Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.xpander.ai/llms.txt

Use this file to discover all available pages before exploring further.

Introduction

The Events Module in xpander.ai SDK provides functionality for handling background tasks and event streaming. It integrates seamlessly with the platform for real-time task execution and monitoring.

Overview

The SDK supports comprehensive event handling systems:
  • Agent Event Listeners
    • Set up using the @on_task decorators
    • Handle incoming task execution requests
    • Integrate with Server Sent Events (SSE) for real-time communication
    • Includes strict validation to ensure the task parameter is correctly specified and returned
  • Application Lifecycle Management
    • Use @on_boot decorators for initialization tasks before event listeners start
    • Use @on_shutdown decorators for cleanup tasks during application shutdown
    • Support both synchronous and asynchronous functions
  • Task Event Streaming
    • Monitor updates in real-time during task execution
    • Track task progress, tool calls, and lifecycle events

Strict Validation

The @on_task decorator includes comprehensive validation to ensure proper usage and prevent runtime errors:

Parameter Validation

  • Required task Parameter: The decorated function must have a parameter named task
  • Task Type Validation: The task parameter must be an instance of the Task class
  • Return Type Validation: The function must return an instance of the Task class
  • Test Task Validation: If provided, test_task must be an instance of LocalTaskTest

Validation Examples

Valid Function Signature

from xpander_sdk import on_task
from xpander_sdk.modules.tasks.sub_modules.task import Task

@on_task
def valid_handler(task: Task) -> Task:
    # Process the task
    task.result = "Processed successfully"
    return task

Invalid Function Signatures (Will Raise Errors)

# Missing 'task' parameter - TypeError will be raised
@on_task
def invalid_no_task():
    pass

# Wrong parameter name - TypeError will be raised
@on_task
def invalid_wrong_param(my_task):
    pass

# Not returning Task instance - TypeError will be raised at runtime
@on_task
def invalid_return_type(task):
    return "string"  # Should return Task instance

Examples

Agent Event Listeners

Handle incoming task execution requests for agent deployment.

Asynchronous Example

Using @on_task decorator to create an event handler:
from xpander_sdk import on_task

@on_task
async def handle_task(task):
    print(f"Processing task: {task.id}")
    # Custom task logic
    task.result = "Task processed successfully"
    return task

Synchronous Example

from xpander_sdk import on_task

@on_task
def handle_task(task):
    print(f"Processing task: {task.id}")
    # Custom task logic
    task.result = "Task processed successfully"
    return task

Advanced Decorator Usage

The @on_task decorator supports additional parameters for advanced use cases:

Using Custom Configuration

from xpander_sdk import on_task, Configuration

# Create custom configuration
custom_config = Configuration(
    api_key="your-api-key",
    base_url="https://custom.api.url"
)

@on_task(configuration=custom_config)
async def handle_task_with_config(task):
    # Task will use the custom configuration
    print(f"Processing task with custom config: {task.id}")
    task.result = "Processed with custom configuration"
    return task

Using Test Tasks for Local Development

from xpander_sdk import on_task
from xpander_sdk.modules.tasks.models.task import LocalTaskTest, AgentExecutionInput
from xpander_sdk.models.shared import OutputFormat

# Define a local test task for development
test_task = LocalTaskTest(
    input=AgentExecutionInput(text="What can you do?"),
    output_format=OutputFormat.Json,
    output_schema={"capabilities": "list of capabilities"}
)

@on_task(test_task=test_task)
async def handle_test_task(task):
    print(f"Testing with local task: {task.input.text}")
    task.result = {
        "capabilities": [
            "Data analysis",
            "Text processing",
            "API integration"
        ]
    }
    return task

Task Event Streaming

Monitor task execution in real-time.

Asynchronous Example

from xpander_sdk import Tasks, Agent
from xpander_sdk.models.events import TaskUpdateEventType

# Load agent
agent = Agent.load("agent-id")

# Create task with event streaming enabled
task = await agent.acreate_task(
    prompt="Analyze this dataset",
    file_urls=["https://example.com/data.csv"],
    events_streaming=True
)

# Stream events
async for event in task.aevents():
    print(f"Event Type: {event.type}")
    if event.type == TaskUpdateEventType.TaskFinished:
        break

Synchronous Example

# Monitor task events
for event in task.events():
    print(f"Event: {event.type}")
    if event.type == "task_finished":
        break

Lifecycle Management

Manage application initialization and cleanup with lifecycle decorators:

Using @on_boot for Initialization

from xpander_sdk import on_boot

@on_boot
async def initialize_database():
    """Set up database connections before task processing starts."""
    print("Initializing database...")
    # Database setup logic
    print("Database ready")

Using @on_shutdown for Cleanup

from xpander_sdk import on_shutdown

@on_shutdown
async def cleanup_resources():
    """Clean up resources during application shutdown."""
    print("Cleaning up resources...")
    # Cleanup logic
    print("Cleanup complete")

Continue to the [Events API Reference](/API reference/events/API reference) for detailed @on_task documentation or see [Lifecycle Decorators](/API reference/events/API reference/lifecycle) for @on_boot and @on_shutdown reference.

Support

For additional help:

Contact

For further assistance, contact support: dev@xpander.ai