Workflow Management¶
Creating Workflows¶
A workflow in Flux is created by combining the @workflow
decorator with a Python async function that uses await for tasks. Workflows are the primary way to organize and orchestrate task execution.
from flux import workflow, ExecutionContext, task
@task
async def process_data(data: str):
return data.upper()
@workflow
async def my_workflow(ctx: ExecutionContext[str]):
# Workflows must take an ExecutionContext as first argument
# The type parameter [str] indicates the expected input type
result = await process_data(ctx.input)
return result
Workflow Configuration¶
Workflows can be configured using with_options
:
@workflow.with_options(
name="custom_workflow", # Custom name for the workflow
secret_requests=["API_KEY"], # Secrets required by the workflow
output_storage=custom_storage # Custom storage for workflow outputs
)
async def configured_workflow(ctx: ExecutionContext):
pass
Workflow Lifecycle¶
A workflow goes through several stages during its execution:
-
Initialization
-
Execution
-
Completion or Failure
-
Replay or Resume (if needed)
Workflow States¶
Workflows can be in several states:
-
Running
-
Paused
# Workflow with pause point from flux.tasks import pause @workflow async def pausable_workflow(ctx: ExecutionContext): await some_task() await pause("manual_approval") return "Complete" ctx = pausable_workflow.run() print(ctx.is_paused) # True when paused print(ctx.has_finished) # False when paused # Resume paused workflow ctx = pausable_workflow.run(execution_id=ctx.execution_id)
-
Completed
-
Failed