Skip to main content

Overview

Tool hooks provide a powerful mechanism to intercept and react to tool invocations throughout their lifecycle. This allows you to implement cross-cutting concerns like logging, monitoring, validation, and error handling without modifying tool implementations.

Available Hooks

The xpander.ai SDK provides three decorators for tool lifecycle hooks:
  • @on_tool_before: Execute before tool invocation
  • @on_tool_after: Execute after successful tool invocation
  • @on_tool_error: Execute when tool invocation fails

Quick Start

from typing import Optional, Dict, Any
from xpander_sdk import on_tool_before, on_tool_after, on_tool_error, Tool
from loguru import logger

@on_tool_before
def log_invocation(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None
):
    logger.info(f"Invoking tool: {tool.name}")

@on_tool_after
def log_success(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None,
    result: Any = None
):
    logger.info(f"Tool {tool.name} succeeded")

@on_tool_error
def log_failure(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None,
    error: Optional[Exception] = None
):
    logger.error(f"Tool {tool.name} failed: {error}")

Hook Parameters

All hooks receive the following parameters:
ParameterTypeDescription
toolToolThe Tool object being invoked
payloadAnyThe payload sent to the tool
payload_extensionOptional[Dict[str, Any]]Additional payload data
tool_call_idOptional[str]Unique ID of the tool call
agent_versionOptional[str]Version of the agent making the call
Additional parameters:
  • After hooks also receive: result - The result returned by the tool
  • Error hooks also receive: error - The exception that occurred

Async Support

Both sync and async hooks are supported:
@on_tool_before
async def async_validation(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None
):
    # Async validation logic
    await validate_payload(payload)

@on_tool_after
async def async_metrics(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None,
    result: Any = None
):
    # Send metrics to async service
    await metrics_service.record(tool.name, result)

Common Use Cases

Request Validation

@on_tool_before
def validate_request(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None
):
    if not payload:
        logger.warning(f"Empty payload for tool {tool.name}")
    
    # Additional validation logic
    if tool.name == "sensitive_tool" and not agent_version:
        logger.error("Sensitive tool requires agent version")

Performance Monitoring

import time
from collections import defaultdict

execution_times = defaultdict(dict)

@on_tool_before
def record_start_time(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None
):
    execution_times[tool_call_id] = {"start": time.time(), "tool": tool.name}

@on_tool_after
def record_duration(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None,
    result: Any = None
):
    if tool_call_id in execution_times:
        duration = time.time() - execution_times[tool_call_id]["start"]
        logger.info(f"Tool {tool.name} took {duration:.2f}s")
        metrics.gauge(f"tool.{tool.name}.duration", duration)

Error Alerting

@on_tool_error
async def alert_on_failure(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None,
    error: Optional[Exception] = None
):
    # Send alert for critical tools
    if tool.name in ["payment_processor", "auth_service"]:
        await alert_service.send(
            title=f"Critical Tool Failure: {tool.name}",
            message=f"Error: {str(error)}\nCall ID: {tool_call_id}",
            severity="critical"
        )

Audit Logging

import json
from datetime import datetime

@on_tool_before
def audit_log_request(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None
):
    audit_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "event": "tool_invocation",
        "tool_name": tool.name,
        "tool_id": tool.id,
        "call_id": tool_call_id,
        "agent_version": agent_version,
        "payload": payload
    }
    audit_log.write(json.dumps(audit_entry) + "\n")

Hook Execution Flow

Tool Invocation Request

Execute @on_tool_before hooks (all registered)

Execute Tool Logic

    Success? ───No──→ Execute @on_tool_error hooks
         ↓                        ↓
        Yes                  Return Result

Execute @on_tool_after hooks

    Return Result

Multiple Hooks

You can register multiple hooks of the same type - they execute in registration order:
@on_tool_before
def first_hook(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None
):
    logger.info("First before hook")

@on_tool_before
def second_hook(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None
):
    logger.info("Second before hook")

Best Practices

Hooks should be fast to avoid delaying tool execution. Move expensive operations to background tasks or use async hooks for I/O operations.
If your hook does I/O (database, API calls), make it async to avoid blocking tool execution.
Hooks should observe, not alter tool execution. They’re for cross-cutting concerns like logging and monitoring.
Hook failures shouldn’t break tool invocations. Exceptions in hooks are logged but don’t prevent tool execution.
When logging payloads, ensure you’re not exposing sensitive information like API keys or personal data.
Include tool_call_id in your logs for traceability across your system.

Integration with @on_task

Tool hooks work seamlessly with @on_task handlers:
from xpander_sdk import on_tool_before, on_task

@on_tool_before
def log_tool_call(
    tool: Tool,
    payload: Any,
    payload_extension: Optional[Dict[str, Any]] = None,
    tool_call_id: Optional[str] = None,
    agent_version: Optional[str] = None
):
    logger.info(f"Tool {tool.name} called")

@on_task
async def handle_task(task):
    # Tool hooks will be triggered for any tools invoked here
    result = await tool.ainvoke(agent_id=task.agent_id, payload={...})
    return task

Troubleshooting

Ensure hooks are registered before any tool invocations. Register hooks at module level, not inside functions.
If hooks slow down execution, consider:
  • Making hooks async if they do I/O
  • Moving expensive operations to background tasks
  • Using sampling for high-frequency tools
Enable debug logging to see hook execution:
from loguru import logger
logger.add("tool_hooks.log", level="DEBUG")

Next Steps