@on_task Decorator

The @on_task decorator provides a simple way to register functions as event-driven task handlers for agent deployment and background task processing.

Overview

The @on_task decorator automatically:
  • Registers your function as a task handler
  • Sets up event listening for incoming task execution requests
  • Handles both synchronous and asynchronous functions
  • Validates that the decorated function has a task parameter
  • Works with default configuration from environment variables

Syntax

@on_task
def your_task_handler(task):
    # Your task processing logic
    return task

Parameters

The decorator itself takes no parameters, but the decorated function must:
  • Accept a task parameter as its first argument
  • Return the modified task object

Usage Examples

Basic Synchronous Handler

from xpander_sdk import on_task

@on_task
def handle_task(task):
    print(f"Processing task: {task.id}")
    print(f"Task input: {task.input.text}")
    
    # Your custom logic here
    task.result = "Task processed successfully"
    
    return task

Asynchronous Handler

from xpander_sdk import on_task
import asyncio

@on_task
async def handle_task_async(task):
    print(f"Processing task: {task.id}")
    
    # Simulate async work
    await asyncio.sleep(1)
    
    # Your custom async logic here
    task.result = "Async task processed successfully"
    
    return task

Task Processing with Error Handling

from xpander_sdk import on_task
from xpander_sdk.modules.tasks.models.task import AgentExecutionStatus

@on_task
def handle_task_with_error_handling(task):
    try:
        print(f"Processing task: {task.id}")
        
        # Your processing logic
        if task.input.text == "error":
            raise ValueError("Simulated error")
            
        task.result = "Task completed successfully"
        task.status = AgentExecutionStatus.Completed
        
    except Exception as e:
        print(f"Task failed: {str(e)}")
        task.result = f"Task failed: {str(e)}"
        task.status = AgentExecutionStatus.Failed
    
    return task

Processing Different Input Types

from xpander_sdk import on_task

@on_task
def handle_multimodal_task(task):
    print(f"Processing task: {task.id}")
    
    # Handle text input
    if task.input.text:
        print(f"Text input: {task.input.text}")
        # Process text
    
    # Handle file inputs
    if task.input.files:
        print(f"Files to process: {len(task.input.files)}")
        for file_url in task.input.files:
            print(f"Processing file: {file_url}")
            # Process each file
    
    task.result = "Multimodal processing completed"
    return task

Behavior

Automatic Event Listening

When you use the @on_task decorator, it automatically:
  1. Sets up event listening for your agent
  2. Waits for incoming task execution requests
  3. Calls your decorated function when tasks arrive
  4. Handles the task lifecycle (status updates, result saving)

Function Requirements

Your decorated function must:
  • Accept a task parameter
  • Return the modified task object
  • Handle any exceptions internally (recommended)

Configuration

The decorator uses:
  • Environment variables for SDK configuration (XPANDER_API_KEY, XPANDER_ORGANIZATION_ID, etc.)
  • Default agent ID from XPANDER_AGENT_ID environment variable

Task Object Properties

The task parameter passed to your function contains:
# Task identification
task.id                    # str: Unique task identifier
task.agent_id             # str: Associated agent ID
task.organization_id      # str: Organization identifier

# Task input
task.input.text           # str: Text prompt/instruction
task.input.files          # List[str]: URLs of input files
task.input.user           # User: User details (if provided)

# Task status and results
task.status               # AgentExecutionStatus: Current status
task.result               # str: Task result (set by your handler)

# Timestamps
task.created_at           # datetime: When task was created
task.started_at           # datetime: When execution started
task.finished_at          # datetime: When execution finished

# Configuration
task.output_format        # OutputFormat: Desired output format
task.output_schema        # Dict: JSON schema for output validation
task.events_streaming     # bool: Whether event streaming is enabled

Best Practices

1. Always Return the Task

@on_task
def good_handler(task):
    # Process task
    task.result = "Done"
    return task  # ✅ Always return

2. Handle Errors Gracefully

@on_task
def robust_handler(task):
    try:
        # Your logic here
        task.result = "Success"
    except Exception as e:
        task.result = f"Error: {str(e)}"
        task.status = AgentExecutionStatus.Failed
    return task

3. Set Appropriate Status

@on_task
def status_aware_handler(task):
    # Process task
    if success:
        task.status = AgentExecutionStatus.Completed
    else:
        task.status = AgentExecutionStatus.Failed
    return task

4. Validate Input

@on_task
def validating_handler(task):
    if not task.input.text and not task.input.files:
        task.result = "Error: No input provided"
        task.status = AgentExecutionStatus.Failed
        return task
    
    # Process valid input
    task.result = "Processing completed"
    return task