Task Progress Streaming¶
Tasks can report progress during execution using the progress() function. Progress events are ephemeral — they stream to connected clients in real-time but are never persisted, never stored in the database, and never replayed. This makes them ideal for:
- LLM token streaming — see agent responses as they generate
- Data processing updates — track rows processed, percentage complete
- Pipeline step reporting — show which phase is active
Quick Start¶
from flux import task, workflow, ExecutionContext
from flux.tasks import progress
@task
async def process_data(items: list):
results = []
for i, item in enumerate(items):
results.append(item * 2)
await progress({"processed": i + 1, "total": len(items)})
return results
@workflow
async def my_workflow(ctx: ExecutionContext):
return await process_data(ctx.input["items"])
Consuming Progress Events¶
Progress events are delivered via SSE when using stream mode. There are two ways to consume them:
CLI¶
HTTP API¶
curl -N -X POST http://localhost:8000/workflows/my_workflow/run/stream \
-H "Content-Type: application/json" \
-d '{"items": [1, 2, 3]}'
Both produce the same SSE event stream. Progress events use the task.progress event type and carry the standard ExecutionEvent structure:
event: task.progress
data: {"type": "TASK_PROGRESS", "source_id": "process_data_123", "name": "process_data", "value": {"processed": 1, "total": 3}, "time": "2026-03-19 17:50:08.410"}
event: task.progress
data: {"type": "TASK_PROGRESS", "source_id": "process_data_123", "name": "process_data", "value": {"processed": 2, "total": 3}, "time": "2026-03-19 17:50:08.511"}
event: task.progress
data: {"type": "TASK_PROGRESS", "source_id": "process_data_123", "name": "process_data", "value": {"processed": 3, "total": 3}, "time": "2026-03-19 17:50:08.613"}
event: my_workflow.execution.completed
data: {"state": "COMPLETED", "output": {...}}
Progress events are interleaved with durable execution state events (CLAIMED, RUNNING, COMPLETED). Clients distinguish them by the event field — task.progress for ephemeral progress, {workflow}.execution.{state} for durable state changes.
When no stream-mode client is connected (async or sync mode), progress events are silently dropped. There is zero memory or CPU overhead.
Agent Token Streaming¶
The agent() task uses progress() to stream LLM tokens. Streaming is enabled by default:
from flux.tasks.ai import agent
assistant = agent(
"You are a helpful assistant.",
model="openai/gpt-4o",
)
result = await assistant("Explain quantum computing")
Each token is emitted as progress({"token": "..."}). The return value is still the complete response string — streaming is a side-effect for connected clients.
SSE output for a streaming agent:
event: task.progress
data: {"type": "TASK_PROGRESS", "name": "agent_openai_gpt_4o", "value": {"token": "Quantum"}, ...}
event: task.progress
data: {"type": "TASK_PROGRESS", "name": "agent_openai_gpt_4o", "value": {"token": " computing"}, ...}
event: task.progress
data: {"type": "TASK_PROGRESS", "name": "agent_openai_gpt_4o", "value": {"token": " is"}, ...}
event: my_workflow.execution.completed
data: {"state": "COMPLETED", "output": {"response": "Quantum computing is..."}}
Disabling Streaming¶
Structured Output¶
When response_format is set, streaming is automatically disabled. Partial JSON tokens are not useful to clients.
from pydantic import BaseModel
class Analysis(BaseModel):
summary: str
score: float
analyst = agent(
"Analyze the text.",
model="openai/gpt-4o",
response_format=Analysis,
)
Tool Calling¶
When an agent uses tools, the tool-calling iterations use non-streaming calls (tool call JSON must be complete to parse). Only the final response (after all tool calls) is streamed.
Progress Value¶
The progress() function accepts any value — the shape is up to the task author:
# Numeric progress
await progress({"processed": 500, "total": 10000})
# Step-based progress
await progress({"step": "building image"})
# Token streaming (used by agent())
await progress({"token": "Hello"})
# Custom structure
await progress({"phase": "training", "epoch": 3, "loss": 0.042})
Durability Guarantees¶
Progress events have zero impact on Flux's durability model:
| Aspect | Behavior |
|---|---|
| Event log | Progress is never added to ctx.events |
| Database | Progress is never persisted |
| Checkpoint | Progress does not trigger checkpoints |
| Replay | Progress events don't exist during replay |
| Event count | Still 2 per task (STARTED + COMPLETED) |
If the worker crashes mid-streaming, the task re-executes from scratch on resume. The client gets a fresh token stream. The complete response is durably stored only in the TASK_COMPLETED event.
How It Works¶
sequenceDiagram
participant Task
participant Worker
participant Server
participant Client
Task->>Worker: await progress(value)
Note over Worker: put_nowait(queue)
Worker->>Server: POST /progress/{id} (batch, no DB write)
Server->>Client: SSE: task.progress (ephemeral)
Task->>Worker: return result
Note over Worker: TASK_COMPLETED event
Worker->>Server: POST /checkpoint/{id} (DB write)
Server->>Client: SSE: execution.completed (durable)
- Task calls
progress(value)— enqueued in an in-memory per-execution queue on the worker (fire-and-forget, never blocks) - A background flusher batches progress items and POSTs them to
POST /workers/{name}/progress/{execution_id}(no database write) - Server buffers in-memory (only when a stream-mode client is connected — zero overhead otherwise)
- SSE handler yields
TASK_PROGRESSevents to the client, interleaved with durable checkpoint events - On cancellation, the flusher drains remaining items before exiting (final tokens are the most valuable)
Back-pressure¶
The worker queue has a max size of 1000 items. If progress events are produced faster than they can be flushed, excess events are silently dropped. This prevents unbounded memory growth without blocking task execution.
Examples¶
See the included examples:
examples/task_progress_example.py— Numeric and step-based progress reportingexamples/ai/streaming_progress_agent_ollama.py— LLM token streaming with Ollama