Documentation Index
Fetch the complete documentation index at: https://docs.xpander.ai/llms.txt
Use this file to discover all available pages before exploring further.
Task represents a single agent execution. You get one back from Agent.acreate_task, Tasks.aget, Tasks.acreate, and the @on_task handler. It carries the full task state (input, status, result, deep-planning, tokens, …) and the methods needed to drive it forward.
from xpander_sdk import Tasks
task = await Tasks().aget(task_id="task_xyz")
print(task.id, task.status.value, task.result)
Class methods
Task.aload(task_id, configuration=None) -> Task
Same as Tasks.aget but available without a Tasks instance. Useful when you have a Configuration but don’t want to instantiate the module class.
from xpander_sdk import Task, Configuration
config = Configuration(api_key="...", organization_id="...")
task = await Task.aload(task_id="task_xyz", configuration=config)
The sync sibling is Task.load(...).
Task.areport_external_task(...) (classmethod)
See the dedicated report_external_task page.
Attributes
Identity
| Attribute | Type | Description |
|---|
id | str | Task ID. |
agent_id | str | Owning agent. |
organization_id | str | Owning organization. |
agent_version | str | None | Agent version pinned for this run. |
triggering_agent_id | str | None | Agent that triggered this (sub-agent flows). |
parent_execution | str | None | Parent task ID for sub-tasks. |
sub_executions | list[str] | Child task IDs. |
source | str | None | Origin tag ("sdk", "webhook", "slack", …). |
title | str | None | Display title. |
| Attribute | Type | Description |
|---|
input | AgentExecutionInput | Holds text, files: list[str], and user: User | None. |
payload_extension | dict | None | Extra fields merged into every tool-call payload. |
additional_context | str | None | Extra context appended to the system prompt. |
instructions_override | str | None | Extra instructions for this run. |
expected_output | str | None | Expected output description. |
mcp_servers | list[MCPServerDetails] | Per-task MCP servers. |
user_oidc_token | str | None | OIDC token for connector pre-auth. |
user_tokens | dict | None | Pre-computed user tokens for MCP auth. |
disable_attachment_injection | bool | Skip auto-inlining of human-readable file content. |
output_format | OutputFormat | None | Output format. |
output_schema | dict | None | JSON schema for structured output. |
voice_id | str | None | Voice ID for OutputFormat.Voice. |
think_mode | ThinkMode | Default or Harder. |
State
| Attribute | Type | Description |
|---|
status | AgentExecutionStatus | One of Pending, Executing, Paused, Error, Failed, Completed, Stopped. |
internal_status | str | None | Finer-grained status used by the platform internally. |
last_executed_node_id | str | None | Cursor in the execution graph (workflow agents). |
is_manually_stopped | bool | True after astop. |
is_orchestration | bool | True for orchestration tasks. |
events_streaming | bool | Whether SSE streaming is enabled. |
hitl_request | HumanInTheLoopRequest | None | Set when the task is paused awaiting human approval. |
pending_eca_request | PendingECARequest | None | Set when an ECA (External Credential Authorization) is pending. |
deep_planning | DeepPlanning | Deep-planning state (plan items, completion flags). |
execution_attempts | int | Number of plan-following attempts (1 for first execution). |
return_metrics | bool | Whether the task returns metrics. |
Output
| Attribute | Type | Description |
|---|
result | str | None | Final result. |
tokens | Tokens | None | Token usage (prompt + completion). |
used_tools | list[str] | Names of tools the task invoked. |
duration | float | Wall-clock duration in seconds. |
Timestamps
| Attribute | Type | Description |
|---|
created_at | datetime | Creation timestamp. |
started_at | datetime | None | When the worker picked it up. |
paused_at | datetime | None | When it last paused. |
finished_at | datetime | None | When it reached a terminal state. |
Internal
| Attribute | Type | Description |
|---|
configuration | Configuration | None | SDK configuration. Excluded from serialization. |
test_run_node_id | str | None | (Internal) Test workflow node id. |
Instance methods
save
task.asave(with_deep_plan_update: bool = False): PATCHes the task on the platform with the current in-memory state. By default deep_planning is excluded (the platform’s plan state is authoritative); pass with_deep_plan_update=True to push your plan updates back.
task.result = "Done"
task.status = AgentExecutionStatus.Completed
await task.asave()
After saving, the response repopulates the local instance, but deep_planning and additional_context are restored from local memory if the API returns empty values (defensive against stale API state during retries). Sync: task.save(...).
set_status
task.aset_status(status, result=None): convenience wrapper that sets status (and optionally result) and calls asave() in one step.
await task.aset_status(AgentExecutionStatus.Executing)
Sync: task.set_status(...).
stop
task.astop(): terminates the task and updates the local instance with the platform’s response. Sync: task.stop().
reload
task.areload(): re-fetches the task from the platform and updates the in-memory instance. deep_planning and additional_context are restored from local memory if the API returns empty values. Sync: task.reload().
events
task.aevents() / task.events(): SSE stream of TaskUpdateEvent. Requires events_streaming=True at task creation. See streaming events.
get_activity_log
task.aget_activity_log() / task.get_activity_log(): full message / tool-call / reasoning thread for the task. See activity log.
report_metrics
task.areport_metrics() / task.report_metrics(): pushes token counts and tool usage to the platform. See report metrics.
get_plan_following_status
task.aget_plan_following_status() -> PlanFollowingStatus: checks whether deep-planning tasks are complete. Returns PlanFollowingStatus(can_finish: bool, uncompleted_tasks: list[DeepPlanningItem]). Used by the runtime to retry the agent until the plan is satisfied. Sync: task.get_plan_following_status().
acompact_session_for_retry
task.acompact_session_for_retry() -> CompactRetryResult: forces L2 (auto) compaction on the agent’s session before a plan-following retry. Used internally by the runtime; rarely needed from user code.
Agno helpers
These methods help shape the task input for Agno agents.
get_files
task.get_files() -> list[Any]
Returns PDF files from task.input.files as Agno File objects (when Agno is installed). Returns the URL strings as a fallback. Empty list if the task has no PDFs.
get_images
task.get_images() -> list[Any]
Returns image files as Agno Image objects (or URL strings as fallback). Empty list if no images.
get_human_readable_files
task.get_human_readable_files() -> list[dict[str, str]]
Fetches and parses text-based files (CSV, JSON, TXT, .py, …) and returns [{"url": ..., "content": ...}]. Used by to_message() to inline file contents into the prompt. Honors task.disable_attachment_injection.
to_message
task.to_message(retry_count: int = 0) -> str
Builds the message string to pass to the framework agent. Format:
{input.text}
Files: url1, url2, ...
Files contents:
{json of each readable file}
On retries (retry_count > 0) and when deep planning is active, it generates a continuation prompt that lists completed/uncompleted tasks and pinpoints the next one to work on. This is the prompt format the runtime feeds to Agno when retrying after an incomplete plan.
@on_task
async def handler(task: Task) -> Task:
args = await backend.aget_args(agent_id=task.agent_id, task=task)
agno_agent = AgnoAgent(**args)
result = await agno_agent.arun(
input=task.to_message(), # text + files + readable contents
files=task.get_files(), # PDFs as Agno File
images=task.get_images(), # images as Agno Image
)
task.result = result.content
return task