> ## Documentation Index
> Fetch the complete documentation index at: https://docs.xpander.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# @on_task

> Register a function as the agent's task handler.

`@on_task` registers a function as the agent's task executor. The decorated function runs every time the platform dispatches a task to this worker: over SSE in production, and via the embedded HTTP server (`POST /invoke` on port 59321) for local invocations and Cloud Run-style integrations.

The runtime auto-detects whether your function is a regular handler (returns `Task`) or a streaming handler (async generator that yields `TaskUpdateEvent`).

```python theme={"dark"}
from xpander_sdk import Backend, on_task
from xpander_sdk.modules.tasks.sub_modules.task import Task
from agno.agent import Agent as AgnoAgent

backend = Backend()

@on_task
async def handler(task: Task) -> Task:
    args = await backend.aget_args(
        agent_id=task.agent_id,
        agent_version=task.agent_version,
        task=task,
    )
    agno_agent = AgnoAgent(**args)
    result = await agno_agent.arun(input=task.to_message())
    task.result = result.content
    return task
```

## Decorator forms

```python theme={"dark"}
@on_task
def fn(task: Task) -> Task: ...

@on_task(configuration=config)
def fn(task: Task) -> Task: ...

@on_task(test_task=local_test_task)
async def fn(task: Task) -> Task: ...
```

| Parameter       | Type            | Default | Description                                                                                                                               |
| --------------- | --------------- | ------- | ----------------------------------------------------------------------------------------------------------------------------------------- |
| `configuration` | `Configuration` | `None`  | SDK config for the underlying `Events` module. Falls back to env vars.                                                                    |
| `test_task`     | `LocalTaskTest` | `None`  | A simulated task to run locally. The runtime invokes the handler once with this task and exits, instead of subscribing to the SSE stream. |

### Required signature

The handler must accept exactly one parameter named `task`. Either positional or keyword form works.

```python theme={"dark"}
@on_task
def handler(task: Task) -> Task:        # ✓
    ...

@on_task
async def handler(task: Task) -> Task:  # ✓
    ...

@on_task
async def handler(thing: Task):         # ✗ raises TypeError: must be named `task`
    ...
```

## Handler types

The decorator detects which kind you wrote by inspecting whether your function is an async generator.

### Regular handler

Sync or async, returns the (possibly mutated) `Task`.

```python theme={"dark"}
@on_task
async def handler(task: Task) -> Task:
    task.result = "done"
    return task
```

The runtime persists the returned task automatically. Don't call `task.asave()` yourself unless you need to checkpoint mid-handler.

### Streaming handler

Async generator that yields `TaskUpdateEvent`s. Used for token-by-token streaming responses.

A streaming handler emits two kinds of events. Each token (or text chunk) from the LLM is yielded as a `TaskUpdateEventType.Chunk` event, which the platform forwards to subscribers in real time. After the loop ends, the handler yields one final `TaskUpdateEventType.TaskFinished` event carrying the completed `Task` object. The platform uses that final event to mark the task complete and persist its result.

```python theme={"dark"}
from datetime import datetime, timezone
from xpander_sdk import on_task, TaskUpdateEvent, TaskUpdateEventType

@on_task
async def handler(task: Task):
    async for chunk in stream_from_llm(task.to_message()):
        yield TaskUpdateEvent(
            type=TaskUpdateEventType.Chunk,
            task_id=task.id,
            organization_id=task.organization_id,
            time=datetime.now(timezone.utc),
            data=chunk,
        )

    yield TaskUpdateEvent(
        type=TaskUpdateEventType.TaskFinished,
        task_id=task.id,
        organization_id=task.organization_id,
        time=datetime.now(timezone.utc),
        data=task,  # final Task object
    )
```

Streaming handlers are exposed via `POST /invoke` only: the SSE listener wraps them in an adapter that consumes the generator and tracks the final `Task` from the `TaskFinished` event.

## What the runtime does

When you decorate a function with `@on_task`:

1. **Validates the signature** (must accept `task`).
2. **Detects handler type** (regular vs streaming).
3. **Starts an embedded HTTP server** on port `59321` for `POST /invoke` (always, in a daemon thread). Override the port with `XPANDER_STREAMING_PORT`.
4. **Registers an SSE listener** with the `Events` module. The listener:
   * Reads `XPANDER_API_KEY`, `XPANDER_ORGANIZATION_ID`, `XPANDER_AGENT_ID` from env (raises `ModuleException` if any are missing).
   * Subscribes to the platform's task-dispatch stream.
   * Acquires a semaphore (`max_sync_workers=6` by default) before dispatching.
   * Sets task status to `Executing` before calling your handler.
   * Calls your handler.
   * Persists the returned task; if your handler raised, marks the task `Error` with the exception string as the result.
   * Reports metrics if `task.tokens` is set.
   * Re-runs the handler with a continuation prompt if [deep planning](/developers/sdk-reference/tasks/task#state) is enabled (multi-step task plans tracked in `task.deep_planning`) and items remain incomplete, up to `MAX_PLAN_RETRIES = 5`. Pre-retry the runtime triggers session compaction.

## Test mode

Pass `test_task` (or use the CLI override) to invoke the handler once locally instead of subscribing to the platform.

### With a `LocalTaskTest`

```python theme={"dark"}
from xpander_sdk import on_task
from xpander_sdk.modules.tasks.models.task import LocalTaskTest, AgentExecutionInput
from xpander_sdk.models.shared import OutputFormat

local_test_task = LocalTaskTest(
    input=AgentExecutionInput(text="What can you do?"),
    output_format=OutputFormat.Json,
    output_schema={"capabilities": "list of capabilities"},
)

@on_task(test_task=local_test_task)
async def handler(task: Task) -> Task:
    task.result = {"capabilities": ["Search", "Summarize"]}
    return task
```

The runtime registers a worker, creates the test task, dispatches it to the handler, and exits after completion (printing the final result). Useful for local iteration without invoking from the dashboard.

### From the CLI

The decorator also responds to `--invoke` / `--prompt` arguments on `sys.argv`, so you can run any `@on_task`-decorated module with:

```bash theme={"dark"}
python worker.py --invoke --prompt "Try this prompt" --output_format json
```

Supported flags:

| Flag              | Purpose                                                     |
| ----------------- | ----------------------------------------------------------- |
| `--invoke`        | Switches the runtime into single-task test mode.            |
| `--prompt`        | Required when `--invoke` is set. Becomes `task.input.text`. |
| `--output_format` | One of `json`, `markdown`, `text`.                          |
| `--output_schema` | JSON-encoded schema string.                                 |

## Runtime caveats

* **One handler per process.** The decorator subscribes to the SSE stream on import. Decorating multiple functions in the same module isn't useful: the most recently registered handler wins. Use multiple Python processes (or container instances) if you need to host different agents.
* **HTTP server starts immediately.** The `POST /invoke` endpoint is up before the SSE listener finishes connecting. If port `59321` is in use the runtime logs a warning and continues. Override the port with the `XPANDER_STREAMING_PORT` env var.
* **Synchronous handlers are dispatched on a thread pool.** A thread pool of `max_sync_workers=6` handles sync handlers concurrently. Async handlers run on the event loop.

## Errors raised by the decorator

* `TypeError`: handler doesn't accept `task`, or returns/yields the wrong type.
* `ModuleException`: required env vars are missing when the SSE listener starts.
