Long-Running Executions
When agents need to execute multiple tool calls or perform complex operations (like deep research, data analysis, or multi-step workflows), processing time can vary significantly.
Letta supports various ways to handle long-running agents, so you can choose the approach that best fits your use case:
| Use Case | Duration | Recommendedation | Key Benefits |
|---|---|---|---|
| Few-step invocations | < 1 minute | Standard streaming | Simplest approach |
| Variable length runs | 1-10 minutes | Background mode (Keepalive + Timeout as a second choice) | Easy way to reduce timeouts |
| Deep research | 10+ minutes | Background mode, or async polling | Survives disconnects, resumable streams |
| Batch jobs | Any | Async polling | Fire-and-forget, check results later |
Option 1: Background Mode with Resumable Streaming
Section titled “Option 1: Background Mode with Resumable Streaming”Background mode decouples agent execution from your client connection. The agent processes your request on the server while streaming results to a persistent store, allowing you to reconnect and resume from any point — even if your application crashes or network fails.
curl --request POST \ --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{ "messages": [ { "role": "user", "content": "Run comprehensive analysis on this dataset" } ], "stream_tokens": true, "background": true}'
# Response stream includes run_id and seq_id for each chunk:
data: {"run_id":"run-123","seq_id":0,"message_type":"reasoning_message","reasoning":"Analyzing"}data: {"run_id":"run-123","seq_id":1,"message_type":"reasoning_message","reasoning":" the dataset"}data: {"run_id":"run-123","seq_id":2,"message_type":"tool_call","tool_call":{...}}
# ... stream continues
# Step 2: If disconnected, resume from last received seq_id
curl --request GET \ --url https://api.letta.com/v1/runs/$RUN_ID/stream \ --header 'Accept: text/event-stream' \ --data '{"starting_after": 57}'stream = client.agents.messages.create_stream( agent_id=agent_state.id, messages=[ { "role": "user", "content": "Run comprehensive analysis on this dataset" } ], stream_tokens=True, background=True,)run_id = Nonelast_seq_id = Nonefor chunk in stream: if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): run_id = chunk.run_id # Save this to reconnect if your connection drops last_seq_id = chunk.seq_id # Save this as your resumption point for cursor-based pagination print(chunk)
# If disconnected, resume from last received seq_id:for chunk in client.runs.stream(run_id, starting_after=last_seq_id): print(chunk)const stream = await client.agents.messages.createStream({ agentId: agentState.id, requestBody: { messages: [ { role: "user", content: "Run comprehensive analysis on this dataset", }, ], streamTokens: true, background: true, },});
let runId = null;let lastSeqId = null;for await (const chunk of stream) { if (chunk.run_id && chunk.seq_id) { runId = chunk.run_id; // Save this to reconnect if your connection drops lastSeqId = chunk.seq_id; // Save this as your resumption point for cursor-based pagination } console.log(chunk);}
// If disconnected, resume from last received seq_idfor await (const chunk of client.runs.stream(runId, { startingAfter: lastSeqId,})) { console.log(chunk);}# 1) Start background stream and capture approval requeststream = client.agents.messages.create_stream( agent_id=agent.id, messages=[{"role": "user", "content": "Do a sensitive operation"}], stream_tokens=True, background=True,)
approval_request_id = Noneorig_run_id = Nonelast_seq_id = 0for chunk in stream: if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): orig_run_id = chunk.run_id last_seq_id = chunk.seq_id if getattr(chunk, "message_type", None) == "approval_request_message": approval_request_id = chunk.id break
# 2) Approve in background; capture the approval stream cursor (this creates a new run)approve = client.agents.messages.create_stream( agent_id=agent.id, messages=[{"type": "approval", "approve": True, "approval_request_id": approval_request_id}], stream_tokens=True, background=True,)
run_id = Noneapprove_seq = 0for chunk in approve: if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): run_id = chunk.run_id approve_seq = chunk.seq_id if getattr(chunk, "message_type", None) == "tool_return_message": # Tool result arrives here on the approval stream break
# 3) Resume that run to read follow-up tokensfor chunk in client.runs.stream(run_id, starting_after=approve_seq): print(chunk)// 1) Start background stream and capture approval requestconst stream = await client.agents.messages.createStream(agent.id, { messages: [{ role: "user", content: "Do a sensitive operation" }], streamTokens: true, background: true,});
let approvalRequestId = null;let origRunId = null;let lastSeqId = 0;for await (const chunk of stream) { if (chunk.runId && chunk.seqId) { origRunId = chunk.runId; lastSeqId = chunk.seqId; } if (chunk.messageType === "approval_request_message") { approvalRequestId = chunk.id; break; }}
// 2) Approve in background; capture the approval stream cursor (this creates a new run)const approveStream = await client.agents.messages.createStream(agent.id, { messages: [{ type: "approval", approve: true, approvalRequestId }], streamTokens: true, background: true,});
let runId = null;let approveSeq = 0;for await (const chunk of approveStream) { if (chunk.runId && chunk.seqId) { runId = chunk.runId; approveSeq = chunk.seqId; } if (chunk.messageType === "tool_return_message") { // Tool result arrives here on the approval stream break; }}
// 3) Resume that run to read follow-up tokensfor await (const chunk of client.runs.stream(runId, { startingAfter: approveSeq,})) { console.log(chunk);}HITL in Background Mode
Section titled “HITL in Background Mode”When Human‑in‑the‑Loop (HITL) approval is enabled for a tool, your background stream may pause and emit an approval_request_message. In background mode, send the approval via a separate background stream and capture that stream’s run_id/seq_id.
# 1) Start background stream; capture approval requestcurl --request POST \ --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{ "messages": [{"role": "user", "content": "Do a sensitive operation"}], "stream_tokens": true, "background": true}'
# Example stream output (approval request arrives):
data: {"run_id":"run-abc","seq_id":0,"message_type":"reasoning_message","reasoning":"..."}data: {"run_id":"run-abc","seq_id":1,"message_type":"approval_request_message","id":"message-abc","tool_call":{"name":"sensitive_operation","arguments":"{...}","tool_call_id":"tool-xyz"}}
# 2) Approve in background; capture approval stream cursor (this creates a new run)
curl --request POST \ --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{"messages": [{"type": "approval", "approve": true, "approval_request_id": "message-abc"}],"stream_tokens": true,"background": true}'
# Example approval stream output (tool result arrives here):
data: {"run_id":"run-new","seq_id":0,"message_type":"tool_return_message","status":"success","tool_return":"..."}
# 3) Resume the approval stream's run to continue
curl --request GET \ --url https://api.letta.com/v1/runs/$RUN_ID/stream \ --header 'Accept: text/event-stream' \ --data '{"starting_after": 0}'# 1) Start background stream and capture approval requeststream = client.agents.messages.create_stream( agent_id=agent.id, messages=[{"role": "user", "content": "Do a sensitive operation"}], stream_tokens=True, background=True,)
approval_request_id = Noneorig_run_id = Nonelast_seq_id = 0for chunk in stream: if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): orig_run_id = chunk.run_id last_seq_id = chunk.seq_id if getattr(chunk, "message_type", None) == "approval_request_message": approval_request_id = chunk.id break
# 2) Approve in background; capture the approval stream cursor (this creates a new run)approve = client.agents.messages.create_stream( agent_id=agent.id, messages=[{"type": "approval", "approve": True, "approval_request_id": approval_request_id}], stream_tokens=True, background=True,)
run_id = Noneapprove_seq = 0for chunk in approve: if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): run_id = chunk.run_id approve_seq = chunk.seq_id if getattr(chunk, "message_type", None) == "tool_return_message": # Tool result arrives here on the approval stream break
# 3) Resume that run to read follow-up tokensfor chunk in client.runs.stream(run_id, starting_after=approve_seq): print(chunk)// 1) Start background stream and capture approval requestconst stream = await client.agents.messages.createStream({ agentId: agent.id, requestBody: { messages: [{ role: "user", content: "Do a sensitive operation" }], streamTokens: true, background: true, },});
let approvalRequestId: string | null = null;let origRunId: string | null = null;let lastSeqId = 0;for await (const chunk of stream) { if (chunk.run_id && chunk.seq_id) { origRunId = chunk.run_id; lastSeqId = chunk.seq_id; } if (chunk.message_type === "approval_request_message") { approvalRequestId = chunk.id; break; }}
// 2) Approve in background; capture the approval stream cursor (this creates a new run)const approve = await client.agents.messages.createStream({ agentId: agent.id, requestBody: { messages: [{ type: "approval", approve: true, approvalRequestId }], streamTokens: true, background: true, },});
let runId: string | null = null;let approveSeq = 0;for await (const chunk of approve) { if (chunk.run_id && chunk.seq_id) { runId = chunk.run_id; approveSeq = chunk.seq_id; } if (chunk.message_type === "tool_return_message") { // Tool result arrives here on the approval stream break; }}
// 3) Resume that run to read follow-up tokensconst resume = await client.runs.stream(runId!, { startingAfter: approveSeq });for await (const chunk of resume) { console.log(chunk);}Discovering and Resuming Active Streams
Section titled “Discovering and Resuming Active Streams”When your application starts or recovers from a crash, you can check for any active background streams and resume them. This is particularly useful for:
- Application restarts: Resume processing after deployments or crashes
- Load balancing: Pick up streams started by other instances
- Monitoring: Check progress of long-running operations from different clients
# Step 1: Find active background streams for your agentscurl --request GET \ --url https://api.letta.com/v1/runs/active \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{ "agent_ids": [ "agent-123", "agent-456" ], "background": true}'# Returns: [{"run_id": "run-abc", "agent_id": "agent-123", "status": "processing", ...}]
# Step 2: Resume streaming from the beginning (or any specified seq_id)
curl --request GET \ --url https://api.letta.com/v1/runs/$RUN_ID/stream \ --header 'Accept: text/event-stream' \ --data '{"starting_after": 0, # Start from beginning"batch_size": 1000 # Fetch historical chunks in larger batches}'# Find and resume active background streamsactive_runs = client.runs.active( agent_ids=["agent-123", "agent-456"], background=True,)
if active_runs: # Resume the first active stream from the beginning run = active_runs[0] print(f"Resuming stream for run {run.id}, status: {run.status}")
stream = client.runs.stream( run_id=run.id, starting_after=0, # Start from beginning batch_size=1000 # Fetch historical chunks in larger batches )
# Each historical chunk is streamed one at a time, followed by new chunks as they become available for chunk in stream: print(chunk)// Find and resume active background streamsconst activeRuns = await client.runs.active({ agentIds: ["agent-123", "agent-456"], background: true,});
if (activeRuns.length > 0) { // Resume the first active stream from the beginning const run = activeRuns[0]; console.log(`Resuming stream for run ${run.id}, status: ${run.status}`);
const stream = await client.runs.stream(run.id, { startingAfter: 0, // Start from beginning batchSize: 1000, // Fetch historical chunks in larger batches });
// Each historical chunk is streamed one at a time, followed by new chunks as they become available for await (const chunk of stream) { console.log(chunk); }}Option 2: Async Operations with Polling
Section titled “Option 2: Async Operations with Polling”Ideal for batch processing, scheduled jobs, or when you don’t need real-time updates. The async SDK method queues your request and returns immediately, letting you check results later:
# Start async operation (returns immediately with run ID)curl --request POST \ --url https://api.letta.com/v1/agents/$AGENT_ID/messages/async \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{ "messages": [ { "role": "user", "content": "Run comprehensive analysis on this dataset" } ]}'
# Poll for results using the returned run ID
curl --request GET \ --url https://api.letta.com/v1/runs/$RUN_ID# Start async operation (returns immediately with run ID)run = client.agents.messages.create_async( agent_id=agent_state.id, messages=[ { "role": "user", "content": "Run comprehensive analysis on this dataset" } ],)
# Poll for completionimport timewhile run.status != "completed": time.sleep(2) run = client.runs.retrieve(run_id=run.id)
# Get the messages once completemessages = client.runs.messages.list(run_id=run.id)// Start async operation (returns immediately with run ID)const run = await client.agents.createAgentMessageAsync({ agentId: agentState.id, requestBody: { messages: [ { role: "user", content: "Run comprehensive analysis on this dataset", }, ], },});
// Poll for completionwhile (run.status !== "completed") { await new Promise((resolve) => setTimeout(resolve, 2000)); run = await client.runs.retrieveRun({ runId: run.id });}
// Get the messages once completeconst messages = await client.runs.listRunMessages({ runId: run.id });Option 3: Configure Streaming with Keepalive Pings and Longer Timeouts
Section titled “Option 3: Configure Streaming with Keepalive Pings and Longer Timeouts”For operations under 10 minutes that need real-time updates without the complexity of background processing. Configure keepalive pings and timeouts to maintain stable connections:
curl --request POST \ --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{ "messages": [ { "role": "user", "content": "Execute this long-running analysis" } ], "include_pings": true}'# Configure client with extended timeoutfrom letta_client import Lettaimport os
client = Letta(token=os.getenv("LETTA_API_KEY"))
# Enable pings to prevent timeout during long operations
stream = client.agents.messages.create_stream(agent_id=agent_state.id,messages=[{"role": "user","content": "Execute this long-running analysis"}],include_pings=True, # Sends periodic keepalive messagesrequest_options={"timeout_in_seconds": 600} # 10 min timeout)
# Process the stream (pings will keep connection alive)
for chunk in stream:if chunk.message_type == "ping": # Keepalive ping received, connection is still activecontinueprint(chunk)// Configure client with extended timeoutimport { Letta } from '@letta/sdk';
const client = new Letta({ token: process.env.LETTA_API_KEY});
// Enable pings to prevent timeout during long operationsconst stream = await client.agents.createAgentMessageStream({ agentId: agentState.id, requestBody: { messages: [ { role: "user", content: "Execute this long-running analysis" } ], includePings: true // Sends periodic keepalive messages }, { timeoutInSeconds: 600 // 10 minutes timeout in seconds }});
// Process the stream (pings will keep connection alive)for await (const chunk of stream) { if (chunk.message_type === "ping") { // Keepalive ping received, connection is still active continue; } console.log(chunk);}Configuration Guidelines
Section titled “Configuration Guidelines”| Parameter | Purpose | When to Use |
|---|---|---|
| Timeout in seconds | Extends request timeout beyond 60s default | Set to 1.5x your expected max duration |
| Include pings | Sends keepalive messages every ~30s | Enable for operations with long gaps between outputs |