Documentation Index
Fetch the complete documentation index at: https://docs.xpander.ai/llms.txt
Use this file to discover all available pages before exploring further.
Introduction
The Events Module in xpander.ai SDK provides functionality for handling background tasks and event streaming. It integrates seamlessly with the platform for real-time task execution and monitoring.
Overview
The SDK supports comprehensive event handling systems:
-
Agent Event Listeners
- Set up using the
@on_task decorators
- Handle incoming task execution requests
- Integrate with Server Sent Events (SSE) for real-time communication
- Includes strict validation to ensure the task parameter is correctly specified and returned
-
Application Lifecycle Management
- Use
@on_boot decorators for initialization tasks before event listeners start
- Use
@on_shutdown decorators for cleanup tasks during application shutdown
- Support both synchronous and asynchronous functions
-
Task Event Streaming
- Monitor updates in real-time during task execution
- Track task progress, tool calls, and lifecycle events
Strict Validation
The @on_task decorator includes comprehensive validation to ensure proper usage and prevent runtime errors:
Parameter Validation
- Required
task Parameter: The decorated function must have a parameter named task
- Task Type Validation: The
task parameter must be an instance of the Task class
- Return Type Validation: The function must return an instance of the
Task class
- Test Task Validation: If provided,
test_task must be an instance of LocalTaskTest
Validation Examples
Valid Function Signature
from xpander_sdk import on_task
from xpander_sdk.modules.tasks.sub_modules.task import Task
@on_task
def valid_handler(task: Task) -> Task:
# Process the task
task.result = "Processed successfully"
return task
Invalid Function Signatures (Will Raise Errors)
# Missing 'task' parameter - TypeError will be raised
@on_task
def invalid_no_task():
pass
# Wrong parameter name - TypeError will be raised
@on_task
def invalid_wrong_param(my_task):
pass
# Not returning Task instance - TypeError will be raised at runtime
@on_task
def invalid_return_type(task):
return "string" # Should return Task instance
Examples
Agent Event Listeners
Handle incoming task execution requests for agent deployment.
Asynchronous Example
Using @on_task decorator to create an event handler:
from xpander_sdk import on_task
@on_task
async def handle_task(task):
print(f"Processing task: {task.id}")
# Custom task logic
task.result = "Task processed successfully"
return task
Synchronous Example
from xpander_sdk import on_task
@on_task
def handle_task(task):
print(f"Processing task: {task.id}")
# Custom task logic
task.result = "Task processed successfully"
return task
Advanced Decorator Usage
The @on_task decorator supports additional parameters for advanced use cases:
Using Custom Configuration
from xpander_sdk import on_task, Configuration
# Create custom configuration
custom_config = Configuration(
api_key="your-api-key",
base_url="https://custom.api.url"
)
@on_task(configuration=custom_config)
async def handle_task_with_config(task):
# Task will use the custom configuration
print(f"Processing task with custom config: {task.id}")
task.result = "Processed with custom configuration"
return task
Using Test Tasks for Local Development
from xpander_sdk import on_task
from xpander_sdk.modules.tasks.models.task import LocalTaskTest, AgentExecutionInput
from xpander_sdk.models.shared import OutputFormat
# Define a local test task for development
test_task = LocalTaskTest(
input=AgentExecutionInput(text="What can you do?"),
output_format=OutputFormat.Json,
output_schema={"capabilities": "list of capabilities"}
)
@on_task(test_task=test_task)
async def handle_test_task(task):
print(f"Testing with local task: {task.input.text}")
task.result = {
"capabilities": [
"Data analysis",
"Text processing",
"API integration"
]
}
return task
Task Event Streaming
Monitor task execution in real-time.
Asynchronous Example
from xpander_sdk import Tasks, Agent
from xpander_sdk.models.events import TaskUpdateEventType
# Load agent
agent = Agent.load("agent-id")
# Create task with event streaming enabled
task = await agent.acreate_task(
prompt="Analyze this dataset",
file_urls=["https://example.com/data.csv"],
events_streaming=True
)
# Stream events
async for event in task.aevents():
print(f"Event Type: {event.type}")
if event.type == TaskUpdateEventType.TaskFinished:
break
Synchronous Example
# Monitor task events
for event in task.events():
print(f"Event: {event.type}")
if event.type == "task_finished":
break
Lifecycle Management
Manage application initialization and cleanup with lifecycle decorators:
Using @on_boot for Initialization
from xpander_sdk import on_boot
@on_boot
async def initialize_database():
"""Set up database connections before task processing starts."""
print("Initializing database...")
# Database setup logic
print("Database ready")
Using @on_shutdown for Cleanup
from xpander_sdk import on_shutdown
@on_shutdown
async def cleanup_resources():
"""Clean up resources during application shutdown."""
print("Cleaning up resources...")
# Cleanup logic
print("Cleanup complete")
Continue to the [Events API Reference](/API reference/events/API reference) for detailed @on_task documentation or see [Lifecycle Decorators](/API reference/events/API reference/lifecycle) for @on_boot and @on_shutdown reference.
Support
For additional help:
For further assistance, contact support: dev@xpander.ai