Google ADK (Agent Development Kit)
By Himanshu Shekhar | 25 Mar 2025 | (0 Reviews)
Suggest Improvement on Google ADK (Agent Development Kit) — Click here
Module 01: Google ADK Architecture & Agent Runtime
Learning Objectives
- Understand ADK's core architecture and design principles
- Master the AgentKit orchestrator and event loop
- Implement custom tools with proper validation
- Configure memory providers for production
- Build multi-agent coordination systems
- Deploy agents with proper configuration
Prerequisites
Before starting this module, ensure you have:
- Python 3.9+ installed on your system
- Basic understanding of LLMs and prompt engineering
- Google Cloud account (for deployment sections)
- Familiarity with async Python concepts
1.1 ADK High-Level Design: AgentKit Orchestrator
What is AgentKit Orchestrator?
The AgentKit orchestrator is Google's enterprise-grade orchestration engine for AI agents. It's a sophisticated runtime that manages the complete lifecycle of agent execution, from request routing to state persistence.
📋 Core Definition
The orchestrator is a distributed system component that:
- Maintains a registry of all available agents
- Routes incoming requests to appropriate agents
- Manages conversation state across turns
- Coordinates tool execution and result handling
- Handles multi-agent handoffs and delegation
🎯 Why Use It?
- Scalability: Handles millions of concurrent conversations
- Reliability: Built-in retry and error handling
- Flexibility: Pluggable components for customization
- Observability: Native integration with Cloud Trace and Logging
AgentKit Orchestrator Architecture
┌─────────────────────────────────────────────────────────────────┐
│ AGENTKIT ORCHESTRATOR │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ ROUTER LAYER │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Intent │→│ Agent │→│ Context │→│ Session │ │ │
│ │ │ Classifier│ │ Selector │ │ Builder │ │ Manager │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ EXECUTION LAYER │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Agent │→│ Tool │→│ Memory │→│ Model │ │ │
│ │ │ Runtime │ │ Executor │ │ Manager │ │ Gateway │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ PERSISTENCE LAYER │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ State │→│ History │→│ Vector │→│ Cache │ │ │
│ │ │ Store │ │ Store │ │ Store │ │ Store │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Deep Dive: Orchestrator Internals
Component Breakdown:
The Agent Registry maintains metadata about all available agents:
- Agent Capabilities: What tasks each agent can perform
- Tool Associations: Which tools each agent has access to
- Model Requirements: Specific LLM configurations per agent
- Resource Limits: Memory, timeouts, and concurrent session limits
// Agent Registry Entry Structure
{
"agent_id": "customer-support-v2",
"version": "2.1.0",
"capabilities": ["ticket_management", "knowledge_search", "escalation"],
"tools": ["create_ticket", "search_kb", "get_customer_info"],
"model": {
"name": "gemini-2.0-flash",
"temperature": 0.3,
"max_tokens": 2048
},
"resources": {
"max_concurrent": 100,
"timeout_seconds": 30,
"memory_mb": 512
}
}
The Session Context is a shared blackboard that persists across agent turns:
| Component | Purpose | Persistence |
|---|---|---|
| Conversation History | Message exchange log | Full session |
| Entity Cache | Extracted entities (names, dates, etc.) | Session with TTL |
| Tool Results | Cached responses from tools | Configurable TTL |
| Agent Scratchpad | Temporary working memory | Current turn only |
| User Preferences | Learned user patterns | Cross-session |
How to Use: Implementation Guide
Step 1: Initialize the Orchestrator
from google.adk import Orchestrator, OrchestratorConfig
from google.adk.memory import FirestoreMemoryProvider
from google.adk.tracing import CloudTraceConfig
# Configure orchestrator with production settings
config = OrchestratorConfig(
default_model="gemini-2.0-flash",
memory_provider=FirestoreMemoryProvider(
project_id="my-project",
collection_name="agent-sessions",
ttl_seconds=3600 # Sessions expire after 1 hour
),
tracing=CloudTraceConfig(
enabled=True,
sample_rate=0.1 # Trace 10% of requests
),
max_concurrent_turns=1000,
default_timeout_seconds=30
)
orchestrator = Orchestrator(config=config)
Step 2: Register Agents
from google.adk import Agent
from google.adk.tools import ToolRegistry
# Create tools
tool_registry = ToolRegistry()
tool_registry.register(get_weather_tool)
tool_registry.register(calculate_shipping_tool)
# Create agent
support_agent = Agent(
name="support_bot",
description="Handles customer support inquiries",
system_prompt="""You are a helpful customer support agent for an e-commerce platform.
You can check order status, process returns, and provide shipping information.
Always be polite and professional.""",
tools=tool_registry,
model_config={
"temperature": 0.3,
"max_output_tokens": 1024
}
)
# Register with orchestrator
orchestrator.register_agent(support_agent)
Step 3: Process Conversations
# Process a user message
response = await orchestrator.process_turn(
session_id="user-123-abc",
user_message="Where's my order #ORD-456?",
context={
"user_id": "12345",
"channel": "web",
"language": "en"
}
)
print(f"Agent: {response.text}")
print(f"Tools used: {response.tool_calls}")
print(f"Latency: {response.latency_ms}ms")
print(f"Token usage: {response.token_usage}")
1.2 Agent Runtime & Event Loop
Understanding the Event Loop
The ADK runtime is built on an asynchronous event-driven architecture. The event loop is the heart of agent execution, processing each interaction as a series of discrete events.
Event Lifecycle
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ User │────▶│ Agent │────▶│ Tool │────▶│ Model │────▶│Response │
│ Message │ │Reasoning│ │Execution│ │Generation│ │ Delivery│
└─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────┐
│ EVENT QUEUE (Priority-based) │
└─────────────────────────────────────────────────────────────────────────┘
Event Types and Priorities
| Event Type | Priority | Description | Handler |
|---|---|---|---|
USER_MESSAGE |
High | New user input requiring immediate attention | MessageHandler.process() |
TOOL_CALL |
Medium | Agent requests tool execution | ToolExecutor.execute() |
TOOL_RESULT |
Medium | Tool execution completed with result | Agent.continue() |
MODEL_REQUEST |
Low | LLM inference request | ModelGateway.generate() |
STATE_SAVE |
Background | Persist session state asynchronously | MemoryProvider.save() |
Event Loop Implementation Details
Core Event Loop Code (Simplified)
class EventLoop:
def __init__(self):
self.queue = asyncio.PriorityQueue()
self.handlers = {}
self.running = False
self.stats = EventLoopStats()
async def start(self):
"""Start the event loop"""
self.running = True
while self.running:
try:
# Get next event with timeout
priority, event = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
# Process event
await self.process_event(event)
# Update statistics
self.stats.record_event(event.type)
except asyncio.TimeoutError:
# No events, check for cleanup
await self.cleanup_idle_sessions()
except Exception as e:
# Log error but continue
logger.error(f"Event loop error: {e}", exc_info=True)
async def process_event(self, event):
"""Process a single event"""
start_time = time.time()
try:
# Find handler
handler = self.handlers.get(event.type)
if not handler:
raise NoHandlerError(f"No handler for {event.type}")
# Execute handler with timeout
result = await asyncio.wait_for(
handler(event),
timeout=event.timeout
)
# Generate follow-up events if needed
if result.next_events:
for next_event in result.next_events:
await self.queue.put((next_event.priority, next_event))
# Log success
self.stats.record_success(event.type, time.time() - start_time)
except asyncio.TimeoutError:
self.stats.record_timeout(event.type)
await self.handle_timeout(event)
except Exception as e:
self.stats.record_error(event.type, e)
await self.handle_error(event, e)
Event Prioritization Strategy
class EventPriority:
"""Priority levels for events"""
CRITICAL = 0 # User-facing, must process immediately
HIGH = 1 # Important but can wait briefly
NORMAL = 2 # Standard processing
LOW = 3 # Background tasks
BACKGROUND = 4 # Non-essential tasks
class Event:
def __init__(self, type, data, priority=EventPriority.NORMAL):
self.type = type
self.data = data
self.priority = priority
self.created_at = time.time()
self.timeout = self.calculate_timeout()
self.retry_count = 0
self.max_retries = 3
def calculate_timeout(self):
"""Calculate timeout based on priority"""
timeouts = {
EventPriority.CRITICAL: 5, # 5 seconds
EventPriority.HIGH: 10, # 10 seconds
EventPriority.NORMAL: 30, # 30 seconds
EventPriority.LOW: 60, # 60 seconds
EventPriority.BACKGROUND: 300 # 5 minutes
}
return timeouts.get(self.priority, 30)
Performance Optimization
Event Loop Tuning Parameters
- Queue Size: Max 10,000 events pending
- Worker Pool: 10-100 concurrent handlers
- Batch Size: Process up to 50 events per batch
- Idle Timeout: 30 seconds before cleanup
Monitoring Metrics
- Event Latency: P95 < 100ms
- Queue Depth: Alert if > 1000
- Error Rate: < 0.1% of events
- Throughput: Events/second
1.3 Tool Registry & Function Calling
Complete Guide to ADK Tools
What are Tools?
Tools are functions that agents can call to interact with external systems, APIs, or perform specific actions. ADK provides a flexible framework for defining, registering, and executing tools.
Tool Types
🔧 Built-in Tools
- Google Workspace (Gmail, Calendar, Drive)
- Web Search
- Code Interpreter
- Calculator
- Weather API
📦 Custom Tools
- Database queries
- REST APIs
- gRPC services
- Internal services
- File operations
🤝 Composite Tools
- Multi-step workflows
- Conditional logic
- Parallel execution
- Retry policies
- Circuit breakers
Tool Schema Definition
from google.adk.tools import tool, ToolSchema
from pydantic import BaseModel, Field
# Method 1: Decorator-based (Simplest)
@tool(
name="get_weather",
description="Get current weather for a location",
parameters={
"location": {
"type": "string",
"description": "City name or coordinates",
"required": True
},
"units": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"default": "celsius"
}
},
timeout_seconds=10,
retry_config={
"max_retries": 3,
"backoff_factor": 2
}
)
async def get_weather(location: str, units: str = "celsius") -> dict:
"""
Fetch weather data from external API.
Args:
location: City name (e.g., "New York") or coordinates ("40.71,-74.01")
units: Temperature units (celsius/fahrenheit)
Returns:
Weather data dictionary
"""
# API call logic here
async with aiohttp.ClientSession() as session:
params = {
"q": location,
"units": "metric" if units == "celsius" else "imperial"
}
async with session.get("https://api.weather.com/v1", params=params) as resp:
data = await resp.json()
return {
"temperature": data["main"]["temp"],
"conditions": data["weather"][0]["description"],
"humidity": data["main"]["humidity"],
"wind_speed": data["wind"]["speed"]
}
# Method 2: Pydantic model-based (Type-safe)
class WeatherInput(BaseModel):
location: str = Field(description="City name or coordinates")
units: str = Field(
default="celsius",
description="Temperature units",
enum=["celsius", "fahrenheit"]
)
include_forecast: bool = Field(
default=False,
description="Include 5-day forecast"
)
class WeatherOutput(BaseModel):
current_temp: float
conditions: str
humidity: int
wind_speed: float
forecast: Optional[list] = None
@tool(schema=WeatherInput, output_schema=WeatherOutput)
async def get_weather_detailed(input: WeatherInput) -> WeatherOutput:
"""Type-safe tool implementation"""
# Implementation here
pass
# Method 3: Class-based (For complex tools)
class DatabaseQueryTool(Tool):
def __init__(self, connection_pool):
super().__init__(
name="query_database",
description="Execute SQL queries on the database"
)
self.pool = connection_pool
self.stats = QueryStats()
def get_schema(self) -> dict:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL query to execute"
},
"params": {
"type": "array",
"description": "Query parameters"
},
"timeout": {
"type": "integer",
"default": 30
}
},
"required": ["query"]
}
async def execute(self, **kwargs):
start_time = time.time()
try:
async with self.pool.acquire() as conn:
result = await conn.execute(
kwargs["query"],
kwargs.get("params", []),
timeout=kwargs.get("timeout", 30)
)
self.stats.record_success(time.time() - start_time)
return {"rows": result, "count": len(result)}
except Exception as e:
self.stats.record_error(str(e))
raise ToolExecutionError(f"Database query failed: {e}")
Advanced Tool Features
1. Parallel Function Calling
# ADK automatically handles parallel calls
@tool(parallel_calls=True)
async def check_multiple_stocks(symbols: List[str]) -> List[dict]:
"""Check multiple stock prices in parallel"""
tasks = [get_stock_price(symbol) for symbol in symbols]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
# Agent can request multiple tools at once
# LLM Response:
# {
# "tool_calls": [
# {"name": "get_weather", "args": {"location": "New York"}},
# {"name": "get_weather", "args": {"location": "London"}},
# {"name": "calculate_shipping", "args": {"order_id": "123"}}
# ]
# }
2. Tool Middleware & Hooks
class ToolMiddleware:
async def before_execution(self, tool_name: str, args: dict):
"""Called before tool execution"""
logger.info(f"Executing {tool_name} with args: {args}")
# Add tracing
with tracer.start_span(tool_name) as span:
span.set_attribute("args", str(args))
async def after_execution(self, tool_name: str, result: any):
"""Called after successful execution"""
logger.info(f"Tool {tool_name} completed")
# Cache result if needed
await cache.set(f"tool:{tool_name}", result, ttl=300)
async def on_error(self, tool_name: str, error: Exception):
"""Called on tool failure"""
logger.error(f"Tool {tool_name} failed: {error}")
# Increment metrics
metrics.increment(f"tool.errors.{tool_name}")
# Register middleware
tool_registry.add_middleware(ToolMiddleware())
3. Tool Versioning & Compatibility
@tool(
name="search_products",
version="2.0.0",
deprecated_versions=["1.0.0"],
migration_guide="Use 'query' instead of 'search_term'"
)
async def search_products_v2(
query: str,
category: Optional[str] = None,
limit: int = 10
) -> List[dict]:
"""
v2.0.0: Enhanced search with better relevance
v1.0.0: Deprecated - use search_products_v2 instead
"""
pass
# Backward compatibility wrapper
@tool(name="search_products", version="1.0.0")
async def search_products_v1(search_term: str):
"""Legacy version - redirects to v2"""
result = await search_products_v2(query=search_term)
logger.warning("Using deprecated tool v1.0.0")
return result
1.4 State Persistence & Memory Providers
ADK Memory Systems
Memory Architecture
┌─────────────────────────────────────────────────────────────┐
│ MEMORY HIERARCHY │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────────────┐ │
│ │ Working Memory │ Current conversation context │
│ │ (Session Cache) │ Fast, ephemeral (Redis) │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ Conversation Store │ Full history, user profiles │
│ │ (Document Store) │ Durable, queryable (Firestore) │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ Semantic Memory │ Vector embeddings, knowledge │
│ │ (Vector Store) │ Similarity search (AlloyDB AI) │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Memory Provider Comparison
| Provider | Best For | Persistence | Latency | Scalability | Cost |
|---|---|---|---|---|---|
| InMemory | Development, testing | ❌ Ephemeral | < 1ms | Single instance | Free |
| Redis | Session cache, real-time | ⚠️ Configurable TTL | 1-5ms | High (cluster) | $$ |
| Firestore | Production serverless | ✅ Persistent | 50-200ms | Auto-scaling | $ |
| AlloyDB | Structured memory, analytics | ✅ Persistent | 10-50ms | Very high | $$$ |
| BigQuery | Analytics, historical analysis | ✅ Persistent | 1-5 seconds | Massive | $$ |
Step-by-Step Implementation
1. Configure Firestore Memory Provider
from google.cloud import firestore
from google.adk.memory import FirestoreMemoryProvider, MemoryConfig
# Initialize Firestore client
db = firestore.AsyncClient(project="my-project")
# Configure memory provider
memory_provider = FirestoreMemoryProvider(
client=db,
collection_name="agent_memory",
session_collection="sessions",
history_collection="conversations",
config=MemoryConfig(
ttl_seconds=86400, # 24 hours
max_history_turns=50,
compression=True,
encryption_key=os.getenv("ENCRYPTION_KEY")
)
)
# Define memory schema
class SessionMemory(BaseModel):
session_id: str
user_id: str
created_at: datetime
updated_at: datetime
context: Dict[str, Any]
history: List[ConversationTurn]
metadata: Dict[str, Any]
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
# Memory operations
async def save_session_memory(session_id: str, memory: SessionMemory):
"""Save session state to Firestore"""
await memory_provider.save(
key=f"session:{session_id}",
value=memory.dict(),
metadata={
"user_id": memory.user_id,
"turn_count": len(memory.history)
}
)
async def load_session_memory(session_id: str) -> Optional[SessionMemory]:
"""Load session state from Firestore"""
data = await memory_provider.get(f"session:{session_id}")
if data:
return SessionMemory(**data)
return None
2. Redis for High-Performance Caching
import redis.asyncio as redis
from google.adk.memory import RedisMemoryProvider
# Redis configuration
redis_client = await redis.from_url(
"redis://localhost:6379",
encoding="utf-8",
decode_responses=True,
max_connections=50
)
# Create Redis memory provider
redis_memory = RedisMemoryProvider(
client=redis_client,
prefix="adk:",
default_ttl=3600, # 1 hour
serializer="json",
compression=True
)
# Cache strategies
class CacheStrategy:
@staticmethod
async def cache_tool_result(tool_name: str, args: dict, result: any):
"""Cache expensive tool results"""
cache_key = f"tool:{tool_name}:{hash(frozenset(args.items()))}"
await redis_memory.set(
cache_key,
result,
ttl=300, # 5 minutes
tags=["tool_result", tool_name]
)
@staticmethod
async def cache_embedding(text: str, embedding: List[float]):
"""Cache text embeddings"""
cache_key = f"embedding:{hash(text)}"
await redis_memory.set(cache_key, embedding, ttl=86400) # 24 hours
@staticmethod
async def cache_session_context(session_id: str, context: dict):
"""Cache active session context"""
await redis_memory.set(
f"session:{session_id}",
context,
ttl=1800 # 30 minutes
)
3. AlloyDB for Vector Memory
from google.adk.memory import AlloyDBMemoryProvider
from pgvector.asyncpg import register_vector
# Initialize AlloyDB connection
alloydb_memory = AlloyDBMemoryProvider(
connection_string=os.getenv("ALLOYDB_CONNECTION_STRING"),
vector_dimension=768, # Embedding dimension
similarity_function="cosine",
index_type="ivfflat" # or "hnsw" for better performance
)
# Create vector memory table
await alloydb_memory.create_table("""
CREATE TABLE IF NOT EXISTS vector_memory (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(768),
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW()
)
""")
# Store with vector embedding
async def store_with_embedding(content: str, embedding: List[float], metadata: dict):
"""Store content with its vector embedding"""
await alloydb_memory.execute(
"INSERT INTO vector_memory (content, embedding, metadata) VALUES ($1, $2, $3)",
content, embedding, metadata
)
# Semantic search
async def semantic_search(query_embedding: List[float], limit: int = 5):
"""Find similar content using vector similarity"""
results = await alloydb_memory.execute(
"""
SELECT content, metadata,
1 - (embedding <=> $1) as similarity
FROM vector_memory
ORDER BY similarity DESC
LIMIT $2
""",
query_embedding, limit
)
return results
1.5 Multi-Agent Coordination
Understanding Multi-Agent Coordination
Multi-agent coordination enables multiple AI agents to work together, sharing context and delegating tasks to solve complex problems that single agents cannot handle efficiently.
🤝 Coordination Patterns
- Orchestrator-Worker: Central coordinator delegates to specialized agents
- Peer-to-Peer: Agents communicate directly with each other
- Hierarchical: Multi-level agent organization
- Blackboard: Shared memory space for agent communication
🎯 When to Use Multi-Agent
- Complex workflows: Multiple specialized skills required
- Scalability: Distribute load across agents
- Resilience: Failover and redundancy
- Specialization: Each agent focuses on specific domain
Multi-Agent Coordination Architecture
┌─────────────────────────────────────────────────────────────────┐
│ MULTI-AGENT COORDINATION │
│ │
│ ┌─────────────────┐ │
│ │ Orchestrator │ │
│ │ Agent │ │
│ └────────┬────────┘ │
│ │ │
│ ┌──────────────────────┼──────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Search │ │ Data │ │ Analysis │ │
│ │ Agent │◄────────┤ Agent │◄────────┤ Agent │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │ │ │ │
│ └──────────────────────┼──────────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Shared Memory │ │
│ │ (Blackboard) │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Multi-Agent Implementation
Creating Specialized Agents
from google.adk import Agent, Orchestrator
from google.adk.tools import ToolRegistry
# Create specialized agents
search_agent = Agent(
name="search_agent",
description="Handles web searches and information retrieval",
system_prompt="You are a search specialist. Find accurate information from the web.",
tools=[web_search_tool, document_search_tool]
)
data_agent = Agent(
name="data_agent",
description="Processes and analyzes data",
system_prompt="You are a data analyst. Process and analyze structured data.",
tools=[database_tool, calculation_tool, visualization_tool]
)
analysis_agent = Agent(
name="analysis_agent",
description="Provides insights and recommendations",
system_prompt="You are a business analyst. Provide insights and recommendations.",
tools=[reporting_tool, ml_model_tool]
)
Configuring Orchestrator with Routing Rules
from google.adk.orchestration import RoutingConfig, AgentRouter
# Define routing rules based on intent
routing_config = RoutingConfig(
rules=[
{
"intent": "search|find|lookup",
"agent": "search_agent",
"confidence": 0.8
},
{
"intent": "analyze|calculate|compute",
"agent": "data_agent",
"confidence": 0.7
},
{
"intent": "recommend|advise|suggest",
"agent": "analysis_agent",
"confidence": 0.6
}
],
default_agent="orchestrator",
enable_fallback=True
)
# Create router
agent_router = AgentRouter(
agents=[search_agent, data_agent, analysis_agent],
routing_config=routing_config
)
# Configure orchestrator with router
orchestrator = Orchestrator(
router=agent_router,
enable_multi_agent=True,
shared_memory_provider=redis_memory
)
Agent Handoff and Delegation
# Agent can delegate tasks to other agents
@agent.capability
async def delegate_task(task_description: str, target_agent: str):
"""Delegate a subtask to another agent"""
# Create handoff context
handoff_context = {
"original_request": task_description,
"delegating_agent": agent.name,
"session_id": current_session.id,
"required_output": "analysis_results"
}
# Hand off to target agent
response = await orchestrator.handoff(
target_agent=target_agent,
task=task_description,
context=handoff_context
)
# Process response when agent completes
return {
"status": "completed",
"result": response.output,
"delegated_to": target_agent
}
Shared Memory and Context
from google.adk.memory import SharedMemoryProvider
# Configure shared memory for multi-agent coordination
shared_memory = SharedMemoryProvider(
backend="redis",
namespace="multi_agent",
ttl=3600,
synchronization=True
)
# Agents can read/write to shared context
async def update_shared_context(agent_name: str, data: dict):
"""Update shared context from agent"""
await shared_memory.update(
key="shared_context",
value={
"last_updated_by": agent_name,
"timestamp": time.time(),
"data": data
}
)
# Coordination protocol
class CoordinationProtocol:
@staticmethod
async def request_help(agent_name: str, task: str):
"""Request help from other agents"""
await shared_memory.publish(
channel="agent_requests",
message={
"from": agent_name,
"task": task,
"required_capability": task.type
}
)
@staticmethod
async def respond_to_request(request_id: str, response: any):
"""Respond to help request"""
await shared_memory.publish(
channel=f"response_{request_id}",
message=response
)
Multi-Agent Best Practices
✅ Do's
- Define clear agent boundaries and responsibilities
- Implement timeout mechanisms for agent handoffs
- Use shared memory for context preservation
- Log all inter-agent communications for debugging
- Implement circuit breakers for failing agents
❌ Don'ts
- Avoid circular dependencies between agents
- Don't overload orchestrator with too many agents
- Prevent infinite delegation loops
- Avoid sharing large data payloads directly
- Don't ignore agent failure handling
1.6 ADK vs LangChain / Semantic Kernel
Framework Comparison: ADK vs Alternatives
Understanding the key differences between Google ADK, LangChain, and Microsoft's Semantic Kernel helps you choose the right framework for your use case.
| Feature | Google ADK | LangChain | Semantic Kernel |
|---|---|---|---|
| Primary Backer | Open Source (Community) | Microsoft | |
| Architecture | Orchestrator-based with AgentKit | Chain-based with LCEL | Kernel-based with planners |
| Multi-Agent Support | ✅ Native (Orchestrator) | ⚠️ Via LangGraph | ⚠️ Via planners |
| Google Cloud Integration | ✅ Deep (Vertex AI, Firestore, etc.) | ⚠️ Via integrations | ❌ Limited |
| Azure Integration | ❌ None | ⚠️ Via integrations | ✅ Deep (Azure OpenAI) |
| Tool Registry | ✅ Built-in with schema validation | ✅ Via tools module | ✅ Native plugins |
| Memory Providers | Redis, Firestore, AlloyDB, BigQuery | Vector stores, Redis, SQL | Volatile, persistent, vector |
| Learning Curve | Moderate | Steep | Moderate |
| Enterprise Features | ✅ Built-in (tracing, monitoring) | ⚠️ Requires additional tools | ✅ Built-in telemetry |
| Language Support | Python | Python, JavaScript | Python, C#, Java |
Detailed Framework Analysis
- Google Cloud Ecosystem: If you're already using GCP services
- Enterprise Production: Built-in observability, security, and scaling
- Multi-Agent Systems: Native orchestrator for complex agent coordination
- Gemini Models: Deep integration with Google's LLMs
- Serverless Deployment: Cloud Run, Firebase integration
- Maximum Flexibility: Largest ecosystem of integrations
- Multi-Cloud: Works with any LLM provider
- Community Support: Extensive documentation and examples
- RAG Applications: Advanced retrieval patterns
- JavaScript/TypeScript: Full-stack JavaScript applications
- Microsoft Stack: Azure OpenAI, .NET applications
- Enterprise Integration: Microsoft 365, Dynamics
- Multi-Language: C#, Python, Java support
- Planner-Based: Automatic task decomposition
- Plugin Architecture: Native Microsoft Graph integration
Migration Guide: LangChain to ADK
LangChain to ADK Concept Mapping
| LangChain Concept | ADK Equivalent | Migration Notes |
|---|---|---|
| Chain | Agent with tools | ADK agents are more declarative |
| Runnable | Tool or Capability | Use @tool decorator |
| Memory | MemoryProvider | Pluggable backend (Redis/Firestore) |
| LCEL | Orchestrator workflows | Declarative YAML or Python config |
| AgentExecutor | Agent Runtime | Built-in event loop |
| Tool | @tool decorator | Schema validation built-in |
Example: Migrating a Simple Chain
# LangChain Version
from langchain import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
prompt = PromptTemplate(
input_variables=["question"],
template="Answer this question: {question}"
)
chain = LLMChain(llm=OpenAI(), prompt=prompt)
result = chain.run("What is machine learning?")
# ADK Version
from google.adk import Agent
agent = Agent(
name="qa_agent",
system_prompt="Answer questions accurately and concisely.",
model="gemini-2.0-flash"
)
result = await agent.process("What is machine learning?")
1.7 ADK Configuration & Initialisation
ADK Configuration System
ADK provides a flexible, hierarchical configuration system that supports multiple formats and sources, making it easy to configure agents for different environments.
📝 Configuration Sources
- Environment variables
- YAML/JSON files
- Python dictionaries
- Secret managers
- Remote config servers
⚙️ Configuration Types
- Agent configuration
- Model configuration
- Memory configuration
- Tool configuration
- Orchestrator settings
🔄 Configuration Hierarchy
- Default values
- Environment overrides
- File-based configs
- Runtime overrides
- Secret injection
Configuration Methods
1. YAML Configuration
# config.yaml
project:
name: customer-support-agent
environment: production
agent:
name: support_bot
description: "Customer support agent for e-commerce"
system_prompt: "You are a helpful support agent..."
model:
provider: vertex
name: gemini-2.0-flash
temperature: 0.3
max_tokens: 2048
safety_settings:
harassment: BLOCK_MEDIUM_AND_ABOVE
memory:
provider: firestore
config:
collection: agent_sessions
ttl: 3600
max_history: 50
tools:
- name: search_knowledge_base
enabled: true
timeout: 30
- name: create_ticket
enabled: true
required_role: agent
orchestrator:
max_concurrent: 1000
default_timeout: 30
tracing:
enabled: true
sample_rate: 0.1
monitoring:
metrics_port: 9090
health_check_path: /health
2. Loading Configuration
from google.adk.config import ConfigLoader, Config
from google.adk import Agent, Orchestrator
import os
# Load from YAML file
config_loader = ConfigLoader()
config = config_loader.from_yaml("config.yaml")
# Override with environment variables
config = config.merge({
"agent.model.temperature": float(os.getenv("MODEL_TEMP", 0.3)),
"orchestrator.max_concurrent": int(os.getenv("MAX_CONCURRENT", 1000))
})
# Create agent from config
agent = Agent.from_config(config["agent"])
# Create orchestrator
orchestrator = Orchestrator.from_config(config["orchestrator"])
3. Environment-Based Configuration
# .env file
ADK_ENVIRONMENT=production
ADK_PROJECT_ID=my-project-123
ADK_DEFAULT_MODEL=gemini-2.0-flash
ADK_MEMORY_PROVIDER=firestore
ADK_REDIS_URL=redis://redis:6379
ADK_ENABLE_TRACING=true
ADK_SAMPLE_RATE=0.1
ADK_LOG_LEVEL=INFO
# Python configuration with environment variables
from google.adk.config import EnvConfig
class AppConfig(EnvConfig):
"""Application configuration from environment"""
environment: str = "development"
project_id: str = None
# Agent settings
default_model: str = "gemini-2.0-flash"
temperature: float = 0.3
# Memory settings
memory_provider: str = "inmemory"
redis_url: Optional[str] = None
# Observability
enable_tracing: bool = False
sample_rate: float = 0.0
class Config:
env_prefix = "ADK_"
# Load configuration
config = AppConfig()
# Use configuration
agent = Agent(
name="support_bot",
model=config.default_model,
temperature=config.temperature
)
Initialization Patterns
Basic Initialization
from google.adk import ADK, Agent, Orchestrator
from google.adk.memory import RedisMemoryProvider
from google.adk.tracing import CloudTrace
# Initialize ADK
adk = ADK(project="my-project", environment="production")
# Configure memory
memory = RedisMemoryProvider.from_url("redis://localhost:6379")
# Create agent
agent = Agent(
name="assistant",
system_prompt="You are a helpful assistant.",
memory_provider=memory
)
# Initialize orchestrator with agent
orchestrator = Orchestrator(
agents=[agent],
default_timeout=30,
enable_tracing=True
)
# Start the application
await adk.start(orchestrator)
Factory Pattern
from google.adk import AgentFactory, ToolFactory
class SupportAgentFactory(AgentFactory):
"""Factory for creating support agents"""
def create_agent(self, config: dict) -> Agent:
"""Create configured support agent"""
# Create tools
tools = ToolFactory.create_many([
{"name": "search_kb", "config": config.get("kb_config", {})},
{"name": "create_ticket", "config": config.get("ticket_config", {})},
{"name": "get_customer_info", "config": config.get("customer_config", {})}
])
# Configure memory
memory = self.create_memory(config.get("memory", {}))
# Create agent
return Agent(
name=config.get("name", "support_agent"),
system_prompt=config.get("system_prompt", DEFAULT_PROMPT),
tools=tools,
memory_provider=memory,
model_config=config.get("model", {})
)
def create_memory(self, config: dict):
"""Create memory provider based on config"""
provider_type = config.get("type", "inmemory")
if provider_type == "redis":
return RedisMemoryProvider(**config.get("params", {}))
elif provider_type == "firestore":
return FirestoreMemoryProvider(**config.get("params", {}))
else:
return InMemoryProvider()
# Usage
factory = SupportAgentFactory()
agent = factory.create_agent({
"name": "premium_support",
"system_prompt": "You are a premium support agent...",
"memory": {"type": "redis", "params": {"url": "redis://localhost"}},
"model": {"temperature": 0.2}
})
Dependency Injection
from google.adk import inject, Container
# Define dependencies
class AgentDependencies:
def __init__(self):
self.memory = RedisMemoryProvider()
self.tracing = CloudTrace()
self.metrics = MetricsCollector()
self.logger = StructuredLogger()
# Configure container
container = Container()
container.register(AgentDependencies, scope="singleton")
@inject
async def create_support_agent(deps: AgentDependencies = Provide[AgentDependencies]):
"""Create agent with injected dependencies"""
return Agent(
name="support_agent",
memory_provider=deps.memory,
tracing=deps.tracing,
logger=deps.logger
)
# Use with DI
agent = await create_support_agent()
Configuration Best Practices
🔐 Secrets Management
- Never hardcode credentials
- Use Google Secret Manager
- Environment variables for local
- Rotate secrets regularly
📦 Environment Separation
- dev.yaml for development
- staging.yaml for testing
- prod.yaml for production
- Use environment overrides
🔄 Version Control
- Version your configurations
- Use semantic versioning
- Document breaking changes
- Maintain changelog
Example: Multi-Environment Configuration
# Base config (config/base.yaml)
agent:
model: gemini-2.0-flash
temperature: 0.3
memory:
provider: firestore
ttl: 3600
# Development override (config/dev.yaml)
agent:
temperature: 0.5 # Higher temperature for creativity
memory:
provider: inmemory # No persistence in dev
tracing:
enabled: false
# Production override (config/prod.yaml)
agent:
temperature: 0.2 # More deterministic
memory:
provider: redis
ttl: 7200 # Longer session
tracing:
enabled: true
sample_rate: 0.1
# Load environment-specific config
import os
env = os.getenv("ADK_ENV", "dev")
config = ConfigLoader().load([
"config/base.yaml",
f"config/{env}.yaml"
])
Complete Installation Guide
System Requirements
- Python 3.9 - 3.11
- 8GB RAM minimum (16GB recommended)
- 10GB free disk space
- Linux/macOS/Windows (WSL2 recommended for Windows)
Step 1: Environment Setup
# Create virtual environment
python -m venv adk-env
source adk-env/bin/activate # Linux/macOS
# or
adk-env\Scripts\activate # Windows
# Upgrade pip
python -m pip install --upgrade pip
# Install ADK core
pip install google-adk
# Install optional dependencies
pip install google-adk[all] # All features
# Or select specific ones:
pip install google-adk[vertex] # Vertex AI integration
pip install google-adk[firestore] # Firestore memory
pip install google-adk[redis] # Redis support
pip install google-adk[alloydb] # AlloyDB support
pip install google-adk[tracing] # OpenTelemetry tracing
Step 2: Google Cloud Setup
# Install Google Cloud CLI
# https://cloud.google.com/sdk/docs/install
# Initialize and authenticate
gcloud init
gcloud auth application-default login
# Enable required APIs
gcloud services enable \
aiplatform.googleapis.com \
firestore.googleapis.com \
redis.googleapis.com \
alloydb.googleapis.com \
cloudtrace.googleapis.com \
logging.googleapis.com
# Create service account
gcloud iam service-accounts create adk-agent \
--display-name="ADK Agent Service Account"
# Download credentials
gcloud iam service-accounts keys create credentials.json \
--iam-account=adk-agent@PROJECT_ID.iam.gserviceaccount.com
# Set environment variable
export GOOGLE_APPLICATION_CREDENTIALS=credentials.json
Step 3: Verify Installation
# Create test script: test_adk.py
from google.adk import __version__
from google.adk import Agent, Orchestrator
import asyncio
async def test_adk():
print(f"ADK Version: {__version__}")
# Create simple agent
agent = Agent(
name="test_bot",
system_prompt="You are a helpful assistant."
)
orchestrator = Orchestrator(agents=[agent])
response = await orchestrator.process_turn(
session_id="test-123",
user_message="Hello, are you working?"
)
print(f"Response: {response.text}")
print(f"✅ ADK is working!")
if __name__ == "__main__":
asyncio.run(test_adk())
# Run test
python test_adk.py
Step 4: Docker Setup (Optional)
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run
CMD ["python", "main.py"]
# docker-compose.yml
version: '3.8'
services:
adk-agent:
build: .
ports:
- "8080:8080"
environment:
- GOOGLE_APPLICATION_CREDENTIALS=/app/credentials.json
- REDIS_URL=redis://redis:6379
volumes:
- ./credentials.json:/app/credentials.json
depends_on:
- redis
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
volumes:
redis-data:
🎓 Module 01 : Google ADK Architecture & Agent Runtime Successfully Completed
You have successfully completed this module of Google ADK (Agent Development Kit).
Keep building your expertise step by step — Learn Next Module →
Module 02: Agent Types & Persona Design
Learning Objectives
- Understand different agent types and their use cases
- Master conversational vs task-oriented agent design
- Implement RAG agents with knowledge bases
- Design multi-modal agent interactions
- Create dynamic personas with prompt layering
- Implement system prompt engineering techniques
Prerequisites
Before starting this module, ensure you have:
- Completed Module 01 (ADK Architecture fundamentals)
- Understanding of prompt engineering basics
- Familiarity with different LLM capabilities
- Basic knowledge of user experience design
2.1 Conversational Agents
Understanding Conversational Agents
Conversational agents are AI systems designed to engage in natural, human-like dialogue. They maintain context, understand nuance, and create engaging interactions that feel natural to users.
💬 Key Characteristics
- Natural Language Understanding: Interpret user intent and context
- Context Maintenance: Remember conversation history
- Turn-Taking: Manage dialogue flow naturally
- Persona Consistency: Maintain consistent character
- Emotional Intelligence: Detect and respond to sentiment
🎯 Common Use Cases
- Customer Support: Handle inquiries and complaints
- Virtual Assistants: Schedule tasks and answer questions
- Companion Bots: Provide emotional support
- Educational Tutors: Teach through dialogue
- Entertainment: Games and interactive stories
Conversational Agent Architecture
┌─────────────────────────────────────────────────────────────────┐
│ CONVERSATIONAL AGENT │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ User │───▶│ NLU Layer │───▶│ Dialogue │ │
│ │ Input │ │ (Intent/Parsing)│ │ Management │ │
│ └──────────────┘ └──────────────┘ └───────┬──────┘ │
│ │ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────▼──────┐ │
│ │ Response │◀───│ NLG Layer │◀───│ Context │ │
│ │ Generation │ │ (Text/Speech)│ │ Manager │ │
│ └──────────────┘ └──────────────┘ └───────┬──────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ Conversation │ │
│ │ History Store │ │
│ └───────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Building a Conversational Agent
Basic Conversational Agent
from google.adk import Agent
from google.adk.memory import ConversationBuffer
from google.adk.nlu import IntentClassifier
class ConversationalAgent:
def __init__(self, name: str, personality: str):
self.agent = Agent(
name=name,
system_prompt=f"""You are {name}, a conversational AI with this personality: {personality}
Guidelines:
- Be natural and engaging in conversation
- Show empathy when users share feelings
- Ask follow-up questions to keep dialogue flowing
- Remember details from earlier in the conversation
- Adapt your tone to match the user's emotional state
"""
)
# Add conversation memory
self.memory = ConversationBuffer(
max_turns=50,
summary_threshold=20
)
# Add intent classification
self.intent_classifier = IntentClassifier(
intents=["greeting", "question", "complaint", "farewell", "small_talk"],
confidence_threshold=0.7
)
async def process_message(self, user_message: str, session_id: str):
# Classify intent
intent = await self.intent_classifier.classify(user_message)
# Load conversation history
history = await self.memory.get_history(session_id)
# Generate response with context
response = await self.agent.process(
user_message=user_message,
context={
"history": history,
"intent": intent,
"session_id": session_id
}
)
# Store in memory
await self.memory.add_turn(
session_id=session_id,
user_message=user_message,
agent_response=response.text
)
return response
# Create a friendly assistant
assistant = ConversationalAgent(
name="FriendlyHelper",
personality="warm, empathetic, and enthusiastic. You love helping people and making them smile."
)
# Example conversation
response = await assistant.process_message(
"Hi! I'm feeling a bit stressed about work today.",
"session_123"
)
print(response.text)
Advanced Features: Emotion Detection
from google.adk.sentiment import EmotionDetector
from google.adk.response import EmotionalResponse
class EmotionallyAwareAgent(ConversationalAgent):
def __init__(self, name: str, personality: str):
super().__init__(name, personality)
self.emotion_detector = EmotionDetector(
emotions=["joy", "sadness", "anger", "fear", "surprise", "neutral"],
model="emotion-bert-base"
)
# Response templates for different emotions
self.emotion_responses = {
"joy": "I'm so happy to hear that! 😊",
"sadness": "I'm sorry you're feeling this way. I'm here to listen.",
"anger": "I understand you're frustrated. Let's work through this together.",
"fear": "That sounds concerning. How can I help address your worries?",
"surprise": "Wow, that's unexpected! Tell me more.",
"neutral": "I understand. How can I help you today?"
}
async def process_with_emotion(self, user_message: str, session_id: str):
# Detect emotion
emotion = await self.emotion_detector.detect(user_message)
# Get base response
response = await self.process_message(user_message, session_id)
# Add emotional acknowledgment
if emotion.emotion in self.emotion_responses:
response.text = f"{self.emotion_responses[emotion.emotion]} {response.text}"
# Adjust response parameters based on emotion
if emotion.emotion in ["sadness", "fear"]:
response.temperature = 0.7 # More empathetic
response.max_tokens = 150 # Longer responses
elif emotion.emotion == "anger":
response.temperature = 0.5 # More measured
response.calm_tone = True # Special flag for calmer responses
return response
# Usage
emotion_agent = EmotionallyAwareAgent(
name="EmpathyBot",
personality="deeply empathetic and supportive"
)
response = await emotion_agent.process_with_emotion(
"I just lost my job and I'm really worried about the future.",
"session_456"
)
print(response.text)
Conversational Agent Best Practices
🗣️ Natural Dialogue
- Use conversational fillers naturally
- Vary response patterns
- Acknowledge user input before answering
- Use appropriate humor when suitable
🧠 Context Management
- Remember user preferences
- Reference past conversations
- Handle topic changes gracefully
- Summarize long conversations
❤️ Emotional Intelligence
- Detect emotional cues
- Match user's emotional tone
- Know when to escalate to humans
- Maintain appropriate boundaries
Conversational Agent Types Comparison
| Type | Primary Focus | Memory Requirements | Typical Use Cases | Complexity |
|---|---|---|---|---|
| Chit-Chat Bot | Social conversation | Short-term | Entertainment, companionship | Low |
| Task-Oriented | Goal completion | Session-based | Booking, ordering | Medium |
| Knowledge Bot | Information retrieval | Long-term + KB | FAQ, research | High |
| Therapeutic Bot | Emotional support | Long-term history | Mental health, coaching | Very High |
2.2 Task-Oriented Agents
Understanding Task-Oriented Agents
Task-oriented agents are designed to accomplish specific goals efficiently. They focus on completing tasks accurately, with minimal conversational overhead, making them ideal for transactional interactions.
⚙️ Key Characteristics
- Goal-Driven: Focused on task completion
- Efficient: Minimal chit-chat, straight to business
- Structured: Follow predefined workflows
- Verification: Confirm actions before executing
- Error Recovery: Handle failures gracefully
🎯 Common Use Cases
- Booking Systems: Flights, hotels, appointments
- E-commerce: Order placement, tracking, returns
- Banking: Transfers, bill payments, balance checks
- IT Support: Password resets, software installation
- HR Automation: Leave requests, expense reporting
Task-Oriented Agent Architecture
┌─────────────────────────────────────────────────────────────────┐
│ TASK-ORIENTED AGENT │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ User │───▶│ Intent │───▶│ Task │ │
│ │ Request │ │ Recognition │ │ Parser │ │
│ └──────────────┘ └──────────────┘ └───────┬──────┘ │
│ │ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────▼──────┐ │
│ │ Action │◀───│ Verification│◀───│ Parameter │ │
│ │ Execution │ │ Layer │ │ Collection │ │
│ └───────┬──────┘ └──────────────┘ └──────────────┘ │
│ │ │
│ ┌───────▼──────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ External │───▶│ Result │───▶│ Confirmation│ │
│ │ APIs/Tools │ │ Processing │ │ to User │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Building a Task-Oriented Agent
Flight Booking Agent Example
from google.adk import Agent
from google.adk.tools import ToolRegistry
from google.adk.dialogue import TaskDialogueManager
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
# Define task models
class FlightSearchParams(BaseModel):
origin: str = Field(description="Departure city or airport code")
destination: str = Field(description="Arrival city or airport code")
departure_date: str = Field(description="Date of departure (YYYY-MM-DD)")
return_date: Optional[str] = Field(None, description="Return date for round trips")
passengers: int = Field(1, description="Number of passengers")
cabin_class: str = Field("economy", description="economy, premium, business, first")
class BookingParams(BaseModel):
flight_id: str = Field(description="Selected flight identifier")
passenger_names: List[str] = Field(description="Full names of all passengers")
payment_method: str = Field(description="Credit card or other payment method")
special_requests: Optional[str] = Field(None, description="Meal preferences, assistance, etc.")
class TaskOrientedFlightAgent:
def __init__(self):
# Initialize tools
self.tools = ToolRegistry()
self.tools.register(self.search_flights)
self.tools.register(self.check_availability)
self.tools.register(self.book_flight)
self.tools.register(self.process_payment)
# Task-specific system prompt
self.agent = Agent(
name="FlightBookingBot",
system_prompt="""You are a flight booking assistant. Your goal is to help users book flights efficiently.
Guidelines:
- Be concise and focus on gathering required information
- Ask for one piece of information at a time
- Confirm all details before booking
- Handle errors gracefully and provide alternatives
- Never book without explicit user confirmation
- Keep track of the booking state (searching → selecting → confirming → booking)
""",
tools=self.tools
)
# Task state management
self.dialogue_manager = TaskDialogueManager(
required_fields={
"flight_search": ["origin", "destination", "departure_date"],
"booking": ["flight_id", "passenger_names", "payment_method"]
},
confirmation_required=True
)
@tool(
name="search_flights",
description="Search for available flights based on criteria"
)
async def search_flights(self, params: FlightSearchParams) -> dict:
"""Search for flights using external API"""
# Call airline API (simplified)
flights = await self.airline_api.search(params.dict())
return {
"status": "success",
"flights": flights,
"count": len(flights),
"search_params": params.dict()
}
@tool(
name="check_availability",
description="Check if a specific flight is still available"
)
async def check_availability(self, flight_id: str) -> dict:
"""Verify flight availability"""
available = await self.airline_api.check_seats(flight_id)
return {
"flight_id": flight_id,
"available": available,
"seats_remaining": available.seats if available else 0
}
@tool(
name="book_flight",
description="Book a flight with passenger details"
)
async def book_flight(self, params: BookingParams) -> dict:
"""Complete the flight booking"""
# Verify availability again
available = await self.check_availability(params.flight_id)
if not available["available"]:
return {
"status": "error",
"message": "Flight no longer available",
"suggestions": await self.find_alternatives(params.flight_id)
}
# Create booking
booking = await self.airline_api.create_booking(
flight_id=params.flight_id,
passengers=params.passenger_names,
special_requests=params.special_requests
)
return {
"status": "success",
"booking_reference": booking.reference,
"total_price": booking.price,
"confirmation_sent": booking.confirmation_email
}
@tool(
name="process_payment",
description="Process payment for the booking"
)
async def process_payment(self, booking_ref: str, payment_details: dict) -> dict:
"""Handle payment processing"""
# Validate payment (in production, use PCI-compliant service)
result = await self.payment_gateway.charge(
amount=booking_ref.amount,
payment_method=payment_details
)
return {
"status": "success" if result.success else "failed",
"transaction_id": result.transaction_id,
"receipt_url": result.receipt_url
}
async def handle_booking_session(self, user_message: str, session_id: str):
"""Main session handler with state management"""
# Get current task state
state = await self.dialogue_manager.get_state(session_id)
# Process based on state
if state.current_task == "initial":
# Start new booking
response = await self.agent.process(
user_message,
context={
"task": "flight_search",
"collected_params": {}
}
)
# Extract parameters from response
params = self.extract_booking_params(response.text)
await self.dialogue_manager.update_state(
session_id,
"searching",
params
)
elif state.current_task == "searching":
# Handle flight selection
if "select" in user_message.lower():
flight_id = self.extract_flight_id(user_message)
response = await self.agent.process(
f"User selected flight {flight_id}. Now ask for passenger details.",
context={"task": "passenger_info"}
)
else:
# Refine search
response = await self.agent.process(
user_message,
context={"task": "refine_search"}
)
elif state.current_task == "passenger_info":
# Collect passenger details
response = await self.agent.process(
user_message,
context={"task": "collect_passenger_info"}
)
if self.all_passenger_info_collected(response):
await self.dialogue_manager.update_state(
session_id,
"confirming",
self.extract_passenger_info(response)
)
elif state.current_task == "confirming":
# Confirm booking
if "confirm" in user_message.lower():
booking = await self.book_flight(state.collected_params)
response = f"Booking confirmed! Your reference is {booking['booking_reference']}"
await self.dialogue_manager.complete_task(session_id)
elif "change" in user_message.lower():
response = "Let's modify your booking. What would you like to change?"
await self.dialogue_manager.update_state(session_id, "searching", {})
else:
response = "Please confirm or modify your booking details."
return response
# Usage
booking_agent = TaskOrientedFlightAgent()
# Example session
response = await booking_agent.handle_booking_session(
"I need to book a flight from New York to London next Friday",
"booking_123"
)
print(response)
State Machine for Task Management
from enum import Enum
from typing import Dict, Any
from datetime import datetime
class TaskState(Enum):
INITIAL = "initial"
GATHERING_INFO = "gathering_info"
VERIFYING = "verifying"
EXECUTING = "executing"
CONFIRMING = "confirming"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TaskStateMachine:
def __init__(self, task_name: str):
self.task_name = task_name
self.current_state = TaskState.INITIAL
self.context: Dict[str, Any] = {}
self.history: List[Dict] = []
self.start_time = datetime.now()
self.end_time = None
# Define valid transitions
self.transitions = {
TaskState.INITIAL: [TaskState.GATHERING_INFO, TaskState.CANCELLED],
TaskState.GATHERING_INFO: [TaskState.VERIFYING, TaskState.FAILED, TaskState.CANCELLED],
TaskState.VERIFYING: [TaskState.EXECUTING, TaskState.GATHERING_INFO, TaskState.FAILED],
TaskState.EXECUTING: [TaskState.CONFIRMING, TaskState.FAILED],
TaskState.CONFIRMING: [TaskState.COMPLETED, TaskState.GATHERING_INFO, TaskState.FAILED],
TaskState.COMPLETED: [],
TaskState.FAILED: [TaskState.GATHERING_INFO],
TaskState.CANCELLED: []
}
async def transition(self, new_state: TaskState, context_update: Dict = None):
"""Attempt to transition to a new state"""
if new_state in self.transitions[self.current_state]:
# Record history
self.history.append({
"from": self.current_state,
"to": new_state,
"timestamp": datetime.now(),
"context": self.context.copy()
})
# Update state
self.current_state = new_state
if context_update:
self.context.update(context_update)
if new_state == TaskState.COMPLETED:
self.end_time = datetime.now()
return True
else:
raise InvalidTransitionError(
f"Cannot transition from {self.current_state} to {new_state}"
)
def get_required_info(self) -> List[str]:
"""Get required information for current state"""
requirements = {
TaskState.GATHERING_INFO: self.context.get("required_fields", []),
TaskState.VERIFYING: self.context.get("verification_needed", []),
TaskState.EXECUTING: self.context.get("execution_params", []),
}
return requirements.get(self.current_state, [])
def is_complete(self) -> bool:
"""Check if task is complete"""
return self.current_state in [TaskState.COMPLETED, TaskState.FAILED, TaskState.CANCELLED]
def get_duration(self) -> float:
"""Get task duration in seconds"""
end = self.end_time or datetime.now()
return (end - self.start_time).total_seconds()
# Example usage in task agent
class TaskManager:
def __init__(self):
self.tasks: Dict[str, TaskStateMachine] = {}
async def create_task(self, session_id: str, task_name: str, required_fields: List[str]):
"""Create a new task state machine"""
task = TaskStateMachine(task_name)
task.context["required_fields"] = required_fields
self.tasks[session_id] = task
return task
async def process_step(self, session_id: str, user_input: str, extracted_info: Dict):
"""Process a step in the task workflow"""
task = self.tasks.get(session_id)
if not task:
return {"error": "No active task"}
# Update context with new info
task.context.update(extracted_info)
# Check if we have all required info
required = task.get_required_info()
missing = [field for field in required if field not in task.context]
if not missing and task.current_state == TaskState.GATHERING_INFO:
await task.transition(TaskState.VERIFYING)
# Handle different states
if task.current_state == TaskState.VERIFYING:
# Present info for verification
return {
"state": "verifying",
"message": "Please verify this information:",
"info": {k: task.context[k] for k in required},
"actions": ["confirm", "edit", "cancel"]
}
elif task.current_state == TaskState.EXECUTING:
# Execute the task
result = await self.execute_task(task.task_name, task.context)
await task.transition(TaskState.CONFIRMING, {"result": result})
return result
elif task.current_state == TaskState.CONFIRMING:
# Confirm completion
return {
"state": "completed",
"message": "Task completed successfully",
"result": task.context.get("result"),
"duration": task.get_duration()
}
# Still gathering info
return {
"state": "gathering",
"missing_fields": missing,
"collected_so_far": {k: task.context[k] for k in task.context if k in required}
}
Task-Oriented Agent Patterns
📋 Form-Filling
Collect structured information sequentially
- Step-by-step data collection
- Validation per field
- Progress tracking
🔄 Wizard Pattern
Guided workflow with conditional branches
- Dynamic next steps
- Context-aware questions
- Skip logic based on answers
⚡ Command Pattern
Direct execution with parameters
- Single-turn completion
- Rich parameter parsing
- Immediate feedback
🛡️ Verification Pattern
Double-check before executing
- Summary confirmation
- Risk assessment
- Undo capabilities
2.3 Retrieval Augmented Agents
Understanding Retrieval Augmented Agents
Retrieval Augmented Generation (RAG) agents combine the power of LLMs with external knowledge bases. They retrieve relevant information from documents, databases, or vector stores to provide accurate, up-to-date, and verifiable responses.
📚 Key Components
- Vector Database: Store document embeddings
- Retriever: Find relevant context
- Ranker: Score and select best matches
- Context Window: Manage retrieved information
- Citation Engine: Track information sources
🎯 Common Use Cases
- Knowledge Base Q&A: Answer from company docs
- Research Assistants: Academic paper analysis
- Legal Document Review: Contract analysis
- Medical Information: Clinical guidelines
- Technical Support: Product documentation
RAG Agent Architecture
┌─────────────────────────────────────────────────────────────────┐
│ RETRIEVAL AUGMENTED AGENT │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ User │───▶│ Query │───▶│ Embedding │ │
│ │ Query │ │ Processor │ │ Generator │ │
│ └──────────────┘ └──────────────┘ └───────┬──────┘ │
│ │ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────▼──────┐ │
│ │ Vector │◀───│ Retriever │◀───│ Vector │ │
│ │ Database │ │ │ │ Search │ │
│ └───────┬──────┘ └──────────────┘ └──────────────┘ │
│ │ │
│ ┌───────▼──────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Retrieved │───▶│ Context │───▶│ LLM with │ │
│ │ Chunks │ │ Builder │ │ Context │ │
│ └──────────────┘ └──────────────┘ └───────┬──────┘ │
│ │ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────▼──────┐ │
│ │ Response │◀───│ Citation │◀───│ Response │ │
│ │ with Cites │ │ Formatter │ │ Generator │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Building a RAG Agent
Complete RAG Agent Implementation
from google.adk import Agent
from google.adk.rag import (
VectorStore,
Retriever,
EmbeddingGenerator,
ContextBuilder,
CitationEngine
)
from google.adk.memory import CacheProvider
from typing import List, Dict, Any
import numpy as np
from dataclasses import dataclass
@dataclass
class Document:
id: str
content: str
metadata: Dict[str, Any]
embedding: Optional[np.ndarray] = None
class RAGAgent:
def __init__(
self,
name: str,
vector_store: VectorStore,
embedding_model: str = "text-embedding-004",
chunk_size: int = 512,
chunk_overlap: int = 50,
top_k: int = 5,
similarity_threshold: float = 0.7
):
self.name = name
self.vector_store = vector_store
self.top_k = top_k
self.similarity_threshold = similarity_threshold
# Initialize components
self.embedding_generator = EmbeddingGenerator(
model=embedding_model,
dimension=768 # Embedding dimension
)
self.retriever = Retriever(
vector_store=vector_store,
similarity_metric="cosine",
max_results=top_k
)
self.context_builder = ContextBuilder(
max_tokens=4000, # Context window size
strategy="relevance_ranked",
include_metadata=True
)
self.citation_engine = CitationEngine(
format="markdown",
include_page_numbers=True,
include_urls=True
)
# Cache for frequent queries
self.cache = CacheProvider(
backend="redis",
ttl=3600 # 1 hour cache
)
# Main agent
self.agent = Agent(
name=f"{name}_rag_agent",
system_prompt="""You are a knowledgeable assistant that answers questions based on retrieved documents.
Guidelines:
- Only answer based on the provided context
- If the context doesn't contain the answer, say so
- Always cite your sources using the provided citations
- Be precise and factual
- Include relevant quotes when appropriate
- If multiple sources conflict, present different perspectives
"""
)
async def ingest_documents(self, documents: List[Document]):
"""Process and store documents in vector database"""
for doc in documents:
# Split into chunks if needed
chunks = self._chunk_document(doc.content)
for i, chunk in enumerate(chunks):
# Generate embedding
embedding = await self.embedding_generator.embed(chunk)
# Create chunk document
chunk_doc = Document(
id=f"{doc.id}_chunk_{i}",
content=chunk,
metadata={
**doc.metadata,
"chunk_index": i,
"total_chunks": len(chunks),
"source_doc": doc.id
},
embedding=embedding
)
# Store in vector DB
await self.vector_store.add_document(chunk_doc)
def _chunk_document(self, text: str) -> List[str]:
"""Split document into overlapping chunks"""
words = text.split()
chunks = []
for i in range(0, len(words), self.chunk_size - self.chunk_overlap):
chunk_words = words[i:i + self.chunk_size]
chunks.append(" ".join(chunk_words))
return chunks
async def retrieve_context(self, query: str) -> List[Dict]:
"""Retrieve relevant documents for query"""
# Check cache first
cache_key = f"query:{hash(query)}"
cached = await self.cache.get(cache_key)
if cached:
return cached
# Generate query embedding
query_embedding = await self.embedding_generator.embed(query)
# Search vector store
results = await self.retriever.search(
query_embedding=query_embedding,
top_k=self.top_k,
threshold=self.similarity_threshold
)
# Cache results
await self.cache.set(cache_key, results)
return results
async def answer_question(
self,
query: str,
session_id: str,
conversation_history: List[Dict] = None
) -> Dict:
"""Answer a question using RAG"""
# Step 1: Retrieve relevant context
retrieved_docs = await self.retrieve_context(query)
if not retrieved_docs:
return {
"answer": "I couldn't find any relevant information to answer your question.",
"sources": [],
"confidence": 0.0
}
# Step 2: Build context with citations
context, citations = await self.context_builder.build(
retrieved_docs,
query=query,
conversation_history=conversation_history
)
# Step 3: Generate answer with context
response = await self.agent.process(
query,
context={
"retrieved_context": context,
"citations": citations,
"conversation_history": conversation_history
}
)
# Step 4: Add citations to response
formatted_response = await self.citation_engine.format(
response.text,
citations
)
# Step 5: Return comprehensive result
return {
"answer": formatted_response,
"sources": [
{
"document_id": doc["id"],
"title": doc["metadata"].get("title", "Unknown"),
"relevance": doc["score"],
"excerpt": doc["content"][:200] + "...",
"url": doc["metadata"].get("url"),
"page": doc["metadata"].get("page")
}
for doc in retrieved_docs
],
"confidence": np.mean([doc["score"] for doc in retrieved_docs]),
"query": query,
"num_sources": len(retrieved_docs)
}
# Usage Example
async def create_knowledge_agent():
# Initialize vector store (using AlloyDB with pgvector)
vector_store = await VectorStore.create(
provider="alloydb",
connection_string="postgresql://user:pass@localhost:5432/vectors",
table_name="document_embeddings",
dimension=768
)
# Create RAG agent
rag_agent = RAGAgent(
name="KnowledgeBot",
vector_store=vector_store,
embedding_model="text-embedding-004",
top_k=10,
similarity_threshold=0.65
)
# Ingest documents
documents = [
Document(
id="doc1",
content="Artificial intelligence is transforming industries...",
metadata={"title": "AI Overview", "source": "handbook", "page": 1}
),
# More documents...
]
await rag_agent.ingest_documents(documents)
# Answer questions
result = await rag_agent.answer_question(
"What are the main applications of AI in healthcare?",
session_id="user_123"
)
print(f"Answer: {result['answer']}")
print(f"Sources: {len(result['sources'])}")
for source in result['sources']:
print(f" - {source['title']} (relevance: {source['relevance']:.2f})")
return rag_agent
Advanced RAG Techniques
class AdvancedRAGTechniques:
"""Collection of advanced RAG optimization techniques"""
@staticmethod
async def hypothetical_document_embeddings(
query: str,
llm: Agent,
embedder: EmbeddingGenerator
) -> np.ndarray:
"""HyDE: Generate hypothetical document for better retrieval"""
# Ask LLM to generate a hypothetical perfect document
hypo_doc = await llm.generate(
f"Write a paragraph that would perfectly answer: {query}"
)
# Embed the hypothetical document
return await embedder.embed(hypo_doc)
@staticmethod
async def multi_query_retrieval(
query: str,
llm: Agent,
retriever: Retriever,
num_queries: int = 3
) -> List[Dict]:
"""Generate multiple query variations for better coverage"""
variations = await llm.generate(
f"Generate {num_queries} different phrasings of: {query}"
)
all_results = []
for var in variations:
results = await retriever.search(var)
all_results.extend(results)
# Deduplicate and rerank
return AdvancedRAGTechniques.deduplicate_and_rerank(all_results)
@staticmethod
async def rerank_with_cross_encoder(
query: str,
documents: List[Dict],
cross_encoder_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
) -> List[Dict]:
"""Rerank retrieved documents with cross-encoder"""
pairs = [[query, doc["content"]] for doc in documents]
scores = await cross_encoder_model.predict(pairs)
for doc, score in zip(documents, scores):
doc["rerank_score"] = score
return sorted(documents, key=lambda x: x["rerank_score"], reverse=True)
@staticmethod
async def contextual_compression(
query: str,
documents: List[Dict],
llm: Agent
) -> List[Dict]:
"""Extract only relevant parts from documents"""
compressed = []
for doc in documents:
# Ask LLM to extract relevant parts
extracted = await llm.generate(
f"Query: {query}\n\nDocument: {doc['content']}\n\n"
"Extract only the parts relevant to the query, word for word."
)
doc["compressed_content"] = extracted
compressed.append(doc)
return compressed
RAG Evaluation Metrics
| Metric | Description | Target | Measurement Method |
|---|---|---|---|
| Hit Rate | Percentage of queries where relevant documents are retrieved | > 90% | Human evaluation or annotated dataset |
| Mean Reciprocal Rank (MRR) | Rank of first relevant document | > 0.8 | Position of relevant doc in results |
| Normalized Discounted Cumulative Gain (NDCG) | Measures ranking quality with graded relevance | > 0.85 | Relevance scores (0-3) per document |
| Context Precision | How much of retrieved context is actually used | > 0.7 | Token overlap with generated answer |
| Answer Faithfulness | Answer aligns with retrieved context | > 0.9 | Factual consistency checks |
| Citation Accuracy | Citations correctly support the claims | > 0.95 | Claim-citation verification |
2.4 Multi-Modal Agent Patterns
Understanding Multi-Modal Agents
Multi-modal agents can process and generate multiple types of data: text, images, audio, video, and more. They integrate different modalities to provide richer, more natural interactions.
🎨 Supported Modalities
- Text: Natural language input/output
- Image: Visual recognition and generation
- Audio: Speech recognition and synthesis
- Video: Motion analysis and generation
- Structured Data: Tables, graphs, charts
🎯 Common Use Cases
- Visual Q&A: Answer questions about images
- Content Creation: Generate images from text
- Accessibility: Describe scenes for visually impaired
- Multimedia Analysis: Analyze videos with audio
- Interactive Assistants: Voice + visual interfaces
Multi-Modal Agent Architecture
┌─────────────────────────────────────────────────────────────────┐
│ MULTI-MODAL AGENT │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Text │───▶│ Text │ │ Image │ │
│ │ Input │ │ Encoder │ │ Encoder │◀──┐ │
│ └──────────────┘ └───────┬──────┘ └───────┬──────┘ │ │
│ │ │ │ │
│ ┌──────────────┐ ┌───────▼──────┐ ┌───────▼──────┐ │ │
│ │ Audio │───▶│ Audio │───▶│ Fusion │ │ │
│ │ Input │ │ Encoder │ │ Layer │ │ │
│ └──────────────┘ └───────┬──────┘ └───────┬──────┘ │ │
│ │ │ │ │
│ ┌──────────────┐ ┌───────▼──────┐ ┌───────▼──────┐ │ │
│ │ Video │───▶│ Video │───▶│ Joint │ │ │
│ │ Input │ │ Encoder │ │ Embedding │ │ │
│ └──────────────┘ └──────────────┘ └───────┬──────┘ │ │
│ │ │ │
│ ┌─────────────────────┼──────────┘ │
│ │ │ │
│ ┌──────────────┐ ┌───────▼──────┐ ┌───────▼──────┐ │
│ │ Text │◀───│ Decoder │◀───│ Multi-Modal│ │
│ │ Output │ │ Network │ │ LLM │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Image │◀───│ Image │ │ Audio │ │
│ │ Output │ │ Generator │ │ Generator │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Building a Multi-Modal Agent
Image + Text Understanding Agent
from google.adk import Agent
from google.adk.multimodal import (
ImageEncoder,
AudioEncoder,
VideoEncoder,
MultiModalFusion,
ImageGenerator
)
from PIL import Image
import io
import base64
class MultiModalAgent:
def __init__(self, name: str):
self.name = name
# Initialize encoders for different modalities
self.image_encoder = ImageEncoder(
model="vit-large-patch16-224",
embedding_dim=768
)
self.audio_encoder = AudioEncoder(
model="wav2vec2-base",
sample_rate=16000
)
self.video_encoder = VideoEncoder(
frame_model="vit",
temporal_model="timesformer",
fps=5
)
# Fusion layer for combining modalities
self.fusion = MultiModalFusion(
strategy="cross_attention",
hidden_dim=512,
num_heads=8
)
# Image generation capability
self.image_generator = ImageGenerator(
model="imagen",
style_preset="photorealistic"
)
# Main multi-modal agent
self.agent = Agent(
name=f"{name}_multimodal",
system_prompt="""You are a multi-modal AI assistant that can understand and generate:
- Text (natural language)
- Images (analyze, describe, and generate)
- Audio (transcribe, understand, and respond with voice)
- Video (analyze frames and temporal patterns)
Guidelines:
- When given images, describe them in detail
- Answer questions about visual content accurately
- Generate images from text descriptions when requested
- Combine information from multiple modalities
- Be precise about what you see and hear
"""
)
async def process_image(self, image_data: bytes, query: str = None) -> Dict:
"""Process an image with optional text query"""
# Encode image
image_embedding = await self.image_encoder.encode(image_data)
if query:
# Process text query about the image
text_embedding = await self.agent.embed(query)
# Fuse modalities
fused = await self.fusion.fuse(
modalities={
"image": image_embedding,
"text": text_embedding
},
query=query
)
# Generate response
response = await self.agent.process(
query,
context={
"image_embedding": image_embedding,
"fused_features": fused,
"task": "visual_qa"
}
)
else:
# Just describe the image
response = await self.agent.process(
"Describe this image in detail.",
context={
"image_embedding": image_embedding,
"task": "image_captioning"
}
)
return {
"description": response.text,
"image_embedding": image_embedding.tolist()[:10] # Sample
}
async def generate_image(self, prompt: str, style: str = "photorealistic") -> bytes:
"""Generate an image from text description"""
image = await self.image_generator.generate(
prompt=prompt,
style=style,
size=(1024, 1024),
num_images=1
)
return image[0] # Return image bytes
async def process_audio(self, audio_data: bytes, task: str = "transcribe") -> Dict:
"""Process audio input (speech)"""
if task == "transcribe":
transcript = await self.audio_encoder.transcribe(audio_data)
return {"transcript": transcript}
elif task == "analyze":
# Analyze audio features (emotion, speaker, etc.)
features = await self.audio_encoder.analyze(audio_data)
return {
"emotion": features.get("emotion"),
"speaker_id": features.get("speaker_id"),
"confidence": features.get("confidence")
}
elif task == "respond":
# Generate spoken response
transcript = await self.audio_encoder.transcribe(audio_data)
response_text = await self.agent.process(transcript)
audio_response = await self.audio_encoder.synthesize(response_text.text)
return {
"transcript": transcript,
"response_text": response_text.text,
"audio_response": base64.b64encode(audio_response).decode('utf-8')
}
async def process_video(self, video_path: str, query: str = None) -> Dict:
"""Process video with optional query"""
# Extract frames and audio
frames = await self.video_encoder.extract_frames(video_path, fps=2)
audio = await self.video_encoder.extract_audio(video_path)
# Encode frames
frame_embeddings = []
for frame in frames[:10]: # Limit to 10 frames
emb = await self.image_encoder.encode(frame)
frame_embeddings.append(emb)
# Encode audio if present
audio_embedding = None
if audio:
audio_embedding = await self.audio_encoder.encode(audio)
# Temporal analysis
video_features = await self.video_encoder.analyze(
frames=frames,
audio=audio_embedding
)
# Answer query if provided
if query:
response = await self.agent.process(
query,
context={
"video_features": video_features,
"frame_count": len(frames),
"duration": video_features.get("duration", 0)
}
)
return {
"answer": response.text,
"key_moments": video_features.get("key_moments", []),
"scene_changes": video_features.get("scene_changes", [])
}
# Return summary
return {
"summary": video_features.get("summary", ""),
"duration": video_features.get("duration", 0),
"num_frames": len(frames),
"has_audio": audio is not None,
"tags": video_features.get("tags", [])
}
# Usage examples
async def demonstrate_multimodal():
agent = MultiModalAgent("OmniAssistant")
# 1. Image understanding
with open("photo.jpg", "rb") as f:
image_data = f.read()
result = await agent.process_image(image_data, "What's in this image?")
print(f"Image description: {result['description']}")
# 2. Image generation
image = await agent.generate_image("A serene mountain landscape at sunset")
# 3. Audio processing
with open("speech.wav", "rb") as f:
audio_data = f.read()
result = await agent.process_audio(audio_data, "respond")
print(f"User said: {result['transcript']}")
print(f"Response: {result['response_text']}")
# 4. Video analysis
result = await agent.process_video("meeting_recording.mp4", "What were the key discussion points?")
print(f"Video analysis: {result['answer']}")
Multi-Modal Integration Patterns
🔄 Early Fusion
Combine modalities at input level before processing.
- Concatenate embeddings
- Simple implementation
- Good for aligned modalities
⚡ Late Fusion
Combine decisions from separate modality processors.
- Process modalities independently
- Vote or average results
- Robust to missing data
🧠 Cross-Attention
Attention mechanisms between modalities.
- Learn cross-modal relationships
- State-of-the-art performance
- Handles complex interactions
2.5 Agent Personality & Prompt Layering
Understanding Agent Personality
Agent personality defines how an agent communicates, behaves, and interacts with users. Prompt layering is a technique to build complex, nuanced personalities by combining multiple prompt components.
🎭 Personality Dimensions
- Tone: Formal, casual, friendly, professional
- Formality: Level of language sophistication
- Empathy: Emotional responsiveness
- Humor: Use of jokes, wit, playfulness
- Culture: Regional and cultural references
📚 Prompt Layers
- Base Layer: Core capabilities and constraints
- Persona Layer: Character definition
- Tone Layer: Communication style
- Context Layer: Situational awareness
- Instruction Layer: Task-specific guidance
Building Personality with Prompt Layering
Persona Definition Framework
from google.adk import Agent
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class PersonalityTrait(Enum):
FORMALITY = "formality"
EMPATHY = "empathy"
HUMOR = "humor"
ENTHUSIASM = "enthusiasm"
PATIENCE = "patience"
DIRECTNESS = "directness"
CREATIVITY = "creativity"
ANALYTICAL = "analytical"
@dataclass
class Persona:
"""Define a complete agent persona"""
name: str
traits: Dict[PersonalityTrait, float] # 0-1 scale
background: str
communication_style: str
expertise_areas: List[str]
catchphrases: List[str]
restrictions: List[str]
def to_system_prompt(self) -> str:
"""Convert persona to system prompt"""
prompt = f"""You are {self.name}, an AI assistant with the following personality and background:
BACKGROUND:
{self.background}
COMMUNICATION STYLE:
{self.communication_style}
PERSONALITY TRAITS:
"""
for trait, value in self.traits.items():
if value > 0.7:
prompt += f"- Highly {trait.value}\n"
elif value > 0.4:
prompt += f"- Moderately {trait.value}\n"
prompt += f"\nEXPERTISE AREAS:\n"
for area in self.expertise_areas:
prompt += f"- {area}\n"
if self.catchphrases:
prompt += f"\nYou occasionally use these phrases:\n"
for phrase in self.catchphrases:
prompt += f"- {phrase}\n"
if self.restrictions:
prompt += f"\nRESTRICTIONS:\n"
for restriction in self.restrictions:
prompt += f"- {restriction}\n"
return prompt
class PromptLayer:
"""A single layer in the prompt hierarchy"""
def __init__(self, name: str, priority: int, content: str):
self.name = name
self.priority = priority # Higher priority overrides lower
self.content = content
self.active = True
def render(self, context: Dict = None) -> str:
"""Render the layer with context variables"""
if context and self.name in context:
return self.content.format(**context[self.name])
return self.content
class LayeredPromptAgent:
"""Agent with multi-layer prompt management"""
def __init__(self, base_persona: Persona):
self.persona = base_persona
self.layers: List[PromptLayer] = []
self.context: Dict = {}
# Add base persona layer
self.add_layer(PromptLayer(
name="persona",
priority=100,
content=base_persona.to_system_prompt()
))
# Add capabilities layer
self.add_layer(PromptLayer(
name="capabilities",
priority=90,
content="""CAPABILITIES:
- Answer questions accurately and helpfully
- Solve problems step by step
- Admit when you don't know something
- Ask clarifying questions when needed
- Provide examples to illustrate concepts
- Break down complex topics into simple parts
"""
))
# Initialize agent
self.agent = Agent(
name=base_persona.name,
system_prompt=self._build_system_prompt()
)
def add_layer(self, layer: PromptLayer):
"""Add a new prompt layer"""
self.layers.append(layer)
self.layers.sort(key=lambda x: x.priority, reverse=True)
self._update_system_prompt()
def remove_layer(self, layer_name: str):
"""Remove a prompt layer"""
self.layers = [l for l in self.layers if l.name != layer_name]
self._update_system_prompt()
def update_context(self, **kwargs):
"""Update context variables"""
self.context.update(kwargs)
self._update_system_prompt()
def _build_system_prompt(self) -> str:
"""Build complete system prompt from all layers"""
prompt_parts = []
for layer in self.layers:
if layer.active:
rendered = layer.render(self.context)
if rendered.strip():
prompt_parts.append(f"=== {layer.name.upper()} ===\n{rendered}\n")
return "\n".join(prompt_parts)
def _update_system_prompt(self):
"""Update the agent's system prompt"""
self.agent.system_prompt = self._build_system_prompt()
async def process(self, user_message: str, session_id: str = None):
"""Process a user message"""
return await self.agent.process(user_message, session_id=session_id)
# Example: Creating different personas
def create_support_persona() -> Persona:
"""Create a customer support persona"""
return Persona(
name="SupportPro",
traits={
PersonalityTrait.EMPATHY: 0.9,
PersonalityTrait.PATIENCE: 0.9,
PersonalityTrait.FORMALITY: 0.5,
PersonalityTrait.DIRECTNESS: 0.4,
PersonalityTrait.ENTHUSIASM: 0.6
},
background="You are a senior customer support specialist with 10 years of experience helping users solve technical problems. You've helped thousands of customers and know exactly how to make them feel heard and valued.",
communication_style="Professional yet warm. You listen carefully, acknowledge feelings, and provide clear solutions. You use phrases like 'I understand' and 'Let me help you with that'.",
expertise_areas=["Technical troubleshooting", "Account management", "Product guidance", "Billing issues"],
catchphrases=["I'm here to help!", "Let's solve this together", "Great question!"],
restrictions=["Never share sensitive customer data", "Escalate complex issues appropriately"]
)
def create_technical_expert_persona() -> Persona:
"""Create a technical expert persona"""
return Persona(
name="TechExpert",
traits={
PersonalityTrait.ANALYTICAL: 0.9,
PersonalityTrait.DIRECTNESS: 0.8,
PersonalityTrait.FORMALITY: 0.7,
PersonalityTrait.CREATIVITY: 0.5,
PersonalityTrait.HUMOR: 0.2
},
background="You are a senior software architect with deep expertise in system design, algorithms, and best practices. You love explaining complex technical concepts in a clear, structured way.",
communication_style="Precise and systematic. You provide step-by-step explanations, use technical terms appropriately, and always explain the reasoning behind your recommendations.",
expertise_areas=["System architecture", "Algorithms", "Code optimization", "Design patterns", "Cloud computing"],
catchphrases=["Let me break this down", "The key concept here is", "Consider this approach"],
restrictions=["Keep explanations accessible", "Provide code examples when helpful"]
)
# Usage example
support_agent = LayeredPromptAgent(create_support_persona())
# Add domain-specific layer
support_agent.add_layer(PromptLayer(
name="product_knowledge",
priority=80,
content="""PRODUCT KNOWLEDGE:
You support 'TaskFlow Pro' - a project management tool.
Key features:
- Task management with dependencies
- Team collaboration with comments
- File sharing and version control
- Time tracking and reporting
- Integration with Slack, GitHub, and Google Workspace
Common issues:
- Login problems (clear cache, reset password)
- Notification delays (check settings)
- Integration errors (re-authenticate)
"""
))
response = await support_agent.process(
"I can't log into my account! This is urgent!",
session_id="user_789"
)
Common Personality Archetypes
👔 The Professional
Formal, concise, business-like
- Uses proper language
- Sticks to facts
- Minimal emotional language
- Respectful and courteous
😊 The Friendly Guide
Warm, encouraging, supportive
- Uses emojis and exclamations
- Offers encouragement
- Builds rapport
- Celebrates user wins
🔧 The Technician
Precise, detailed, systematic
- Step-by-step instructions
- Technical specifications
- Explains underlying principles
- Uses diagrams in text
🎓 The Teacher
Educational, patient, explanatory
- Breaks down concepts
- Uses analogies
- Checks understanding
- Encourages questions
2.6 System Prompt Engineering
Understanding System Prompt Engineering
System prompt engineering is the art and science of crafting effective instructions for AI agents. Well-engineered prompts guide agent behavior, improve response quality, and ensure consistency across interactions.
📝 Prompt Components
- Role Definition: Who the agent is
- Instructions: What to do and how
- Constraints: Boundaries and limitations
- Examples: Few-shot demonstrations
- Output Format: Expected response structure
⚙️ Engineering Principles
- Clarity: Be specific and unambiguous
- Conciseness: Essential information only
- Structure: Logical organization
- Testing: Iterative refinement
- Versioning: Track prompt changes
Prompt Engineering Techniques
1. Role-Based Prompting
# Role-based prompt template
ROLE_BASED_PROMPT = """You are an expert {role} with {years} years of experience.
Your expertise includes:
{expertise}
Your task is to: {task}
Guidelines:
{guidelines}
Now, respond to this query: {query}
Remember to: {reminders}
"""
# Example usage
prompt = ROLE_BASED_PROMPT.format(
role="cybersecurity analyst",
years="15",
expertise="- Threat detection and analysis\n- Incident response\n- Security architecture\n- Risk assessment",
task="analyze potential security threats in the described scenario",
guidelines="- Think step by step\n- Consider multiple attack vectors\n- Prioritize risks by severity\n- Recommend mitigations",
query="Our company is moving to cloud infrastructure. What security concerns should we address?",
reminders="- Mention industry standards\n- Consider compliance requirements\n- Suggest monitoring tools"
)
2. Chain of Thought Prompting
CHAIN_OF_THOUGHT_PROMPT = """Solve this problem step by step:
Problem: {problem}
Let's think through this systematically:
Step 1: Understand what's being asked
{step1}
Step 2: Identify key information and constraints
{step2}
Step 3: Break down the problem into smaller parts
{step3}
Step 4: Solve each part
{step4}
Step 5: Combine solutions
{step5}
Step 6: Verify the answer
{step6}
Final Answer: {answer}
Make sure to show all reasoning steps clearly.
"""
3. Few-Shot Learning
FEW_SHOT_PROMPT = """Here are examples of how to {task_type}:
Example 1:
Input: {example1_input}
Output: {example1_output}
Reasoning: {example1_reasoning}
Example 2:
Input: {example2_input}
Output: {example2_output}
Reasoning: {example2_reasoning}
Example 3:
Input: {example3_input}
Output: {example3_output}
Reasoning: {example3_reasoning}
Now, apply the same pattern to this new input:
Input: {new_input}
Follow the same reasoning process and provide the output.
"""
# Example for sentiment analysis
sentiment_prompt = FEW_SHOT_PROMPT.format(
task_type="analyze sentiment in customer reviews",
example1_input="This product is amazing! Best purchase ever.",
example1_output="POSITIVE (confidence: 0.95)",
example1_reasoning="Uses positive words 'amazing', 'best', exclamation marks indicate enthusiasm",
example2_input="The delivery was late and the item was damaged.",
example2_output="NEGATIVE (confidence: 0.90)",
example2_reasoning="Mentions problems 'late', 'damaged', expresses frustration",
example3_input="The product is okay, does what it says but nothing special.",
example3_output="NEUTRAL (confidence: 0.80)",
example3_reasoning="Mixed feelings, no strong positive or negative language",
new_input="I've been using this for a week and it's working well so far."
)
Prompt Testing & Evaluation
class PromptTester:
"""Test and evaluate prompt effectiveness"""
def __init__(self):
self.test_cases = []
self.results = []
def add_test_case(self, input_text: str, expected_output: str, criteria: List[str]):
"""Add a test case"""
self.test_cases.append({
"input": input_text,
"expected": expected_output,
"criteria": criteria
})
async def test_prompt(self, prompt: str, agent: Agent) -> Dict:
"""Test a prompt against all test cases"""
agent.system_prompt = prompt
results = []
for case in self.test_cases:
response = await agent.process(case["input"])
# Evaluate response
score = self.evaluate_response(
response.text,
case["expected"],
case["criteria"]
)
results.append({
"input": case["input"],
"response": response.text,
"expected": case["expected"],
"score": score,
"passed": score > 0.7
})
# Calculate metrics
pass_rate = sum(1 for r in results if r["passed"]) / len(results)
avg_score = sum(r["score"] for r in results) / len(results)
return {
"results": results,
"pass_rate": pass_rate,
"average_score": avg_score,
"total_tests": len(results)
}
def evaluate_response(self, response: str, expected: str, criteria: List[str]) -> float:
"""Evaluate response quality"""
score = 0.0
weights = {
"contains_keywords": 0.3,
"length_appropriate": 0.2,
"format_correct": 0.3,
"reasoning_shown": 0.2
}
# Check for expected keywords
expected_words = set(expected.lower().split())
response_words = set(response.lower().split())
common_words = expected_words.intersection(response_words)
keyword_score = len(common_words) / max(len(expected_words), 1)
score += keyword_score * weights["contains_keywords"]
# Check length appropriateness
expected_len = len(expected.split())
actual_len = len(response.split())
length_ratio = min(actual_len, expected_len) / max(actual_len, expected_len)
score += length_ratio * weights["length_appropriate"]
# Check format criteria
format_score = 0
for criterion in criteria:
if criterion == "json" and self.is_valid_json(response):
format_score += 1
elif criterion == "bullets" and ("•" in response or "- " in response):
format_score += 1
elif criterion == "steps" and "step" in response.lower():
format_score += 1
format_score = format_score / len(criteria) if criteria else 0
score += format_score * weights["format_correct"]
# Check reasoning
reasoning_indicators = ["because", "therefore", "since", "as a result", "first", "second"]
reasoning_score = sum(1 for ind in reasoning_indicators if ind in response.lower())
reasoning_score = min(reasoning_score / 3, 1.0) # Cap at 1.0
score += reasoning_score * weights["reasoning_shown"]
return score
def is_valid_json(self, text: str) -> bool:
"""Check if text is valid JSON"""
try:
import json
json.loads(text)
return True
except:
return False
2.7 Dynamic Persona Switching
Understanding Dynamic Persona Switching
Dynamic persona switching allows agents to change their personality, communication style, or role based on context, user needs, or conversation state. This enables more adaptive and personalized interactions.
🔄 Switch Triggers
- User Intent: Detect what user needs
- Emotion Detection: Respond to user mood
- Task Complexity: Match expertise level
- Conversation Stage: Greeting vs deep discussion
- User Preference: Learned over time
🎯 Switching Strategies
- Gradual Transition: Slowly shift tone
- Immediate Switch: Clear context change
- Blended Persona: Combine multiple traits
- Context-Aware: Based on situation
- User-Requested: Explicit user choice
Dynamic Persona Switching Architecture
┌─────────────────────────────────────────────────────────────────┐
│ DYNAMIC PERSONA SWITCHING │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ User │───▶│ Context │───▶│ Persona │ │
│ │ Input │ │ Analyzer │ │ Selector │ │
│ └──────────────┘ └──────────────┘ └───────┬──────┘ │
│ │ │
│ ┌────────────────────────────────────────────────▼──────┐ │
│ │ PERSONA REGISTRY │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Formal │ │ Friendly │ │Technical │ │ Teacher │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────┼───────────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Transition │───▶│ Current │───▶│ Response │ │
│ │ Manager │ │ Persona │ │ Generation │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Building a Dynamic Persona Switcher
Persona Switching System
from google.adk import Agent
from google.adk.classifiers import IntentClassifier, EmotionClassifier
from typing import Dict, List, Optional
from enum import Enum
from datetime import datetime
class PersonaType(Enum):
FORMAL = "formal"
FRIENDLY = "friendly"
TECHNICAL = "technical"
EMPATHETIC = "empathetic"
HUMOROUS = "humorous"
TEACHER = "teacher"
@dataclass
class Persona:
name: str
type: PersonaType
system_prompt: str
traits: Dict[str, float]
triggers: List[str]
class DynamicPersonaSwitcher:
def __init__(self, default_persona: str = "friendly"):
self.personas: Dict[str, Persona] = {}
self.current_persona: str = default_persona
self.switch_history: List[Dict] = []
# Classifiers
self.intent_classifier = IntentClassifier(
intents=["greeting", "question", "problem", "technical", "emotional"]
)
self.emotion_classifier = EmotionClassifier(
emotions=["neutral", "happy", "sad", "angry", "confused"]
)
# Switch thresholds
self.min_confidence = 0.6
def register_persona(self, persona: Persona):
"""Register a new persona"""
self.personas[persona.name] = persona
async def analyze_context(self, message: str) -> Dict:
"""Analyze conversation context"""
intent = await self.intent_classifier.classify(message)
emotion = await self.emotion_classifier.classify(message)
# Check for triggers
triggered = []
for name, persona in self.personas.items():
for trigger in persona.triggers:
if trigger.lower() in message.lower():
triggered.append({"name": name, "trigger": trigger})
return {
"intent": intent.intent,
"emotion": emotion.emotion,
"triggers": triggered,
"complexity": "high" if len(message.split()) > 20 else "medium" if len(message.split()) > 10 else "low",
"has_question": "?" in message
}
async def select_persona(self, context: Dict) -> str:
"""Select best persona based on context"""
scores = {}
for name, persona in self.personas.items():
score = 0.0
# Intent matching
if context["intent"] == "technical" and persona.type == PersonaType.TECHNICAL:
score += 0.4
elif context["intent"] == "emotional" and persona.type == PersonaType.EMPATHETIC:
score += 0.4
# Emotion matching
if context["emotion"] == "sad" and persona.type == PersonaType.EMPATHETIC:
score += 0.3
elif context["emotion"] == "confused" and persona.type == PersonaType.TEACHER:
score += 0.3
# Trigger matching
for trigger in context["triggers"]:
if trigger["name"] == name:
score += 0.5
# Complexity matching
if context["complexity"] == "high" and persona.type == PersonaType.TECHNICAL:
score += 0.2
elif context["complexity"] == "low" and persona.type == PersonaType.FRIENDLY:
score += 0.2
scores[name] = score
# Return best match above threshold
best = max(scores.items(), key=lambda x: x[1])
return best[0] if best[1] >= self.min_confidence else self.current_persona
async def switch_persona(self, new_persona: str, session_id: str) -> Dict:
"""Switch to new persona"""
if new_persona == self.current_persona:
return {"switched": False, "persona": self.current_persona}
# Record switch
self.switch_history.append({
"timestamp": datetime.now(),
"from": self.current_persona,
"to": new_persona,
"session_id": session_id
})
old = self.current_persona
self.current_persona = new_persona
return {
"switched": True,
"from": old,
"to": new_persona,
"message": self.get_transition_message(old, new_persona)
}
def get_transition_message(self, from_p: str, to_p: str) -> Optional[str]:
"""Get transition message for persona switch"""
transitions = {
("formal", "friendly"): "I'll switch to a more casual tone to help you better.",
("friendly", "technical"): "Let me put on my technical hat to address this.",
("technical", "teacher"): "I'll explain this in a more educational way.",
}
return transitions.get((from_p, to_p))
async def process(self, message: str, session_id: str) -> Dict:
"""Process message with persona switching"""
# Analyze context
context = await self.analyze_context(message)
# Select persona
selected = await self.select_persona(context)
# Switch if needed
switch_result = await self.switch_persona(selected, session_id)
# Get current persona and process
persona = self.personas[self.current_persona]
agent = Agent(name=persona.name, system_prompt=persona.system_prompt)
response = await agent.process(message, session_id=session_id)
# Add transition message if switched
if switch_result["switched"] and switch_result.get("message"):
response.text = f"{switch_result['message']}\n\n{response.text}"
return {
"response": response,
"persona_used": self.current_persona,
"switched": switch_result["switched"],
"context": context
}
Persona Switching Strategies
🎯 Intent-Based
Switch based on user intent
- Technical questions → Technical
- Emotional content → Empathetic
😊 Emotion-Based
Respond to user emotion
- Frustrated → Calm, patient
- Happy → Enthusiastic
📊 Complexity-Based
Match task complexity
- Simple → Friendly
- Complex → Technical
👤 User-Based
Learn user preferences
- Returning users → Preferred
- New users → Friendly
🎓 Module 02 : Agent Types & Persona Design Successfully Completed
You have successfully completed this module of Google ADK (Agent Development Kit).
You've learned about:
- Conversational Agents
- Task-Oriented Agents
- RAG Agents
- Multi-Modal Patterns
- Persona Design
- Prompt Engineering
- Dynamic Switching
Keep building your expertise step by step — Learn Next Module →
Module 03: Tools & Function Calling Internals
Learning Objectives
- Master OpenAPI and gRPC tool wrapper implementations
- Implement robust tool validation and schema generation
- Design parallel function calling architectures
- Create comprehensive error handling and retry policies
- Leverage built-in Google Workspace, Search, and Code tools
- Develop custom tools with best practices
- Implement tool versioning and backward compatibility
Prerequisites
Before starting this module, ensure you have:
- Completed Module 01 (ADK Architecture) and Module 02 (Agent Types)
- Understanding of REST APIs and gRPC concepts
- Familiarity with JSON Schema and data validation
- Experience with asynchronous programming in Python
- Google Cloud project with enabled APIs (for built-in tools)
3.1 OpenAPI / gRPC Tool Wrappers
📖 Definition: What are OpenAPI/gRPC Tool Wrappers?
OpenAPI and gRPC tool wrappers are adapters that transform external API specifications into callable functions that AI agents can use. They bridge the gap between API definitions and agent tool interfaces.
🔍 OpenAPI Wrappers
Convert REST API specifications (OpenAPI/Swagger) into agent-callable tools with automatic request/response handling, parameter validation, and error management.
⚡ gRPC Wrappers
Transform gRPC service definitions into high-performance bidirectional streaming tools with protocol buffer serialization and built-in load balancing.
🔄 Hybrid Wrappers
Combine both REST and gRPC capabilities, allowing agents to seamlessly switch between protocols based on performance needs and data requirements.
🎯 Why Use API Tool Wrappers?
Key Benefits
- Automatic Schema Translation: Converts OpenAPI specs to JSON Schema for tool validation
- Protocol Abstraction: Agents don't need to know underlying protocol details
- Built-in Error Handling: Standardized error responses across different APIs
- Authentication Management: Handles OAuth, API keys, and service accounts automatically
- Rate Limiting: Built-in throttling to respect API limits
- Request/Response Transformation: Converts between API formats and agent-friendly structures
Business Value
- 50-70% reduction in API integration code
- 90% faster time-to-market for new API integrations
- Built-in monitoring and observability
- Automatic documentation for agent capabilities
- Version management across API updates
OpenAPI/gRPC Wrapper Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ OPENAPI / GRPC TOOL WRAPPER ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Agent │────▶│ Tool Call │────▶│ Wrapper │ │
│ │ Request │ │ Router │ │ Selector │ │
│ └──────────────┘ └──────────────┘ └───────┬──────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ WRAPPER LAYER │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ OpenAPI │ │ gRPC │ │ Hybrid │ │ │
│ │ │ Parser │ │ Compiler │ │ Router │ │ │
│ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Schema │ │ Protobuf │ │ Protocol │ │ │
│ │ │ Converter │ │ Generator │ │ Negotiator │ │ │
│ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │
│ └─────────┼──────────────────┼──────────────────┼──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ HTTP Client │ │ gRPC Client │ │ Circuit │ │
│ │ (REST) │ │ │ │ Breaker │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ RESPONSE PROCESSING LAYER │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Response │ │ Error │ │ Metrics │ │ │
│ │ │ Transformer │ │ Handler │ │ Collector │ │ │
│ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │
│ └─────────┼──────────────────┼──────────────────┼──────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Agent Response │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
How to Use: OpenAPI Tool Wrapper Implementation
Step 1: Basic OpenAPI Wrapper
from google.adk.tools import Tool, ToolRegistry
from google.adk.api_wrappers import OpenAPITool, OpenAPIConfig
from typing import Dict, Any, Optional, List
import yaml
import json
import os
import aiohttp
import asyncio
from datetime import datetime
import hashlib
class OpenAPIToolWrapper:
"""
Comprehensive OpenAPI wrapper for agent tools
"""
def __init__(self, spec_path: str, base_url: str = None, cache_ttl: int = 300):
"""
Initialize OpenAPI wrapper from specification file
Args:
spec_path: Path to OpenAPI YAML/JSON file
base_url: Optional override for API base URL
cache_ttl: Cache TTL in seconds for API responses
"""
self.spec_path = spec_path
self.spec = self._load_spec(spec_path)
self.base_url = base_url or self._extract_base_url()
self.cache_ttl = cache_ttl
self.tools = []
self.operations = self._parse_operations()
self.response_cache = {}
self.metrics = {
'total_calls': 0,
'cache_hits': 0,
'errors': 0,
'avg_latency': 0
}
def _load_spec(self, path: str) -> Dict:
"""Load OpenAPI specification from file"""
if not os.path.exists(path):
raise FileNotFoundError(f"OpenAPI spec not found: {path}")
with open(path, 'r') as f:
if path.endswith(('.yaml', '.yml')):
return yaml.safe_load(f)
else:
return json.load(f)
def _extract_base_url(self) -> str:
"""Extract base URL from OpenAPI spec"""
servers = self.spec.get('servers', [])
if servers:
return servers[0].get('url', '')
# Try to extract from host/schemes (OpenAPI 2.0)
host = self.spec.get('host')
schemes = self.spec.get('schemes', ['https'])
if host:
return f"{schemes[0]}://{host}{self.spec.get('basePath', '')}"
return ''
def _parse_operations(self) -> List[Dict]:
"""Parse all operations from OpenAPI spec"""
operations = []
paths = self.spec.get('paths', {})
for path, methods in paths.items():
for method, operation in methods.items():
if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']:
# Parse parameters
parameters = operation.get('parameters', [])
# Parse request body
request_body = operation.get('requestBody', {})
content_types = list(request_body.get('content', {}).keys())
# Parse responses
responses = operation.get('responses', {})
success_responses = [code for code in responses.keys() if code.startswith('2')]
operations.append({
'path': path,
'method': method.upper(),
'operation_id': operation.get('operationId'),
'summary': operation.get('summary', ''),
'description': operation.get('description', ''),
'parameters': parameters,
'request_body': request_body,
'responses': responses,
'success_codes': success_responses,
'content_types': content_types,
'tags': operation.get('tags', []),
'deprecated': operation.get('deprecated', False),
'security': operation.get('security', [])
})
return operations
def create_tools(self, auth_config: Dict = None) -> List[Tool]:
"""
Create agent tools from OpenAPI operations
Args:
auth_config: Authentication configuration (API key, OAuth, etc.)
Returns:
List of Tool objects ready for agent registration
"""
tools = []
for op in self.operations:
# Skip deprecated operations if configured
if op['deprecated'] and auth_config.get('skip_deprecated', False):
continue
# Generate tool name
tool_name = op.get('operation_id')
if not tool_name:
# Generate from path and method
path_part = op['path'].replace('/', '_').replace('{', '').replace('}', '')
tool_name = f"{op['method'].lower()}_{path_part}"
# Create enhanced tool configuration
config = EnhancedOpenAPIConfig(
operation_id=op.get('operation_id'),
method=op['method'],
path=op['path'],
base_url=self.base_url,
parameters=op['parameters'],
request_body=op.get('request_body'),
success_codes=op['success_codes'],
content_types=op['content_types'],
auth=auth_config,
timeout=auth_config.get('timeout', 30),
retry_config={
'max_retries': auth_config.get('max_retries', 3),
'backoff_factor': auth_config.get('backoff_factor', 1.5),
'retry_on': [429, 500, 502, 503, 504]
},
cache_ttl=self.cache_ttl if op['method'] == 'GET' else 0
)
# Create tool with enhanced functionality
tool = EnhancedOpenAPITool(
name=tool_name,
description=op['description'] or op['summary'],
tags=op['tags'],
config=config,
metrics=self.metrics,
cache=self.response_cache
)
tools.append(tool)
return tools
class EnhancedOpenAPITool(OpenAPITool):
"""
Enhanced OpenAPI tool with caching, metrics, and advanced error handling
"""
def __init__(self, name: str, description: str, tags: List[str],
config: 'EnhancedOpenAPIConfig', metrics: Dict, cache: Dict):
super().__init__(name, description, config)
self.tags = tags
self.metrics = metrics
self.cache = cache
self.semaphore = asyncio.Semaphore(10) # Max 10 concurrent calls
async def execute(self, **kwargs) -> Any:
"""
Execute the API call with caching and rate limiting
"""
start_time = datetime.now()
self.metrics['total_calls'] += 1
# Generate cache key for GET requests
cache_key = None
if self.config.method == 'GET' and self.config.cache_ttl > 0:
cache_key = self._generate_cache_key(kwargs)
if cache_key in self.cache:
cached = self.cache[cache_key]
if (datetime.now() - cached['timestamp']).seconds < self.config.cache_ttl:
self.metrics['cache_hits'] += 1
return cached['response']
# Rate limiting
async with self.semaphore:
try:
# Execute with timeout
response = await asyncio.wait_for(
self._make_request(kwargs),
timeout=self.config.timeout
)
# Cache response
if cache_key:
self.cache[cache_key] = {
'response': response,
'timestamp': datetime.now()
}
# Update metrics
latency = (datetime.now() - start_time).total_seconds() * 1000
self.metrics['avg_latency'] = (
self.metrics['avg_latency'] * (self.metrics['total_calls'] - 1) + latency
) / self.metrics['total_calls']
return response
except Exception as e:
self.metrics['errors'] += 1
raise
def _generate_cache_key(self, kwargs: Dict) -> str:
"""Generate cache key from request parameters"""
content = f"{self.config.method}:{self.config.path}:{sorted(kwargs.items())}"
return hashlib.md5(content.encode()).hexdigest()
async def _make_request(self, kwargs: Dict) -> Any:
"""Make the actual HTTP request"""
async with aiohttp.ClientSession() as session:
# Build URL
url = self.config.base_url + self._format_path(kwargs)
# Extract query parameters
params = {k: v for k, v in kwargs.items()
if k in self._get_query_params()}
# Extract body parameters
body = {k: v for k, v in kwargs.items()
if k in self._get_body_params()}
# Make request with retry logic
for attempt in range(self.config.retry_config['max_retries']):
try:
async with session.request(
method=self.config.method,
url=url,
params=params,
json=body if body else None,
headers=self._get_headers(kwargs)
) as response:
if response.status in self.config.success_codes:
return await response.json()
elif response.status in self.config.retry_config['retry_on']:
if attempt < self.config.retry_config['max_retries'] - 1:
wait = self.config.retry_config['backoff_factor'] ** attempt
await asyncio.sleep(wait)
continue
response.raise_for_status()
except aiohttp.ClientError as e:
if attempt < self.config.retry_config['max_retries'] - 1:
wait = self.config.retry_config['backoff_factor'] ** attempt
await asyncio.sleep(wait)
else:
raise
def _format_path(self, kwargs: Dict) -> str:
"""Format path with path parameters"""
path = self.config.path
for key, value in kwargs.items():
path = path.replace(f'{{{key}}}', str(value))
return path
def _get_query_params(self) -> List[str]:
"""Get list of query parameter names"""
return [p['name'] for p in self.config.parameters
if p.get('in') == 'query']
def _get_body_params(self) -> List[str]:
"""Get list of body parameter names"""
if self.config.request_body:
schema = self.config.request_body.get('content', {}).get('application/json', {})
return list(schema.get('properties', {}).keys())
return []
def _get_headers(self, kwargs: Dict) -> Dict:
"""Get request headers including auth"""
headers = {
'Content-Type': self.config.content_types[0] if self.config.content_types else 'application/json',
'Accept': 'application/json'
}
# Add authentication
if self.config.auth:
if self.config.auth.get('type') == 'api_key':
headers[self.config.auth.get('header_name', 'X-API-Key')] = self.config.auth['api_key']
elif self.config.auth.get('type') == 'bearer':
headers['Authorization'] = f"Bearer {self.config.auth['token']}"
return headers
# Advanced: Streaming gRPC Wrapper
class StreamingGRPCToolWrapper:
"""
Advanced gRPC wrapper with streaming support
"""
def __init__(self, proto_path: str, service_name: str, server_address: str,
max_message_size: int = 4 * 1024 * 1024): # 4MB default
self.proto_path = proto_path
self.service_name = service_name
self.server_address = server_address
self.max_message_size = max_message_size
self.channel = None
self.stub = None
self._init_channel()
def _init_channel(self):
"""Initialize gRPC channel with options"""
import grpc
channel_options = [
('grpc.max_send_message_length', self.max_message_size),
('grpc.max_receive_message_length', self.max_message_size),
('grpc.enable_retries', 1),
('grpc.keepalive_time_ms', 10000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.http2.max_pings_without_data', 0),
('grpc.keepalive_permit_without_calls', 1)
]
self.channel = grpc.aio.insecure_channel(
self.server_address,
options=channel_options
)
# Load proto and create stub
self._load_proto()
def _load_proto(self):
"""Load proto file and create stub"""
from grpc_tools import protoc
import sys
import tempfile
# Compile proto to temporary directory
with tempfile.TemporaryDirectory() as tmpdir:
protoc.main([
'protoc',
f'--proto_path={os.path.dirname(self.proto_path)}',
f'--python_out={tmpdir}',
f'--grpc_python_out={tmpdir}',
self.proto_path
])
# Add to path and import
sys.path.insert(0, tmpdir)
module_name = os.path.basename(self.proto_path).replace('.proto', '_pb2')
grpc_module = os.path.basename(self.proto_path).replace('.proto', '_pb2_grpc')
self.pb2_module = __import__(module_name)
self.pb2_grpc_module = __import__(grpc_module)
# Get stub class
stub_class = getattr(self.pb2_grpc_module, f'{self.service_name}Stub')
self.stub = stub_class(self.channel)
def create_streaming_tools(self) -> List[Tool]:
"""Create streaming tools from gRPC methods"""
tools = []
for method in self._get_service_methods():
if method.client_streaming and method.server_streaming:
tool = BidirectionalStreamingTool(
name=f"stream_{method.name}",
description=f"Bidirectional streaming gRPC method: {method.name}",
stub=self.stub,
method_name=method.name,
request_type=getattr(self.pb2_module, method.input_type.name),
response_type=getattr(self.pb2_module, method.output_type.name)
)
elif method.client_streaming:
tool = ClientStreamingTool(
name=f"client_stream_{method.name}",
description=f"Client streaming gRPC method: {method.name}",
stub=self.stub,
method_name=method.name,
request_type=getattr(self.pb2_module, method.input_type.name),
response_type=getattr(self.pb2_module, method.output_type.name)
)
elif method.server_streaming:
tool = ServerStreamingTool(
name=f"server_stream_{method.name}",
description=f"Server streaming gRPC method: {method.name}",
stub=self.stub,
method_name=method.name,
request_type=getattr(self.pb2_module, method.input_type.name),
response_type=getattr(self.pb2_module, method.output_type.name)
)
else:
tool = UnaryGRPCTool(
name=f"unary_{method.name}",
description=f"Unary gRPC method: {method.name}",
stub=self.stub,
method_name=method.name,
request_type=getattr(self.pb2_module, method.input_type.name),
response_type=getattr(self.pb2_module, method.output_type.name)
)
tools.append(tool)
return tools
# Hybrid REST/gRPC Router
class HybridProtocolRouter:
"""
Router that automatically selects best protocol (REST or gRPC) based on context
"""
def __init__(self, rest_tools: Dict[str, Tool], grpc_tools: Dict[str, Tool]):
self.rest_tools = rest_tools
self.grpc_tools = grpc_tools
self.routing_rules = self._build_routing_rules()
def _build_routing_rules(self) -> Dict[str, Dict]:
"""Build routing rules based on tool characteristics"""
rules = {}
for tool_name, tool in self.rest_tools.items():
rules[tool_name] = {
'rest': tool,
'grpc': self.grpc_tools.get(tool_name),
'preferences': {
'large_payload': 'grpc', # gRPC better for large payloads
'low_latency': 'grpc', # gRPC has lower latency
'simple_requests': 'rest', # REST simpler for simple requests
'streaming': 'grpc', # Only gRPC supports streaming
'browser': 'rest' # REST works better in browsers
}
}
return rules
async def route_call(self, tool_name: str, context: Dict, **kwargs) -> Any:
"""
Route call to appropriate protocol based on context
Args:
tool_name: Name of the tool to call
context: Context including payload size, latency requirements, etc.
**kwargs: Tool arguments
"""
rule = self.routing_rules.get(tool_name)
if not rule:
raise ValueError(f"Unknown tool: {tool_name}")
# Determine best protocol
protocol = self._select_protocol(rule, context)
# Execute with selected protocol
tool = rule[protocol]
start_time = time.time()
try:
result = await tool.execute(**kwargs)
latency = time.time() - start_time
# Log routing decision for analytics
self._log_routing(tool_name, protocol, context, latency)
return result
except Exception as e:
# Fallback to other protocol on failure
if protocol == 'grpc' and rule['rest']:
return await rule['rest'].execute(**kwargs)
elif protocol == 'rest' and rule['grpc']:
return await rule['grpc'].execute(**kwargs)
raise
def _select_protocol(self, rule: Dict, context: Dict) -> str:
"""Select best protocol based on context"""
if not rule['grpc']:
return 'rest'
# Check context indicators
if context.get('requires_streaming'):
return 'grpc'
if context.get('payload_size', 0) > 1024 * 100: # > 100KB
return 'grpc'
if context.get('latency_sensitive'):
return 'grpc'
if context.get('client_type') == 'browser':
return 'rest'
# Default to REST for simplicity
return 'rest'
def _log_routing(self, tool_name: str, protocol: str, context: Dict, latency: float):
"""Log routing decision for analytics"""
# In production, send to monitoring system
print(f"Routed {tool_name} to {protocol} (latency: {latency:.3f}s)")
# Usage Examples
async def demonstrate_advanced_wrappers():
"""Example: Using advanced API wrappers"""
# 1. Enhanced OpenAPI wrapper with caching
openapi_wrapper = OpenAPIToolWrapper(
spec_path='complex_api.yaml',
base_url='https://api.example.com/v1',
cache_ttl=600 # 10 minute cache
)
enhanced_tools = openapi_wrapper.create_tools({
'type': 'oauth2',
'client_id': 'your-client-id',
'client_secret': 'your-client-secret',
'timeout': 60,
'max_retries': 5,
'skip_deprecated': True
})
# 2. Streaming gRPC wrapper
grpc_wrapper = StreamingGRPCToolWrapper(
proto_path='streaming_service.proto',
service_name='StreamingService',
server_address='streaming.example.com:443',
max_message_size=16 * 1024 * 1024 # 16MB
)
streaming_tools = grpc_wrapper.create_streaming_tools()
# 3. Hybrid router
rest_dict = {t.name: t for t in enhanced_tools}
grpc_dict = {t.name: t for t in streaming_tools}
router = HybridProtocolRouter(rest_dict, grpc_dict)
# 4. Use with context-aware routing
result = await router.route_call(
'get_large_dataset',
context={
'payload_size': 1024 * 1024 * 5, # 5MB
'latency_sensitive': False,
'client_type': 'backend'
},
query='SELECT * FROM large_table'
)
# 5. Monitor performance
print(f"OpenAPI metrics: {openapi_wrapper.metrics}")
return router
Advanced OpenAPI Features
- Response Caching: Intelligent caching with TTL and invalidation strategies
- Rate Limiting: Token bucket algorithm for API rate limit compliance
- Circuit Breaking: Automatic failure detection and circuit breaking
- Request Retry: Smart retry with exponential backoff and jitter
- Metrics Collection: Comprehensive performance and error metrics
- Protocol Negotiation: Automatic REST/gRPC selection based on context
OpenAPI vs gRPC Tool Wrappers Comparison
| Feature | OpenAPI Wrapper | gRPC Wrapper | Use Case |
|---|---|---|---|
| Protocol | HTTP/1.1, HTTP/2 | HTTP/2 | Choose gRPC for high-performance, OpenAPI for broad compatibility |
| Data Format | JSON, XML, Form Data | Protocol Buffers | gRPC: 3-10x faster serialization, 60-80% smaller payloads |
| Streaming | Server-Sent Events, WebSockets | Bidirectional, Client, Server streaming | gRPC for real-time data, OpenAPI for simple request-response |
| Code Generation | OpenAPI Generator (50+ languages) | protoc compiler (12 languages) | Both excellent, gRPC more type-safe with native enums |
| Error Handling | HTTP status codes, custom error bodies | Rich error model with status codes | gRPC provides structured error details |
| Load Balancing | HTTP load balancers (layer 7) | Client-side load balancing, transparent | gRPC better for microservices with client-side LB |
| Authentication | OAuth2, JWT, API Keys, Basic Auth | OAuth2, JWT, TLS mutual auth | Both support standard auth mechanisms |
| Browser Support | Native through Fetch/XHR | Requires gRPC-web proxy | OpenAPI for web clients, gRPC for backend services |
| Tool Complexity | Simple to implement | More complex but more powerful | OpenAPI for quick integrations, gRPC for complex systems |
Performance Benchmarks
| Operation | OpenAPI (JSON) | gRPC (Protobuf) | Improvement |
|---|---|---|---|
| Serialization (1KB message) | 50-100 μs | 5-15 μs | 5-10x faster |
| Deserialization (1KB message) | 40-80 μs | 5-10 μs | 4-8x faster |
| Message Size (1KB data) | ~1.2 KB | ~0.3 KB | 75% smaller |
| RPC Latency (simple) | 5-15 ms | 2-5 ms | 2-3x faster |
| Streaming Throughput | 10-50 msg/s | 1000-5000 msg/s | 100x higher |
| Connection Overhead | HTTP/1.1: high | HTTP/2: low | Better multiplexing |
3.2 Tool Validation & Schema Generation
📖 Definition: What is Tool Validation & Schema Generation?
Tool validation ensures that inputs to agent tools meet expected formats, types, and constraints. Schema generation creates structured definitions of tool inputs and outputs that agents can understand and use for intelligent function calling.
🔍 Validation Components
- Type Checking: Verify data types (string, number, boolean, array, object)
- Format Validation: Check formats like email, date, UUID, URL, IP address
- Range Validation: Ensure numeric values within acceptable bounds
- Required Fields: Verify all mandatory parameters are present
- Cross-field Validation: Check relationships between fields
- Business Rules: Apply domain-specific validation logic
- Schema Validation: Validate against JSON Schema or other schemas
📊 Schema Generation Types
- JSON Schema: Industry standard for JSON data validation (draft-04 to 2020-12)
- Pydantic Models: Python type hints with validation and serialization
- Protocol Buffers: Schema for gRPC services with versioning
- GraphQL Schemas: Type system for GraphQL APIs
- OpenAPI Schemas: REST API parameter definitions
- Avro Schemas: For Apache Kafka and big data
- Thrift IDL: For cross-language services
🎯 Why Use Tool Validation & Schema Generation?
🔒 Reliability
- Prevents invalid tool calls before execution
- Reduces runtime errors by 80%
- Ensures consistent data quality
- Catches type mismatches early
- Prevents injection attacks
🤖 Agent Intelligence
- Schemas help agents understand tool requirements
- Enables automatic parameter extraction from user input
- Improves function calling accuracy by 60%
- Guides agents with descriptions and examples
- Enables auto-completion in agent development
⚡ Performance
- Fast validation with compiled schemas
- Reduces unnecessary API calls
- Early rejection of invalid requests
- Optimized serialization/deserialization
- Enables request caching
📋 Documentation
- Self-documenting APIs
- Automatic API documentation generation
- Client SDK generation
- Testing data generation
How to Use: Advanced Tool Validation & Schema Generation
1. Comprehensive Validation System
from pydantic import BaseModel, Field, validator, root_validator, ValidationError
from typing import Optional, List, Dict, Any, Union
from datetime import datetime, date, time
from enum import Enum
import re
import json
import jsonschema
from jsonschema import Draft202012Validator
import pandantic
# Advanced validation with multiple schema formats
class ComprehensiveValidator:
"""
Validator supporting multiple schema formats and validation strategies
"""
def __init__(self):
self.validators = {}
self.schemas = {}
self.compiled_validators = {}
self.validation_stats = {
'total_validations': 0,
'successful': 0,
'failed': 0,
'avg_validation_time': 0
}
def register_pydantic_model(self, name: str, model: BaseModel):
"""Register a Pydantic model for validation"""
self.validators[name] = {
'type': 'pydantic',
'model': model,
'schema': model.schema()
}
def register_json_schema(self, name: str, schema: Dict, version: str = '2020-12'):
"""Register a JSON Schema for validation"""
self.validators[name] = {
'type': 'jsonschema',
'schema': schema,
'version': version,
'validator': self._create_json_validator(schema, version)
}
def _create_json_validator(self, schema: Dict, version: str):
"""Create a JSON Schema validator"""
if version == '2020-12':
return Draft202012Validator(schema)
else:
return jsonschema.Draft7Validator(schema)
def validate(self, name: str, data: Dict) -> Dict:
"""
Validate data against registered schema
Returns:
Validated and possibly transformed data
"""
start_time = time.time()
self.validation_stats['total_validations'] += 1
validator_info = self.validators.get(name)
if not validator_info:
raise ValueError(f"No validator registered for: {name}")
try:
if validator_info['type'] == 'pydantic':
# Pydantic validation
model = validator_info['model']
validated = model(**data)
result = validated.dict()
elif validator_info['type'] == 'jsonschema':
# JSON Schema validation
validator = validator_info['validator']
validator.validate(data)
result = data
self.validation_stats['successful'] += 1
return result
except Exception as e:
self.validation_stats['failed'] += 1
raise ValidationError(f"Validation failed for {name}: {str(e)}")
finally:
validation_time = time.time() - start_time
self._update_stats(validation_time)
def _update_stats(self, validation_time: float):
"""Update validation statistics"""
total = self.validation_stats['total_validations']
avg = self.validation_stats['avg_validation_time']
self.validation_stats['avg_validation_time'] = (
(avg * (total - 1) + validation_time) / total
)
# Advanced Pydantic Models with Complex Validation
class Address(BaseModel):
"""Address model with comprehensive validation"""
street: str = Field(..., min_length=5, max_length=100)
city: str = Field(..., min_length=2, max_length=50)
state: str = Field(..., min_length=2, max_length=2, regex=r'^[A-Z]{2}$')
zip_code: str = Field(..., regex=r'^\d{5}(-\d{4})?$')
country: str = Field(default='US', min_length=2, max_length=2)
@validator('zip_code')
def validate_zip(cls, v):
"""Validate US zip code format"""
if not re.match(r'^\d{5}(-\d{4})?$', v):
raise ValueError('Invalid ZIP code format')
return v
class PaymentMethod(str, Enum):
CREDIT_CARD = 'credit_card'
DEBIT_CARD = 'debit_card'
PAYPAL = 'paypal'
BANK_TRANSFER = 'bank_transfer'
class CreditCard(BaseModel):
"""Credit card details with PCI compliance validation"""
card_number: str = Field(..., min_length=13, max_length=19)
expiry_month: int = Field(..., ge=1, le=12)
expiry_year: int = Field(..., ge=datetime.now().year, le=datetime.now().year + 10)
cvv: str = Field(..., min_length=3, max_length=4, regex=r'^\d{3,4}$')
cardholder_name: str = Field(..., min_length=2, max_length=100)
@validator('card_number')
def validate_luhn(cls, v):
"""Validate credit card number using Luhn algorithm"""
def luhn_checksum(card_number):
def digits_of(n):
return [int(d) for d in str(n)]
digits = digits_of(card_number)
odd_digits = digits[-1::-2]
even_digits = digits[-2::-2]
checksum = sum(odd_digits)
for d in even_digits:
checksum += sum(digits_of(d * 2))
return checksum % 10
if luhn_checksum(v) != 0:
raise ValueError('Invalid credit card number')
return v
class OrderItem(BaseModel):
"""Order item with validation"""
product_id: str = Field(..., min_length=5, max_length=20)
quantity: int = Field(..., ge=1, le=100)
unit_price: float = Field(..., ge=0.01, le=10000)
@property
def total_price(self) -> float:
return self.quantity * self.unit_price
class Order(BaseModel):
"""Complete order model with cross-field validation"""
order_id: str = Field(..., min_length=8, max_length=20)
customer_id: str = Field(..., min_length=5, max_length=20)
order_date: datetime = Field(default_factory=datetime.now)
items: List[OrderItem] = Field(..., min_items=1, max_items=100)
shipping_address: Address
billing_address: Optional[Address] = None
payment_method: PaymentMethod
credit_card: Optional[CreditCard] = None
coupon_code: Optional[str] = Field(None, min_length=5, max_length=20)
notes: Optional[str] = Field(None, max_length=500)
@validator('coupon_code')
def validate_coupon(cls, v):
"""Validate coupon code format"""
if v and not re.match(r'^[A-Z0-9]{5,20}$', v):
raise ValueError('Invalid coupon code format')
return v
@root_validator
def validate_payment(cls, values):
"""Validate payment method consistency"""
payment_method = values.get('payment_method')
credit_card = values.get('credit_card')
if payment_method == PaymentMethod.CREDIT_CARD and not credit_card:
raise ValueError('Credit card details required for credit card payment')
if credit_card and payment_method != PaymentMethod.CREDIT_CARD:
raise ValueError('Credit card provided but payment method is not credit card')
return values
@root_validator
def validate_addresses(cls, values):
"""Validate billing address if provided"""
shipping = values.get('shipping_address')
billing = values.get('billing_address')
if not billing:
values['billing_address'] = shipping
return values
@property
def subtotal(self) -> float:
return sum(item.total_price for item in self.items)
@property
def tax(self) -> float:
return self.subtotal * 0.1 # 10% tax
@property
def total(self) -> float:
total = self.subtotal + self.tax
if self.coupon_code:
total *= 0.9 # 10% discount
return total
# Dynamic Schema Generation
class DynamicSchemaGenerator:
"""
Generate schemas dynamically from various sources
"""
@staticmethod
def from_database_table(table_name: str, connection) -> Dict:
"""Generate JSON Schema from database table"""
import sqlalchemy
inspector = sqlalchemy.inspect(connection)
columns = inspector.get_columns(table_name)
schema = {
'type': 'object',
'properties': {},
'required': []
}
type_mapping = {
'INTEGER': 'integer',
'VARCHAR': 'string',
'TEXT': 'string',
'BOOLEAN': 'boolean',
'DATE': 'string',
'DATETIME': 'string',
'FLOAT': 'number',
'DECIMAL': 'number'
}
for col in columns:
col_name = col['name']
col_type = str(col['type']).split('(')[0].upper()
schema['properties'][col_name] = {
'type': type_mapping.get(col_type, 'string'),
'description': f"Column: {col_name}"
}
if not col['nullable']:
schema['required'].append(col_name)
# Add length constraints for strings
if 'VARCHAR' in str(col['type']):
import re
match = re.search(r'VARCHAR\((\d+)\)', str(col['type']))
if match:
schema['properties'][col_name]['maxLength'] = int(match.group(1))
return schema
@staticmethod
def from_csv_sample(csv_path: str, sample_size: int = 100) -> Dict:
"""Generate schema from CSV data sample"""
import pandas as pd
df = pd.read_csv(csv_path, nrows=sample_size)
schema = {
'type': 'object',
'properties': {},
'required': []
}
type_mapping = {
'int64': 'integer',
'float64': 'number',
'object': 'string',
'bool': 'boolean',
'datetime64': 'string'
}
for col in df.columns:
dtype = str(df[col].dtype)
schema['properties'][col] = {
'type': type_mapping.get(dtype, 'string'),
'description': f"Column: {col}"
}
# Add sample values as examples
if not df[col].isna().all():
schema['properties'][col]['examples'] = df[col].dropna().head(3).tolist()
return schema
@staticmethod
def from_json_sample(json_data: List[Dict]) -> Dict:
"""Generate schema from JSON sample data"""
def infer_type(value):
if isinstance(value, bool):
return 'boolean'
elif isinstance(value, int):
return 'integer'
elif isinstance(value, float):
return 'number'
elif isinstance(value, str):
return 'string'
elif isinstance(value, list):
return 'array'
elif isinstance(value, dict):
return 'object'
else:
return 'string'
schema = {
'type': 'object',
'properties': {},
'required': []
}
if not json_data:
return schema
# Analyze all samples
for item in json_data:
for key, value in item.items():
if key not in schema['properties']:
schema['properties'][key] = {
'type': infer_type(value),
'description': f"Field: {key}"
}
# Track if field is always present
schema['required'].append(key)
return schema
# Context-Aware Validation
class ContextualValidator:
"""
Validation that adapts based on user context and conversation history
"""
def __init__(self):
self.rules = {}
self.context_cache = {}
self.validation_history = []
def add_rule(self, field: str, condition: callable, message: str,
context_required: List[str] = None):
"""Add a validation rule with context requirements"""
if field not in self.rules:
self.rules[field] = []
self.rules[field].append({
'condition': condition,
'message': message,
'context_required': context_required or []
})
async def validate(self, data: Dict, context: Dict,
conversation_history: List[Dict]) -> Dict[str, List[str]]:
"""
Validate data with context awareness
Args:
data: Data to validate
context: Current context (user tier, location, etc.)
conversation_history: Previous conversation turns
Returns:
Dictionary of field errors
"""
errors = {}
for field, value in data.items():
field_errors = []
if field in self.rules:
for rule in self.rules[field]:
# Check if rule applies in current context
applies = True
for ctx_req in rule['context_required']:
if ctx_req not in context:
applies = False
break
if applies:
if not rule['condition'](value, context, conversation_history):
field_errors.append(rule['message'])
if field_errors:
errors[field] = field_errors
# Cross-field validation
cross_errors = await self._validate_cross_fields(data, context)
if cross_errors:
errors.update(cross_errors)
# Record validation for learning
self.validation_history.append({
'timestamp': datetime.now(),
'data': data,
'context': context,
'errors': errors
})
return errors
async def _validate_cross_fields(self, data: Dict, context: Dict) -> Dict[str, List[str]]:
"""Validate relationships between fields"""
errors = {}
# Example: Date range validation
if 'start_date' in data and 'end_date' in data:
if data['start_date'] > data['end_date']:
errors['date_range'] = ['Start date must be before end date']
# Example: Location-based validation
if 'country' in data and 'state' in data:
country_states = {
'US': ['CA', 'NY', 'TX', 'FL'],
'CA': ['ON', 'QC', 'BC']
}
if data['country'] in country_states:
if data['state'] not in country_states[data['country']]:
errors['location'] = [f"Invalid state for country {data['country']}"]
return errors
# Performance-Optimized Validation with Caching
class CachedValidator:
"""
High-performance validator with multiple caching strategies
"""
def __init__(self, cache_size: int = 1000, cache_ttl: int = 300):
self.cache = {}
self.cache_size = cache_size
self.cache_ttl = cache_ttl
self.hits = 0
self.misses = 0
def _get_cache_key(self, schema: Dict, data: Dict) -> str:
"""Generate cache key from schema and data"""
content = f"{hash(str(schema))}:{hash(str(sorted(data.items())))}"
return hashlib.sha256(content.encode()).hexdigest()
def _cleanup_cache(self):
"""Remove expired cache entries"""
now = time.time()
expired = [k for k, v in self.cache.items()
if now - v['timestamp'] > self.cache_ttl]
for k in expired:
del self.cache[k]
# Limit cache size
if len(self.cache) > self.cache_size:
oldest = sorted(self.cache.items(), key=lambda x: x[1]['timestamp'])[:len(self.cache) - self.cache_size]
for k, _ in oldest:
del self.cache[k]
async def validate(self, schema: Dict, data: Dict) -> Optional[Dict[str, List[str]]]:
"""
Validate with caching
Returns:
Errors dict or None if valid
"""
cache_key = self._get_cache_key(schema, data)
# Check cache
if cache_key in self.cache:
cached = self.cache[cache_key]
if time.time() - cached['timestamp'] < self.cache_ttl:
self.hits += 1
return cached['errors']
self.misses += 1
# Perform validation
errors = await self._validate_internal(schema, data)
# Cache result
self.cache[cache_key] = {
'errors': errors,
'timestamp': time.time()
}
# Cleanup cache
self._cleanup_cache()
return errors
async def _validate_internal(self, schema: Dict, data: Dict) -> Optional[Dict[str, List[str]]]:
"""Internal validation logic"""
errors = {}
properties = schema.get('properties', {})
for field, rules in properties.items():
value = data.get(field)
field_type = rules.get('type')
# Required check
if field in schema.get('required', []) and value is None:
errors[field] = errors.get(field, []) + ['Field is required']
continue
if value is not None:
# Type validation
if field_type == 'string' and not isinstance(value, str):
errors[field] = errors.get(field, []) + [f'Expected string, got {type(value).__name__}']
elif field_type == 'integer' and not isinstance(value, int):
errors[field] = errors.get(field, []) + [f'Expected integer, got {type(value).__name__}']
elif field_type == 'number' and not isinstance(value, (int, float)):
errors[field] = errors.get(field, []) + [f'Expected number, got {type(value).__name__}']
elif field_type == 'boolean' and not isinstance(value, bool):
errors[field] = errors.get(field, []) + [f'Expected boolean, got {type(value).__name__}']
# String length validation
if field_type == 'string':
if 'minLength' in rules and len(value) < rules['minLength']:
errors[field] = errors.get(field, []) + [f'Minimum length {rules["minLength"]}']
if 'maxLength' in rules and len(value) > rules['maxLength']:
errors[field] = errors.get(field, []) + [f'Maximum length {rules["maxLength"]}']
# Number range validation
elif field_type in ['integer', 'number']:
if 'minimum' in rules and value < rules['minimum']:
errors[field] = errors.get(field, []) + [f'Minimum value {rules["minimum"]}']
if 'maximum' in rules and value > rules['maximum']:
errors[field] = errors.get(field, []) + [f'Maximum value {rules["maximum"]}']
# Pattern validation
if 'pattern' in rules and not re.match(rules['pattern'], str(value)):
errors[field] = errors.get(field, []) + [f'Must match pattern: {rules["pattern"]}']
return errors if errors else None
def get_stats(self) -> Dict:
"""Get cache statistics"""
return {
'hits': self.hits,
'misses': self.misses,
'hit_ratio': self.hits / (self.hits + self.misses) if (self.hits + self.misses) > 0 else 0,
'cache_size': len(self.cache),
'max_size': self.cache_size
}
# Usage Example
async def demonstrate_advanced_validation():
"""Example: Using advanced validation system"""
# 1. Create comprehensive validator
validator = ComprehensiveValidator()
# 2. Register Pydantic models
validator.register_pydantic_model('order', Order)
# 3. Register JSON Schema
json_schema = {
'type': 'object',
'properties': {
'name': {'type': 'string', 'minLength': 2},
'age': {'type': 'integer', 'minimum': 18},
'email': {'type': 'string', 'format': 'email'}
},
'required': ['name', 'email']
}
validator.register_json_schema('user', json_schema)
# 4. Context-aware validation
contextual = ContextualValidator()
contextual.add_rule(
field='amount',
condition=lambda v, ctx, hist: v <= ctx.get('daily_limit', 1000),
message='Amount exceeds daily limit',
context_required=['daily_limit']
)
# 5. Cached validation
cached_validator = CachedValidator(cache_size=500, cache_ttl=600)
# Example validation
try:
# Validate order
order_data = {
'order_id': 'ORD123456',
'customer_id': 'CUST12345',
'items': [
{'product_id': 'PROD001', 'quantity': 2, 'unit_price': 29.99}
],
'shipping_address': {
'street': '123 Main St',
'city': 'San Francisco',
'state': 'CA',
'zip_code': '94105',
'country': 'US'
},
'payment_method': 'credit_card',
'credit_card': {
'card_number': '4111111111111111',
'expiry_month': 12,
'expiry_year': 2025,
'cvv': '123',
'cardholder_name': 'John Doe'
}
}
validated = validator.validate('order', order_data)
print(f"Order validated: {validated['order_id']}")
except ValidationError as e:
print(f"Validation failed: {e}")
# Validate with context
context = {'daily_limit': 500, 'user_tier': 'premium'}
history = [{'intent': 'payment', 'amount': 100}]
errors = await contextual.validate(
{'amount': 600, 'payment_method': 'credit_card'},
context,
history
)
if errors:
print(f"Contextual validation errors: {errors}")
# Cached validation stats
for i in range(100):
await cached_validator.validate(json_schema, {
'name': 'John Doe',
'age': 30,
'email': 'john@example.com'
})
print(f"Cache stats: {cached_validator.get_stats()}")
return {
'validator': validator,
'contextual': contextual,
'cache_stats': cached_validator.get_stats()
}
Validation Strategies Comparison
| Strategy | Performance | Flexibility | Use Case | Example |
|---|---|---|---|---|
| Pydantic Models | ⚡ Fast (compiled via Rust) | High | Complex business objects with relationships | Orders, user profiles, nested data |
| JSON Schema | ⚡ Very Fast | Medium | API request validation, configuration files | REST endpoints, config validation |
| Marshmallow | 🐢 Slower (pure Python) | Very High | Complex serialization/deserialization | Nested objects with custom transformations |
| Cerberus | ⚡ Fast | High | Document validation, MongoDB | JSON document validation |
| Voluptuous | ⚡ Fast | Medium | Simple schema validation | Form data, simple APIs |
| Custom Validators | Variable | Maximum | Business rules, cross-field validation | Domain-specific logic |
| Type Hints Only | ⚡ Fastest | Low | Simple type checking | Primitive parameters, internal functions |
Schema Format Comparison
| Format | Language Support | Versioning | Validation Features | Best For |
|---|---|---|---|---|
| JSON Schema | 40+ languages | Draft 4-7, 2019-09, 2020-12 | Types, formats, patterns, conditionals | REST APIs, configuration, data validation |
| Protocol Buffers | 12 languages | Backward/forward compatible | Strong typing, required/optional | gRPC services, high-performance systems |
| Avro | 11 languages | Schema evolution rules | Rich types, default values | Apache Kafka, Hadoop, big data |
| Thrift | 28 languages | Field IDs for compatibility | Strong typing, enums, structs | Cross-language services |
| GraphQL SDL | 20+ languages | Deprecation, schema stitching | Rich type system, interfaces, unions | GraphQL APIs, real-time queries |
| Pydantic | Python only | Semantic versioning | Python type hints, validators, JSON Schema export | Python applications, data validation |
| OpenAPI | 30+ languages | OpenAPI versioning | Request/response schemas, parameters, security | REST API documentation and validation |
3.3 Parallel Function Calling
📖 Definition: What is Parallel Function Calling?
Parallel function calling enables agents to execute multiple tool calls simultaneously, significantly reducing response latency and improving throughput. Instead of sequential execution, the agent can invoke independent functions concurrently, aggregating results for complex queries.
⚡ Key Concepts
- Concurrent Execution: Multiple tools run simultaneously
- Dependency Management: Handle tool interdependencies
- Result Aggregation: Combine parallel results
- Error Isolation: Failures don't affect other calls
- Resource Pooling: Manage connection limits
📈 Performance Benefits
- 3-10x faster response times
- 70% reduction in total latency
- Better resource utilization
- Improved user experience
- Higher throughput for batch operations
🔄 Parallel Patterns
- Fan-out / Fan-in
- Map-Reduce
- Data parallelism
- Task parallelism
- Pipeline parallelism
🎯 Why Use Parallel Function Calling?
🚀 Performance
- Reduce latency from O(n) to O(1)
- Handle multiple API calls simultaneously
- Process large datasets in parallel
- Utilize multi-core processors
💰 Cost Efficiency
- Better resource utilization
- Fewer sequential timeouts
- Optimized connection pooling
- Reduced infrastructure costs
🎨 User Experience
- Faster responses to complex queries
- Real-time data aggregation
- Progressive result display
- Reduced perceived latency
🛡️ Resilience
- Isolated failures
- Partial results available
- Automatic retry per task
- Graceful degradation
How to Use: Advanced Parallel Function Calling
1. Advanced Parallel Execution System
import asyncio
from typing import List, Dict, Any, Callable, Optional
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import threading
from dataclasses import dataclass
from enum import Enum
import queue
from collections import defaultdict
import psutil
import heapq
class ParallelStrategy(Enum):
THREAD = "thread" # For I/O bound tasks
PROCESS = "process" # For CPU bound tasks
ASYNC = "async" # For asyncio native tasks
HYBRID = "hybrid" # Automatically choose best strategy
@dataclass
class Task:
"""Represents a task to be executed in parallel"""
id: str
name: str
func: Callable
args: tuple
kwargs: dict
priority: int = 0
dependencies: List[str] = None
timeout: Optional[float] = None
retry_count: int = 0
max_retries: int = 3
@dataclass
class TaskResult:
"""Result of a parallel task execution"""
task_id: str
status: str # 'success', 'failed', 'timeout'
result: Any = None
error: Optional[str] = None
start_time: float = 0
end_time: float = 0
worker_id: Optional[str] = None
@property
def duration(self) -> float:
return self.end_time - self.start_time
class AdaptiveParallelExecutor:
"""
Advanced parallel executor with adaptive strategy selection
"""
def __init__(self, max_workers: int = None):
self.max_workers = max_workers or psutil.cpu_count() * 4
self.thread_pool = ThreadPoolExecutor(max_workers=self.max_workers)
self.process_pool = ProcessPoolExecutor(max_workers=psutil.cpu_count())
# Task queues by priority
self.task_queues = {
i: asyncio.Queue() for i in range(5) # 5 priority levels
}
# Performance tracking
self.strategy_performance = defaultdict(list)
self.worker_stats = defaultdict(lambda: {'tasks': 0, 'total_time': 0})
# Result tracking
self.results = {}
self.futures = []
# Start worker tasks
self.workers = []
self.running = True
self._start_workers()
def _start_workers(self):
"""Start worker tasks for each priority queue"""
for priority in range(5):
for _ in range(self.max_workers // 5):
worker = asyncio.create_task(self._worker_loop(priority))
self.workers.append(worker)
async def _worker_loop(self, priority: int):
"""Worker loop processing tasks from a priority queue"""
worker_id = f"worker-{priority}-{len(self.workers)}"
while self.running:
try:
# Get task from queue with timeout
task = await asyncio.wait_for(
self.task_queues[priority].get(),
timeout=1.0
)
# Execute task
result = await self._execute_task(task, worker_id)
# Store result
self.results[task.id] = result
# Update statistics
self.worker_stats[worker_id]['tasks'] += 1
self.worker_stats[worker_id]['total_time'] += result.duration
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Worker error: {e}")
async def _execute_task(self, task: Task, worker_id: str) -> TaskResult:
"""Execute a single task with appropriate strategy"""
result = TaskResult(
task_id=task.id,
start_time=time.time(),
worker_id=worker_id
)
# Determine best execution strategy
strategy = await self._select_strategy(task)
try:
# Execute with timeout
if task.timeout:
coro = asyncio.wait_for(
self._execute_with_strategy(task, strategy),
timeout=task.timeout
)
result.result = await coro
else:
result.result = await self._execute_with_strategy(task, strategy)
result.status = 'success'
except asyncio.TimeoutError:
result.status = 'timeout'
result.error = f"Task timed out after {task.timeout}s"
# Retry logic
if task.retry_count < task.max_retries:
task.retry_count += 1
await self.submit_task(task)
except Exception as e:
result.status = 'failed'
result.error = str(e)
# Retry logic for failures
if task.retry_count < task.max_retries:
task.retry_count += 1
await self.submit_task(task)
result.end_time = time.time()
# Record strategy performance
self.strategy_performance[strategy].append(result.duration)
return result
async def _select_strategy(self, task: Task) -> ParallelStrategy:
"""Intelligently select execution strategy"""
# Check if it's a coroutine function
if asyncio.iscoroutinefunction(task.func):
return ParallelStrategy.ASYNC
# Analyze function for CPU intensity
if self._is_cpu_intensive(task.func):
return ParallelStrategy.PROCESS
# Check historical performance
best_strategy = self._get_best_strategy(task.func.__name__)
if best_strategy:
return best_strategy
# Default to thread pool for I/O bound
return ParallelStrategy.THREAD
def _is_cpu_intensive(self, func: Callable) -> bool:
"""Heuristic to determine if function is CPU intensive"""
# Check function name for common CPU-intensive patterns
cpu_keywords = ['calculate', 'compute', 'process', 'analyze',
'transform', 'encode', 'decode', 'encrypt', 'decrypt']
func_name = func.__name__.lower()
for keyword in cpu_keywords:
if keyword in func_name:
return True
# Check if function has loops or heavy operations
import inspect
try:
source = inspect.getsource(func)
loop_indicators = ['for ', 'while ', 'recursion', 'numpy', 'pandas']
for indicator in loop_indicators:
if indicator in source:
return True
except:
pass
return False
def _get_best_strategy(self, func_name: str) -> Optional[ParallelStrategy]:
"""Get best performing strategy from historical data"""
strategy_avgs = {}
for strategy in ParallelStrategy:
if func_name in self.strategy_performance:
durations = self.strategy_performance[strategy]
if durations:
strategy_avgs[strategy] = sum(durations) / len(durations)
if strategy_avgs:
return min(strategy_avgs.items(), key=lambda x: x[1])[0]
return None
async def _execute_with_strategy(self, task: Task, strategy: ParallelStrategy) -> Any:
"""Execute task with selected strategy"""
if strategy == ParallelStrategy.ASYNC:
return await task.func(*task.args, **task.kwargs)
elif strategy == ParallelStrategy.THREAD:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.thread_pool,
lambda: task.func(*task.args, **task.kwargs)
)
elif strategy == ParallelStrategy.PROCESS:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.process_pool,
lambda: task.func(*task.args, **task.kwargs)
)
else:
# Hybrid - try async first, fallback to thread
try:
return await task.func(*task.args, **task.kwargs)
except:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.thread_pool,
lambda: task.func(*task.args, **task.kwargs)
)
async def submit_task(self, task: Task) -> str:
"""Submit a task for execution"""
# Check dependencies
if task.dependencies:
for dep_id in task.dependencies:
if dep_id not in self.results:
# Dependency not ready, queue with higher priority
task.priority = min(task.priority + 1, 4)
break
# Add to appropriate priority queue
await self.task_queues[task.priority].put(task)
return task.id
async def submit_batch(self, tasks: List[Task]) -> List[str]:
"""Submit multiple tasks and return their IDs"""
task_ids = []
for task in tasks:
task_id = await self.submit_task(task)
task_ids.append(task_id)
return task_ids
async def wait_for_results(self, task_ids: List[str],
timeout: Optional[float] = None) -> Dict[str, TaskResult]:
"""Wait for specific tasks to complete"""
start_time = time.time()
results = {}
while len(results) < len(task_ids):
# Check if timeout exceeded
if timeout and (time.time() - start_time) > timeout:
break
# Collect available results
for task_id in task_ids:
if task_id in self.results and task_id not in results:
results[task_id] = self.results[task_id]
await asyncio.sleep(0.01) # Small delay to prevent CPU spinning
return results
async def wait_all(self) -> Dict[str, TaskResult]:
"""Wait for all submitted tasks to complete"""
while any(q.qsize() > 0 for q in self.task_queues.values()):
await asyncio.sleep(0.1)
# Wait for in-progress tasks
while len(self.results) < self._get_total_submitted():
await asyncio.sleep(0.1)
return self.results.copy()
def _get_total_submitted(self) -> int:
"""Get total number of submitted tasks"""
total = 0
for q in self.task_queues.values():
total += q.qsize()
return total + len(self.results)
def get_stats(self) -> Dict:
"""Get executor statistics"""
return {
'workers': len(self.workers),
'max_workers': self.max_workers,
'queued_tasks': sum(q.qsize() for q in self.task_queues.values()),
'completed_tasks': len(self.results),
'strategy_performance': {
s.value: {
'avg_duration': sum(d) / len(d) if d else 0,
'count': len(d)
}
for s, d in self.strategy_performance.items()
},
'worker_stats': dict(self.worker_stats)
}
async def shutdown(self):
"""Gracefully shut down the executor"""
self.running = False
# Wait for workers to finish
for worker in self.workers:
worker.cancel()
await asyncio.gather(*self.workers, return_exceptions=True)
# Shutdown pools
self.thread_pool.shutdown(wait=True)
self.process_pool.shutdown(wait=True)
# Dependency-Aware Parallel Execution
class DependencyGraph:
"""
Manages task dependencies for parallel execution
"""
def __init__(self):
self.graph = defaultdict(set)
self.reverse_graph = defaultdict(set)
self.task_data = {}
def add_task(self, task_id: str, task: Task):
"""Add a task to the dependency graph"""
self.task_data[task_id] = task
if task.dependencies:
for dep_id in task.dependencies:
self.graph[task_id].add(dep_id)
self.reverse_graph[dep_id].add(task_id)
def get_ready_tasks(self) -> List[str]:
"""Get tasks with no pending dependencies"""
ready = []
for task_id, task in self.task_data.items():
if task_id not in self.graph:
continue
# Check if all dependencies are satisfied
all_done = True
for dep_id in self.graph[task_id]:
if dep_id in self.task_data: # Dependency not yet processed
all_done = False
break
if all_done:
ready.append(task_id)
return ready
def mark_completed(self, task_id: str):
"""Mark a task as completed and update dependencies"""
if task_id in self.task_data:
del self.task_data[task_id]
# Remove from dependency graphs
if task_id in self.graph:
del self.graph[task_id]
# Update reverse dependencies
for dep_id in list(self.reverse_graph[task_id]):
self.graph[dep_id].discard(task_id)
if task_id in self.reverse_graph:
del self.reverse_graph[task_id]
def get_execution_levels(self) -> List[List[str]]:
"""
Group tasks into parallel execution levels
"""
levels = []
remaining = set(self.task_data.keys())
while remaining:
# Find tasks with no dependencies in remaining set
current_level = []
for task_id in remaining:
deps = self.graph[task_id]
if not any(dep in remaining for dep in deps):
current_level.append(task_id)
if not current_level:
# Circular dependency detected
raise ValueError("Circular dependency detected")
levels.append(current_level)
remaining -= set(current_level)
return levels
# Example: Complex Parallel Workflow
class ParallelWorkflowExample:
"""
Example demonstrating complex parallel execution patterns
"""
def __init__(self):
self.executor = AdaptiveParallelExecutor()
self.dependency_graph = DependencyGraph()
async def run_analytics_pipeline(self, user_id: str) -> Dict:
"""
Run a complex analytics pipeline with multiple parallel stages
Stages:
1. Fetch user data from multiple sources (parallel)
2. Process each data stream (parallel)
3. Aggregate results (sequential after processing)
4. Generate insights (parallel)
5. Compile report (sequential)
"""
tasks = []
# Stage 1: Parallel data fetching
fetch_tasks = [
Task(
id=f"fetch_profile_{user_id}",
name="fetch_profile",
func=self._fetch_user_profile,
args=(user_id,),
kwargs={},
priority=3
),
Task(
id=f"fetch_orders_{user_id}",
name="fetch_orders",
func=self._fetch_user_orders,
args=(user_id, 100),
kwargs={},
priority=3
),
Task(
id=f"fetch_activity_{user_id}",
name="fetch_activity",
func=self._fetch_user_activity,
args=(user_id, 30),
kwargs={},
priority=3
),
Task(
id=f"fetch_preferences_{user_id}",
name="fetch_preferences",
func=self._fetch_user_preferences,
args=(user_id,),
kwargs={},
priority=3
)
]
# Submit fetch tasks
fetch_ids = await self.executor.submit_batch(fetch_tasks)
for task in fetch_tasks:
self.dependency_graph.add_task(task.id, task)
# Wait for fetch results
fetch_results = await self.executor.wait_for_results(fetch_ids, timeout=10)
# Stage 2: Process each data stream in parallel
process_tasks = []
for result in fetch_results.values():
if result.status == 'success':
data = result.result
task = Task(
id=f"process_{result.task_id}",
name="process_data",
func=self._process_data_stream,
args=(data,),
kwargs={},
dependencies=[result.task_id],
priority=2
)
process_tasks.append(task)
process_ids = await self.executor.submit_batch(process_tasks)
for task in process_tasks:
self.dependency_graph.add_task(task.id, task)
# Stage 3: Aggregate results (sequential after processing)
process_results = await self.executor.wait_for_results(process_ids, timeout=15)
# Stage 4: Generate insights in parallel
insight_tasks = []
insight_types = ['behavioral', 'purchase', 'engagement', 'churn']
for insight_type in insight_types:
task = Task(
id=f"insight_{insight_type}",
name="generate_insight",
func=self._generate_insight,
args=(process_results, insight_type),
kwargs={},
dependencies=[t.id for t in process_tasks],
priority=1
)
insight_tasks.append(task)
insight_ids = await self.executor.submit_batch(insight_tasks)
for task in insight_tasks:
self.dependency_graph.add_task(task.id, task)
# Stage 5: Compile final report (sequential)
insight_results = await self.executor.wait_for_results(insight_ids, timeout=10)
final_report = await self._compile_report(
fetch_results, process_results, insight_results
)
# Get execution statistics
stats = self.executor.get_stats()
return {
'report': final_report,
'stats': stats,
'execution_levels': self.dependency_graph.get_execution_levels()
}
async def _fetch_user_profile(self, user_id: str) -> Dict:
"""Simulate fetching user profile"""
await asyncio.sleep(0.5)
return {
'user_id': user_id,
'name': 'John Doe',
'email': 'john@example.com',
'member_since': '2020-01-01',
'tier': 'premium'
}
async def _fetch_user_orders(self, user_id: str, limit: int) -> List[Dict]:
"""Simulate fetching user orders"""
await asyncio.sleep(0.8)
return [
{'order_id': 'ORD001', 'amount': 299.99, 'date': '2024-01-15'},
{'order_id': 'ORD002', 'amount': 149.50, 'date': '2024-02-01'},
{'order_id': 'ORD003', 'amount': 89.99, 'date': '2024-02-15'}
][:limit]
async def _fetch_user_activity(self, user_id: str, days: int) -> Dict:
"""Simulate fetching user activity"""
await asyncio.sleep(0.3)
return {
'last_login': '2024-02-20',
'total_visits': 45,
'pages_viewed': 120,
'avg_session_duration': 180 # seconds
}
async def _fetch_user_preferences(self, user_id: str) -> Dict:
"""Simulate fetching user preferences"""
await asyncio.sleep(0.2)
return {
'theme': 'dark',
'notifications': True,
'language': 'en',
'currency': 'USD'
}
async def _process_data_stream(self, data: Any) -> Dict:
"""Process a data stream"""
# Simulate CPU-intensive processing
await asyncio.sleep(0.5)
return {'processed': True, 'insights': data}
async def _generate_insight(self, data: Dict, insight_type: str) -> Dict:
"""Generate specific insight from data"""
await asyncio.sleep(0.3)
return {
'type': insight_type,
'score': 0.85,
'recommendations': ['action1', 'action2']
}
async def _compile_report(self, fetch: Dict, process: Dict, insights: Dict) -> Dict:
"""Compile final report"""
return {
'summary': 'User analytics report',
'fetch_stats': {k: v.status for k, v in fetch.items()},
'process_stats': {k: v.status for k, v in process.items()},
'insights': {k: v.result for k, v in insights.items() if v.status == 'success'},
'generated_at': time.time()
}
# Usage Example
async def demonstrate_parallel_execution():
"""Example: Using advanced parallel execution"""
# 1. Basic parallel execution
executor = AdaptiveParallelExecutor(max_workers=10)
# Create various task types
tasks = [
Task(
id="io_task_1",
name="io_bound",
func=lambda x: f"IO result: {x}",
args=("data1",),
kwargs={},
priority=2
),
Task(
id="cpu_task_1",
name="cpu_bound",
func=lambda x: sum(i * i for i in range(x)),
args=(1000000,),
kwargs={},
priority=1
),
Task(
id="async_task_1",
name="async_task",
func=asyncio.sleep,
args=(0.5,),
kwargs={},
priority=3
)
]
# Submit tasks
task_ids = await executor.submit_batch(tasks)
# Wait for results
results = await executor.wait_for_results(task_ids)
# 2. Complex workflow
workflow = ParallelWorkflowExample()
report = await workflow.run_analytics_pipeline("user_123")
# 3. Get executor statistics
stats = executor.get_stats()
print(f"Executor stats: {stats}")
# 4. Cleanup
await executor.shutdown()
return {
'basic_results': results,
'workflow_report': report,
'stats': stats
}
Parallel Execution Patterns
| Pattern | Description | Use Case | Example | Performance Gain |
|---|---|---|---|---|
| Fan-Out/Fan-In | Distribute work to multiple workers, collect results | Parallel API calls, data fetching | Get user data from 5 services | 5x faster |
| Map-Reduce | Process chunks in parallel, combine results | Large dataset processing | Analyze 1M records | 10-100x faster |
| Pipeline | Parallel stages with dependencies | ETL workflows | Extract → Transform → Load | 3x faster |
| Scatter-Gather | Broadcast query, aggregate responses | Distributed search | Search across databases | Nx faster (N = sources) |
| Master-Worker | Coordinator distributes tasks to workers | Task queues, job processing | Image processing queue | Linear with workers |
| Divide and Conquer | Recursively split problem, solve subproblems | Sorting, searching algorithms | Parallel merge sort | O(log n) depth |
| Data Parallelism | Same operation on different data chunks | Matrix operations, image processing | Apply filter to 1000 images | Linear with cores |
| Task Parallelism | Different operations on same/different data | Complex workflows | Analytics pipeline | 3-5x faster |
Parallel Execution Optimization Tips
🚀 Performance Tips
- Right-size worker pools: Too many workers cause context switching overhead
- Use async for I/O: Async tasks are more efficient than threads for I/O
- Batch small tasks: Combine many tiny tasks to reduce overhead
- Monitor memory usage: Parallel tasks can consume significant memory
- Implement backpressure: Prevent overwhelming downstream systems
⚠️ Common Pitfalls
- Thread safety: Ensure shared data is properly synchronized
- Deadlocks: Avoid circular dependencies between tasks
- Resource exhaustion: Database connections, file handles, etc.
- Non-idempotent operations: Retries may cause duplicate effects
- Debugging complexity: Parallel bugs are harder to reproduce
3.4 Tool Retry & Error Policies
📖 Definition: What are Tool Retry & Error Policies?
Retry and error policies define how tools handle failures, transient errors, and exceptional conditions. They ensure robustness, reliability, and graceful degradation of agent capabilities through intelligent error classification, retry strategies, and circuit breaking.
🔄 Retry Strategies
- Fixed Delay: Wait constant time between retries
- Exponential Backoff: Increasing delay with each retry
- Jitter: Add randomness to prevent thundering herd
- Linear Backoff: Linear increase in wait time
- Fibonacci Backoff: Fibonacci sequence for delays
- Immediate Retry: Retry instantly (use with caution)
⚠️ Error Types
- Transient Errors: Network timeouts, rate limits (retryable)
- Permanent Errors: Invalid input, auth failure (non-retryable)
- Business Errors: Domain-specific failures
- System Errors: Infrastructure failures
- Timeout Errors: Operation exceeded time limit
- Resource Errors: Out of memory, disk full
🛡️ Circuit Breaker States
- CLOSED: Normal operation, requests pass through
- OPEN: Failing, requests rejected immediately
- HALF_OPEN: Testing if service recovered
- HALF_OPEN_LIMITED: Limited test requests
🎯 Why Use Retry & Error Policies?
📈 Reliability
- 99.9%+ success rate with retries
- Handle temporary failures automatically
- Graceful degradation
- Self-healing systems
💰 Cost Optimization
- Avoid unnecessary retries
- Smart error classification
- Circuit breaking prevents cascading failures
- Reduce resource waste
👥 User Experience
- Fewer visible errors
- Better error messages
- Self-healing systems
- Consistent behavior
📊 Observability
- Track error patterns
- Monitor retry effectiveness
- Alert on critical failures
- Identify problematic services
How to Use: Advanced Retry & Error Policies
1. Comprehensive Retry System with Circuit Breaker
from typing import Callable, Any, Optional, Type, Dict, List
import asyncio
import time
import random
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import logging
from collections import deque
import threading
class RetryStrategy(Enum):
"""Available retry strategies"""
FIXED = "fixed"
EXPONENTIAL = "exponential"
LINEAR = "linear"
FIBONACCI = "fibonacci"
JITTERED_EXPONENTIAL = "jittered_exponential"
DECORRELATED_JITTER = "decorrelated_jitter"
class ErrorCategory(Enum):
"""Error categories for classification"""
TRANSIENT = "transient" # Retryable
PERMANENT = "permanent" # Non-retryable
BUSINESS = "business" # Domain error
SYSTEM = "system" # Infrastructure error
TIMEOUT = "timeout" # Timeout error
RATE_LIMIT = "rate_limit" # Rate limiting
AUTHENTICATION = "authentication" # Auth failure
VALIDATION = "validation" # Input validation
RESOURCE_EXHAUSTION = "resource_exhaustion" # Out of memory/disk
class CircuitState(Enum):
"""Circuit breaker states"""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
HALF_OPEN_LIMITED = "half_open_limited" # Limited test requests
@dataclass
class RetryConfig:
"""Configuration for retry behavior"""
max_retries: int = 3
strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
base_delay: float = 1.0
max_delay: float = 60.0
jitter: bool = True
jitter_factor: float = 0.1
retry_on_timeout: bool = True
retry_on_rate_limit: bool = True
retry_on_exceptions: List[Type[Exception]] = None
no_retry_on_exceptions: List[Type[Exception]] = None
retry_on_http_status: List[int] = None
no_retry_on_http_status: List[int] = None
@dataclass
class CircuitBreakerConfig:
"""Configuration for circuit breaker"""
failure_threshold: int = 5
recovery_timeout: float = 60.0
half_open_max_calls: int = 3
success_threshold: int = 2
rolling_window_seconds: float = 60.0
minimum_calls: int = 10
class RollingCounter:
"""Rolling window counter for metrics"""
def __init__(self, window_seconds: float):
self.window_seconds = window_seconds
self.buckets = deque()
self.lock = threading.Lock()
def add(self, value: float = 1):
"""Add a value to the counter"""
with self.lock:
now = time.time()
self.buckets.append((now, value))
self._cleanup(now)
def _cleanup(self, now: float):
"""Remove old buckets"""
while self.buckets and now - self.buckets[0][0] > self.window_seconds:
self.buckets.popleft()
def sum(self) -> float:
"""Get sum of values in window"""
with self.lock:
now = time.time()
self._cleanup(now)
return sum(v for _, v in self.buckets)
def count(self) -> int:
"""Get count of events in window"""
with self.lock:
now = time.time()
self._cleanup(now)
return len(self.buckets)
class CircuitBreaker:
"""
Advanced circuit breaker with rolling windows and metrics
"""
def __init__(self, name: str, config: CircuitBreakerConfig):
self.name = name
self.config = config
self.state = CircuitState.CLOSED
self.failure_counter = RollingCounter(config.rolling_window_seconds)
self.success_counter = RollingCounter(config.rolling_window_seconds)
self.total_counter = RollingCounter(config.rolling_window_seconds)
self.last_failure_time = None
self.half_open_calls = 0
self.consecutive_successes = 0
self.lock = asyncio.Lock()
self.logger = logging.getLogger(f"circuit_breaker.{name}")
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""
Call function with circuit breaker protection
"""
# Check state
await self._check_state()
# Record attempt
self.total_counter.add()
if self.state == CircuitState.OPEN:
if await self._should_attempt_recovery():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
self.consecutive_successes = 0
else:
raise CircuitBreakerOpenError(f"Circuit breaker {self.name} is OPEN")
if self.state == CircuitState.HALF_OPEN_LIMITED:
if self.half_open_calls >= self.config.half_open_max_calls:
raise CircuitBreakerOpenError(f"Circuit breaker {self.name} in HALF_OPEN with max calls")
if self.state in [CircuitState.HALF_OPEN, CircuitState.HALF_OPEN_LIMITED]:
self.half_open_calls += 1
# Execute function
try:
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
# Success - record and potentially close circuit
await self._handle_success()
return result
except Exception as e:
await self._handle_failure(e)
raise e
async def _check_state(self):
"""Update state based on metrics"""
async with self.lock:
total_calls = self.total_counter.count()
failures = self.failure_counter.count()
if total_calls < self.config.minimum_calls:
return
failure_rate = failures / total_calls if total_calls > 0 else 0
if self.state == CircuitState.CLOSED and failure_rate > 0.5:
self.state = CircuitState.OPEN
self.last_failure_time = time.time()
self.logger.warning(f"Circuit breaker {self.name} OPEN due to {failure_rate:.2%} failure rate")
async def _should_attempt_recovery(self) -> bool:
"""Determine if we should attempt recovery"""
if not self.last_failure_time:
return True
elapsed = time.time() - self.last_failure_time
return elapsed > self.config.recovery_timeout
async def _handle_success(self):
"""Handle successful call"""
async with self.lock:
self.success_counter.add()
if self.state in [CircuitState.HALF_OPEN, CircuitState.HALF_OPEN_LIMITED]:
self.consecutive_successes += 1
if self.consecutive_successes >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self.failure_counter = RollingCounter(self.config.rolling_window_seconds)
self.success_counter = RollingCounter(self.config.rolling_window_seconds)
self.total_counter = RollingCounter(self.config.rolling_window_seconds)
self.logger.info(f"Circuit breaker {self.name} CLOSED after successful recovery")
async def _handle_failure(self, error: Exception):
"""Handle failed call"""
async with self.lock:
self.failure_counter.add()
self.last_failure_time = time.time()
if self.state in [CircuitState.HALF_OPEN, CircuitState.HALF_OPEN_LIMITED]:
self.state = CircuitState.OPEN
self.logger.warning(f"Circuit breaker {self.name} OPEN after failure in HALF_OPEN state")
class AdvancedRetryPolicy:
"""
Advanced retry policy with multiple strategies and circuit breaking
"""
def __init__(self, name: str, retry_config: RetryConfig,
circuit_config: CircuitBreakerConfig = None):
self.name = name
self.retry_config = retry_config
self.circuit_breaker = CircuitBreaker(name, circuit_config) if circuit_config else None
self.stats = {
'total_calls': 0,
'successful_calls': 0,
'failed_calls': 0,
'retried_calls': 0,
'circuit_open_calls': 0,
'total_retries': 0,
'avg_retry_delay': 0
}
self.logger = logging.getLogger(f"retry_policy.{name}")
async def execute(self, func: Callable, *args, **kwargs) -> Any:
"""
Execute function with retry and circuit breaker
"""
self.stats['total_calls'] += 1
last_exception = None
for attempt in range(self.retry_config.max_retries + 1):
try:
# Check circuit breaker
if self.circuit_breaker:
result = await self.circuit_breaker.call(func, *args, **kwargs)
else:
result = await self._execute_func(func, *args, **kwargs)
self.stats['successful_calls'] += 1
return result
except Exception as e:
last_exception = e
# Classify error
category = self._classify_error(e)
# Check if retryable
if not self._is_retryable(e, category):
self.stats['failed_calls'] += 1
raise
# Check if max retries reached
if attempt >= self.retry_config.max_retries:
self.stats['failed_calls'] += 1
raise MaxRetriesExceededError(
f"Max retries ({self.retry_config.max_retries}) exceeded"
) from e
# Calculate delay
delay = self._calculate_delay(attempt)
self.stats['total_retries'] += 1
self.stats['avg_retry_delay'] = (
self.stats['avg_retry_delay'] * (self.stats['total_retries'] - 1) + delay
) / self.stats['total_retries']
# Log retry
self.logger.warning(
f"Retry {attempt + 1}/{self.retry_config.max_retries} for {func.__name__} "
f"after {delay:.2f}s due to: {e}"
)
await asyncio.sleep(delay)
async def _execute_func(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with timeout"""
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay based on strategy"""
if self.retry_config.strategy == RetryStrategy.FIXED:
delay = self.retry_config.base_delay
elif self.retry_config.strategy == RetryStrategy.EXPONENTIAL:
delay = self.retry_config.base_delay * (2 ** attempt)
elif self.retry_config.strategy == RetryStrategy.LINEAR:
delay = self.retry_config.base_delay * (attempt + 1)
elif self.retry_config.strategy == RetryStrategy.FIBONACCI:
fib = [1, 1]
for i in range(2, attempt + 2):
fib.append(fib[i-1] + fib[i-2])
delay = self.retry_config.base_delay * fib[attempt]
elif self.retry_config.strategy == RetryStrategy.JITTERED_EXPONENTIAL:
exp_delay = self.retry_config.base_delay * (2 ** attempt)
jitter = random.uniform(0, exp_delay * self.retry_config.jitter_factor)
delay = exp_delay + jitter
elif self.retry_config.strategy == RetryStrategy.DECORRELATED_JITTER:
# AWS recommended jitter strategy
delay = min(
self.retry_config.max_delay,
random.uniform(
self.retry_config.base_delay,
self.retry_config.base_delay * 3 ** attempt
)
)
else:
delay = self.retry_config.base_delay
# Apply jitter if configured
if self.retry_config.jitter and self.retry_config.strategy not in [
RetryStrategy.JITTERED_EXPONENTIAL,
RetryStrategy.DECORRELATED_JITTER
]:
delay += random.uniform(0, delay * self.retry_config.jitter_factor)
return min(delay, self.retry_config.max_delay)
def _classify_error(self, error: Exception) -> ErrorCategory:
"""Classify error type"""
error_str = str(error).lower()
if isinstance(error, asyncio.TimeoutError):
return ErrorCategory.TIMEOUT
if "rate limit" in error_str or "too many requests" in error_str:
return ErrorCategory.RATE_LIMIT
if isinstance(error, (ConnectionError, ConnectionRefusedError,
ConnectionResetError, ConnectionAbortedError)):
return ErrorCategory.TRANSIENT
if isinstance(error, ValueError) or "invalid" in error_str:
return ErrorCategory.VALIDATION
if isinstance(error, PermissionError) or "auth" in error_str or "unauthorized" in error_str:
return ErrorCategory.AUTHENTICATION
if "business" in error_str or "domain" in error_str:
return ErrorCategory.BUSINESS
if "memory" in error_str or "disk" in error_str or "resource" in error_str:
return ErrorCategory.RESOURCE_EXHAUSTION
return ErrorCategory.SYSTEM
def _is_retryable(self, error: Exception, category: ErrorCategory) -> bool:
"""Determine if error is retryable"""
# Check HTTP status codes if available
if hasattr(error, 'status_code'):
if self.retry_config.no_retry_on_http_status:
if error.status_code in self.retry_config.no_retry_on_http_status:
return False
if self.retry_config.retry_on_http_status:
return error.status_code in self.retry_config.retry_on_http_status
# Check custom exception lists
if self.retry_config.retry_on_exceptions:
if any(isinstance(error, exc) for exc in self.retry_config.retry_on_exceptions):
return True
if self.retry_config.no_retry_on_exceptions:
if any(isinstance(error, exc) for exc in self.retry_config.no_retry_on_exceptions):
return False
# Classify by category
retryable_categories = [
ErrorCategory.TRANSIENT,
ErrorCategory.TIMEOUT,
ErrorCategory.RATE_LIMIT,
ErrorCategory.SYSTEM
]
if category in retryable_categories:
return True
return False
def get_stats(self) -> Dict:
"""Get retry policy statistics"""
stats = self.stats.copy()
if self.circuit_breaker:
stats['circuit_state'] = self.circuit_breaker.state.value
stats['circuit_failures'] = self.circuit_breaker.failure_counter.count()
return stats
# Rate Limiting Handler
class RateLimiter:
"""
Token bucket rate limiter with multiple strategies
"""
def __init__(self, rate: float, capacity: int = None):
"""
Initialize rate limiter
Args:
rate: Requests per second
capacity: Maximum burst capacity (defaults to rate)
"""
self.rate = rate
self.capacity = capacity or int(rate)
self.tokens = self.capacity
self.last_refill = time.time()
self.lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> bool:
"""
Acquire tokens from the bucket
Returns:
True if tokens acquired, False if rate limited
"""
async with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
async def wait_and_acquire(self, tokens: int = 1):
"""Wait until tokens are available"""
while True:
if await self.acquire(tokens):
return
# Calculate wait time
wait_time = (tokens - self.tokens) / self.rate
await asyncio.sleep(max(0.001, wait_time))
def _refill(self):
"""Refill tokens based on elapsed time"""
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
class DistributedRateLimiter:
"""
Distributed rate limiter using Redis
"""
def __init__(self, redis_client, key: str, rate: float, capacity: int):
self.redis = redis_client
self.key = key
self.rate = rate
self.capacity = capacity
async def acquire(self, tokens: int = 1) -> bool:
"""Acquire tokens using Redis"""
import time
now = time.time()
pipeline = self.redis.pipeline()
# Remove old tokens
pipeline.zremrangebyscore(self.key, 0, now - 1)
# Count existing tokens
pipeline.zcard(self.key)
# Add current request
pipeline.zadd(self.key, {str(now): now})
# Set expiry
pipeline.expire(self.key, 60)
results = await pipeline.execute()
current_tokens = results[1]
return current_tokens < self.capacity
# Usage Example
async def demonstrate_retry_policies():
"""Example: Using advanced retry policies"""
# 1. Configure retry policy
retry_config = RetryConfig(
max_retries=5,
strategy=RetryStrategy.JITTERED_EXPONENTIAL,
base_delay=1.0,
max_delay=30.0,
jitter=True,
retry_on_http_status=[429, 500, 502, 503, 504],
no_retry_on_http_status=[400, 401, 403, 404]
)
circuit_config = CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout=60,
half_open_max_calls=3,
success_threshold=2,
rolling_window_seconds=60,
minimum_calls=10
)
# 2. Create retry policy with circuit breaker
retry_policy = AdvancedRetryPolicy(
name="api_caller",
retry_config=retry_config,
circuit_config=circuit_config
)
# 3. Define unreliable function
async def unreliable_api_call(param: str):
"""Simulate unreliable API"""
import random
r = random.random()
if r < 0.6: # 60% failure rate
if r < 0.2:
raise asyncio.TimeoutError("API timeout")
elif r < 0.4:
raise ConnectionError("Network error")
else:
# Simulate HTTP error
class HTTPError(Exception):
def __init__(self, status_code):
self.status_code = status_code
raise HTTPError(500)
return f"Success: {param}"
# 4. Execute with retry policy
try:
result = await retry_policy.execute(
unreliable_api_call,
"test_param"
)
print(f"Result: {result}")
except MaxRetriesExceededError:
print("All retries failed")
# 5. Get statistics
stats = retry_policy.get_stats()
print(f"Retry stats: {stats}")
# 6. Rate limiter example
rate_limiter = RateLimiter(rate=10, capacity=20) # 10 req/sec, burst 20
async def rate_limited_call(n):
if await rate_limiter.acquire():
return f"Call {n} succeeded"
else:
return f"Call {n} rate limited"
# Make 30 rapid calls
tasks = [rate_limited_call(i) for i in range(30)]
results = await asyncio.gather(*tasks)
# Count successes and rate limits
successes = sum(1 for r in results if "succeeded" in r)
limited = sum(1 for r in results if "rate limited" in r)
print(f"Rate limiter: {successes} succeeded, {limited} rate limited")
return {
'retry_stats': stats,
'rate_limiter_results': {'successes': successes, 'limited': limited}
}
3.5 Built-in Tools: Google Workspace, Search, Code
📖 Definition: What are Built-in Google Tools?
Google ADK provides a comprehensive set of built-in tools that integrate directly with Google services. These pre-built tools enable agents to interact with Gmail, Calendar, Drive, Google Search, and execute code in sandboxed environments, providing enterprise-grade functionality out of the box.
📧 Google Workspace
Tools for Gmail, Calendar, Drive, Docs, Sheets, and Meet. Enable agents to send emails, schedule meetings, manage files, and collaborate on documents with full OAuth support.
15+ tools🔍 Google Search
Web search, image search, news search, and custom search capabilities. Agents can retrieve real-time information from the internet with filtering and safe search.
4 search types💻 Code Execution
Sandboxed Python, JavaScript, and other language execution. Agents can run code, analyze results, and generate dynamic content with resource limits and security.
5+ languages🤖 AI Services
Integration with Vertex AI, Translation, Vision API, Natural Language, and other Google AI services for advanced capabilities.
20+ APIs🎯 Why Use Built-in Google Tools?
⚡ Instant Integration
- Zero configuration for basic usage
- Automatic OAuth handling with token refresh
- Pre-built error handling for Google APIs
- Optimized for agent workflows
- Batch operations for efficiency
🔒 Enterprise Security
- Google-grade authentication
- Fine-grained permission scopes
- Audit logging built-in
- Compliant with SOC2, HIPAA, GDPR
- Data residency controls
🚀 High Performance
- Optimized API calls with connection pooling
- Built-in caching at multiple levels
- Automatic retries with exponential backoff
- Rate limit management
- Quota tracking and alerts
Built-in Tools Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ BUILT-IN GOOGLE TOOLS ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ AUTHENTICATION LAYER │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ OAuth 2.0 │ │ Service │ │ API Key │ Token │ │
│ │ │ Flow │ │ Account │ │ Management │ Refresh │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ & Cache │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ DISCOVERY & REGISTRY │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ API │ │ Schema │ │ Version │ Capability │ │
│ │ │ Discovery │ │ Registry │ │ Manager │ Detection │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ TOOL CATEGORIES │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ WORKSPACE TOOLS │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Gmail │ │ Calendar │ │ Drive │ │ Docs │ │ │ │
│ │ │ │ Tools │ │ Tools │ │ Tools │ │ Tools │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Sheets │ │ Slides │ │ Meet │ │ Forms │ │ │ │
│ │ │ │ Tools │ │ Tools │ │ Tools │ │ Tools │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ SEARCH TOOLS │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Web │ │ Image │ │ News │ │ Custom │ │ │ │
│ │ │ │ Search │ │ Search │ │ Search │ │ Search │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ SafeSearch, Language, Country Filters │ │ │ │
│ │ │ └──────────────────────────────────────────────────────┘ │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ CODE EXECUTION │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Python │ │ Java │ │ Node │ │ Go │ │ │ │
│ │ │ │ Runtime │ │ Runtime │ │ Runtime │ │ Runtime │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ Sandboxing, Resource Limits, Security Scanner │ │ │ │
│ │ │ └──────────────────────────────────────────────────────┘ │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ AI SERVICES │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Vertex │ │ Vision │ │ Lang │ │ Speech │ │ │ │
│ │ │ │ AI │ │ API │ │ API │ │ API │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ AutoML │ │ Dialog │ │ Natural │ │ Translate│ │ │ │
│ │ │ │ │ │ Flow │ │ Language │ │ API │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ MONITORING & METRICS │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Usage │ │ Latency │ │ Error │ Quota │ │
│ │ │ Tracking │ │ Metrics │ │ Tracking │ Alerts │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
How to Use: Advanced Built-in Tools Integration
1. Comprehensive Google Workspace Integration
from google.adk.tools import workspace
from google.adk.auth import OAuth2Manager, ServiceAccountManager
from typing import List, Dict, Optional, Any
import base64
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os
import mimetypes
from datetime import datetime, timedelta
import asyncio
import hashlib
class AdvancedWorkspaceToolkit:
"""
Comprehensive Google Workspace integration with advanced features
"""
def __init__(self, credentials_path: str = None, use_service_account: bool = False):
"""
Initialize Workspace toolkit with multiple auth methods
Args:
credentials_path: Path to OAuth credentials or service account JSON
use_service_account: Use service account instead of OAuth
"""
self.use_service_account = use_service_account
self.credentials_path = credentials_path
# Initialize auth managers
if use_service_account:
self.auth = ServiceAccountManager(
credentials_path=credentials_path,
scopes=self._get_all_scopes()
)
else:
self.auth = OAuth2Manager(
credentials_path=credentials_path,
scopes=self._get_all_scopes()
)
# Initialize service clients
self.services = self._init_services()
# Cache for rate limiting and quotas
self.request_cache = {}
self.quota_tracker = {}
self.batch_operations = []
def _get_all_scopes(self) -> List[str]:
"""Get all required OAuth scopes"""
return [
# Gmail scopes
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.labels',
'https://www.googleapis.com/auth/gmail.settings.basic',
# Calendar scopes
'https://www.googleapis.com/auth/calendar',
'https://www.googleapis.com/auth/calendar.events',
'https://www.googleapis.com/auth/calendar.settings.readonly',
# Drive scopes
'https://www.googleapis.com/auth/drive',
'https://www.googleapis.com/auth/drive.file',
'https://www.googleapis.com/auth/drive.metadata',
'https://www.googleapis.com/auth/drive.readonly',
# Docs scopes
'https://www.googleapis.com/auth/documents',
'https://www.googleapis.com/auth/documents.readonly',
# Sheets scopes
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/spreadsheets.readonly',
# Slides scopes
'https://www.googleapis.com/auth/presentations',
'https://www.googleapis.com/auth/presentations.readonly',
# Meet scopes
'https://www.googleapis.com/auth/meetings.space.created',
'https://www.googleapis.com/auth/meetings.space.readonly'
]
def _init_services(self) -> Dict:
"""Initialize all Google API services"""
from googleapiclient.discovery import build
services = {}
api_versions = {
'gmail': 'v1',
'calendar': 'v3',
'drive': 'v3',
'docs': 'v1',
'sheets': 'v4',
'slides': 'v1',
'meet': 'v2'
}
for api_name, version in api_versions.items():
credentials = self.auth.get_credentials()
services[api_name] = build(api_name, version, credentials=credentials)
return services
# ==================== GMAIL TOOLS ====================
class GmailTools:
"""Advanced Gmail operations"""
def __init__(self, parent):
self.parent = parent
self.service = parent.services['gmail']
@tool(
name="send_advanced_email",
description="Send email with advanced features like templates, tracking, and scheduling"
)
async def send_advanced_email(
self,
to: List[str],
subject: str,
template_name: str = None,
template_data: Dict = None,
body: str = None,
cc: List[str] = None,
bcc: List[str] = None,
attachments: List[str] = None,
schedule_time: datetime = None,
track_opens: bool = False,
track_clicks: bool = False,
priority: str = 'normal',
labels: List[str] = None,
thread_id: str = None
) -> Dict:
"""
Send email with advanced features
Args:
to: List of recipients
subject: Email subject
template_name: Name of template to use
template_data: Data for template rendering
body: Plain text or HTML body
cc: Carbon copy recipients
bcc: Blind carbon copy recipients
attachments: List of file paths
schedule_time: Schedule delivery time
track_opens: Track email opens
track_clicks: Track link clicks
priority: 'high', 'normal', 'low'
labels: List of Gmail labels to apply
thread_id: Thread ID for replies
"""
try:
# Build email message
msg = MIMEMultipart('mixed' if attachments else 'alternative')
msg['To'] = ', '.join(to)
msg['Subject'] = subject
if cc:
msg['Cc'] = ', '.join(cc)
if bcc:
msg['Bcc'] = ', '.join(bcc)
if thread_id:
msg['In-Reply-To'] = thread_id
msg['References'] = thread_id
# Add priority header
if priority == 'high':
msg['X-Priority'] = '1'
msg['Importance'] = 'high'
elif priority == 'low':
msg['X-Priority'] = '5'
msg['Importance'] = 'low'
# Render template or use provided body
if template_name:
body = await self._render_template(template_name, template_data or {})
# Add tracking pixels if needed
if track_opens:
tracking_pixel = self._generate_tracking_pixel()
if '' in body:
body = body.replace('', f'{tracking_pixel}