Documentation Index
Fetch the complete documentation index at: https://docs.xpander.ai/llms.txt
Use this file to discover all available pages before exploring further.
Task Class
The Task class represents a single task with full execution details and event streaming capabilities. It provides methods for monitoring task progress, streaming events, and managing task lifecycle.
Properties
Unique identifier for the task.
agent_id
Identifier for the associated agent.
status
status: AgentExecutionStatus
Current execution status of the task.
internal_status
internal_status: Optional[str]
Optional internal status field that customers can use to report their own status alongside the standard task status. Limited to 255 characters.
input: AgentExecutionInput
Input parameters provided to the task.
result
Execution result if the task has completed.
events_streaming
Whether real-time event streaming is enabled for this task.
created_at
Timestamp when the task was created.
updated_at
Timestamp when the task was last updated.
output_format: Optional[OutputFormat]
Configured output format for the task results.
output_schema
output_schema: Optional[Dict]
JSON schema for output validation, if specified.
llm_model_provider
llm_model_provider: Optional[str]
Effective LLM provider used for this task (e.g. "openai", "anthropic"). Reflects the per-task override when supplied to acreate() / acreate_task(), otherwise the agent’s configured provider.
llm_model_name
llm_model_name: Optional[str]
Effective model name used for this task (e.g. "gpt-5", "claude-sonnet-4-6"). Reflects the per-task override when supplied, otherwise the agent’s configured model.
llm_reasoning_effort
llm_reasoning_effort: Optional[LLMReasoningEffort]
Effective reasoning effort used for this task. One of LLMReasoningEffort.Low, Medium, High, XHigh. Reflects the per-task override when supplied, otherwise the agent’s configured reasoning effort.
Class Methods
aload(task_id, configuration=None)
Asynchronously load a task by its ID.
@classmethod
async def aload(
cls,
task_id: str,
configuration: Optional[Configuration] = None
) -> Task
Parameters:
task_id (str): Unique identifier for the task
configuration (Optional[Configuration]): SDK configuration
Returns: Complete Task object with full details.
Example:
task = await Task.aload("task-456")
print(f"Task status: {task.status}")
Raises:
- [
ModuleException](/API reference/exceptions): If the task is not found or access is denied.
load(task_id, configuration=None)
Synchronously load a task by its ID.
@classmethod
def load(
cls,
task_id: str,
configuration: Optional[Configuration] = None
) -> Task
Parameters: Same as aload()
Returns: Complete Task object.
Example:
task = Task.load("task-456")
print(f"Task status: {task.status}")
Instance Methods
aset_status(status)
Asynchronously change the task’s execution status.
async def aset_status(status: AgentExecutionStatus) -> None
Parameters:
status (AgentExecutionStatus): New status for the task
Example:
await task.aset_status(AgentExecutionStatus.InProgress)
print(f"Task status updated to: {task.status}")
Related: [AgentExecutionStatus](/API reference/types#agentexecutionstatus)
set_status(status)
Synchronously change the task’s execution status.
def set_status(status: AgentExecutionStatus) -> None
Parameters: Same as aset_status()
Example:
task.set_status(AgentExecutionStatus.Completed)
asave()
Asynchronously save task changes back to the xpander platform.
async def asave() -> Task
Returns: Updated Task object with server-side changes.
Example:
task.result = "Processing completed successfully"
updated_task = await task.asave()
print(f"Task saved with result: {updated_task.result}")
Raises:
- [
ModuleException](/API reference/exceptions): If the save operation fails.
save()
Synchronously save task changes.
Returns: Updated Task object.
Example:
task.result = "Processing completed successfully"
updated_task = task.save()
astop()
Asynchronously stop the task execution.
async def astop() -> Task
Returns: Stopped Task object with updated status.
Example:
stopped_task = await task.astop()
print(f"Task {stopped_task.id} has been stopped")
print(f"Final status: {stopped_task.status}")
Raises:
- [
ModuleException](/API reference/exceptions): If the task cannot be stopped.
stop()
Synchronously stop the task execution.
Returns: Stopped Task object.
Example:
stopped_task = task.stop()
print(f"Task stopped with status: {stopped_task.status}")
aevents()
Asynchronously stream task events in real-time.
async def aevents() -> AsyncGenerator[TaskUpdateEvent, None]
Returns: Async generator yielding TaskUpdateEvent objects.
Example:
async for event in task.aevents():
print(f"Event: {event.type} at {event.time}")
print(f"Data: {event.data}")
if event.type == TaskUpdateEventType.TaskFinished:
print("Task completed!")
break
Related: [TaskUpdateEvent](/API reference/types#taskupdateevent), [TaskUpdateEventType](/API reference/types#taskupdateeventtype)
Note: This method requires events_streaming=True when creating the task.
events()
Synchronously stream task events.
def events() -> Generator[TaskUpdateEvent, None, None]
Returns: Generator yielding TaskUpdateEvent objects.
Example:
for event in task.events():
print(f"Event: {event.type}")
if event.type == "task_finished":
break
get_files()
Get PDF files from task input, formatted for Agno integration.
def get_files() -> list[Any]
Returns: List of Agno File objects (when Agno is available) or URL strings. Returns empty list if no PDF files are present.
Example:
files = task.get_files()
result = await agno_agent.arun(
input=task.to_message(),
files=files
)
get_images()
Get image files from task input, formatted for Agno integration.
def get_images() -> list[Any]
Returns: List of Agno Image objects (when Agno is available) or URL strings. Returns empty list if no image files are present.
Example:
images = task.get_images()
result = await agno_agent.arun(
input=task.to_message(),
images=images
)
get_human_readable_files()
Get human-readable files from task input with their content.
def get_human_readable_files() -> list[dict[str, str]]
Returns: List of dictionaries with ‘url’ and ‘content’ keys. Returns empty list if no human-readable files are present.
Example:
readable_files = task.get_human_readable_files()
for file_data in readable_files:
print(f"File: {file_data['url']}")
print(f"Content: {file_data['content']}")
aget_activity_log()
Asynchronously retrieve the activity log for this task.
async def aget_activity_log() -> AgentActivityThread
Returns: AgentActivityThread object containing complete activity log including messages, tool calls, reasoning steps, sub-agent triggers, and authentication events.
Example:
from xpander_sdk.models.activity import (
AgentActivityThreadMessage,
AgentActivityThreadToolCall
)
task = await Task.aload("task-123")
activity_log = await task.aget_activity_log()
for message in activity_log.messages:
if isinstance(message, AgentActivityThreadMessage):
print(f"{message.role}: {message.content.text}")
elif isinstance(message, AgentActivityThreadToolCall):
print(f"Tool: {message.tool_name}")
print(f"Result: {message.result}")
Raises:
- [
ModuleException](/API reference/exceptions): If the activity log cannot be retrieved or doesn’t exist.
Related: [AgentActivityThread](/API reference/types#agentactivitythread)
get_activity_log()
Synchronously retrieve the activity log for this task.
def get_activity_log() -> AgentActivityThread
Returns: AgentActivityThread object with complete activity log.
Example:
from xpander_sdk.models.activity import AgentActivityThreadMessage
task = Task.load("task-123")
activity_log = task.get_activity_log()
for message in activity_log.messages:
if isinstance(message, AgentActivityThreadMessage):
print(f"{message.role}: {message.content.text}")
Raises:
- [
ModuleException](/API reference/exceptions): If the activity log cannot be retrieved or doesn’t exist.
to_message()
Convert task input to a formatted message string.
Returns: Formatted message string including text, file URLs, and readable file content.
Example:
message = task.to_message()
result = await agno_agent.arun(input=message)
Usage Examples
Complete Task Lifecycle Management
from xpander_sdk import Tasks, Agent
from xpander_sdk.models.shared import AgentExecutionStatus, OutputFormat
async def manage_task_lifecycle():
# Create task with event streaming
agent = await Agent.aload("agent-123")
task = await agent.acreate_task(
prompt="Analyze sales data",
events_streaming=True,
output_format=OutputFormat.Json
)
print(f"Created task: {task.id}")
print(f"Initial status: {task.status}")
# Monitor task events
async for event in task.aevents():
print(f"Event: {event.type}")
if event.type == "task_finished":
# Reload task to get final result
final_task = await task.aload(task.id)
print(f"Final result: {final_task.result}")
break
elif event.type == "tool_call_request":
tool_call = event.data
print(f"Tool requested: {tool_call.tool_name}")
Task Status Management
from xpander_sdk import Task
from xpander_sdk.models.shared import AgentExecutionStatus
async def update_task_status():
# Load existing task
task = await Task.aload("task-456")
# Update status
await task.aset_status(AgentExecutionStatus.InProgress)
# Add custom result
task.result = "Custom processing completed"
# Set custom internal status for tracking
task.internal_status = "Data validation completed"
# Save changes
updated_task = await task.asave()
print(f"Task updated: {updated_task.status}")
print(f"Internal status: {updated_task.internal_status}")
Internal Status Tracking
The internal_status field provides optional custom context information with a 255 character limit. It’s set once before the task is saved or returned.
from xpander_sdk import Tasks
async def track_internal_status():
tasks = Tasks()
# Create task
task = await tasks.acreate(
agent_id="agent-123",
prompt="Process customer orders"
)
# Your processing logic here
order_count = 0
# ... process customer orders ...
order_count = 25 # Example: processed 25 orders
# Set result
task.result = f"Successfully processed {order_count} customer orders"
# Optional: Set internal status once before saving (max 255 characters)
task.internal_status = f"Order processing completed: {order_count} orders validated and shipped"
await task.asave() # internal_status is persisted here
print(f"Task Status: {task.status}") # e.g., "completed"
print(f"Internal Status: {task.internal_status}") # "Order processing completed: 25 orders validated and shipped"
# Example: Handle character limit validation (255 characters max)
detailed_status = f"Comprehensive order processing completed with validation, payment processing, and shipping for {order_count} orders including special handling for priority customers"
if len(detailed_status) > 255:
task.internal_status = detailed_status[:255] # Truncate to fit limit
else:
task.internal_status = detailed_status
await task.asave() # Save the final internal status
Error Handling with Tasks
from xpander_sdk import Task
from xpander_sdk.exceptions import ModuleException
async def safe_task_operations():
try:
# Attempt to load task
task = await Task.aload("non-existent-task")
except ModuleException as e:
if e.status_code == 404:
print("Task not found")
else:
print(f"Error loading task: {e.description}")
return
try:
# Attempt to stop task
await task.astop()
print("Task stopped successfully")
except ModuleException as e:
print(f"Failed to stop task: {e.description}")
Batch Task Monitoring
from xpander_sdk import Tasks
import asyncio
async def monitor_multiple_tasks():
tasks = Tasks()
# Get all tasks for an agent
task_list = await tasks.alist("agent-123")
# Load full details for running tasks
running_tasks = []
for task_item in task_list:
full_task = await task_item.aload()
if full_task.status == AgentExecutionStatus.InProgress:
running_tasks.append(full_task)
print(f"Monitoring {len(running_tasks)} running tasks")
# Monitor all running tasks concurrently
async def monitor_task(task):
async for event in task.aevents():
print(f"Task {task.id}: {event.type}")
if event.type == "task_finished":
break
# Start monitoring all tasks
await asyncio.gather(*[monitor_task(task) for task in running_tasks])
File Handling with Agno Integration
from xpander_sdk import Tasks, Backend
from agno.agent import Agent
async def process_task_with_files():
# Create task with various file types
tasks = Tasks()
task = await tasks.acreate(
agent_id="agent-123",
prompt="Analyze the attached documents and images",
file_urls=[
"https://example.com/report.pdf", # PDF document
"https://example.com/chart.png", # Image file
"https://example.com/data.csv", # Human-readable file
"https://example.com/logo.jpg" # Another image
]
)
# Get files categorized by type for Agno integration
files = task.get_files() # PDF files as Agno File objects
images = task.get_images() # Image files as Agno Image objects
readable_files = task.get_human_readable_files() # Text files with content
print(f"Task {task.id} has:")
print(f" {len(files)} PDF files")
print(f" {len(images)} image files")
print(f" {len(readable_files)} readable files")
# Initialize Agno agent with xpander backend
backend = Backend()
agno_agent = Agent(**backend.get_args())
# Pass files and images directly to Agno
result = await agno_agent.arun(
input=task.to_message(), # Includes text + file URLs + readable content
files=files, # PDF files as Agno File objects
images=images # Image files as Agno Image objects
)
print(f"Analysis result: {result.content}")
# Access individual readable file contents if needed
for file_data in readable_files:
print(f"\nFile: {file_data['url']}")
print(f"Content preview: {file_data['content'][:200]}...")
Task Activity Log Analysis
from xpander_sdk import Task
from xpander_sdk.models.activity import (
AgentActivityThreadMessage,
AgentActivityThreadToolCall,
AgentActivityThreadReasoning,
AgentActivityThreadSubAgentTrigger
)
async def analyze_task_execution():
# Load a completed task
task = await Task.aload("task-456")
# Retrieve detailed activity log
activity_log = await task.aget_activity_log()
print(f"Task ID: {activity_log.id}")
print(f"Created: {activity_log.created_at}")
# Check if user information is available
if activity_log.user:
print(f"User: {activity_log.user.email}")
# Process different message types
tool_calls = []
reasoning_steps = []
messages = []
for message in activity_log.messages:
if isinstance(message, AgentActivityThreadMessage):
# User/agent conversation
messages.append({
'role': message.role,
'text': message.content.text,
'time': message.created_at
})
print(f"\n[{message.created_at}] {message.role}: {message.content.text}")
elif isinstance(message, AgentActivityThreadToolCall):
# Tool invocations
tool_calls.append({
'name': message.tool_name,
'payload': message.payload,
'result': message.result,
'is_error': message.is_error
})
print(f"\n[{message.created_at}] Tool: {message.tool_name}")
print(f" Payload: {message.payload}")
if message.result:
print(f" Result: {message.result}")
if message.is_error:
print(f" ⚠️ Error occurred")
elif isinstance(message, AgentActivityThreadReasoning):
# Agent reasoning
reasoning_steps.append({
'type': message.type,
'thought': message.thought,
'confidence': message.confidence
})
print(f"\n[{message.created_at}] Reasoning ({message.type}):")
print(f" Title: {message.title}")
print(f" Confidence: {message.confidence}")
if message.thought:
print(f" Thought: {message.thought}")
if message.action:
print(f" Action: {message.action}")
elif isinstance(message, AgentActivityThreadSubAgentTrigger):
# Sub-agent calls
print(f"\n[{message.created_at}] Sub-agent triggered:")
print(f" Agent ID: {message.agent_id}")
print(f" Query: {message.query}")
if message.files:
print(f" Files: {', '.join(message.files)}")
# Summary statistics
print(f"\n=== Execution Summary ===")
print(f"Total messages: {len(messages)}")
print(f"Tool calls: {len(tool_calls)}")
print(f"Reasoning steps: {len(reasoning_steps)}")
# Calculate tool usage
if tool_calls:
print(f"\nTools used:")
tool_usage = {}
for call in tool_calls:
tool_name = call['name']
tool_usage[tool_name] = tool_usage.get(tool_name, 0) + 1
for tool, count in tool_usage.items():
print(f" - {tool}: {count} times")
# Synchronous version
def analyze_task_execution_sync():
from xpander_sdk.models.activity import AgentActivityThreadMessage
task = Task.load("task-456")
activity_log = task.get_activity_log()
# Process activity log (same logic as above)
for message in activity_log.messages:
if isinstance(message, AgentActivityThreadMessage):
print(f"{message.role}: {message.content.text}")
- [
Agent](/API reference/agents/API reference/agent): Agent that creates and manages tasks
- [AgentExecutionStatus](/API reference/types#agentexecutionstatus): Task execution statuses
- [TaskUpdateEvent](/API reference/types#taskupdateevent): Event data structure
- [TaskUpdateEventType](/API reference/types#taskupdateeventtype): Event type enumeration
- [AgentExecutionInput](/API reference/types#agentexecutioninput): Task input configuration
- [Agents Module](/API reference/agents): Create and manage agents
- [Events Module](/API reference/events): Event-driven programming
- [Tools Repository](/API reference/tools): Tool integration and management