Introduction

AI agents that autonomously execute multi-step tasks are transitioning from experimental prototypes to production systems. Unlike traditional API calls, agents make decisions, use tools, and interact with external systems--introducing new challenges around reliability, cost, safety, and observability. This article covers the patterns and practices needed to deploy AI agents safely and efficiently.

Deploying AI Agents to Production

Agent Orchestration

Production agents typically follow a structured execution loop:

import asyncio

from typing import List, Optional

from dataclasses import dataclass, field

@dataclass

class AgentContext:

task: str

max_steps: int = 20

current_step: int = 0

history: List[dict] = field(default_factory=list)

tools_used: List[str] = field(default_factory=list)

total_cost: float = 0.0

class AgentOrchestrator:

def init(self, model: str = "claude-sonnet-4-20260512"):

self.model = model

self.max_retries = 3

self.cost_per_token = {"input": 0.000003, "output": 0.000015}

async def run(self, task: str) -> dict:

ctx = AgentContext(task=task)

while ctx.current_step < ctx.max_steps:

ctx.current_step += 1

try:

1\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\. Think: decide next action

action = await self.think(ctx)

2\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\. Act: execute tool or respond

if action["type"] == "tool_call":

result = await self.execute_tool(action["tool"], action["args"])

ctx.tools_used.append(action["tool"]["name"])

elif action["type"] == "final_answer":

return {

"status": "success",

"answer": action["content"],

"steps": ctx.current_step,

"tools_used": ctx.tools_used,

"total_cost": ctx.total_cost,

}

3\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\. Observe: store result

ctx.history.append({

"step": ctx.current_step,

"action": action,

"observation": result,

})

4\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\. Cost tracking

ctx.total_cost += self._calculate_cost(action)

except ToolError as e:

Handle tool failures with retry

await self.handle_tool_error(ctx, e)

except Exception as e:

Catch-all for unexpected errors

await self.handle_unexpected_error(ctx, e)

return {"status": "max_steps_exceeded", "steps": ctx.max_steps}

Error Handling and Retry Logic

Agents must gracefully handle failures across multiple dimensions:

class AgentErrorHandler:

def init(self):

self.retry_policies = {

"rate_limit": RetryPolicy(max_retries=5, backoff="exponential"),

"timeout": RetryPolicy(max_retries=3, backoff="linear"),

"auth_error": RetryPolicy(max_retries=1, backoff="none"),

"tool_crash": RetryPolicy(max_retries=2, backoff="constant"),

}

async def execute_with_retry(self, tool_call: dict) -> dict:

policy = self._get_policy(tool_call["tool"]["name"])

for attempt in range(policy.max_retries):

try:

return await self._execute_tool(tool_call)

except RateLimitError as e:

wait = self._calculate_backoff(attempt, policy.backoff, e.reset_at)

await self._log_retry(tool_call, attempt, wait)

await asyncio.sleep(wait)

except TimeoutError:

if attempt == policy.max_retries - 1:

return self._graceful_degradation(tool_call)

await asyncio.sleep(policy.backoff_delay * (attempt + 1))

except AuthError:

await self._refresh_credentials(tool_call["tool"]["name"])

continue

return {"error": "max_retries_exceeded", "tool": tool_call["tool"]["name"]}

def _graceful_degradation(self, tool_call: dict) -> dict:

"""Return a safe default when a tool is unavailable."""

return {

"status": "unavailable",

"message": f"{tool_call['tool']['name']} is temporarily unavailable",

"suggestion": "Try again later or use an alternative approach",

}

Human-in-the-Loop

Critical agent actions require human approval before execution:

class HumanInTheLoop:

def init(self, approval_thresholds: dict):

self.thresholds = approval_thresholds

self.pending_approvals = {}

async def request_approval(

self, action: dict, context: AgentContext

) -> bool:

Determine if approval is needed

if not self._requires_approval(action):

return True

approval_id = str(uuid.uuid4())

