Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.xpander.ai/llms.txt

Use this file to discover all available pages before exploring further.

@on_task registers a function as the agent’s task executor. The decorated function runs every time the platform dispatches a task to this worker: over SSE in production, and via the embedded HTTP server (POST /invoke on port 59321) for local invocations and Cloud Run-style integrations. The runtime auto-detects whether your function is a regular handler (returns Task) or a streaming handler (async generator that yields TaskUpdateEvent).
from xpander_sdk import Backend, on_task
from xpander_sdk.modules.tasks.sub_modules.task import Task
from agno.agent import Agent as AgnoAgent

backend = Backend()

@on_task
async def handler(task: Task) -> Task:
    args = await backend.aget_args(
        agent_id=task.agent_id,
        agent_version=task.agent_version,
        task=task,
    )
    agno_agent = AgnoAgent(**args)
    result = await agno_agent.arun(input=task.to_message())
    task.result = result.content
    return task

Decorator forms

@on_task
def fn(task: Task) -> Task: ...

@on_task(configuration=config)
def fn(task: Task) -> Task: ...

@on_task(test_task=local_test_task)
async def fn(task: Task) -> Task: ...
ParameterTypeDefaultDescription
configurationConfigurationNoneSDK config for the underlying Events module. Falls back to env vars.
test_taskLocalTaskTestNoneA simulated task to run locally. The runtime invokes the handler once with this task and exits, instead of subscribing to the SSE stream.

Required signature

The handler must accept exactly one parameter named task. Either positional or keyword form works.
@on_task
def handler(task: Task) -> Task:        # ✓
    ...

@on_task
async def handler(task: Task) -> Task:  # ✓
    ...

@on_task
async def handler(thing: Task):         # ✗ raises TypeError: must be named `task`
    ...

Handler types

The decorator detects which kind you wrote by inspecting whether your function is an async generator.

Regular handler

Sync or async, returns the (possibly mutated) Task.
@on_task
async def handler(task: Task) -> Task:
    task.result = "done"
    return task
The runtime persists the returned task automatically. Don’t call task.asave() yourself unless you need to checkpoint mid-handler.

Streaming handler

Async generator that yields TaskUpdateEvents. Used for token-by-token streaming responses. A streaming handler emits two kinds of events. Each token (or text chunk) from the LLM is yielded as a TaskUpdateEventType.Chunk event, which the platform forwards to subscribers in real time. After the loop ends, the handler yields one final TaskUpdateEventType.TaskFinished event carrying the completed Task object. The platform uses that final event to mark the task complete and persist its result.
from datetime import datetime, timezone
from xpander_sdk import on_task, TaskUpdateEvent, TaskUpdateEventType

@on_task
async def handler(task: Task):
    async for chunk in stream_from_llm(task.to_message()):
        yield TaskUpdateEvent(
            type=TaskUpdateEventType.Chunk,
            task_id=task.id,
            organization_id=task.organization_id,
            time=datetime.now(timezone.utc),
            data=chunk,
        )

    yield TaskUpdateEvent(
        type=TaskUpdateEventType.TaskFinished,
        task_id=task.id,
        organization_id=task.organization_id,
        time=datetime.now(timezone.utc),
        data=task,  # final Task object
    )
Streaming handlers are exposed via POST /invoke only: the SSE listener wraps them in an adapter that consumes the generator and tracks the final Task from the TaskFinished event.

What the runtime does

When you decorate a function with @on_task:
  1. Validates the signature (must accept task).
  2. Detects handler type (regular vs streaming).
  3. Starts an embedded HTTP server on port 59321 for POST /invoke (always, in a daemon thread). Override the port with XPANDER_STREAMING_PORT.
  4. Registers an SSE listener with the Events module. The listener:
    • Reads XPANDER_API_KEY, XPANDER_ORGANIZATION_ID, XPANDER_AGENT_ID from env (raises ModuleException if any are missing).
    • Subscribes to the platform’s task-dispatch stream.
    • Acquires a semaphore (max_sync_workers=6 by default) before dispatching.
    • Sets task status to Executing before calling your handler.
    • Calls your handler.
    • Persists the returned task; if your handler raised, marks the task Error with the exception string as the result.
    • Reports metrics if task.tokens is set.
    • Re-runs the handler with a continuation prompt if deep planning is enabled (multi-step task plans tracked in task.deep_planning) and items remain incomplete, up to MAX_PLAN_RETRIES = 5. Pre-retry the runtime triggers session compaction.

Test mode

Pass test_task (or use the CLI override) to invoke the handler once locally instead of subscribing to the platform.

With a LocalTaskTest

from xpander_sdk import on_task
from xpander_sdk.modules.tasks.models.task import LocalTaskTest, AgentExecutionInput
from xpander_sdk.models.shared import OutputFormat

local_test_task = LocalTaskTest(
    input=AgentExecutionInput(text="What can you do?"),
    output_format=OutputFormat.Json,
    output_schema={"capabilities": "list of capabilities"},
)

@on_task(test_task=local_test_task)
async def handler(task: Task) -> Task:
    task.result = {"capabilities": ["Search", "Summarize"]}
    return task
The runtime registers a worker, creates the test task, dispatches it to the handler, and exits after completion (printing the final result). Useful for local iteration without invoking from the dashboard.

From the CLI

The decorator also responds to --invoke / --prompt arguments on sys.argv, so you can run any @on_task-decorated module with:
python worker.py --invoke --prompt "Try this prompt" --output_format json
Supported flags:
FlagPurpose
--invokeSwitches the runtime into single-task test mode.
--promptRequired when --invoke is set. Becomes task.input.text.
--output_formatOne of json, markdown, text.
--output_schemaJSON-encoded schema string.

Runtime caveats

  • One handler per process. The decorator subscribes to the SSE stream on import. Decorating multiple functions in the same module isn’t useful: the most recently registered handler wins. Use multiple Python processes (or container instances) if you need to host different agents.
  • HTTP server starts immediately. The POST /invoke endpoint is up before the SSE listener finishes connecting. If port 59321 is in use the runtime logs a warning and continues. Override the port with the XPANDER_STREAMING_PORT env var.
  • Synchronous handlers are dispatched on a thread pool. A thread pool of max_sync_workers=6 handles sync handlers concurrently. Async handlers run on the event loop.

Errors raised by the decorator

  • TypeError: handler doesn’t accept task, or returns/yields the wrong type.
  • ModuleException: required env vars are missing when the SSE listener starts.