> ## Documentation Index
> Fetch the complete documentation index at: https://docs.xpander.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Task class

> Task object attributes and instance methods.

`Task` represents a single agent execution. You get one back from `Agent.acreate_task`, `Tasks.aget`, `Tasks.acreate`, and the `@on_task` handler. It carries the full task state (input, status, result, deep-planning, tokens, …) and the methods needed to drive it forward.

```python theme={"dark"}
from xpander_sdk import Tasks

task = await Tasks().aget(task_id="task_xyz")
print(task.id, task.status.value, task.result)
```

## Class methods

### `Task.aload(task_id, configuration=None) -> Task`

Same as `Tasks.aget` but available without a `Tasks` instance. Useful when you have a `Configuration` but don't want to instantiate the module class.

```python theme={"dark"}
from xpander_sdk import Task, Configuration

config = Configuration(api_key="...", organization_id="...")
task = await Task.aload(task_id="task_xyz", configuration=config)
```

The sync sibling is `Task.load(...)`.

### `Task.areport_external_task(...)` (classmethod)

See the dedicated [`report_external_task`](/developers/sdk-reference/tasks/report-external-task) page.

## Attributes

### Identity

| Attribute             | Type          | Description                                      |
| --------------------- | ------------- | ------------------------------------------------ |
| `id`                  | `str`         | Task ID.                                         |
| `agent_id`            | `str`         | Owning agent.                                    |
| `organization_id`     | `str`         | Owning organization.                             |
| `agent_version`       | `str \| None` | Agent version pinned for this run.               |
| `triggering_agent_id` | `str \| None` | Agent that triggered this (sub-agent flows).     |
| `parent_execution`    | `str \| None` | Parent task ID for sub-tasks.                    |
| `sub_executions`      | `list[str]`   | Child task IDs.                                  |
| `source`              | `str \| None` | Origin tag (`"sdk"`, `"webhook"`, `"slack"`, …). |
| `title`               | `str \| None` | Display title.                                   |

### Input

| Attribute                      | Type                     | Description                                                 |
| ------------------------------ | ------------------------ | ----------------------------------------------------------- |
| `input`                        | `AgentExecutionInput`    | Holds `text`, `files: list[str]`, and `user: User \| None`. |
| `payload_extension`            | `dict \| None`           | Extra fields merged into every tool-call payload.           |
| `additional_context`           | `str \| None`            | Extra context appended to the system prompt.                |
| `instructions_override`        | `str \| None`            | Extra instructions for this run.                            |
| `expected_output`              | `str \| None`            | Expected output description.                                |
| `mcp_servers`                  | `list[MCPServerDetails]` | Per-task MCP servers.                                       |
| `user_oidc_token`              | `str \| None`            | OIDC token for connector pre-auth.                          |
| `user_tokens`                  | `dict \| None`           | Pre-computed user tokens for MCP auth.                      |
| `disable_attachment_injection` | `bool`                   | Skip auto-inlining of human-readable file content.          |
| `output_format`                | `OutputFormat \| None`   | Output format.                                              |
| `output_schema`                | `dict \| None`           | JSON schema for structured output.                          |
| `voice_id`                     | `str \| None`            | Voice ID for `OutputFormat.Voice`.                          |
| `think_mode`                   | `ThinkMode`              | `Default` or `Harder`.                                      |

### State

| Attribute               | Type                            | Description                                                                         |
| ----------------------- | ------------------------------- | ----------------------------------------------------------------------------------- |
| `status`                | `AgentExecutionStatus`          | One of `Pending`, `Executing`, `Paused`, `Error`, `Failed`, `Completed`, `Stopped`. |
| `internal_status`       | `str \| None`                   | Finer-grained status used by the platform internally.                               |
| `last_executed_node_id` | `str \| None`                   | Cursor in the execution graph (workflow agents).                                    |
| `is_manually_stopped`   | `bool`                          | `True` after `astop`.                                                               |
| `is_orchestration`      | `bool`                          | `True` for orchestration tasks.                                                     |
| `events_streaming`      | `bool`                          | Whether SSE streaming is enabled.                                                   |
| `hitl_request`          | `HumanInTheLoopRequest \| None` | Set when the task is paused awaiting human approval.                                |
| `pending_eca_request`   | `PendingECARequest \| None`     | Set when an ECA (External Credential Authorization) is pending.                     |
| `deep_planning`         | `DeepPlanning`                  | Deep-planning state (plan items, completion flags).                                 |
| `execution_attempts`    | `int`                           | Number of plan-following attempts (1 for first execution).                          |
| `return_metrics`        | `bool`                          | Whether the task returns metrics.                                                   |

### Output

| Attribute    | Type             | Description                        |
| ------------ | ---------------- | ---------------------------------- |
| `result`     | `str \| None`    | Final result.                      |
| `tokens`     | `Tokens \| None` | Token usage (prompt + completion). |
| `used_tools` | `list[str]`      | Names of tools the task invoked.   |
| `duration`   | `float`          | Wall-clock duration in seconds.    |

