Managing agent tasks and executions in the xpander.ai SDK
The xpander.ai SDK provides methods for managing agent executions through the Agent class. An execution represents a task assigned to an agent, along with its status and results.
Assigns a new task to the agent, creating a new execution.
from xpander_sdk import XpanderClient# Initialize client and get agentxpander_client = XpanderClient(api_key="your-api-key")agent = xpander_client.agents.get(agent_id="agent-1234")# Add a taskagent.add_task(input="Research the impact of quantum computing on cryptography", use_worker=True)print(f"Added task with execution ID: {agent.execution.id}")
# Check if execution is finishedif agent.is_finished():print("The task has been completed") result = agent.retrieve_execution_result()print(f"Result: {result.result}")else:print("The task is still in progress")print(f"Current status: {agent.execution.status}")
Fetches the latest status of the current execution.
# Get the current execution statusstatus = agent.get_execution_status()print(f"Execution ID: {status.id}")print(f"Status: {status.status}")print(f"Created at: {status.created_at}")print(f"Updated at: {status.updated_at}")
# Get the result of a completed executionresult = agent.retrieve_execution_result()print(f"Status: {result.status}")if result.status =="COMPLETED":print(f"Result: {result.result}")elif result.status =="FAILED":print(f"Error: {result.error}")else:print(f"Execution is {result.status}")
# List executions for the agentexecutions = agent.list_executions()print(f"Found {len(executions)} executions:")for execution in executions:print(f"- {execution.id}: {execution.status} (created: {execution.created_at})")
# Get a specific executionexecution_id ="execution-1234"execution = agent.get_execution(execution_id=execution_id)print(f"Execution: {execution.id}")print(f"Status: {execution.status}")print(f"Input message: {execution.input_message}")
from xpander_sdk import XpanderClient, LLMProviderfrom openai import OpenAIimport osimport time# Initialize clientsxpander_client = XpanderClient(api_key=os.environ["XPANDER_API_KEY"])openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])# Get the agentagent = xpander_client.agents.get(agent_id=os.environ["AGENT_ID"])print(f"Agent: {agent.name}")# Add a taskagent.add_task(input="What are the top 3 emerging technologies of 2023?")print(f"Added task with execution ID: {agent.execution.id}")# Initialize memoryagent.memory.init_messages(input=agent.execution.input_message, instructions=agent.instructions, llm_provider=LLMProvider.OPEN_AI)# Run the agent until the task is completemax_iterations =10current_iteration =0whilenot agent.is_finished()and current_iteration < max_iterations: current_iteration +=1print(f"\nIteration {current_iteration}")# Get current status status = agent.get_execution_status()print(f"Status: {status.status}")# Get next action from LLM response = openai_client.chat.completions.create( model="gpt-4o", messages=agent.messages, tools=agent.get_tools(), tool_choice="auto", temperature=0.0)# Add LLM response to memory agent.add_messages(messages=response.model_dump())# Extract and run tool calls tool_calls = XpanderClient.extract_tool_calls( llm_response=response.model_dump(), llm_provider=LLMProvider.OPEN_AI)if tool_calls: results = agent.run_tools(tool_calls=tool_calls)print(f"Executed {len(results)} tools")else:print("No tools were called in this iteration")# Short delay to prevent rate limiting time.sleep(1)# Get final resultresult = agent.retrieve_execution_result()if agent.is_finished():print("\nFINAL RESULT:")print("="*50)print(result.result)else:print("\nMaximum iterations reached before completion")print(f"Current status: {agent.get_execution_status().status}")