Documentation
Complete API reference for the Events module in xpander.ai SDK, including setup, use, and examples
@on_task
from xpander_sdk import on_task @on_task async def handle_task(task): print(f"Processing task: {task.id}") # Custom task logic task.result = "Task processed successfully" return task
from xpander_sdk import on_task @on_task def handle_task(task): print(f"Processing task: {task.id}") # Custom task logic task.result = "Task processed successfully" return task
from xpander_sdk import Tasks, Agent from xpander_sdk.models.events import TaskUpdateEventType # Load agent agent = Agent.load("agent-id") # Create task with event streaming enabled task = await agent.acreate_task( prompt="Analyze this dataset", file_urls=["https://example.com/data.csv"], events_streaming=True ) # Stream events async for event in task.aevents(): print(f"Event Type: {event.type}") if event.type == TaskUpdateEventType.TaskFinished: break
# Monitor task events for event in task.events(): print(f"Event: {event.type}") if event.type == "task_finished": break
Was this page helpful?