Task Class

The Task class represents a single task with full execution details and event streaming capabilities. It provides methods for monitoring task progress, streaming events, and managing task lifecycle.

Properties

id

id: str
Unique identifier for the task.

agent_id

agent_id: str
Identifier for the associated agent.

status

status: AgentExecutionStatus
Current execution status of the task.

input

input: AgentExecutionInput
Input parameters provided to the task.

result

result: Optional[str]
Execution result if the task has completed.

events_streaming

events_streaming: bool
Whether real-time event streaming is enabled for this task.

created_at

created_at: datetime
Timestamp when the task was created.

updated_at

updated_at: datetime
Timestamp when the task was last updated.

output_format

output_format: Optional[OutputFormat]
Configured output format for the task results.

output_schema

output_schema: Optional[Dict]
JSON schema for output validation, if specified.

Class Methods

aload(task_id, configuration=None)

Asynchronously load a task by its ID.
@classmethod
async def aload(
    cls, 
    task_id: str, 
    configuration: Optional[Configuration] = None
) -> Task
Parameters:
  • task_id (str): Unique identifier for the task
  • configuration (Optional[Configuration]): SDK configuration
Returns: Complete Task object with full details. Example:
task = await Task.aload("task-456")
print(f"Task status: {task.status}")
Raises:

load(task_id, configuration=None)

Synchronously load a task by its ID.
@classmethod
def load(
    cls, 
    task_id: str, 
    configuration: Optional[Configuration] = None
) -> Task
Parameters: Same as aload() Returns: Complete Task object. Example:
task = Task.load("task-456")
print(f"Task status: {task.status}")

Instance Methods

aset_status(status)

Asynchronously change the task’s execution status.
async def aset_status(status: AgentExecutionStatus) -> None
Parameters:
  • status (AgentExecutionStatus): New status for the task
Example:
await task.aset_status(AgentExecutionStatus.InProgress)
print(f"Task status updated to: {task.status}")
Related: AgentExecutionStatus

set_status(status)

Synchronously change the task’s execution status.
def set_status(status: AgentExecutionStatus) -> None
Parameters: Same as aset_status() Example:
task.set_status(AgentExecutionStatus.Completed)

asave()

Asynchronously save task changes back to the xpander platform.
async def asave() -> Task
Returns: Updated Task object with server-side changes. Example:
task.result = "Processing completed successfully"
updated_task = await task.asave()
print(f"Task saved with result: {updated_task.result}")
Raises:

save()

Synchronously save task changes.
def save() -> Task
Returns: Updated Task object. Example:
task.result = "Processing completed successfully"
updated_task = task.save()

astop()

Asynchronously stop the task execution.
async def astop() -> Task
Returns: Stopped Task object with updated status. Example:
stopped_task = await task.astop()
print(f"Task {stopped_task.id} has been stopped")
print(f"Final status: {stopped_task.status}")
Raises:

stop()

Synchronously stop the task execution.
def stop() -> Task
Returns: Stopped Task object. Example:
stopped_task = task.stop()
print(f"Task stopped with status: {stopped_task.status}")

aevents()

Asynchronously stream task events in real-time.
async def aevents() -> AsyncGenerator[TaskUpdateEvent, None]
Returns: Async generator yielding TaskUpdateEvent objects. Example:
async for event in task.aevents():
    print(f"Event: {event.type} at {event.time}")
    print(f"Data: {event.data}")
    
    if event.type == TaskUpdateEventType.TaskFinished:
        print("Task completed!")
        break
Related: TaskUpdateEvent, TaskUpdateEventType Note: This method requires events_streaming=True when creating the task.

events()

Synchronously stream task events.
def events() -> Generator[TaskUpdateEvent, None, None]
Returns: Generator yielding TaskUpdateEvent objects. Example:
for event in task.events():
    print(f"Event: {event.type}")
    if event.type == "task_finished":
        break

Usage Examples

Complete Task Lifecycle Management

from xpander_sdk import Tasks, Agent
from xpander_sdk.models.shared import AgentExecutionStatus, OutputFormat

async def manage_task_lifecycle():
    # Create task with event streaming
    agent = await Agent.aload("agent-123")
    task = await agent.acreate_task(
        prompt="Analyze sales data",
        events_streaming=True,
        output_format=OutputFormat.Json
    )
    
    print(f"Created task: {task.id}")
    print(f"Initial status: {task.status}")
    
    # Monitor task events
    async for event in task.aevents():
        print(f"Event: {event.type}")
        
        if event.type == "task_finished":
            # Reload task to get final result
            final_task = await task.aload(task.id)
            print(f"Final result: {final_task.result}")
            break
        elif event.type == "tool_call_request":
            tool_call = event.data
            print(f"Tool requested: {tool_call.tool_name}")

Task Status Management

from xpander_sdk import Task
from xpander_sdk.models.shared import AgentExecutionStatus

async def update_task_status():
    # Load existing task
    task = await Task.aload("task-456")
    
    # Update status
    await task.aset_status(AgentExecutionStatus.InProgress)
    
    # Add custom result
    task.result = "Custom processing completed"
    
    # Save changes
    updated_task = await task.asave()
    print(f"Task updated: {updated_task.status}")

Error Handling with Tasks

from xpander_sdk import Task
from xpander_sdk.exceptions import ModuleException

async def safe_task_operations():
    try:
        # Attempt to load task
        task = await Task.aload("non-existent-task")
    except ModuleException as e:
        if e.status_code == 404:
            print("Task not found")
        else:
            print(f"Error loading task: {e.description}")
        return
    
    try:
        # Attempt to stop task
        await task.astop()
        print("Task stopped successfully")
    except ModuleException as e:
        print(f"Failed to stop task: {e.description}")

Batch Task Monitoring

from xpander_sdk import Tasks
import asyncio

async def monitor_multiple_tasks():
    tasks = Tasks()
    
    # Get all tasks for an agent
    task_list = await tasks.alist("agent-123")
    
    # Load full details for running tasks
    running_tasks = []
    for task_item in task_list:
        full_task = await task_item.aload()
        if full_task.status == AgentExecutionStatus.InProgress:
            running_tasks.append(full_task)
    
    print(f"Monitoring {len(running_tasks)} running tasks")
    
    # Monitor all running tasks concurrently
    async def monitor_task(task):
        async for event in task.aevents():
            print(f"Task {task.id}: {event.type}")
            if event.type == "task_finished":
                break
    
    # Start monitoring all tasks
    await asyncio.gather(*[monitor_task(task) for task in running_tasks])
  • Agent: Agent that creates and manages tasks