self.pending_approvals[approval_id] = {

"action": action,

"context": context,

"status": "pending",

"created_at": datetime.utcnow(),

}

Notify human reviewer

await self._notify_reviewer(

approval_id=approval_id,

action_description=action["description"],

risk_level=action.get("risk", "low"),

current_state=context.history[-3:],

)

Wait for approval (with timeout)

try:

approved = await self._wait_for_approval(approval_id, timeout=300)

self.pending_approvals[approval_id]["status"] = (

"approved" if approved else "rejected"

)

return approved

except TimeoutError:

self.pending_approvals[approval_id]["status"] = "timed_out"

return False

def _requires_approval(self, action: dict) -> bool:

return any([

action.get("risk") in self.thresholds.get("high_risk_actions", []),

action["tool"].get("name") in self.thresholds.get("protected_tools", []),

action.get("amount", 0) > self.thresholds.get("max_amount", 1000),

action.get("destructive", False),

])

Monitoring Agent Behavior

Track agent decisions and outcomes with structured telemetry:

class AgentTelemetry:

def init(self):

self.metrics = MetricsClient()

self.tracer = Tracer()

def record_step(self, ctx: AgentContext, action: dict, observation: dict):

span = self.tracer.start_span("agent_step")

span.set_attributes({

"agent.step": ctx.current_step,

"agent.task_hash": hash(ctx.task),

"action.type": action["type"],

"action.tool": action.get("tool", {}).get("name", "none"),

"action.duration_ms": action.get("duration_ms", 0),

"observation.status": observation.get("status", "unknown"),

})

self.metrics.histogram(

"agent.step.duration",

value=action.get("duration_ms", 0),

tags={

"tool": action.get("tool", {}).get("name", "none"),

"status": observation.get("status", "unknown"),

},

)

span.end()

def detect_anomalies(self, ctx: AgentContext) -> List[str]:

warnings = []

Looping detection

recent_actions = [h["action"]["tool"]["name"]

for h in ctx.history[-10:]]

if len(set(recent_actions)) < 3 and len(recent_actions) >= 5:

warnings.append("POSSIBLE_LOOP")

Cost anomaly

if ctx.total_cost > 0.50:

warnings.append("HIGH_COST")

Token usage

total_tokens = sum(

h["action"].get("tokens", 0) for h in ctx.history

)

if total_tokens > 50000:

warnings.append("HIGH_TOKEN_USAGE")

return warnings

Cost Tracking and Rate Limiting

class AgentCostManager:

def init(self, daily_budget: float = 10.0):

self.daily_budget = daily_budget

self.daily_spend = 0.0

self.token_buckets = {}

async def check_budget(self, estimated_cost: float) -> bool:

Reset daily counter

if self._is_new_day():

self.daily_spend = 0.0

if self.daily_spend + estimated_cost > self.daily_budget:

return False # Budget exceeded

self.daily_spend += estimated_cost

return True

async def rate_limit_check(self, tool: str) -> bool:

bucket = self.token_buckets.get(tool, TokenBucket(

capacity=10,

refill_rate=1,

refill_interval=60

))

return bucket.consume()

Observability

Log agent decision traces for debugging and audit:

{

"timestamp": "2026-05-12T10:30:00Z",

"agent_id": "agent-payment-v3",

"session_id": "sess_abc123",

"step": 4,

"action": {

"type": "tool_call",

"tool": "stripe_charge",

"args": {"amount": 49.99, "currency": "usd"},

"reasoning": "Customer requested payment for order ord-789"

},

"observation": {

"status": "success",

"charge_id": "ch_xyz456",

"duration_ms": 234

},

"cost": {

"input_tokens": 1245,

"output_tokens": 89,

"estimated_cost": 0.005

},

"warnings": []

}

Deploy agents incrementally: start with read-only tools, add human-in-the-loop for destructive actions, and only move to fully autonomous mode after extensive monitoring and failure mode analysis.