Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.xpander.ai/llms.txt

Use this file to discover all available pages before exploring further.

Task represents a single agent execution. You get one back from Agent.acreate_task, Tasks.aget, Tasks.acreate, and the @on_task handler. It carries the full task state (input, status, result, deep-planning, tokens, …) and the methods needed to drive it forward.
from xpander_sdk import Tasks

task = await Tasks().aget(task_id="task_xyz")
print(task.id, task.status.value, task.result)

Class methods

Task.aload(task_id, configuration=None) -> Task

Same as Tasks.aget but available without a Tasks instance. Useful when you have a Configuration but don’t want to instantiate the module class.
from xpander_sdk import Task, Configuration

config = Configuration(api_key="...", organization_id="...")
task = await Task.aload(task_id="task_xyz", configuration=config)
The sync sibling is Task.load(...).

Task.areport_external_task(...) (classmethod)

See the dedicated report_external_task page.

Attributes

Identity

AttributeTypeDescription
idstrTask ID.
agent_idstrOwning agent.
organization_idstrOwning organization.
agent_versionstr | NoneAgent version pinned for this run.
triggering_agent_idstr | NoneAgent that triggered this (sub-agent flows).
parent_executionstr | NoneParent task ID for sub-tasks.
sub_executionslist[str]Child task IDs.
sourcestr | NoneOrigin tag ("sdk", "webhook", "slack", …).
titlestr | NoneDisplay title.

Input

AttributeTypeDescription
inputAgentExecutionInputHolds text, files: list[str], and user: User | None.
payload_extensiondict | NoneExtra fields merged into every tool-call payload.
additional_contextstr | NoneExtra context appended to the system prompt.
instructions_overridestr | NoneExtra instructions for this run.
expected_outputstr | NoneExpected output description.
mcp_serverslist[MCPServerDetails]Per-task MCP servers.
user_oidc_tokenstr | NoneOIDC token for connector pre-auth.
user_tokensdict | NonePre-computed user tokens for MCP auth.
disable_attachment_injectionboolSkip auto-inlining of human-readable file content.
output_formatOutputFormat | NoneOutput format.
output_schemadict | NoneJSON schema for structured output.
voice_idstr | NoneVoice ID for OutputFormat.Voice.
think_modeThinkModeDefault or Harder.

State

AttributeTypeDescription
statusAgentExecutionStatusOne of Pending, Executing, Paused, Error, Failed, Completed, Stopped.
internal_statusstr | NoneFiner-grained status used by the platform internally.
last_executed_node_idstr | NoneCursor in the execution graph (workflow agents).
is_manually_stoppedboolTrue after astop.
is_orchestrationboolTrue for orchestration tasks.
events_streamingboolWhether SSE streaming is enabled.
hitl_requestHumanInTheLoopRequest | NoneSet when the task is paused awaiting human approval.
pending_eca_requestPendingECARequest | NoneSet when an ECA (External Credential Authorization) is pending.
deep_planningDeepPlanningDeep-planning state (plan items, completion flags).
execution_attemptsintNumber of plan-following attempts (1 for first execution).
return_metricsboolWhether the task returns metrics.

Output

AttributeTypeDescription
resultstr | NoneFinal result.
tokensTokens | NoneToken usage (prompt + completion).
used_toolslist[str]Names of tools the task invoked.
durationfloatWall-clock duration in seconds.

Timestamps

AttributeTypeDescription
created_atdatetimeCreation timestamp.
started_atdatetime | NoneWhen the worker picked it up.
paused_atdatetime | NoneWhen it last paused.
finished_atdatetime | NoneWhen it reached a terminal state.

Internal

AttributeTypeDescription
configurationConfiguration | NoneSDK configuration. Excluded from serialization.
test_run_node_idstr | None(Internal) Test workflow node id.

Instance methods

save

task.asave(with_deep_plan_update: bool = False): PATCHes the task on the platform with the current in-memory state. By default deep_planning is excluded (the platform’s plan state is authoritative); pass with_deep_plan_update=True to push your plan updates back.
task.result = "Done"
task.status = AgentExecutionStatus.Completed
await task.asave()
After saving, the response repopulates the local instance, but deep_planning and additional_context are restored from local memory if the API returns empty values (defensive against stale API state during retries). Sync: task.save(...).

set_status

task.aset_status(status, result=None): convenience wrapper that sets status (and optionally result) and calls asave() in one step.
await task.aset_status(AgentExecutionStatus.Executing)
Sync: task.set_status(...).

stop

task.astop(): terminates the task and updates the local instance with the platform’s response. Sync: task.stop().

reload

task.areload(): re-fetches the task from the platform and updates the in-memory instance. deep_planning and additional_context are restored from local memory if the API returns empty values. Sync: task.reload().

events

task.aevents() / task.events(): SSE stream of TaskUpdateEvent. Requires events_streaming=True at task creation. See streaming events.

get_activity_log

task.aget_activity_log() / task.get_activity_log(): full message / tool-call / reasoning thread for the task. See activity log.

report_metrics

task.areport_metrics() / task.report_metrics(): pushes token counts and tool usage to the platform. See report metrics.

get_plan_following_status

task.aget_plan_following_status() -> PlanFollowingStatus: checks whether deep-planning tasks are complete. Returns PlanFollowingStatus(can_finish: bool, uncompleted_tasks: list[DeepPlanningItem]). Used by the runtime to retry the agent until the plan is satisfied. Sync: task.get_plan_following_status().

acompact_session_for_retry

task.acompact_session_for_retry() -> CompactRetryResult: forces L2 (auto) compaction on the agent’s session before a plan-following retry. Used internally by the runtime; rarely needed from user code.

Agno helpers

These methods help shape the task input for Agno agents.

get_files

task.get_files() -> list[Any]
Returns PDF files from task.input.files as Agno File objects (when Agno is installed). Returns the URL strings as a fallback. Empty list if the task has no PDFs.

get_images

task.get_images() -> list[Any]
Returns image files as Agno Image objects (or URL strings as fallback). Empty list if no images.

get_human_readable_files

task.get_human_readable_files() -> list[dict[str, str]]
Fetches and parses text-based files (CSV, JSON, TXT, .py, …) and returns [{"url": ..., "content": ...}]. Used by to_message() to inline file contents into the prompt. Honors task.disable_attachment_injection.

to_message

task.to_message(retry_count: int = 0) -> str
Builds the message string to pass to the framework agent. Format:
{input.text}
Files: url1, url2, ...
Files contents:
{json of each readable file}
On retries (retry_count > 0) and when deep planning is active, it generates a continuation prompt that lists completed/uncompleted tasks and pinpoints the next one to work on. This is the prompt format the runtime feeds to Agno when retrying after an incomplete plan.
@on_task
async def handler(task: Task) -> Task:
    args = await backend.aget_args(agent_id=task.agent_id, task=task)
    agno_agent = AgnoAgent(**args)

    result = await agno_agent.arun(
        input=task.to_message(),       # text + files + readable contents
        files=task.get_files(),        # PDFs as Agno File
        images=task.get_images(),      # images as Agno Image
    )
    task.result = result.content
    return task