> ## Documentation Index
> Fetch the complete documentation index at: https://docs.xpander.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# events

> Stream task updates as they happen.

`Task.aevents()` is an async generator over `TaskUpdateEvent`s emitted by the platform as the task runs: message chunks, tool-call requests/results, reasoning steps, sub-agent triggers, plan updates, and auth events. Use it to build live UIs or to react to specific event types in real time.

```python theme={"dark"}
from xpander_sdk import TaskUpdateEventType

# Task must be created with events_streaming=True
task = await agent.acreate_task(
    prompt="Long-running research task",
    events_streaming=True,
)

async for event in task.aevents():
    print(event.type, event.task_id)
    if event.type == TaskUpdateEventType.TaskFinished:
        break
```

<Warning>
  The task **must** be created with `events_streaming=True`. Calling `aevents()` on a non-streaming task raises `ValueError` immediately.
</Warning>

### Parameters

None.

### Yields `TaskUpdateEvent`

Each event has:

| Field             | Type                  | Description                    |
| ----------------- | --------------------- | ------------------------------ |
| `type`            | `TaskUpdateEventType` | What kind of event. See below. |
| `task_id`         | `str`                 | Task this event belongs to.    |
| `organization_id` | `str`                 | Owning organization.           |
| `time`            | `datetime`            | Event timestamp.               |
| `data`            | `Any`                 | Event-specific payload.        |

#### Event types

| `TaskUpdateEventType` | When                                                         | `data` payload                                                          |
| --------------------- | ------------------------------------------------------------ | ----------------------------------------------------------------------- |
| `TaskCreated`         | Task is created.                                             | A `Task` object.                                                        |
| `TaskUpdated`         | Any non-final task mutation (status change, result patched). | A `Task` object.                                                        |
| `TaskFinished`        | Task reaches a terminal state.                               | A `Task` object.                                                        |
| `Chunk`               | Streaming text from the LLM.                                 | `str` (text chunk).                                                     |
| `AuthEvent`           | Auth required (MCP OAuth, ECA).                              | Auth-specific dict. Also dispatched to `@on_auth_event` handlers.       |
| `ToolCallRequest`     | Agent invokes a tool.                                        | A `ToolCallRequest` (operation\_id, tool\_name, payload, reasoning, …). |
| `ToolCallResult`      | Tool finishes.                                               | A `ToolCallResult` (operation\_id, result, is\_error, …).               |
| `SubAgentTrigger`     | A sub-agent task is created.                                 | Triggering details.                                                     |
| `Think`               | Reasoning step (think tool).                                 | Reasoning data.                                                         |
| `Analyze`             | Reasoning step (analyze tool).                               | Reasoning data.                                                         |
| `PlanUpdated`         | Deep-planning state changes.                                 | A `DeepPlanning` object.                                                |
| `TaskCompactization`  | Auto-compaction event (Layer 2).                             | A `TaskCompactizationEvent`.                                            |

If a typed payload fails to parse (e.g. due to schema drift), the SDK still yields the event with `data` set to the raw dict: the envelope (`type`, `task_id`, `time`) always propagates.

## Examples

### Filter on event type

```python theme={"dark"}
async for event in task.aevents():
    if event.type == TaskUpdateEventType.Chunk:
        print(event.data, end="", flush=True)
    elif event.type == TaskUpdateEventType.TaskFinished:
        print()
        break
```

`Chunk` events have a string `data` payload (the streaming text), making it easy to render token-by-token output.

### Render tool calls

```python theme={"dark"}
async for event in task.aevents():
    if event.type == TaskUpdateEventType.ToolCallRequest:
        req = event.data
        print(f"→ {req.tool_name}({req.payload})")
    elif event.type == TaskUpdateEventType.ToolCallResult:
        res = event.data
        status = "✗" if res.is_error else "✓"
        print(f"{status} {res.tool_name}: {res.result}")
```

### Stop on first failure

```python theme={"dark"}
async for event in task.aevents():
    if event.type == TaskUpdateEventType.ToolCallResult and event.data.is_error:
        print("Tool failed, stopping.")
        await task.astop()
        break
```

### Track auth flows in-band

```python theme={"dark"}
async for event in task.aevents():
    if event.type == TaskUpdateEventType.AuthEvent:
        print(f"Auth required: {event.data}")
        # Show URL to user, etc.
```

If you've registered an `@on_auth_event` handler, it fires automatically: the in-band event lets you also handle it inline.

## Sync version

```python theme={"dark"}
for event in task.events():
    print(event.type, event.data)
```

`task.events()` consumes the async generator on a synchronous executor and yields events in order. Don't call from inside a running event loop.

## Lifecycle

The generator reconnects automatically if the SSE stream drops mid-task. It exits when:

1. The platform closes the stream (typically after `TaskFinished`).
2. The connection fails permanently (network, auth).
3. You `break` out of the loop.

## Errors

* `ValueError`: task wasn't created with `events_streaming=True`.
* Network failures propagate after the SSE retry budget is exhausted.

## When *not* to stream

For batch jobs that don't care about progress, skip `events_streaming` and poll `task.areload()` (or use webhooks). Streaming holds an HTTP connection open per task: fine for one or two, expensive for hundreds.