### Timestamps

| Attribute     | Type               | Description                       |
| ------------- | ------------------ | --------------------------------- |
| `created_at`  | `datetime`         | Creation timestamp.               |
| `started_at`  | `datetime \| None` | When the worker picked it up.     |
| `paused_at`   | `datetime \| None` | When it last paused.              |
| `finished_at` | `datetime \| None` | When it reached a terminal state. |

### Internal

| Attribute          | Type                    | Description                                     |
| ------------------ | ----------------------- | ----------------------------------------------- |
| `configuration`    | `Configuration \| None` | SDK configuration. Excluded from serialization. |
| `test_run_node_id` | `str \| None`           | (Internal) Test workflow node id.               |

## Instance methods

### `save`

`task.asave(with_deep_plan_update: bool = False)`: PATCHes the task on the platform with the current in-memory state. By default `deep_planning` is excluded (the platform's plan state is authoritative); pass `with_deep_plan_update=True` to push your plan updates back.

```python theme={"dark"}
task.result = "Done"
task.status = AgentExecutionStatus.Completed
await task.asave()
```

After saving, the response repopulates the local instance, but `deep_planning` and `additional_context` are restored from local memory if the API returns empty values (defensive against stale API state during retries). Sync: `task.save(...)`.

### `set_status`

`task.aset_status(status, result=None)`: convenience wrapper that sets `status` (and optionally `result`) and calls `asave()` in one step.

```python theme={"dark"}
await task.aset_status(AgentExecutionStatus.Executing)
```

Sync: `task.set_status(...)`.

### `stop`

`task.astop()`: terminates the task and updates the local instance with the platform's response. Sync: `task.stop()`.

### `reload`

`task.areload()`: re-fetches the task from the platform and updates the in-memory instance. `deep_planning` and `additional_context` are restored from local memory if the API returns empty values. Sync: `task.reload()`.

### `events`

`task.aevents()` / `task.events()`: SSE stream of `TaskUpdateEvent`. Requires `events_streaming=True` at task creation. See [streaming events](/developers/sdk-reference/tasks/events).

### `get_activity_log`

`task.aget_activity_log()` / `task.get_activity_log()`: full message / tool-call / reasoning thread for the task. See [activity log](/developers/sdk-reference/tasks/activity-log).

### `report_metrics`

`task.areport_metrics()` / `task.report_metrics()`: pushes token counts and tool usage to the platform. See [report metrics](/developers/sdk-reference/tasks/report-metrics).

### `get_plan_following_status`

`task.aget_plan_following_status() -> PlanFollowingStatus`: checks whether deep-planning tasks are complete. Returns `PlanFollowingStatus(can_finish: bool, uncompleted_tasks: list[DeepPlanningItem])`. Used by the runtime to retry the agent until the plan is satisfied. Sync: `task.get_plan_following_status()`.

### `acompact_session_for_retry`

`task.acompact_session_for_retry() -> CompactRetryResult`: forces L2 (auto) compaction on the agent's session before a plan-following retry. Used internally by the runtime; rarely needed from user code.

### Agno helpers

These methods help shape the task input for Agno agents.

#### `get_files`

```python theme={"dark"}
task.get_files() -> list[Any]
```

Returns PDF files from `task.input.files` as Agno `File` objects (when Agno is installed). Returns the URL strings as a fallback. Empty list if the task has no PDFs.

#### `get_images`

```python theme={"dark"}
task.get_images() -> list[Any]
```

Returns image files as Agno `Image` objects (or URL strings as fallback). Empty list if no images.

#### `get_human_readable_files`

```python theme={"dark"}
task.get_human_readable_files() -> list[dict[str, str]]
```

Fetches and parses text-based files (CSV, JSON, TXT, .py, …) and returns `[{"url": ..., "content": ...}]`. Used by `to_message()` to inline file contents into the prompt. Honors `task.disable_attachment_injection`.

#### `to_message`

```python theme={"dark"}
task.to_message(retry_count: int = 0) -> str
```

Builds the message string to pass to the framework agent. Format:

```
{input.text}
Files: url1, url2, ...
Files contents:
{json of each readable file}
```

On retries (`retry_count > 0`) and when deep planning is active, it generates a continuation prompt that lists completed/uncompleted tasks and pinpoints the next one to work on. This is the prompt format the runtime feeds to Agno when retrying after an incomplete plan.

```python theme={"dark"}
@on_task
async def handler(task: Task) -> Task:
    args = await backend.aget_args(agent_id=task.agent_id, task=task)
    agno_agent = AgnoAgent(**args)

    result = await agno_agent.arun(
        input=task.to_message(),       # text + files + readable contents
        files=task.get_files(),        # PDFs as Agno File
        images=task.get_images(),      # images as Agno Image
    )
    task.result = result.content
    return task
```
