Execution Model¶
Local Execution¶
Local execution runs workflows directly in your Python application.
Direct Python Execution¶
from flux import workflow, WorkflowExecutionContext
@workflow
async def my_workflow(ctx: WorkflowExecutionContext[str]):
result = await some_task(ctx.input)
return result
# Execute the workflow
ctx = my_workflow.run("input_data")
# Access results
print(ctx.output)
print(ctx.succeeded)
Command Line Execution¶
The flux
CLI provides workflow execution through the command line:
# Basic execution
flux exec workflow_file.py workflow_name "input_data"
# Example with hello_world workflow
flux exec hello_world.py hello_world "World"
API-based Execution¶
Flux provides a built-in HTTP API server for remote workflow execution.
Starting the API Server¶
Making API Requests¶
# Execute a workflow
curl --location 'localhost:8000/hello_world' \
--header 'Content-Type: application/json' \
--data '"World"'
# Get execution details
curl --location 'localhost:8000/inspect/[execution_id]'
Available endpoints:
- POST /{workflow_name}
- Execute a workflow
- POST /{workflow_name}/{execution_id}
- Resume a workflow
- GET /inspect/{execution_id}
- Get execution details
HTTP API Response Format¶
{
"execution_id": "unique_execution_id",
"name": "workflow_name",
"input": "input_data",
"output": "result_data"
}
Add ?inspect=true
to get detailed execution information including events:
curl --location 'localhost:8000/hello_world?inspect=true' \
--header 'Content-Type: application/json' \
--data '"World"'
Execution Context¶
The execution context maintains the state and progression of workflow execution:
# Create execution context
ctx = my_workflow.run("input_data")
# Execution identification
execution_id = ctx.execution_id # Unique identifier
workflow_name = ctx.name # Workflow name
# Execution state
is_finished = ctx.finished # Execution completed
has_succeeded = ctx.succeeded # Execution succeeded
has_failed = ctx.failed # Execution failed
is_paused = ctx.paused # Execution paused
# Data access
input_data = ctx.input # Input data
output_data = ctx.output # Output/result data
event_list = ctx.events # Execution events
Paused Workflows¶
Flux supports pausing and resuming workflows:
from flux import workflow, ExecutionContext
from flux.tasks import pause
@workflow
async def pausable_workflow(ctx: ExecutionContext):
# Run until the pause point
result = await initial_task()
# Pause execution
await pause("approval_required")
# This code runs only after resuming
return await final_task(result)
# Start execution (runs until pause point)
ctx = pausable_workflow.run()
print(f"Paused: {ctx.paused}") # True
# Resume execution with the same execution_id
ctx = pausable_workflow.run(execution_id=ctx.execution_id)
print(f"Completed: {ctx.finished}") # True
Resuming Execution¶
# Start workflow
ctx = pausable_workflow.run()
# Resume using execution ID
ctx = pausable_workflow.run(execution_id=ctx.execution_id)
State Management¶
Flux automatically manages workflow state using SQLite for persistence. The state includes:
- Execution context
- Task results
- Events
- Execution status
State is automatically: - Persisted after each step - Loaded when resuming execution - Used for workflow replay - Managed for error recovery
Event System¶
Events track the progression of workflow execution:
Workflow Events¶
from flux.events import ExecutionEventType
# Main workflow lifecycle
ExecutionEventType.WORKFLOW_STARTED # Workflow begins
ExecutionEventType.WORKFLOW_COMPLETED # Workflow succeeds
ExecutionEventType.WORKFLOW_FAILED # Workflow fails
Task Events¶
# Task lifecycle
ExecutionEventType.TASK_STARTED # Task begins
ExecutionEventType.TASK_COMPLETED # Task succeeds
ExecutionEventType.TASK_FAILED # Task fails
# Task retry events
ExecutionEventType.TASK_RETRY_STARTED
ExecutionEventType.TASK_RETRY_COMPLETED
ExecutionEventType.TASK_RETRY_FAILED
# Task fallback events
ExecutionEventType.TASK_FALLBACK_STARTED
ExecutionEventType.TASK_FALLBACK_COMPLETED
ExecutionEventType.TASK_FALLBACK_FAILED
# Task rollback events
ExecutionEventType.TASK_ROLLBACK_STARTED
ExecutionEventType.TASK_ROLLBACK_COMPLETED
ExecutionEventType.TASK_ROLLBACK_FAILED