Introduction

Production AI systems need more than a single LLM call or a simple chain. They require reliable, long-running workflows that handle failures, retries, human-in-the-loop interventions, and complex state management. This article covers three complementary approaches to AI workflow automation: LangChain for LLM orchestration, Temporal for durable execution, and event-driven architectures for scalable pipelines.

AI Workflow Automation: LangChain, Temporal, Event-Driven Agents

LangChain Workflows

LangChain provides abstractions for building LLM-powered workflows:

from langchain.chains import LLMChain, SequentialChain

from langchain.prompts import PromptTemplate

from langchain.chat_models import ChatAnthropic

from langchain.memory import ConversationBufferMemory

llm = ChatAnthropic(model="claude-sonnet-4-20260512")

Define individual chain steps

extract_prompt = PromptTemplate(

input_variables=["text"],

template="Extract key requirements from this text:\n{text}",

)

extract_chain = LLMChain(llm=llm, prompt=extract_prompt, output_key="requirements")

analyze_prompt = PromptTemplate(

input_variables=["requirements"],

template="Analyze these requirements and identify potential issues:\n{requirements}",

)

analyze_chain = LLMChain(llm=llm, prompt=analyze_prompt, output_key="analysis")

generate_prompt = PromptTemplate(

input_variables=["requirements", "analysis"],

template="Based on requirements and analysis, generate a solution:\nRequirements: {requirements}\nAnalysis: {analysis}",

)

generate_chain = LLMChain(llm=llm, prompt=generate_prompt, output_key="solution")

Compose into a sequential workflow

workflow = SequentialChain(

chains=[extract_chain, analyze_chain, generate_chain],

input_variables=["text"],

output_variables=["requirements", "analysis", "solution"],

verbose=True,

)

result = workflow({"text": "Build a REST API for user management with authentication"})

Temporal for Durable Execution

Temporal provides reliability guarantees for long-running AI workflows:

from temporalio import activity, workflow

from temporalio.client import Client

from temporalio.worker import Worker

import asyncio

Define activities (individual steps)

@activity.defn

async def retrieve_documents(query: str) -> list[str]:

return await vector_search(query)

@activity.defn

async def generate_answer(context: list[str], question: str) -> str:

return await call_llm(f"Context: {context}\nQuestion: {question}")

@activity.defn

async def review_output(answer: str) -> str:

"""Human-in-the-loop review with timeout."""

review = await request_human_review(answer, timeout=3600)

if review["approved"]:

return answer

return f"Needs revision: {review['feedback']}"

Define workflow

@workflow.defn

class DocumentQAWorkflow:

@workflow.run

async def run(self, question: str) -> dict:

Step 1: Retrieve documents

docs = await workflow.execute_activity(

retrieve_documents, question,

start_to_close_timeout=timedelta(seconds=30),

retry_policy={"maximum_attempts": 3},

)

Step 2: Generate answer

answer = await workflow.execute_activity(

generate_answer, [docs, question],

start_to_close_timeout=timedelta(minutes=2),

)

Step 3: Human review

final = await workflow.execute_activity(

review_output, answer,

start_to_close_timeout=timedelta(hours=2),

)

return {"question": question, "answer": final}

Run the workflow

async def start_workflow():

client = await Client.connect("localhost:7233")

handle = await client.start_workflow(

DocumentQAWorkflow.run,

"What is our GDPR compliance policy?",

id="doc-qa-workflow-001",

task_queue="ai-tasks",

)

result = await handle.result()

print(result)

Temporal automatically retries failed activities, persists workflow state, and enables human-in-the-loop pauses.

Event-Driven Architecture

Event-driven workflows react to events with loosely coupled agents:

import asyncio

from enum import Enum

class EventType(Enum):

DOCUMENT_UPLOADED = "document.uploaded"

QUERY_RECEIVED = "query.received"

ANALYSIS_COMPLETE = "analysis.complete"

HUMAN_REVIEW_NEEDED = "human.review.needed"

WORKFLOW_COMPLETE = "workflow.complete"

@dataclass

class Event:

type: EventType

data: dict

source: str

timestamp: float = None

class EventBus:

def init(self):

self.subscribers: dict[EventType, list[callable]] = {}

self.event_log: list[Event] = []

def subscribe(self, event_type: EventType, handler: callable):

if event_type not in self.subscribers:

self.subscribers[event_type] = []

self.subscribers[event_type].append(handler)

async def emit(self, event: Event):

self.event_log.append(event)

handlers = self.subscribers.get(event.type, [])

results = await asyncio.gather(

*[handler(event) for handler in handlers],

return_exceptions=True,

)

return results

Event-driven document processing pipeline

class DocumentProcessingPipeline:

def init(self, event_bus: EventBus):

self.bus = event_bus

self._register_handlers()

def _register_handlers(self):

self.bus.subscribe(EventType.DOCUMENT_UPLOADED, self.handle_document)

self.bus.subscribe(EventType.ANALYSIS_COMPLETE, self.handle_analysis)

self.bus.subscribe(EventType.QUERY_RECEIVED, self.handle_query)

async def handle_document(self, event: Event):

doc = event.data["document"]

Extract text, chunk, embed

chunks = chunk_document(doc)

embeddings = embed_chunks(chunks)

store_embeddings(embeddings)

await self.bus.emit(Event(

type=EventType.ANALYSIS_COMPLETE,

data={"doc_id": doc["id"], "chunks": len(chunks)},

source="document_processor",

))

async def handle_query(self, event: Event):

query = event.data["query"]

docs = vector_search(query)

answer = call_llm(f"Context: {docs}\nQuery: {query}")

await self.bus.emit(Event(

type=EventType.WORKFLOW_COMPLETE,

data={"query": query, "answer": answer},

source="query_handler",

))

Error Recovery and Retry

All workflows need robust error handling:

from tenacity import retry, stop_after_attempt, wait_exponential

class WorkflowExecutor:

@retry(

stop=stop_after_attempt(3),

wait=wait_exponential(multiplier=1, min=4, max=60),

)

async def execute_with_retry(self, workflow_fn, args, *kwargs):

try:

return await workflow_fn(args, *kwargs)

except LLMAPIError as e:

if e.is_rate_limit():

raise # Let retry handle it

raise

except ContextWindowError:

Reduce context size and retry

kwargs["max_context"] = kwargs.get("max_context", 4000) // 2

return await self.execute_with_retry(workflow_fn, args, *kwargs)

except Exception:

Log and escalate to human

await self.escalate_to_human(workflow_fn, args, kwargs)

raise

Conclusion

AI workflow automation requires reliability guarantees beyond simple scripting. LangChain provides the LLM orchestration layer with composable chains. Temporal adds durability with automatic retries, state persistence, and human-in-the-loop capabilities. Event-driven architectures enable loosely coupled, scalable pipelines. For production systems, combine all three: use LangChain for LLM logic within Temporal activities, and wire everything together with an event bus for scalability.