Comprehensive guide to integrating DSPy with Microsoft Agent Framework in AgenticFleet, covering typed signatures, assertions, routing cache, GEPA optimization, and agent handoffs.
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
skills listSkill Instructions
name: dspy-agent-framework-integration description: Comprehensive guide to integrating DSPy with Microsoft Agent Framework in AgenticFleet, covering typed signatures, assertions, routing cache, GEPA optimization, and agent handoffs.
DSPy + Microsoft Agent Framework Integration
A comprehensive guide to the integration patterns between DSPy and Microsoft Agent Framework in AgenticFleet. This skill documents how to leverage DSPy's structured reasoning capabilities with the Agent Framework's orchestration primitives.
Overview
AgenticFleet combines DSPy for intelligent prompt optimization and structured outputs with Microsoft Agent Framework for reliable multi-agent orchestration. This integration enables:
- Typed Signatures: Pydantic-validated DSPy outputs for type-safe orchestration
- DSPy-Enhanced Agents: ChatAgent wrappers with Chain of Thought, ReAct, and Program of Thought reasoning
- Routing Cache: TTL-based caching of routing decisions to reduce latency
- GEPA Optimization: Offline genetic prompt algorithm optimization
- Checkpoint Storage: Workflow resumption via agent-framework storage
- Agent Handoffs: Direct agent-to-agent transfers with context preservation
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ AgenticFleet Integration │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ DSPyReasoner│────►│ AgentFactory│────►│ ChatAgent │ │
│ │ (Signatures)│ │ (YAML Config) │ (Enhanced) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ┌──────▼───────────────────▼───────────────────▼──────┐ │
│ │ Microsoft Agent Framework │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Workflow │ │AgentThread│ │CheckpointStorage │ │ │
│ │ └──────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Typed Signatures with Pydantic
Signature Definition Pattern
All DSPy signatures in AgenticFleet use Pydantic models for structured outputs:
# src/agentic_fleet/dspy_modules/signatures.py
import dspy
from pydantic import BaseModel, Field
from typing import Literal
class TaskAnalysis(dspy.Signature):
"""Analyze a task with structured output."""
task: str = dspy.InputField(desc="The user's task description")
analysis: TaskAnalysisOutput = dspy.OutputField(
desc="Structured analysis of the task"
)
class TaskAnalysisOutput(BaseModel):
"""Pydantic model for typed signature output."""
complexity: Literal["low", "medium", "high"] = Field(
description="Estimated task complexity"
)
required_capabilities: list[str] = Field(
description="List of required capabilities"
)
estimated_steps: int = Field(ge=1, le=50)
preferred_tools: list[str] = Field(default_factory=list)
needs_web_search: bool = Field(description="Whether web search needed")
reasoning: str = Field(description="Reasoning behind analysis")
Using TypedPredictor
# src/agentic_fleet/dspy_modules/reasoner.py
from dspy import TypedPredictor
class DSPyReasoner(dspy.Module):
def __init__(self):
super().__init__()
self.analyzer = TypedPredictor(TaskAnalysis)
def analyze(self, task: str) -> TaskAnalysisOutput:
result = self.analyzer(task=task)
return result.analysis
Field Validators
Normalize inputs with Pydantic validators:
class RoutingDecisionOutput(BaseModel):
assigned_to: list[str] = Field(min_length=1)
execution_mode: Literal["delegated", "sequential", "parallel"]
@field_validator("assigned_to", mode="before")
@classmethod
def normalize_agents(cls, v: str | list[str]) -> list[str]:
if isinstance(v, str):
return [a.strip() for a in v.split(",") if a.strip()]
return v
@field_validator("execution_mode", mode="before")
@classmethod
def normalize_mode(cls, v: str) -> str:
mapping = {
"delegate": "delegated",
"single": "delegated",
"sequence": "sequential",
"concurrent": "parallel",
}
return mapping.get(v.strip().lower(), v)
DSPy Assertions for Validation
Hard and Soft Constraints
DSPy 3.x provides two assertion types for routing validation:
# src/agentic_fleet/dspy_modules/assertions.py
import dspy
# Hard constraint: causes backtracking on failure
dspy.Assert(condition, "error message")
# Soft constraint: guides optimization without failure
dspy.Suggest(condition, "guidance message")
Agent Assignment Validation
def validate_agent_exists(
assigned_agents: list[str],
available_agents: list[str]
) -> bool:
"""Check all assigned agents exist in available pool."""
# Hard constraint: must assign at least one agent
Assert(len(assigned_agents) > 0, "Must assign at least one agent")
# Soft suggestion: prefer matching case
for agent in assigned_agents:
Assert(
agent.lower() in [a.lower() for a in available_agents],
f"Agent '{agent}' not in available pool"
)
return True
Execution Mode Validation
def validate_execution_mode(
assigned_agents: list[str],
execution_mode: str
) -> bool:
"""Ensure execution mode matches agent count."""
if len(assigned_agents) > 1 and execution_mode == "delegated":
Suggest(
len(assigned_agents) == 1,
"Consider using 'parallel' for multiple agents"
)
return True
Usage in Signatures
class TaskRouting(dspy.Signature):
task: str = dspy.InputField(desc="The task to route")
team: str = dspy.InputField(desc="Available agents")
context: str = dspy.InputField(desc="Execution context")
decision: RoutingDecisionOutput = dspy.OutputField()
def __call__(self, task, team, context):
# Extract agent names from team description
available_agents = extract_agent_names(team)
# Validate before finalizing
result = super().__call__(task=task, team=team, context=context)
# Validate routing decision
validate_agent_exists(result.decision.assigned_to, available_agents)
validate_execution_mode(
result.decision.assigned_to,
result.decision.execution_mode
)
return result
DSPy-Enhanced Agents
Wrapping ChatAgent
# src/agentic_fleet/agents/base.py
from agent_framework._agents import ChatAgent
import dspy
class DSPyEnhancedAgent(ChatAgent):
def __init__(
self,
name: str,
chat_client,
instructions: str = "",
enable_dspy: bool = True,
reasoning_strategy: str = "chain_of_thought",
**kwargs
):
super().__init__(
name=name,
instructions=instructions,
chat_client=chat_client,
**kwargs
)
self.enable_dspy = enable_dspy
self.reasoning_strategy = reasoning_strategy
# Initialize reasoning modules
if enable_dspy:
self._init_reasoning_modules()
def _init_reasoning_modules(self):
"""Initialize DSPy reasoning strategies."""
if self.reasoning_strategy == "react":
self.react_module = dspy.ReAct(
"question -> answer",
tools=self.tools
)
elif self.reasoning_strategy == "program_of_thought":
self.pot_module = dspy.ProgramOfThought("question -> answer")
elif self.reasoning_strategy == "chain_of_thought":
self.cot_module = dspy.ChainOfThought("question -> answer")
Task Enhancement
class DSPyEnhancedAgent(ChatAgent):
def _enhance_task_with_dspy(self, task: str, context: str = "") -> str:
"""Enhance task using DSPy reasoning."""
if not self.enable_dspy:
return task
# Use Chain of Thought for complex tasks
enhancer = dspy.ChainOfThought("task, context -> enhanced_task")
result = enhancer(
task=task,
context=context or "No prior context"
)
return result.enhanced_task
async def run(self, message, **kwargs):
# Enhance task before execution
enhanced_message = self._enhance_task_with_dspy(
message,
kwargs.get("context", "")
)
# Run with enhanced task
return await super().run(enhanced_message, **kwargs)
Routing Cache
TTL-Based Cache Implementation
# src/agentic_fleet/dspy_modules/reasoner_cache.py
import time
from typing import Any
from collections import OrderedDict
class RoutingCache:
"""TTL-based cache for routing decisions."""
def __init__(self, ttl_seconds: int = 300, max_size: int = 1024):
self.ttl = ttl_seconds
self.max_size = max_size
self._cache: OrderedDict[str, tuple[Any, float]] = OrderedDict()
def get(self, key: str) -> Any | None:
"""Get cached value if not expired."""
if key not in self._cache:
return None
value, timestamp = self._cache[key]
# Check TTL
if time.time() - timestamp > self.ttl:
del self._cache[key]
return None
# Move to end (LRU)
self._cache.move_to_end(key)
return value
def set(self, key: str, value: Any) -> None:
"""Cache value with current timestamp."""
# Evict oldest if at capacity
if len(self._cache) >= self.max_size:
self._cache.popitem(last=False)
self._cache[key] = (value, time.time())
def clear(self) -> None:
"""Clear all cached entries."""
self._cache.clear()
Integration with DSPyReasoner
# src/agentic_fleet/dspy_modules/reasoner.py
class DSPyReasoner(dspy.Module):
def __init__(self, enable_routing_cache: bool = True, **kwargs):
super().__init__()
self.enable_routing_cache = enable_routing_cache
self._routing_cache = RoutingCache(
ttl_seconds=kwargs.get("cache_ttl_seconds", 300),
max_size=kwargs.get("cache_max_entries", 1024)
)
def _generate_cache_key(
self,
task: str,
team: str,
context: str
) -> str:
"""Generate cache key from routing inputs."""
import hashlib
content = f"{task}:{team}:{context}"
return hashlib.md5(content.encode()).hexdigest()
def route(self, task: str, team: str, context: str) -> RoutingDecisionOutput:
# Check cache first
if self.enable_routing_cache:
cache_key = self._generate_cache_key(task, team, context)
cached = self._routing_cache.get(cache_key)
if cached:
return cached
# Execute routing
result = self.router(task=task, team=team, context=context)
decision = result.decision
# Cache result
if self.enable_routing_cache:
self._routing_cache.set(cache_key, decision)
return decision
GEPA Optimization
Configuration
# src/agentic_fleet/config/workflow_config.yaml
dspy:
optimization:
enabled: true
examples_path: src/agentic_fleet/data/supervisor_examples.json
use_gepa: true
gepa_auto: light # light|medium|heavy
gepa_reflection_model: gpt-5-mini
gepa_history_min_quality: 8.0
gepa_history_limit: 200
gepa_val_split: 0.2
gepa_seed: 13
gepa_log_dir: .var/logs/dspy/gepa
Optimization Command
# Run GEPA optimization
agentic-fleet optimize
# Output: .var/cache/dspy/compiled_reasoner.json
Loading Compiled Modules
# src/agentic_fleet/dspy_modules/reasoner.py
def _load_compiled_module(self) -> None:
"""Load optimized prompt weights from disk."""
compiled_path = get_configured_compiled_reasoner_path()
meta_path = Path(f"{compiled_path}.meta")
if compiled_path.exists():
# Verify source hash matches
if meta_path.exists():
meta = json.loads(meta_path.read_text())
expected_hash = meta.get("reasoner_source_hash")
if expected_hash != get_reasoner_source_hash():
logger.info("Compiled reasoner ignored (source hash mismatch)")
return
logger.info(f"Loading compiled reasoner from {compiled_path}")
self.load(str(compiled_path))
Agent Framework Integration
Creating ChatAgent from YAML
# src/agentic_fleet/agents/coordinator.py
from agent_framework._agents import ChatAgent
class AgentFactory:
def create_agent(self, name: str, config: dict) -> ChatAgent:
"""Create ChatAgent from YAML configuration."""
model_id = config.get("model")
instructions = self._resolve_instructions(config.get("instructions", ""))
tools = self._resolve_tools(config.get("tools", []))
return ChatAgent(
name=name,
description=config.get("description", ""),
instructions=instructions,
chat_client=self._create_chat_client(model_id),
tools=tools
)
def _resolve_instructions(self, instructions_ref: str) -> str:
"""Resolve dynamic prompts or static references."""
if instructions_ref.startswith("prompts."):
# Dynamic DSPy prompt generation
return self._generate_dynamic_prompt(instructions_ref)
# Static prompt lookup
return get_static_prompt(instructions_ref)
Dynamic Prompt Generation
# src/agentic_fleet/agents/coordinator.py
from dspy import ChainOfThought
from agentic_fleet.dspy_modules.signatures import PlannerInstructionSignature
class AgentFactory:
def __init__(self):
self.instruction_generator = ChainOfThought(PlannerInstructionSignature)
def _generate_dynamic_prompt(self, ref: str) -> str:
"""Generate prompt using DSPy."""
if ref == "prompts.planner":
result = self.instruction_generator(
available_agents=self._get_agent_descriptions(),
task_goals="Plan and coordinate multi-agent workflows"
)
return result.instructions
return ""
Workflow with Checkpointing
# src/agentic_fleet/workflows/supervisor.py
from agent_framework._workflows import (
WorkflowStartedEvent,
WorkflowStatusEvent,
WorkflowOutputEvent,
ExecutorCompletedEvent,
RequestInfoEvent, # HITL support
FileCheckpointStorage
)
class SupervisorWorkflow:
def __init__(self, context, checkpoint_dir: str = ".var/checkpoints"):
self.context = context
self.checkpoint_storage = FileCheckpointStorage(checkpoint_dir)
async def run_stream(self, task: str, checkpoint_id: str | None = None):
"""Run workflow with optional checkpoint resume."""
if checkpoint_id:
# Resume from checkpoint
await self._resume_from_checkpoint(checkpoint_id)
else:
# Start fresh
async for event in self._execute_pipeline(task):
yield event
async def _resume_from_checkpoint(self, checkpoint_id: str):
"""Resume workflow execution from checkpoint."""
state = self.checkpoint_storage.load(checkpoint_id)
# Restore workflow state
self.context.restore_from_state(state)
# Continue execution
async for event in self._continue_pipeline():
yield event
Agent Handoffs
# src/agentic_fleet/workflows/strategies.py
from agent_framework._agents import ChatAgent
class HandoffManager:
"""Manage agent-to-agent transfers with context preservation."""
def __init__(self):
self._handoff_history: list[dict] = []
def prepare_handoff(
self,
from_agent: ChatAgent,
to_agent: ChatAgent,
context: dict
) -> dict:
"""Prepare handoff input with accumulated context."""
handoff_input = {
"task": context.get("original_task"),
"findings": context.get("findings", []),
"decisions": context.get("decisions", []),
"remaining_work": context.get("remaining_work", []),
"from_agent_summary": self._summarize_agent_work(from_agent)
}
self._handoff_history.append({
"from": from_agent.name,
"to": to_agent.name,
"input": handoff_input
})
return handoff_input
def execute_sequential_with_handoffs(
self,
agents: list[ChatAgent],
tasks: list[str]
) -> list[dict]:
"""Execute tasks with agent handoffs."""
context = {"original_task": tasks[0], "findings": [], "decisions": []}
results = []
for i, (agent, task) in enumerate(zip(agents, tasks)):
context["remaining_work"] = tasks[i + 1:]
handoff_input = self.prepare_handoff(
from_agent=agents[i - 1] if i > 0 else None,
to_agent=agent,
context=context
)
result = self._run_agent_with_context(agent, task, handoff_input)
context["findings"].extend(result.get("findings", []))
context["decisions"].extend(result.get("decisions", []))
results.append(result)
return results
Configuration Reference
workflow_config.yaml
# DSPy Configuration
dspy:
model: gpt-5-mini
routing_model: gpt-5-mini
use_typed_signatures: true
enable_routing_cache: true
routing_cache_ttl_seconds: 300
require_compiled: false # true in production
# Dynamic Prompts
dynamic_prompts:
enabled: true
signatures_path: src/agentic_fleet/dspy_modules/signatures.py
# GEPA Optimization
optimization:
enabled: true
use_gepa: true
gepa_auto: light
# Workflow Configuration
workflow:
supervisor:
max_rounds: 15
enable_streaming: true
checkpointing:
checkpoint_dir: .var/checkpoints
# Agent Configuration
agents:
researcher:
model: gpt-4.1-mini
tools: [TavilySearchTool]
reasoning:
effort: medium
verbosity: normal
Common Patterns
1. Simple Task (Fast-Path)
# src/agentic_fleet/workflows/helpers.py
def is_simple_task(task: str) -> bool:
"""Check if task qualifies for fast-path processing."""
simple_patterns = [
r"^(hi|hello|hey|how are you|what's up)",
r"^\d+\s*[\+\-\*/]\s*\d+$", # Simple math
r"^(what is|who is|where is|when did)\s+\w+", # Simple facts
]
return any(re.match(p, task.lower()) for p in simple_patterns)
2. Multi-Agent Parallel Execution
# src/agentic_fleet/workflows/strategies.py
async def execute_parallel(
agents: list[ChatAgent],
task: str
) -> list[dict]:
"""Execute task across multiple agents concurrently."""
async def run_agent(agent):
return {
"agent": agent.name,
"result": await agent.run(task)
}
results = await asyncio.gather(*[run_agent(a) for a in agents])
return results
3. Quality-Based Refinement Loop
# src/agentic_fleet/workflows/executors.py
async def run_quality_phase(
task: str,
result: str,
threshold: float = 7.0
) -> tuple[str, bool]:
"""Evaluate quality and refine if needed."""
assessment = await self.reasoner.assess_quality(task, result)
if assessment.score < threshold:
# Refine the result
refined = await self._refine_result(task, result, assessment.feedback)
return refined, True
return result, False
Debugging Tips
- Routing issues: Check
.var/logs/execution_history.jsonlfor routing decisions - Slow workflows: Reduce
gepa_max_metric_callsin config - DSPy fallback: If no compiled cache, system uses zero-shot
- Type errors: Run
make type-checkbefore commits
Related Documentation
- DSPy Documentation
- Microsoft Agent Framework
- AgenticFleet:
docs/guides/dspy-agent-framework-integration.md
More by Qredence
View allsystem-init: Initialize or hydrate the agent's memory system and verify configuration.
name: dspy-agent-framework-quick-ref
Repository Cleanup and Git Hygiene: **When to use this skill:**
memory-system: Complete guide to the AgenticFleet memory system. Read this first.
