Introduction
Production AI systems need more than a single LLM call or a simple chain. They require reliable, long-running workflows that handle failures, retries, human-in-the-loop interventions, and complex state management. This article covers three complementary approaches to AI workflow automation: LangChain for LLM orchestration, Temporal for durable execution, and event-driven architectures for scalable pipelines.

LangChain Workflows
LangChain provides abstractions for building LLM-powered workflows:
from langchain.chains import LLMChain, SequentialChain
from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatAnthropic
from langchain.memory import ConversationBufferMemory
llm = ChatAnthropic(model="claude-sonnet-4-20260512")
Define individual chain steps
extract_prompt = PromptTemplate(
input_variables=["text"],
template="Extract key requirements from this text:\n{text}",
)
extract_chain = LLMChain(llm=llm, prompt=extract_prompt, output_key="requirements")
analyze_prompt = PromptTemplate(
input_variables=["requirements"],
template="Analyze these requirements and identify potential issues:\n{requirements}",
)
analyze_chain = LLMChain(llm=llm, prompt=analyze_prompt, output_key="analysis")
generate_prompt = PromptTemplate(
input_variables=["requirements", "analysis"],
template="Based on requirements and analysis, generate a solution:\nRequirements: {requirements}\nAnalysis: {analysis}",
)
generate_chain = LLMChain(llm=llm, prompt=generate_prompt, output_key="solution")
Compose into a sequential workflow
workflow = SequentialChain(
chains=[extract_chain, analyze_chain, generate_chain],
input_variables=["text"],
output_variables=["requirements", "analysis", "solution"],
verbose=True,
)
result = workflow({"text": "Build a REST API for user management with authentication"})
Temporal for Durable Execution
Temporal provides reliability guarantees for long-running AI workflows:
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
import asyncio
Define activities (individual steps)
@activity.defn
async def retrieve_documents(query: str) -> list[str]:
return await vector_search(query)
@activity.defn
async def generate_answer(context: list[str], question: str) -> str:
return await call_llm(f"Context: {context}\nQuestion: {question}")
@activity.defn
async def review_output(answer: str) -> str:
"""Human-in-the-loop review with timeout."""
review = await request_human_review(answer, timeout=3600)
if review["approved"]:
return answer
return f"Needs revision: {review['feedback']}"
Define workflow
@workflow.defn
class DocumentQAWorkflow:
@workflow.run
async def run(self, question: str) -> dict:
Step 1: Retrieve documents
docs = await workflow.execute_activity(
retrieve_documents, question,
start_to_close_timeout=timedelta(seconds=30),
retry_policy={"maximum_attempts": 3},
)
Step 2: Generate answer
answer = await workflow.execute_activity(
generate_answer, [docs, question],
start_to_close_timeout=timedelta(minutes=2),
)
Step 3: Human review
final = await workflow.execute_activity(
review_output, answer,
start_to_close_timeout=timedelta(hours=2),
)
return {"question": question, "answer": final}
Run the workflow
async def start_workflow():
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
DocumentQAWorkflow.run,
"What is our GDPR compliance policy?",
id="doc-qa-workflow-001",
task_queue="ai-tasks",
)
result = await handle.result()
print(result)
Temporal automatically retries failed activities, persists workflow state, and enables human-in-the-loop pauses.
Event-Driven Architecture
Event-driven workflows react to events with loosely coupled agents:
import asyncio
from enum import Enum
class EventType(Enum):
DOCUMENT_UPLOADED = "document.uploaded"
QUERY_RECEIVED = "query.received"
ANALYSIS_COMPLETE = "analysis.complete"
HUMAN_REVIEW_NEEDED = "human.review.needed"
WORKFLOW_COMPLETE = "workflow.complete"
@dataclass
class Event:
type: EventType
data: dict
source: str
timestamp: float = None
class EventBus:
def init(self):
self.subscribers: dict[EventType, list[callable]] = {}
self.event_log: list[Event] = []
def subscribe(self, event_type: EventType, handler: callable):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def emit(self, event: Event):
self.event_log.append(event)
handlers = self.subscribers.get(event.type, [])
results = await asyncio.gather(
*[handler(event) for handler in handlers],
return_exceptions=True,
)
return results
Event-driven document processing pipeline
class DocumentProcessingPipeline:
def init(self, event_bus: EventBus):
self.bus = event_bus
self._register_handlers()
def _register_handlers(self):
self.bus.subscribe(EventType.DOCUMENT_UPLOADED, self.handle_document)
self.bus.subscribe(EventType.ANALYSIS_COMPLETE, self.handle_analysis)
self.bus.subscribe(EventType.QUERY_RECEIVED, self.handle_query)
async def handle_document(self, event: Event):
doc = event.data["document"]
Extract text, chunk, embed
chunks = chunk_document(doc)
embeddings = embed_chunks(chunks)
store_embeddings(embeddings)
await self.bus.emit(Event(
type=EventType.ANALYSIS_COMPLETE,
data={"doc_id": doc["id"], "chunks": len(chunks)},
source="document_processor",
))
async def handle_query(self, event: Event):
query = event.data["query"]
docs = vector_search(query)
answer = call_llm(f"Context: {docs}\nQuery: {query}")
await self.bus.emit(Event(
type=EventType.WORKFLOW_COMPLETE,
data={"query": query, "answer": answer},
source="query_handler",
))
Error Recovery and Retry
All workflows need robust error handling:
from tenacity import retry, stop_after_attempt, wait_exponential
class WorkflowExecutor:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
)
async def execute_with_retry(self, workflow_fn, args, *kwargs):
try:
return await workflow_fn(args, *kwargs)
except LLMAPIError as e:
if e.is_rate_limit():
raise # Let retry handle it
raise
except ContextWindowError:
Reduce context size and retry
kwargs["max_context"] = kwargs.get("max_context", 4000) // 2
return await self.execute_with_retry(workflow_fn, args, *kwargs)
except Exception:
Log and escalate to human
await self.escalate_to_human(workflow_fn, args, kwargs)
raise
Conclusion
AI workflow automation requires reliability guarantees beyond simple scripting. LangChain provides the LLM orchestration layer with composable chains. Temporal adds durability with automatic retries, state persistence, and human-in-the-loop capabilities. Event-driven architectures enable loosely coupled, scalable pipelines. For production systems, combine all three: use LangChain for LLM logic within Temporal activities, and wire everything together with an event bus for scalability.
Enjoy this article? Share your thoughts, questions, or experiences in the comments below — your insights help other readers too.
Join the discussion ↓