Skip to content

Task System

Task Creation

Tasks in Flux are Python functions decorated with @task. They represent individual units of work that can be composed into workflows.

Basic Task Creation

from flux import task

@task
async def simple_task(data: str):
    return data.upper()

Configured Task Creation

@task.with_options(
    name="process_data",                 # Custom task name
    retry_max_attempts=3,                # Maximum retry attempts
    retry_delay=1,                       # Initial delay between retries
    retry_backoff=2,                     # Backoff multiplier
    timeout=30,                          # Task timeout in seconds
    fallback=fallback_function,          # Fallback function
    rollback=rollback_function,          # Rollback function
    secret_requests=["API_KEY"],         # Required secrets
    output_storage=custom_storage        # Custom output storage
)
async def complex_task(data: str, secrets: dict = {}):
    # Task implementation using secrets
    return process_data(data, secrets["API_KEY"])

Task Options

Retry Configuration

@task.with_options(
    retry_max_attempts=3,    # Try up to 3 times
    retry_delay=1,          # Wait 1 second initially
    retry_backoff=2         # Double delay after each retry
)
def retrying_task():
    # Task will retry with delays: 1s, 2s, 4s
    pass

Timeout Configuration

@task.with_options(timeout=30)
def timed_task():
    # Task will fail if it exceeds 30 seconds
    pass

Fallback Configuration

def fallback_handler(input_data):
    # Handle task failure
    return "fallback result"

@task.with_options(fallback=fallback_handler)
def task_with_fallback(input_data):
    # If this fails, fallback_handler is called
    pass

Rollback Configuration

def rollback_handler(input_data):
    # Clean up after task failure
    pass

@task.with_options(rollback=rollback_handler)
def task_with_rollback(input_data):
    # If this fails, rollback_handler is called
    pass

Task Composition

Flux provides several ways to compose tasks:

Parallel Execution

from flux.tasks import parallel

@workflow
async def parallel_workflow(ctx: WorkflowExecutionContext):
    results = await parallel(
        task1(),
        task2(),
        task3()
    )
    return results

Pipeline Processing

from flux.tasks import pipeline

@workflow
async def pipeline_workflow(ctx: WorkflowExecutionContext):
    result = await pipeline(
        task1,
        task2,
        task3,
        input=ctx.input
    )
    return result

Task Mapping

@task
async def process_item(item: str):
    return item.upper()

@workflow
async def mapping_workflow(ctx: WorkflowExecutionContext):
    items = ["a", "b", "c"]
    results = await process_item.map(items)
    return results

Error Handling

Tasks support multiple error handling strategies that can be combined:

Retry Mechanism

@task.with_options(
    retry_max_attempts=3,
    retry_delay=1,
    retry_backoff=2
)
def retrying_task():
    if random.random() < 0.5:
        raise ValueError("Task failed")
    return "success"

Fallback Strategy

def fallback_handler():
    return "fallback result"

@task.with_options(
    fallback=fallback_handler,
    retry_max_attempts=3
)
def task_with_fallback():
    # Retries first, then fallback if all retries fail
    pass

Rollback Operations

def rollback_handler():
    # Clean up resources
    pass

@task.with_options(rollback=rollback_handler)
def task_with_rollback():
    # Rollback is called if task fails
    pass

Timeout Handling

@task.with_options(
    timeout=30,
    fallback=fallback_handler
)
def timed_task():
    # Fails if exceeds 30 seconds, then calls fallback
    pass

Built-in Tasks

Flux provides several built-in tasks for common operations:

Time Operations

from flux.tasks import now, sleep

@workflow
async def timing_workflow(ctx: WorkflowExecutionContext):
    start_time = await now()           # Get current time
    await sleep(timedelta(seconds=5))   # Sleep for duration
    end_time = await now()
    return end_time - start_time

Random Operations

from flux.tasks import choice, randint, randrange

@workflow
async def random_workflow(ctx: WorkflowExecutionContext):
    chosen = await choice(["a", "b", "c"])    # Random choice
    number = await randint(1, 10)             # Random integer
    range_num = await randrange(0, 10, 2)     # Random range

UUID Generation

from flux.tasks import uuid4

@workflow
async def id_workflow(ctx: WorkflowExecutionContext):
    new_id = await uuid4()  # Generate UUID

Workflow Pauses

from flux.tasks import pause

@workflow
async def approval_workflow(ctx: WorkflowExecutionContext):
    # Process something
    result = await process_data()

    # Pause the workflow with a named pause point
    await pause("wait_for_approval")

    # This code will only execute after the workflow is resumed
    return f"Approved: {result}"

Graph-based Task Composition

The Graph task allows you to create complex task dependencies using a directed acyclic graph (DAG):

from flux.tasks import Graph

@task
def get_name(input: str) -> str:
    return input

@task
def say_hello(name: str) -> str:
    return f"Hello, {name}"

@workflow
async def graph_workflow(ctx: WorkflowExecutionContext[str]):
    # Create a graph named "hello_world"
    hello = (
        Graph("hello_world")
        # Add nodes (tasks)
        .add_node("get_name", get_name)
        .add_node("say_hello", say_hello)
        # Define edges (dependencies)
        .add_edge("get_name", "say_hello")
        # Define entry and exit points
        .start_with("get_name")
        .end_with("say_hello")
    )

    # Execute the graph
    return await hello(ctx.input)

Graph features: - Create complex task dependencies - Define conditional execution paths - Validate graph structure - Automatic task ordering

Graph Construction

# Create a more complex graph with conditions
graph = (
    Graph("complex_workflow")
    # Add nodes
    .add_node("task1", task1_func)
    .add_node("task2", task2_func)
    .add_node("task3", task3_func)

    # Add edges with conditions
    .add_edge("task1", "task2", condition=lambda result: result > 0)
    .add_edge("task1", "task3", condition=lambda result: result <= 0)

    # Define workflow boundaries
    .start_with("task1")
    .end_with("task2")
    .end_with("task3")

    # Validate graph structure
    .validate()
)

Graph Properties

  • Nodes can have custom actions
  • Edges can have conditions
  • Automatic cycle detection
  • Guaranteed execution order
  • Built-in validation

AI Agent Tasks

The agent() factory creates tasks that call LLMs. Each agent is a regular @task with retry, timeout, and observability support:

from flux.tasks.ai import agent

assistant = agent(
    "You are a helpful assistant.",
    model="ollama/llama3.2",
    tools=[search_web, get_weather],
)

result = await assistant("What's the weather in London?")

Agents support tool calling, structured output, stateful conversations, and reusable skills. See AI Agents and Agent Skills for details.

MCP Client Tasks

The mcp() task connects workflows to external MCP servers. Each discovered tool becomes a Flux @task:

from flux.tasks.mcp import mcp

async with mcp("http://localhost:8080/mcp") as client:
    tools = await client.discover()
    result = await tools.list_workflows()

See MCP Client for full documentation.

Error Handling in Graphs

def check_condition(result):
    return isinstance(result, str)

graph = (
    Graph("error_handling")
    .add_node("task1", safe_task)
    .add_node("fallback", fallback_task)
    .add_edge("task1", "fallback", condition=lambda r: not check_condition(r))
    .start_with("task1")
    .end_with("fallback")
)