chore: remove .claude folder from version control

The .claude folder contains local Claude Code skills and configuration
that should not be tracked in the repository. Already in .gitignore.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
nicoferdi96
2026-01-09 15:34:13 +01:00
parent 31c22658db
commit 8341a43310
18 changed files with 0 additions and 6441 deletions

View File

@@ -1,47 +0,0 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Repository Overview
This is a Claude Code Skills repository containing custom skills that extend Claude Code's capabilities. Skills are knowledge modules that Claude Code can invoke when specific conditions are met.
## Skill Structure
Each skill lives in `skills/<skill-name>/` with this structure:
```
skills/<skill-name>/
├── SKILL.md # Required: frontmatter (name, description) + knowledge content
└── references/ # Optional: supplementary reference files
└── *.md
```
### SKILL.md Format
```markdown
---
name: skill-name
description: |
Multi-line description of when to use this skill.
Include specific trigger conditions.
---
# Main content follows...
```
The `description` field is critical - it tells Claude Code **when** to activate this skill. Be specific about trigger conditions.
## Writing Effective Skills
1. **Trigger Description**: Write clear, specific conditions in the YAML `description` field
2. **Actionable Content**: The markdown body should be directly usable guidance, not abstract theory
3. **Code Examples**: Include working code patterns that can be adapted
4. **Reference Files**: Split detailed reference material into `references/*.md` and link from SKILL.md
## Current Skills
- **crewai-architect**: Flow-first design patterns for CrewAI applications
- **crewai-enterprise-endpoint-manager**: REST API integration for deployed CrewAI crews/flows
- **crewai-tool-creator**: Custom tool development following best practices
- **software-architect**: Clean code with SOLID principles
- **streamlit**: Building interactive Python web apps

View File

@@ -1,103 +0,0 @@
# Claude Code Skills
A collection of custom skills that extend Claude Code's capabilities with specialized domain knowledge.
## What Are Skills?
Skills are knowledge modules that Claude Code automatically activates when working on specific tasks. Each skill contains:
- **Trigger conditions** - When to activate (defined in YAML frontmatter)
- **Expert knowledge** - Patterns, best practices, and code examples
- **Reference materials** - Deep-dive documentation for complex topics
## Available Skills
| Skill | Description |
|-------|-------------|
| **[crewai-architect](skills/crewai-architect/)** | Flow-first design patterns for CrewAI applications. Covers direct LLM calls, single agents, and crews within flows. |
| **[crewai-enterprise-endpoint-manager](skills/crewai-enterprise-endpoint-manager/)** | REST API integration for deployed CrewAI crews/flows in Enterprise (AOP). Authentication, monitoring, and result retrieval. |
| **[crewai-tool-creator](skills/crewai-tool-creator/)** | Custom tool development following CrewAI and Anthropic best practices. Input schemas, error handling, caching. |
| **[software-architect](skills/software-architect/)** | Clean code with SOLID principles. Single responsibility, dependency injection, interface segregation. |
| **[streamlit](skills/streamlit/)** | Building interactive Python web apps. Widgets, layouts, caching, session state, multipage apps. |
## Using These Skills
### Option 1: Clone to Claude Code Skills Directory
```bash
# Clone into Claude Code's skills directory
git clone https://github.com/YOUR_USERNAME/skills.git ~/.claude/skills/my-skills
```
Skills are automatically discovered and activated based on their trigger descriptions.
### Option 2: Reference in Project
Add to your project's `.claude/settings.json`:
```json
{
"skills": {
"paths": ["/path/to/this/repo/skills"]
}
}
```
## Skill Structure
```
skills/<skill-name>/
├── SKILL.md # Required: frontmatter + knowledge content
└── references/ # Optional: supplementary deep-dives
└── *.md
```
### SKILL.md Format
```markdown
---
name: skill-name
description: |
When to activate this skill.
Be specific about trigger conditions.
---
# Main Content
Actionable guidance, patterns, and code examples.
```
## Contributing a New Skill
1. Create a folder: `skills/<your-skill-name>/`
2. Add `SKILL.md` with:
- YAML frontmatter (`name`, `description`)
- Practical, actionable content
- Working code examples
3. Optionally add `references/*.md` for detailed documentation
4. Submit a PR
### Tips for Effective Skills
- **Trigger Description**: Be specific about *when* the skill should activate
- **Actionable Content**: Include patterns and examples, not just theory
- **Code Examples**: Provide copy-paste-ready snippets
- **Reference Files**: Split detailed material into separate files
## Repository Structure
```
.
├── CLAUDE.md # Instructions for Claude Code
├── README.md # This file
└── skills/ # Skill modules
├── crewai-architect/
├── crewai-enterprise-endpoint-manager/
├── crewai-tool-creator/
├── software-architect/
└── streamlit/
```
## License
MIT

View File

@@ -1,461 +0,0 @@
---
name: crewai-architect
description: |
Guide for architecting CrewAI applications with mastery of different execution patterns.
Use this skill when: (1) Designing new CrewAI projects, (2) Choosing between direct LLM calls vs single agents vs crews within flows,
(3) Implementing flow-based orchestration with @start/@listen/@router decorators, (4) Embedding single agents or LLM calls within flow methods,
(5) Adding crews to flows with `crewai flow add-crew`, (6) Choosing kickoff methods (kickoff, kickoff_async, akickoff),
(7) Implementing structured outputs with Pydantic. Always start with Flows and add agency incrementally.
---
# CrewAI Architecture Guide
**Core Principle: Start with Flows, add agency as needed.**
Flows provide deterministic orchestration. Within flow methods, add agency incrementally:
1. Direct LLM calls (simplest)
2. Single agents (when tools needed)
3. Crews (when collaboration needed)
## Architecture Decision Framework
**Always start with a Flow.** Then choose the right level of agency for each step:
| Agency Level | Within Flow Method | When to Use |
|--------------|-------------------|-------------|
| **Direct LLM** | `llm.call(messages)` | Structured extraction, no tools |
| **Single Agent** | `Agent(...).kickoff()` | Tool usage, single perspective |
| **Crew** | `MyCrew().crew().kickoff()` | Multi-agent collaboration |
### Decision Tree for Each Flow Step
```
What does this step need?
├── Simple structured response → Direct LLM call
├── Tool usage, single perspective → Single Agent (inline)
└── Multi-perspective reasoning → Crew (use add-crew command)
```
## Multi-Agent Pattern Selection
For complex applications requiring multiple agents working together, choose the appropriate orchestration pattern:
| Pattern | Best For | Key Construct |
|---------|----------|---------------|
| **Generator-Critic** | Quality assurance with validation loops | Flow + @router + "revise" loop |
| **Iterative Refinement** | Progressive improvement to threshold | Flow + iteration counter |
| **Orchestrator-Worker** | Dynamic task decomposition | Flow + dynamic Agent() + asyncio.gather |
| **Task Guardrails** | Output validation with auto-retry | Task(guardrail=func, guardrail_max_retries=N) |
| **State Persistence** | Crash recovery, HITL resume | @persist() on Flow class |
| **HITL Webhooks** | Enterprise approval workflows | humanInputWebhook + /resume API |
| **Custom Manager** | Coordinated hierarchical delegation | Crew(manager_agent=..., process=hierarchical) |
| **Composite** | Enterprise production systems | Multiple patterns combined |
### Quick Pattern Selection
```
Output quality critical? → Generator-Critic or Task Guardrails
Iterative improvement needed? → Iterative Refinement
Unknown task complexity? → Orchestrator-Worker (Dynamic Spawning)
Independent parallel tasks? → Parallel Fan-Out with asyncio.gather
Crash recovery needed? → State Persistence with @persist()
Human approval required? → HITL Webhooks
Enterprise production? → Composite (combine patterns)
```
See [references/multi-agent-patterns.md](references/multi-agent-patterns.md) for complete implementations.
---
## Pattern 1: Flow with Direct LLM Calls
**Use when:** Structured extraction, classification, simple transformations.
```python
from crewai.flow.flow import Flow, start, listen
from crewai import LLM
from pydantic import BaseModel
class TaskClassification(BaseModel):
category: str
priority: int
confidence: float
class PipelineState(BaseModel):
input: str = ""
classification: TaskClassification | None = None
class ClassificationFlow(Flow[PipelineState]):
def __init__(self):
super().__init__()
self.llm = LLM(model="gpt-4o")
@start()
def classify_input(self):
llm = LLM(model="gpt-4o", response_format=TaskClassification)
result = llm.call(messages=[
{"role": "user", "content": f"Classify: {self.state.input}"}
])
self.state.classification = result # Returns typed model directly
return result
# Execute
flow = ClassificationFlow()
flow.state.input = "Urgent bug in production"
result = flow.kickoff()
```
## Pattern 2: Flow with Single Agents
**Use when:** Step requires tools, memory, or multi-step reasoning from one perspective.
**Define agents directly in flow methods:**
```python
from crewai.flow.flow import Flow, start, listen, router
from crewai import Agent, LLM
from pydantic import BaseModel
class AnalysisState(BaseModel):
data: str = ""
analysis: str = ""
needs_deep_dive: bool = False
class AnalysisFlow(Flow[AnalysisState]):
@start()
def quick_scan(self):
# Single agent defined inline for tool usage
scanner = Agent(
role="Data Scanner",
goal="Quickly scan data for anomalies",
backstory="Expert at rapid data assessment",
tools=[DataScanTool()],
llm=LLM(model="gpt-4o")
)
result = scanner.kickoff(f"Scan: {self.state.data}")
self.state.needs_deep_dive = "anomaly" in result.raw.lower()
return result
@router(quick_scan)
def route_analysis(self):
return "deep_dive" if self.state.needs_deep_dive else "summary"
@listen("deep_dive")
def detailed_analysis(self):
# Another inline agent for different task
analyst = Agent(
role="Deep Analyst",
goal="Conduct thorough analysis",
backstory="Meticulous investigator",
tools=[AnalysisTool(), ChartTool()]
)
result = analyst.kickoff(f"Deep dive: {self.state.data}")
self.state.analysis = result.raw
return result
@listen("summary")
def quick_summary(self):
# Direct LLM call when no tools needed
llm = LLM(model="gpt-4o")
result = llm.call([{"role": "user", "content": f"Summarize: {self.state.data}"}])
self.state.analysis = result
return result
```
**Async agent execution:**
```python
@listen(some_method)
async def async_analysis(self, data):
agent = Agent(role="Analyst", ...)
result = await agent.kickoff_async(f"Analyze: {data}")
return result
```
## Pattern 3: Flow with Crews
**Use when:** Step requires multi-agent collaboration and autonomous problem-solving.
### Adding a Crew to a Flow
Use the CLI command:
```bash
crewai flow add-crew research_crew
```
This creates the crew structure under `src/your_project/crews/research_crew/`:
```
research_crew/
├── __init__.py
├── research_crew.py
└── config/
├── agents.yaml
└── tasks.yaml
```
### Using Crews in Flow Methods
```python
from crewai.flow.flow import Flow, start, listen, router, or_
from pydantic import BaseModel
from .crews.research_crew.research_crew import ResearchCrew
from .crews.writing_crew.writing_crew import WritingCrew
class ContentState(BaseModel):
topic: str = ""
research: str = ""
article: str = ""
confidence: float = 0.0
class ContentPipeline(Flow[ContentState]):
@start()
def validate_topic(self):
# Quick LLM validation
llm = LLM(model="gpt-4o", response_format=TopicValidation)
return llm.call([{"role": "user", "content": f"Validate topic: {self.state.topic}"}])
@listen(validate_topic)
def research_topic(self, validation):
# Crew for complex research
crew = ResearchCrew().crew()
result = crew.kickoff(inputs={"topic": self.state.topic})
self.state.research = result.raw
self.state.confidence = 0.85 # From crew output
return result
@router(research_topic)
def route_by_confidence(self):
if self.state.confidence > 0.8:
return "write_article"
return "needs_more_research"
@listen("write_article")
def write_content(self):
# Another crew for writing
crew = WritingCrew().crew()
result = crew.kickoff(inputs={
"topic": self.state.topic,
"research": self.state.research
})
self.state.article = result.raw
return result
@listen("needs_more_research")
def request_human_input(self):
return "Research inconclusive - human review needed"
```
## Flow Decorators Reference
| Decorator | Purpose | Example |
|-----------|---------|---------|
| `@start()` | Entry point (multiple allowed, run parallel) | `@start()` |
| `@listen(method)` | Trigger on method completion | `@listen(validate)` |
| `@listen("label")` | Trigger on router label | `@listen("approved")` |
| `@router(method)` | Conditional routing, returns label | Returns `"approved"` or `"rejected"` |
| `and_(a, b)` | Trigger when ALL complete | `@listen(and_(task_a, task_b))` |
| `or_(a, b)` | Trigger when ANY completes | `@listen(or_("pass", "fail"))` |
### Parallel Starts
```python
class ParallelFlow(Flow[State]):
@start() # Runs in parallel
def fetch_data_a(self):
return "Data A"
@start() # Runs in parallel
def fetch_data_b(self):
return "Data B"
@listen(and_(fetch_data_a, fetch_data_b))
def combine_results(self, result_a, result_b):
return f"{result_a} + {result_b}"
```
### Conditional Routing
```python
@router(process_step)
def decide_path(self):
if self.state.score > 90:
return "excellent"
elif self.state.score > 70:
return "good"
return "needs_improvement"
@listen("excellent")
def handle_excellent(self): ...
@listen(or_("good", "needs_improvement"))
def handle_other(self): ...
```
## Kickoff Methods Reference
### Crew Kickoff (within Flow methods)
| Method | Use Case |
|--------|----------|
| `crew.kickoff(inputs={})` | Standard synchronous |
| `await crew.kickoff_async(inputs={})` | Thread-based async |
| `await crew.akickoff(inputs={})` | Native async (preferred) |
| `crew.kickoff_for_each(inputs=[...])` | Sequential batch |
| `await crew.akickoff_for_each([...])` | Concurrent batch (preferred) |
### Flow Kickoff
```python
flow = MyFlow()
flow.state.input = "data" # Set state
result = flow.kickoff()
print(flow.state.result) # Access final state
```
## Structured Output Patterns
### At LLM Level (Direct calls)
```python
from pydantic import BaseModel, Field
class Analysis(BaseModel):
summary: str
key_points: list[str]
confidence: float = Field(ge=0, le=1)
@start()
def analyze(self):
llm = LLM(model="gpt-4o", response_format=Analysis)
result = llm.call([...])
return result # Returns typed Analysis instance directly
```
### At Agent Level
```python
@listen(previous_step)
def agent_analysis(self, data):
agent = Agent(role="Analyst", ...)
result = agent.kickoff(
f"Analyze: {data}",
response_format=Analysis
)
return result.pydantic
```
### At Task Level (Crew)
```python
# In crew task definition
task = Task(
description="Generate report",
expected_output="Structured report",
agent=analyst,
output_pydantic=Report # Enforces structure
)
# Access in flow
result = crew.kickoff(inputs={...})
report = result.pydantic # Typed Report
```
## Complete Example: Adaptive Research Pipeline
```python
from crewai.flow.flow import Flow, start, listen, router, or_
from crewai import Agent, LLM
from pydantic import BaseModel
from .crews.research_crew.research_crew import ResearchCrew
class ResearchState(BaseModel):
query: str = ""
complexity: str = "simple"
findings: str = ""
confidence: float = 0.0
class AdaptiveResearch(Flow[ResearchState]):
@start()
def classify_query(self):
"""LLM call for quick classification"""
llm = LLM(model="gpt-4o", response_format=QueryClassification)
result = llm.call([
{"role": "user", "content": f"Classify complexity: {self.state.query}"}
])
self.state.complexity = result.complexity # LLM returns model directly
return result
@router(classify_query)
def route_by_complexity(self):
return self.state.complexity # "simple", "moderate", or "complex"
@listen("simple")
def quick_search(self):
"""Single agent for simple queries"""
searcher = Agent(
role="Quick Researcher",
goal="Find answer efficiently",
tools=[SearchTool()],
llm=LLM(model="gpt-4o-mini") # Faster model
)
result = searcher.kickoff(self.state.query)
self.state.findings = result.raw
self.state.confidence = 0.7
return result
@listen("moderate")
def standard_research(self):
"""Single agent with more tools"""
researcher = Agent(
role="Researcher",
goal="Thorough research",
tools=[SearchTool(), AnalysisTool()],
llm=LLM(model="gpt-4o")
)
result = researcher.kickoff(self.state.query)
self.state.findings = result.raw
self.state.confidence = 0.85
return result
@listen("complex")
def deep_research(self):
"""Full crew for complex queries"""
crew = ResearchCrew().crew()
result = crew.kickoff(inputs={"query": self.state.query})
self.state.findings = result.raw
self.state.confidence = 0.95
return result
@listen(or_("simple", "moderate", "complex"))
def finalize(self):
return {
"findings": self.state.findings,
"confidence": self.state.confidence
}
# Execute
flow = AdaptiveResearch()
flow.state.query = "What are the implications of quantum computing on cryptography?"
result = flow.kickoff()
```
## Best Practices
1. **Start with Flow** - Always use Flow as the orchestration layer
2. **Add agency incrementally** - LLM → Agent → Crew, only as needed
3. **Use `crewai flow add-crew`** - For creating crews within flows
4. **Define agents inline** - In flow methods for single-use agents
5. **Initialize LLMs in `__init__`** - Reuse LLM instances for efficiency
6. **Type your state** - Always use Pydantic BaseModel
7. **Use structured outputs** - `response_format` and `output_pydantic`
8. **Prefer native async** - `akickoff()` over `kickoff_async()`
9. **Route by state** - Use `@router` for conditional paths
## Reference Files
- [references/multi-agent-patterns.md](references/multi-agent-patterns.md) - **Multi-agent orchestration**: Generator-Critic, Iterative Refinement, Orchestrator-Worker, Task Guardrails, State Persistence, HITL Webhooks, Composite patterns
- [references/flow-patterns.md](references/flow-patterns.md) - Advanced flow patterns, HITL, resumable flows
- [references/crew-patterns.md](references/crew-patterns.md) - YAML config, process types, delegation
- [references/llm-patterns.md](references/llm-patterns.md) - Custom LLM integration, providers

View File

@@ -1,357 +0,0 @@
# Crew Patterns Reference
## Table of Contents
- [YAML Configuration](#yaml-configuration)
- [Crew Class Structure](#crew-class-structure)
- [Process Types](#process-types)
- [Task Dependencies](#task-dependencies)
- [Agent Delegation](#agent-delegation)
- [Memory and Knowledge](#memory-and-knowledge)
## YAML Configuration
### Creating a Crew with CLI
```bash
crewai flow add-crew research_crew
```
Creates:
```
src/project/crews/research_crew/
├── __init__.py
├── research_crew.py
└── config/
├── agents.yaml
└── tasks.yaml
```
### agents.yaml
```yaml
researcher:
role: "Senior Research Analyst"
goal: "Conduct thorough research on {topic} and identify key insights"
backstory: |
You are a veteran researcher with 15 years of experience
in market analysis. You excel at finding hidden patterns
and connecting disparate data points.
verbose: true
allow_delegation: false
analyst:
role: "Data Analyst"
goal: "Analyze research data and produce actionable recommendations"
backstory: |
Expert at transforming raw research into strategic insights.
Known for clear, data-driven conclusions.
verbose: true
allow_delegation: true
```
**Variable interpolation:** Use `{variable}` for dynamic values passed at kickoff.
### tasks.yaml
```yaml
research_task:
description: |
Research the following topic thoroughly: {topic}
Focus on:
- Current market trends
- Key players and competitors
- Recent developments (last 6 months)
- Potential risks and opportunities
expected_output: |
Comprehensive research report with:
- Executive summary
- Detailed findings
- Data sources cited
agent: researcher
analysis_task:
description: |
Analyze the research findings and provide recommendations.
Consider: {analysis_focus}
expected_output: |
Strategic analysis with:
- Key insights
- Recommendations (prioritized)
- Risk assessment
agent: analyst
context:
- research_task
```
## Crew Class Structure
```python
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
@CrewBase
class ResearchCrew:
"""Research crew for thorough topic investigation."""
agents: List[BaseAgent]
tasks: List[Task]
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config["researcher"], # type: ignore[index]
tools=[SearchTool(), WebScrapeTool()],
verbose=True,
)
@agent
def analyst(self) -> Agent:
return Agent(
config=self.agents_config["analyst"], # type: ignore[index]
tools=[AnalysisTool()],
verbose=True,
)
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config["research_task"], # type: ignore[index]
)
@task
def analysis_task(self) -> Task:
return Task(
config=self.tasks_config["analysis_task"], # type: ignore[index]
output_pydantic=AnalysisReport, # Structured output
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
)
```
**Critical:** Always use `# type: ignore[index]` for config access.
## Process Types
### Sequential (Default)
Tasks execute in order, each receiving context from previous.
```python
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
)
```
### Hierarchical
Manager agent coordinates work distribution.
```python
from crewai import LLM
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.hierarchical,
manager_llm=LLM(model="gpt-4o"), # Required for hierarchical
)
```
### Parallel Task Groups
```python
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
parallel_task_execution=True, # Tasks without dependencies run parallel
)
```
## Task Dependencies
### Context Chaining
```yaml
# tasks.yaml
gather_task:
description: "Gather initial data"
agent: gatherer
analyze_task:
description: "Analyze gathered data"
agent: analyst
context:
- gather_task # Receives gather_task output
report_task:
description: "Write final report"
agent: writer
context:
- gather_task # Has access to both
- analyze_task # previous outputs
```
### Programmatic Context
```python
@task
def synthesis_task(self) -> Task:
return Task(
config=self.tasks_config["synthesis_task"], # type: ignore[index]
context=[self.research_task(), self.analysis_task()],
)
```
## Agent Delegation
### Enable Delegation
```yaml
# agents.yaml
manager:
role: "Project Manager"
goal: "Coordinate team and delegate effectively"
allow_delegation: true # Can delegate to other agents
specialist:
role: "Technical Specialist"
goal: "Handle technical implementation"
allow_delegation: false # Handles own work
```
### Delegation in Hierarchical
```python
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.hierarchical,
manager_llm=LLM(model="gpt-4o"),
manager_agent=self.manager(), # Optional custom manager
)
```
## Memory and Knowledge
### Enable Memory
```python
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
memory=True, # Enable short-term memory
verbose=True,
)
```
### Knowledge Sources
```python
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
@crew
def crew(self) -> Crew:
knowledge = TextFileKnowledgeSource(
file_paths=["docs/guidelines.md", "docs/reference.md"]
)
return Crew(
agents=self.agents,
tasks=self.tasks,
knowledge_sources=[knowledge],
)
```
## Output Patterns
### Structured Task Output
```python
from pydantic import BaseModel, Field
class ResearchReport(BaseModel):
summary: str
findings: list[str]
sources: list[str]
confidence: float = Field(ge=0, le=1)
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config["research_task"], # type: ignore[index]
output_pydantic=ResearchReport,
)
```
### File Output
```python
@task
def report_task(self) -> Task:
return Task(
config=self.tasks_config["report_task"], # type: ignore[index]
output_file="outputs/report.md",
)
```
### Accessing Outputs
```python
# In flow method
result = ResearchCrew().crew().kickoff(inputs={"topic": "AI"})
# Raw output
print(result.raw)
# Structured output (if output_pydantic set)
report = result.pydantic
print(report.summary)
# Dictionary access
print(result["summary"])
# JSON (if output_json set)
data = result.json
```
## Using Crew in Flow
```python
from project_name.crews.research_crew.research_crew import ResearchCrew
class ResearchFlow(Flow[State]):
@listen(validate_input)
def do_research(self, validated):
crew = ResearchCrew().crew()
result = crew.kickoff(inputs={
"topic": self.state.topic,
"analysis_focus": self.state.focus
})
self.state.research = result.pydantic
return result
```

View File

@@ -1,297 +0,0 @@
# Advanced Flow Patterns
## Table of Contents
- [Resumable Flows](#resumable-flows)
- [Human-in-the-Loop (HITL)](#human-in-the-loop-hitl)
- [Parallel Execution](#parallel-execution)
- [Error Handling and Retries](#error-handling-and-retries)
- [State Persistence](#state-persistence)
## Resumable Flows
Flows can be resumed from specific points using conditional starts:
```python
from crewai.flow.flow import Flow, start, listen
class ResumableFlow(Flow[State]):
@start() # Unconditional start
def init(self):
self.state.initialized = True
return "initialized"
@start("init") # Conditional: runs after init OR as external trigger
def maybe_begin(self):
return "began"
@listen(and_(init, maybe_begin))
def proceed(self):
return "proceeding"
```
### Resuming from External Trigger
```python
# Start from beginning
flow = MyFlow()
result = flow.kickoff()
# Resume from specific method
flow = MyFlow()
flow.state.previous_data = loaded_state
result = flow.kickoff(start_method="maybe_begin")
```
## Human-in-the-Loop (HITL)
### Pause for Human Review
```python
class HITLFlow(Flow[ReviewState]):
@start()
def generate_draft(self):
agent = Agent(role="Writer", ...)
result = agent.kickoff("Write initial draft")
self.state.draft = result.raw
return result
@router(generate_draft)
def check_confidence(self):
if self.state.confidence < 0.7:
return "needs_human_review"
return "auto_approve"
@listen("needs_human_review")
def pause_for_review(self):
# Save state for later resume
self.state.status = "awaiting_review"
self.state.save() # Persist state
return "Paused for human review"
@listen("auto_approve")
def proceed_automatically(self):
return self.finalize()
@start("human_approved") # Resume point after human approval
def after_human_review(self):
# Human has reviewed and approved
return self.finalize()
def finalize(self):
return {"final": self.state.draft}
```
### Integration with External Systems
```python
@listen("needs_approval")
def request_approval(self):
# Send to Slack, email, or queue
send_approval_request(
content=self.state.draft,
callback_id=self.state.flow_id
)
self.state.status = "pending_approval"
return "Approval requested"
```
## Parallel Execution
### Multiple Start Points
```python
class ParallelDataFlow(Flow[DataState]):
@start()
def fetch_source_a(self):
return api_client.get_data_a()
@start()
def fetch_source_b(self):
return api_client.get_data_b()
@start()
def fetch_source_c(self):
return api_client.get_data_c()
@listen(and_(fetch_source_a, fetch_source_b, fetch_source_c))
def merge_all_sources(self, a, b, c):
self.state.merged = {**a, **b, **c}
return self.state.merged
```
### Async Within Methods
```python
@listen(previous_step)
async def parallel_processing(self, data):
# Process multiple items concurrently
tasks = [
self.process_item(item)
for item in data["items"]
]
results = await asyncio.gather(*tasks)
return results
async def process_item(self, item):
agent = Agent(role="Processor", ...)
return await agent.kickoff_async(f"Process: {item}")
```
### Crew Batch Processing
```python
@listen(gather_inputs)
async def batch_analyze(self, inputs):
crew = AnalysisCrew().crew()
# Concurrent batch processing
results = await crew.akickoff_for_each([
{"item": item} for item in inputs
])
self.state.analyses = [r.raw for r in results]
return results
```
## Error Handling and Retries
### Try-Catch Pattern
```python
class RobustFlow(Flow[State]):
@start()
def risky_operation(self):
try:
result = external_api.call()
self.state.result = result
return "success"
except APIError as e:
self.state.error = str(e)
self.state.retry_count = getattr(self.state, 'retry_count', 0) + 1
return "error"
@router(risky_operation)
def handle_result(self):
if hasattr(self.state, 'error'):
if self.state.retry_count < 3:
return "retry"
return "failed"
return "success"
@listen("retry")
def retry_operation(self):
import time
time.sleep(2 ** self.state.retry_count) # Exponential backoff
return self.risky_operation()
@listen("failed")
def handle_failure(self):
return {"error": self.state.error, "retries": self.state.retry_count}
```
### Graceful Degradation
```python
@listen(process_step)
def with_fallback(self, primary_result):
if not primary_result or primary_result.get("status") == "failed":
# Fallback to simpler approach
llm = LLM(model="gpt-4o-mini")
return llm.call([{"role": "user", "content": "Simple fallback..."}])
return primary_result
```
## State Persistence
### Pydantic State with Validation
```python
from pydantic import BaseModel, Field, field_validator
from typing import Optional
from datetime import datetime
class PersistentState(BaseModel):
flow_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
created_at: datetime = Field(default_factory=datetime.now)
status: str = "pending"
data: dict = {}
error: Optional[str] = None
retry_count: int = 0
@field_validator('status')
@classmethod
def validate_status(cls, v):
valid = ['pending', 'processing', 'completed', 'failed', 'paused']
if v not in valid:
raise ValueError(f'Status must be one of {valid}')
return v
def save(self):
with open(f"state_{self.flow_id}.json", "w") as f:
f.write(self.model_dump_json())
@classmethod
def load(cls, flow_id: str):
with open(f"state_{flow_id}.json") as f:
return cls.model_validate_json(f.read())
```
### Using Persistent State
```python
class PersistentFlow(Flow[PersistentState]):
@start()
def begin_processing(self):
self.state.status = "processing"
self.state.save()
return self.do_work()
@listen(begin_processing)
def checkpoint(self, result):
self.state.data["checkpoint_1"] = result
self.state.save()
return result
# Resume from saved state
saved_state = PersistentState.load("flow-123")
flow = PersistentFlow()
flow.state = saved_state
flow.kickoff(start_method="checkpoint")
```
## Flow Composition
### Nested Flows
```python
class SubFlow(Flow[SubState]):
@start()
def sub_process(self):
return "sub result"
class MainFlow(Flow[MainState]):
@listen(setup)
def run_subflow(self):
sub = SubFlow()
sub.state.input = self.state.data
result = sub.kickoff()
self.state.sub_result = result
return result
```
### Flow Factory Pattern
```python
def create_flow(flow_type: str) -> Flow:
flows = {
"analysis": AnalysisFlow,
"research": ResearchFlow,
"writing": WritingFlow
}
return flows[flow_type]()
# Usage
flow = create_flow(config["flow_type"])
flow.state.input = data
result = flow.kickoff()
```

View File

@@ -1,269 +0,0 @@
# LLM Integration Patterns
## Table of Contents
- [Built-in LLM Configuration](#built-in-llm-configuration)
- [Provider-Specific Setup](#provider-specific-setup)
- [Custom LLM Implementation](#custom-llm-implementation)
- [Direct LLM Calls in Flows](#direct-llm-calls-in-flows)
- [Structured Responses](#structured-responses)
## Built-in LLM Configuration
### Basic LLM Class
```python
from crewai import LLM
# OpenAI
llm = LLM(model="gpt-4o", temperature=0.7)
# Anthropic
llm = LLM(model="anthropic/claude-sonnet-4-20250514")
# Google
llm = LLM(model="gemini/gemini-2.0-flash")
# Local (Ollama)
llm = LLM(model="ollama/llama3.2")
```
### LLM Parameters
```python
llm = LLM(
model="gpt-4o",
temperature=0.7, # Creativity (0-1)
max_tokens=4096, # Max response length
top_p=0.9, # Nucleus sampling
frequency_penalty=0.0, # Reduce repetition
presence_penalty=0.0, # Encourage new topics
seed=42, # Reproducibility
response_format=MyModel, # Pydantic model for structured output
)
```
## Provider-Specific Setup
### OpenAI
```bash
# .env
OPENAI_API_KEY=sk-...
```
```python
llm = LLM(model="gpt-4o")
# or
llm = LLM(model="gpt-4o-mini") # Faster, cheaper
```
### Anthropic
```bash
# .env
ANTHROPIC_API_KEY=sk-ant-...
```
```python
llm = LLM(model="anthropic/claude-sonnet-4-20250514")
```
### Google Gemini
```bash
# .env
GOOGLE_API_KEY=AIza...
```
```python
# Via LiteLLM
llm = LLM(model="gemini/gemini-2.0-flash")
# Via OpenAI-compatible endpoint
llm = LLM(
model="openai/gemini-2.0-flash",
base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
api_key="your-gemini-key"
)
```
### Azure OpenAI
```bash
# .env
AZURE_API_KEY=...
AZURE_API_BASE=https://your-resource.openai.azure.com/
AZURE_API_VERSION=2024-02-01
```
```python
llm = LLM(
model="azure/your-deployment-name",
api_key=os.getenv("AZURE_API_KEY"),
base_url=os.getenv("AZURE_API_BASE"),
)
```
### Local Models (Ollama)
```bash
# Start Ollama
ollama serve
ollama pull llama3.2
```
```python
llm = LLM(
model="ollama/llama3.2",
base_url="http://localhost:11434"
)
```
## Direct LLM Calls in Flows
### Simple Call
```python
class MyFlow(Flow[State]):
def __init__(self):
super().__init__()
self.llm = LLM(model="gpt-4o")
@start()
def process(self):
response = self.llm.call(messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"Process: {self.state.input}"}
])
return response
```
### With Message History
```python
@listen(previous_step)
def conversation(self, context):
messages = [
{"role": "system", "content": "Expert analyst."},
{"role": "user", "content": "Initial question"},
{"role": "assistant", "content": context}, # Previous response
{"role": "user", "content": "Follow-up question"}
]
return self.llm.call(messages=messages)
```
## Structured Responses
### With LLM Class
```python
from pydantic import BaseModel, Field
class Analysis(BaseModel):
summary: str = Field(description="Brief summary")
key_points: list[str] = Field(description="Main findings")
confidence: float = Field(ge=0, le=1, description="Confidence score")
recommendation: str
@start()
def analyze(self):
llm = LLM(model="gpt-4o", response_format=Analysis)
result = llm.call(messages=[
{"role": "user", "content": f"Analyze: {self.state.data}"}
])
self.state.analysis = result # LLM.call() returns model directly
return result
```
### Complex Nested Models
```python
from pydantic import BaseModel
from typing import Optional
class Finding(BaseModel):
title: str
description: str
severity: str # "low", "medium", "high"
class RiskAssessment(BaseModel):
overall_risk: str
findings: list[Finding]
mitigations: list[str]
class FullReport(BaseModel):
executive_summary: str
risk_assessment: RiskAssessment
recommendations: list[str]
next_steps: Optional[list[str]] = None
llm = LLM(model="gpt-4o", response_format=FullReport)
result = llm.call(messages=[...])
report = result # Returns fully typed FullReport directly
```
### Validation and Error Handling
```python
@start()
def safe_extraction(self):
llm = LLM(model="gpt-4o", response_format=DataModel)
try:
result = llm.call(messages=[...])
if result: # LLM returns model directly when response_format is set
self.state.data = result
return "success"
else:
self.state.error = "No structured output"
return "failed"
except Exception as e:
self.state.error = str(e)
return "failed"
```
## Model Selection Guide
| Use Case | Recommended Model | Notes |
|----------|------------------|-------|
| Complex reasoning | gpt-4o, claude-sonnet-4-20250514 | Best quality |
| Fast responses | gpt-4o-mini, gemini-2.0-flash | Good balance |
| Cost-sensitive | gpt-4o-mini, ollama/llama3.2 | Lowest cost |
| Long context | gpt-4o (128k), claude (200k) | Large documents |
| Structured output | gpt-4o, gpt-4o-mini | Best JSON mode |
| Privacy-sensitive | ollama/*, local models | No data leaves |
## Caching and Optimization
### LLM Instance Reuse
```python
class OptimizedFlow(Flow[State]):
def __init__(self):
super().__init__()
# Initialize once, reuse across methods
self.fast_llm = LLM(model="gpt-4o-mini")
self.smart_llm = LLM(model="gpt-4o")
@start()
def quick_classify(self):
# Use fast model for simple tasks
return self.fast_llm.call([...])
@listen(quick_classify)
def deep_analysis(self, classification):
# Use smart model for complex tasks
return self.smart_llm.call([...])
```
### Temperature by Task
```python
# Classification: low temperature for consistency
classifier = LLM(model="gpt-4o", temperature=0.1)
# Creative: higher temperature for variety
creative = LLM(model="gpt-4o", temperature=0.9)
# Structured extraction: zero temperature
extractor = LLM(model="gpt-4o", temperature=0, response_format=DataModel)
```

View File

@@ -1,722 +0,0 @@
# Multi-Agent Design Patterns
Advanced patterns for orchestrating multiple agents in complex applications. These patterns address common challenges: quality assurance, dynamic scaling, fault tolerance, and human oversight.
## Pattern Selection Guide
### Decision Tree
```
What is your primary concern?
├── Output Quality Critical → Generator-Critic or Task Guardrails
├── Iterative Improvement → Iterative Refinement
├── Unknown Task Complexity → Orchestrator-Worker (Dynamic Spawning)
├── Independent Parallel Tasks → Parallel Fan-Out
├── Coordinator Needed → Hierarchical with Custom Manager
├── Crash Recovery / Long Tasks → State Persistence
├── Human Approval Required → HITL Webhooks
└── Enterprise Production → Composite (combine patterns)
```
### Quick Reference
| Pattern | Best For | Key CrewAI Construct |
|---------|----------|---------------------|
| **Generator-Critic** | Quality gates, validation loops | Flow + @router + "revise" loop |
| **Iterative Refinement** | Progressive improvement | Flow + iteration counter + exit condition |
| **Orchestrator-Worker** | Dynamic task decomposition | Flow + inline Agent() + asyncio.gather |
| **Parallel Fan-Out** | Concurrent independent tasks | asyncio.gather + akickoff() |
| **Task Guardrails** | Output validation | Task(guardrail=func, guardrail_max_retries=N) |
| **State Persistence** | Crash recovery, HITL | @persist() on Flow class |
| **HITL Webhooks** | Enterprise approval flows | humanInputWebhook + /resume API |
| **Custom Manager** | Coordinated delegation | Crew(manager_agent=..., process=Process.hierarchical) |
| **Composite** | Enterprise systems | Nested patterns combined |
---
## Generator-Critic Pattern
**Use when:** Output quality is critical and requires validation before acceptance (legal, medical, financial content).
**Concept:** One agent/crew generates content, another critiques it. Based on quality score, either approve or loop back for revision.
```python
from crewai.flow.flow import Flow, start, listen, router
from crewai import Crew
from pydantic import BaseModel
class GeneratorCriticState(BaseModel):
content: str = ""
critique: str = ""
quality_score: float = 0.0
iteration: int = 0
max_iterations: int = 3
class GeneratorCriticFlow(Flow[GeneratorCriticState]):
@start()
def generate(self):
"""Generator crew creates content"""
crew = GeneratorCrew().crew()
result = crew.kickoff(inputs={"topic": self.state.topic})
self.state.content = result.raw
self.state.iteration += 1
return result
@listen(generate)
def critique(self, generated):
"""Critic crew evaluates quality"""
crew = CriticCrew().crew()
result = crew.kickoff(inputs={"content": self.state.content})
self.state.critique = result.raw
self.state.quality_score = result.pydantic.score # Assumes structured output
return result
@router(critique)
def check_quality(self):
if self.state.quality_score >= 0.8:
return "approved"
elif self.state.iteration >= self.state.max_iterations:
return "max_iterations"
return "revise"
@listen("revise")
def revise_content(self):
"""Feed critique back to generator"""
crew = GeneratorCrew().crew()
result = crew.kickoff(inputs={
"topic": self.state.topic,
"previous_attempt": self.state.content,
"feedback": self.state.critique
})
self.state.content = result.raw
self.state.iteration += 1
return result
@listen(revise_content)
def re_critique(self, revised):
"""Loop back to critique"""
return self.critique(revised)
@listen("approved")
def finalize(self):
return {"content": self.state.content, "iterations": self.state.iteration}
@listen("max_iterations")
def handle_max(self):
return {"content": self.state.content, "warning": "Max iterations reached"}
```
**When NOT to use:** Simple tasks where quality is easily validated, or when iteration cost is too high.
---
## Iterative Refinement Pattern
**Use when:** Output requires progressive improvement toward a quality threshold (optimization, polishing).
**Difference from Generator-Critic:** Focuses on continuous improvement rather than pass/fail validation.
```python
from crewai.flow.flow import Flow, start, listen, router
from crewai import Agent, LLM
from pydantic import BaseModel
class RefinementState(BaseModel):
draft: str = ""
quality_score: float = 0.0
iteration: int = 0
max_iterations: int = 5
target_quality: float = 0.9
class IterativeRefinementFlow(Flow[RefinementState]):
@start()
def create_initial(self):
"""Create initial draft"""
writer = Agent(
role="Content Writer",
goal="Create high-quality initial draft",
llm=LLM(model="gpt-4o")
)
result = writer.kickoff(f"Write about: {self.state.topic}")
self.state.draft = result.raw
return result
@listen(create_initial)
def assess_and_refine(self, draft):
"""Assess quality and refine if needed"""
self.state.iteration += 1
# Assess quality
llm = LLM(model="gpt-4o", response_format=QualityAssessment)
assessment = llm.call([
{"role": "user", "content": f"Score 0-1 and suggest improvements:\n\n{self.state.draft}"}
])
self.state.quality_score = assessment.score
if self.state.quality_score >= self.state.target_quality:
return "converged"
if self.state.iteration >= self.state.max_iterations:
return "max_reached"
# Refine
refiner = Agent(role="Editor", goal="Improve based on feedback", llm=LLM(model="gpt-4o"))
result = refiner.kickoff(f"Improve:\n{self.state.draft}\n\nFeedback:\n{assessment.improvements}")
self.state.draft = result.raw
return "continue"
@router(assess_and_refine)
def route(self):
# Router returns the string from assess_and_refine
pass # The return from assess_and_refine is the route
@listen("continue")
def continue_refinement(self):
"""Loop back for another iteration"""
return self.assess_and_refine(self.state.draft)
@listen("converged")
def output_final(self):
return {"content": self.state.draft, "iterations": self.state.iteration}
```
---
## Orchestrator-Worker Pattern (Dynamic Spawning)
**Use when:** Task complexity is unknown upfront and requires dynamic scaling of subagents.
**Concept:** Lead agent analyzes complexity, spawns appropriate number of specialized workers, then synthesizes results.
```python
from crewai.flow.flow import Flow, start, listen, router
from crewai import Agent, LLM
from pydantic import BaseModel
import asyncio
class OrchestratorState(BaseModel):
query: str = ""
complexity: str = "moderate"
worker_count: int = 1
worker_results: list[str] = []
synthesis: str = ""
class OrchestratorWorkerFlow(Flow[OrchestratorState]):
@start()
def analyze_complexity(self):
"""Orchestrator analyzes task and plans"""
llm = LLM(model="gpt-4o", response_format=TaskPlan)
plan = llm.call([
{"role": "system", "content": "Analyze query complexity. Return subtasks."},
{"role": "user", "content": f"Query: {self.state.query}"}
])
self.state.complexity = plan.complexity
# Scale workers to complexity: simple=1, moderate=3, complex=5
complexity_map = {"simple": 1, "moderate": 3, "complex": 5}
self.state.worker_count = complexity_map.get(plan.complexity, 3)
return plan
@listen(analyze_complexity)
async def spawn_workers(self, plan):
"""Dynamically create and run worker agents in parallel"""
worker_tasks = []
for i, subtask in enumerate(plan.subtasks[:self.state.worker_count]):
# Create fresh agent per subtask (clean context)
agent = Agent(
role=f"Research Specialist #{i+1}",
goal=subtask.objective,
backstory=f"Expert in {subtask.domain}",
tools=[SearchTool()],
llm=LLM(model="gpt-4o-mini") # Cheaper model for workers
)
worker_tasks.append(agent.akickoff(subtask.instructions))
# Run all workers concurrently
results = await asyncio.gather(*worker_tasks)
self.state.worker_results = [r.raw for r in results]
return results
@listen(spawn_workers)
def synthesize(self, worker_outputs):
"""Orchestrator synthesizes all findings"""
synthesizer = Agent(
role="Lead Researcher",
goal="Synthesize findings into coherent answer",
llm=LLM(model="gpt-4o") # Best model for synthesis
)
combined = "\n\n---\n\n".join([
f"Worker {i+1}:\n{result}" for i, result in enumerate(self.state.worker_results)
])
result = synthesizer.kickoff(
f"Query: {self.state.query}\n\nFindings:\n{combined}\n\nSynthesize into answer."
)
self.state.synthesis = result.raw
return result
```
**Scaling guidance:**
- Simple queries: 1 worker, 3-10 tool calls
- Moderate: 3 workers in parallel
- Complex: 5+ workers, may need multiple rounds
---
## Parallel Fan-Out Pattern
**Use when:** Multiple independent subtasks can run concurrently for speed.
```python
from crewai.flow.flow import Flow, start, listen, and_
from crewai import Crew
import asyncio
class ParallelState(BaseModel):
data: dict = {}
security_result: str = ""
performance_result: str = ""
style_result: str = ""
final_report: str = ""
class ParallelAnalysisFlow(Flow[ParallelState]):
@start()
async def fan_out_analysis(self):
"""Run multiple crews in parallel"""
results = await asyncio.gather(
SecurityCrew().crew().akickoff(inputs=self.state.data),
PerformanceCrew().crew().akickoff(inputs=self.state.data),
StyleCrew().crew().akickoff(inputs=self.state.data)
)
self.state.security_result = results[0].raw
self.state.performance_result = results[1].raw
self.state.style_result = results[2].raw
return results
@listen(fan_out_analysis)
def gather_and_synthesize(self, parallel_results):
"""Aggregate all parallel results"""
llm = LLM(model="gpt-4o")
self.state.final_report = llm.call([
{"role": "user", "content": f"""Combine analyses:
Security: {self.state.security_result}
Performance: {self.state.performance_result}
Style: {self.state.style_result}
"""}
])
return self.state.final_report
```
**Batch processing with akickoff_for_each:**
```python
async def batch_analysis(self):
datasets = [{"id": 1, ...}, {"id": 2, ...}, {"id": 3, ...}]
results = await AnalysisCrew().crew().akickoff_for_each(datasets)
return results
```
---
## Task Guardrails Pattern
**Use when:** Task outputs must meet specific validation criteria before acceptance.
**Key feature:** Automatic retry with feedback when validation fails.
```python
from typing import Tuple, Any
from crewai import Task, TaskOutput
def validate_json_output(result: TaskOutput) -> Tuple[bool, Any]:
"""Validate output is valid JSON"""
try:
data = json.loads(result.raw)
return (True, data)
except json.JSONDecodeError as e:
return (False, f"Invalid JSON: {e}")
def validate_length(result: TaskOutput) -> Tuple[bool, Any]:
"""Validate minimum length"""
if len(result.raw) < 100:
return (False, "Output too short, needs more detail")
return (True, result.raw)
def validate_no_pii(result: TaskOutput) -> Tuple[bool, Any]:
"""Check for PII"""
pii_patterns = ["SSN:", "credit card"]
for pattern in pii_patterns:
if pattern.lower() in result.raw.lower():
return (False, f"Contains PII pattern: {pattern}")
return (True, result.raw)
# Single guardrail
task = Task(
description="Generate JSON report",
expected_output="Valid JSON object",
agent=analyst,
guardrail=validate_json_output,
guardrail_max_retries=3 # Retry up to 3x if validation fails
)
# Multiple sequential guardrails
task = Task(
description="Generate customer report",
expected_output="Detailed report without PII",
agent=writer,
guardrails=[validate_length, validate_no_pii], # Run in order
guardrail_max_retries=3
)
```
**When validation fails:** The agent receives feedback and retries automatically.
---
## State Persistence Pattern
**Use when:** Long-running flows need crash recovery, or HITL requires resume capability.
```python
from crewai.flow.flow import Flow, start, listen
from crewai.flow.persistence import persist
from pydantic import BaseModel
class LongRunningState(BaseModel):
step: str = "initialized"
checkpoint_data: dict = {}
results: list[str] = []
@persist() # Saves state after EVERY method
class ResilientFlow(Flow[LongRunningState]):
@start()
def phase_one(self):
self.state.step = "phase_one_complete"
self.state.checkpoint_data["phase1"] = "data"
# If crash here, flow resumes from this state
return "Phase 1 done"
@listen(phase_one)
def phase_two(self, prev):
self.state.step = "phase_two_complete"
# State automatically persisted
return "Phase 2 done"
@listen(phase_two)
def phase_three(self, prev):
self.state.step = "complete"
return "All phases done"
# First run (crashes at phase_two)
flow1 = ResilientFlow()
result1 = flow1.kickoff()
# Second run - automatically resumes from persisted state
flow2 = ResilientFlow()
result2 = flow2.kickoff() # Continues from last checkpoint
```
**Integration with HITL:**
```python
@persist()
class ApprovalFlow(Flow[ApprovalState]):
@listen(generate_content)
def await_approval(self, content):
self.state.awaiting_approval = True
self.state.pending_content = content
# State persisted - can resume after human approves
return "Awaiting human approval"
```
---
## HITL Webhooks Pattern (Enterprise)
**Use when:** Enterprise deployments require human approval workflows with external system integration.
```python
import requests
BASE_URL = "https://your-crewai-deployment.com"
# Kickoff with human input webhook
response = requests.post(f"{BASE_URL}/kickoff", json={
"inputs": {"topic": "Quarterly Report"},
"humanInputWebhook": {
"url": "https://your-app.com/hitl-callback",
"authentication": {
"strategy": "bearer",
"token": "your-secret-token"
}
}
})
execution_id = response.json()["execution_id"]
# When human reviews (via your webhook handler)
# Resume with approval/feedback
def resume_after_human_review(execution_id, task_id, approved, feedback=""):
response = requests.post(f"{BASE_URL}/resume", json={
"execution_id": execution_id,
"task_id": task_id,
"human_feedback": feedback,
"is_approve": approved
})
return response.json()
# Approve and continue
resume_after_human_review(execution_id, "review_task", True, "Looks good!")
# Reject and retry
resume_after_human_review(execution_id, "review_task", False, "Needs more data on Q3")
```
---
## Custom Manager Pattern
**Use when:** Hierarchical coordination requires specialized management behavior.
**`manager_agent` vs `manager_llm`:**
- Use `manager_llm` for simple coordination with default behavior
- Use `manager_agent` for custom role, goals, and delegation rules
```python
from crewai import Agent, Crew, Task, Process
# Custom manager with specific behavior
manager = Agent(
role="Senior Project Manager",
goal="Coordinate team efficiently, prioritize quality over speed",
backstory="""Experienced PM who excels at delegation.
Always validates work before final delivery.
Escalates blockers immediately.""",
allow_delegation=True, # Required for manager
verbose=True
)
# Specialist agents
researcher = Agent(
role="Research Analyst",
goal="Provide accurate, thorough research",
allow_delegation=False # Specialists don't delegate
)
writer = Agent(
role="Technical Writer",
goal="Create clear, accurate documentation",
allow_delegation=False
)
# Hierarchical crew with custom manager
crew = Crew(
agents=[manager, researcher, writer],
tasks=[
Task(description="Research and document API changes", agent=manager)
],
process=Process.hierarchical,
manager_agent=manager # Use custom manager
)
# Alternative: Use LLM as manager (simpler)
crew_simple = Crew(
agents=[researcher, writer],
tasks=[...],
process=Process.hierarchical,
manager_llm="gpt-4o" # Default manager behavior
)
```
---
## Post-Processing Pattern
**Use when:** Final output requires dedicated processing (citations, formatting, compliance).
```python
class PostProcessingFlow(Flow[ContentState]):
@listen(research_complete)
def add_citations(self, raw_research):
"""Dedicated agent for citations"""
citation_agent = Agent(
role="Citation Specialist",
goal="Add proper academic citations",
backstory="Editor with expertise in attribution",
llm=LLM(model="gpt-4o")
)
result = citation_agent.kickoff(
f"Add inline citations [Author, Year] to:\n{raw_research.raw}\n"
f"Sources:\n{json.dumps(self.state.sources)}"
)
self.state.cited_content = result.raw
return result
@listen(add_citations)
def format_for_publication(self, cited):
"""Formatting specialist"""
formatter = Agent(
role="Publication Formatter",
goal="Format for target publication",
llm=LLM(model="gpt-4o")
)
return formatter.kickoff(f"Format for {self.state.target_format}:\n{cited.raw}")
```
---
## Composite Patterns
**Use when:** Enterprise applications require multiple patterns working together.
### Example: Customer Support System
Combines: Coordinator, Parallel Fan-Out, Generator-Critic, Task Guardrails, State Persistence, HITL.
```python
from crewai.flow.flow import Flow, start, listen, router, or_
from crewai.flow.persistence import persist
from crewai import Agent, Crew, Task, Process
import asyncio
class SupportState(BaseModel):
ticket: dict = {}
category: str = ""
parallel_analyses: dict = {}
draft_response: str = ""
quality_score: float = 0.0
iteration: int = 0
escalated: bool = False
@persist() # Crash recovery
class CustomerSupportFlow(Flow[SupportState]):
@start()
def classify_ticket(self):
"""Route to appropriate specialist"""
llm = LLM(model="gpt-4o", response_format=TicketClassification)
result = llm.call([
{"role": "user", "content": f"Classify: {self.state.ticket}"}
])
self.state.category = result.category
return result
@router(classify_ticket)
def route_to_specialist(self):
if self.state.category in ["billing", "technical", "account"]:
return self.state.category
return "general"
@listen("billing")
async def handle_billing(self):
"""Parallel analysis for billing issues"""
results = await asyncio.gather(
BillingCrew().crew().akickoff(inputs=self.state.ticket),
ComplianceCrew().crew().akickoff(inputs=self.state.ticket)
)
self.state.parallel_analyses = {
"billing": results[0].raw,
"compliance": results[1].raw
}
return results
@listen("technical")
def handle_technical(self):
"""Hierarchical crew for technical issues"""
manager = Agent(role="Tech Lead", allow_delegation=True)
crew = Crew(
agents=[manager, Agent(role="Backend Expert"), Agent(role="Frontend Expert")],
tasks=[Task(description=f"Resolve: {self.state.ticket}", agent=manager)],
process=Process.hierarchical,
manager_agent=manager
)
result = crew.kickoff()
self.state.parallel_analyses["technical"] = result.raw
return result
@listen(or_("billing", "technical", "account", "general"))
def generate_response(self):
"""Generator creates response"""
crew = ResponseCrew().crew()
result = crew.kickoff(inputs={
"ticket": self.state.ticket,
"analyses": self.state.parallel_analyses
})
self.state.draft_response = result.raw
self.state.iteration += 1
return result
@listen(generate_response)
def critique_response(self, draft):
"""Critic evaluates quality"""
# Task guardrail validates tone
task = Task(
description="Critique response for tone and accuracy",
agent=QACritic(),
guardrail=validate_professional_tone,
guardrail_max_retries=2
)
crew = Crew(agents=[QACritic()], tasks=[task])
result = crew.kickoff(inputs={"response": self.state.draft_response})
self.state.quality_score = result.pydantic.score
return result
@router(critique_response)
def quality_gate(self):
if self.state.quality_score >= 0.85:
return "approved"
elif self.state.iteration >= 2:
return "escalate" # HITL escalation
return "revise"
@listen("revise")
def revise_response(self):
# Loop back to generator with feedback
return self.generate_response()
@listen("escalate")
def escalate_to_human(self):
"""HITL escalation"""
self.state.escalated = True
return {"status": "escalated", "draft": self.state.draft_response}
@listen("approved")
def send_response(self):
return {"status": "sent", "response": self.state.draft_response}
```
---
## Anti-Patterns
### 1. Over-Engineering Simple Tasks
**Wrong:** Using Orchestrator-Worker for a simple classification task.
**Right:** Use direct LLM call with structured output.
### 2. Missing Exit Conditions
**Wrong:** Iterative loop without max_iterations.
**Right:** Always include `max_iterations` and check in router.
### 3. Not Using @persist() in Production
**Wrong:** Long-running flow without persistence.
**Right:** Add `@persist()` to any flow that could fail mid-execution.
### 4. Synchronous When Async Available
**Wrong:** `crew.kickoff()` for multiple independent crews.
**Right:** `await asyncio.gather(*[crew.akickoff() for crew in crews])`.
### 5. Forgetting Guardrails for Critical Outputs
**Wrong:** Financial report task without validation.
**Right:** Add `guardrail` for compliance and accuracy checks.
### 6. Skipping Post-Processing
**Wrong:** Sending raw LLM output to customers.
**Right:** Add citation, formatting, and compliance agents.

View File

@@ -1,228 +0,0 @@
---
name: crewai-enterprise-endpoint-manager
description: |
Guide for interacting with deployed CrewAI Enterprise (AOP) crews and flows via API endpoints.
Use this skill when: (1) Kicking off deployed crews/flows programmatically, (2) Monitoring execution status and progress,
(3) Retrieving results from completed executions, (4) Understanding the REST API workflow for deployed agentic workflows,
(5) Implementing authentication with Bearer tokens, (6) Building integrations that consume deployed CrewAI endpoints,
(7) Handling human-in-the-loop webhooks with deployed crews, (8) Managing concurrent executions with semaphore patterns.
---
# CrewAI Enterprise Endpoint Manager
Deployed crews and flows in CrewAI Enterprise (AOP) are accessible via REST API endpoints for programmatic execution, monitoring, and result retrieval.
## Contents
- [API Workflow](#api-workflow-overview)
- [Authentication](#authentication)
- [Endpoint Reference](#endpoint-reference)
- [Python Integration](#python-integration)
- [Human-in-the-Loop](#human-in-the-loop-hitl-webhooks)
- [Flow Integration](#integration-with-crewai-flows)
- [Best Practices](#best-practices)
- [Status States](#status-states-reference)
**Detailed References:**
- [references/python-client.md](references/python-client.md) - Full Python client class, async patterns, batch execution with semaphore
- [references/error-handling.md](references/error-handling.md) - Retry strategies, rate limiting, circuit breaker patterns
## API Workflow Overview
```
1. GET /inputs → Discover required input parameters
2. POST /kickoff → Start execution (returns kickoff_id)
3. GET /{id}/status → Monitor progress and retrieve results
```
## Authentication
All requests require a Bearer token from the **Status tab** of your crew's detail page in the AOP dashboard.
```bash
curl -H "Authorization: Bearer YOUR_CREW_TOKEN" \
https://your-crew-url.crewai.com/endpoint
```
## Endpoint Reference
### 1. Discover Inputs - `GET /inputs`
```bash
curl -H "Authorization: Bearer YOUR_CREW_TOKEN" \
https://your-crew-url.crewai.com/inputs
```
**Response:**
```json
{
"inputs": [
{"name": "topic", "type": "string", "required": true},
{"name": "max_results", "type": "integer", "required": false}
]
}
```
### 2. Start Execution - `POST /kickoff`
```bash
curl -X POST \
-H "Authorization: Bearer YOUR_CREW_TOKEN" \
-H "Content-Type: application/json" \
-d '{"inputs": {"topic": "AI Research", "max_results": 10}}' \
https://your-crew-url.crewai.com/kickoff
```
**Response:**
```json
{"kickoff_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef"}
```
### 3. Monitor Status - `GET /{kickoff_id}/status`
```bash
curl -H "Authorization: Bearer YOUR_CREW_TOKEN" \
https://your-crew-url.crewai.com/a1b2c3d4-e5f6-7890-1234-567890abcdef/status
```
**Running:**
```json
{
"status": "running",
"current_task": "research_task",
"progress": {"completed_tasks": 1, "total_tasks": 3}
}
```
**Completed:**
```json
{
"status": "completed",
"result": {
"output": "Final output from the crew...",
"tasks": [
{"task_id": "research_task", "output": "Research findings...", "agent": "Travel Researcher", "execution_time": 45.2}
]
},
"execution_time": 108.5
}
```
**Error:**
```json
{"status": "error", "error_message": "Failed to complete task due to invalid input."}
```
## Python Integration
Basic synchronous pattern for simple integrations:
```python
import requests
import time
BASE_URL = "https://your-crew-url.crewai.com"
TOKEN = "YOUR_CREW_TOKEN"
headers = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}
def kickoff_crew(inputs: dict) -> str:
"""Start execution and return kickoff_id."""
resp = requests.post(f"{BASE_URL}/kickoff", headers=headers, json={"inputs": inputs})
resp.raise_for_status()
return resp.json()["kickoff_id"]
def wait_for_completion(kickoff_id: str, poll_interval: float = 2.0) -> dict:
"""Poll until execution completes or fails."""
while True:
resp = requests.get(f"{BASE_URL}/{kickoff_id}/status", headers=headers)
resp.raise_for_status()
status = resp.json()
if status["status"] in ("completed", "error"):
return status
time.sleep(poll_interval)
# Usage
kickoff_id = kickoff_crew({"topic": "AI Research"})
result = wait_for_completion(kickoff_id)
print(result["result"]["output"])
```
**For advanced patterns, see [references/python-client.md](references/python-client.md):**
- Full `CrewAIClient` class with sync/async methods
- Batch execution with semaphore-controlled concurrency
- Rate limiting and progress callbacks
- Structured output parsing with Pydantic
**For error handling, see [references/error-handling.md](references/error-handling.md):**
- Exponential backoff and retry strategies
- Rate limit handling with adaptive concurrency
- Circuit breaker pattern for resilience
## Human-in-the-Loop (HITL) Webhooks
For crews requiring human input during execution:
```bash
curl -X POST {BASE_URL}/kickoff \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"inputs": {"topic": "AI Research"},
"humanInputWebhook": {
"url": "https://your-webhook.com/hitl",
"authentication": {"strategy": "bearer", "token": "your-webhook-secret"}
}
}'
```
**Webhook payload received:**
```json
{"kickoff_id": "abc123", "task_id": "review_task", "prompt": "Please review...", "context": {...}}
```
**Respond:**
```json
{"response": "Approved with minor edits...", "continue": true}
```
## Integration with CrewAI Flows
Deployed Flows expose the same API. Inputs map to your Flow's state model:
```python
# Local Flow definition
class ResearchState(BaseModel):
topic: str = ""
depth: int = 1
class ResearchFlow(Flow[ResearchState]):
@start()
def begin_research(self):
...
```
```bash
# API call maps to state fields
curl -X POST -H "Authorization: Bearer YOUR_FLOW_TOKEN" \
-H "Content-Type: application/json" \
-d '{"inputs": {"topic": "Quantum Computing", "depth": 3}}' \
https://your-flow-url.crewai.com/kickoff
```
## Best Practices
1. **Discover inputs first** - Always call `GET /inputs` to understand required parameters
2. **Handle all status states** - Check for "running", "completed", and "error"
3. **Use semaphores for batches** - Limit concurrent executions (see [python-client.md](references/python-client.md))
4. **Implement exponential backoff** - For retries: `2^attempt` seconds (see [error-handling.md](references/error-handling.md))
5. **Store kickoff_ids** - Persist IDs for debugging and resumption
6. **Set appropriate timeouts** - Long-running crews may need 10+ minute timeouts
## Status States Reference
| Status | Meaning | Next Action |
|--------|---------|-------------|
| `pending` | Queued for execution | Continue polling |
| `running` | Execution in progress | Continue polling |
| `completed` | Successfully finished | Extract results |
| `error` | Execution failed | Check error_message, retry if transient |

View File

@@ -1,450 +0,0 @@
# Error Handling Patterns for CrewAI Enterprise API
Robust error handling strategies for production deployments.
## Contents
- [Error Types](#error-types) - HTTP status codes and execution errors
- [Retry Strategies](#retry-strategies) - Exponential backoff, selective retry
- [Rate Limit Handling](#rate-limit-handling) - Detect limits, adaptive concurrency
- [Circuit Breaker Pattern](#circuit-breaker-pattern) - Prevent cascading failures
- [Error Aggregation](#error-aggregation-and-reporting) - Collect and summarize batch errors
- [Timeout Handling](#timeout-handling) - Separate execution and poll timeouts
- [Resilient Batch Execution](#complete-error-resilient-batch-execution) - Full pattern with auto-retry
## Error Types
### HTTP Status Errors
| Status Code | Meaning | Action |
|-------------|---------|--------|
| 400 | Bad Request | Check input format, fix and retry |
| 401 | Unauthorized | Token invalid/expired, refresh token |
| 403 | Forbidden | No access to this crew |
| 404 | Not Found | Crew URL or kickoff_id doesn't exist |
| 429 | Rate Limited | Implement backoff, reduce concurrency |
| 500 | Server Error | Retry with exponential backoff |
| 502/503 | Service Unavailable | Wait and retry |
### Execution Errors
```json
{
"status": "error",
"error_message": "Failed to complete task due to invalid input."
}
```
Common causes:
- Invalid inputs for the crew's expected schema
- Agent task failures
- LLM API errors (rate limits, timeouts)
- Tool execution failures
## Retry Strategies
### Exponential Backoff
```python
import asyncio
import random
async def retry_with_backoff(
func,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
):
"""
Retry with exponential backoff and optional jitter.
Args:
func: Async function to retry
max_retries: Maximum retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay cap
jitter: Add randomness to prevent thundering herd
"""
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
await asyncio.sleep(delay)
```
### Selective Retry
```python
from httpx import HTTPStatusError
RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}
async def selective_retry(func, max_retries: int = 3):
"""Only retry on specific error types."""
for attempt in range(max_retries + 1):
try:
return await func()
except HTTPStatusError as e:
if e.response.status_code not in RETRYABLE_STATUS_CODES:
raise # Don't retry 400, 401, 403, 404
if attempt == max_retries:
raise
await asyncio.sleep(2 ** attempt)
except (asyncio.TimeoutError, ConnectionError) as e:
if attempt == max_retries:
raise
await asyncio.sleep(2 ** attempt)
```
## Rate Limit Handling
### Detect and Respect Rate Limits
```python
import asyncio
from datetime import datetime, timedelta
class RateLimitHandler:
"""Handle 429 responses with retry-after headers."""
def __init__(self):
self.blocked_until: datetime | None = None
async def wait_if_blocked(self):
"""Wait if currently rate limited."""
if self.blocked_until and datetime.now() < self.blocked_until:
wait_time = (self.blocked_until - datetime.now()).total_seconds()
print(f"Rate limited. Waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
def handle_response(self, response):
"""Check response for rate limit headers."""
if response.status_code == 429:
retry_after = response.headers.get("Retry-After", "60")
try:
seconds = int(retry_after)
except ValueError:
seconds = 60
self.blocked_until = datetime.now() + timedelta(seconds=seconds)
raise RateLimitError(f"Rate limited for {seconds}s")
class RateLimitError(Exception):
pass
```
### Adaptive Concurrency
```python
class AdaptiveSemaphore:
"""
Automatically reduce concurrency on rate limits.
"""
def __init__(self, initial: int = 10, minimum: int = 1):
self.current = initial
self.minimum = minimum
self.semaphore = asyncio.Semaphore(initial)
self.lock = asyncio.Lock()
self._rate_limit_count = 0
async def acquire(self):
await self.semaphore.acquire()
def release(self):
self.semaphore.release()
async def reduce_on_rate_limit(self):
"""Call when hitting rate limit."""
async with self.lock:
self._rate_limit_count += 1
if self._rate_limit_count >= 3 and self.current > self.minimum:
new_value = max(self.current // 2, self.minimum)
print(f"Reducing concurrency: {self.current} -> {new_value}")
self.current = new_value
self.semaphore = asyncio.Semaphore(new_value)
self._rate_limit_count = 0
```
## Circuit Breaker Pattern
Prevent cascading failures by stopping requests when error rate is too high.
```python
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
@dataclass
class CircuitBreaker:
"""
Circuit breaker to prevent cascading failures.
Usage:
breaker = CircuitBreaker(failure_threshold=5, recovery_time=30)
async def make_request():
if not breaker.allow_request():
raise CircuitOpenError("Circuit is open")
try:
result = await client.kickoff(inputs)
breaker.record_success()
return result
except Exception as e:
breaker.record_failure()
raise
"""
failure_threshold: int = 5
recovery_time: float = 30.0 # seconds
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
last_failure_time: datetime | None = None
def allow_request(self) -> bool:
"""Check if request is allowed."""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if self._should_attempt_recovery():
self.state = CircuitState.HALF_OPEN
return True
return False
# HALF_OPEN: allow one request to test
return True
def record_success(self):
"""Record successful request."""
self.failure_count = 0
self.state = CircuitState.CLOSED
def record_failure(self):
"""Record failed request."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit opened after {self.failure_count} failures")
def _should_attempt_recovery(self) -> bool:
if self.last_failure_time is None:
return True
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.recovery_time
class CircuitOpenError(Exception):
pass
```
## Error Aggregation and Reporting
```python
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class ErrorAggregator:
"""Collect and summarize errors from batch executions."""
errors: list[tuple[dict, str]] = field(default_factory=list)
error_counts: dict[str, int] = field(default_factory=lambda: defaultdict(int))
def add_error(self, inputs: dict, error_message: str):
"""Record an error."""
self.errors.append((inputs, error_message))
# Categorize error
category = self._categorize(error_message)
self.error_counts[category] += 1
def _categorize(self, error_message: str) -> str:
"""Categorize error for aggregation."""
lower = error_message.lower()
if "rate limit" in lower or "429" in lower:
return "rate_limit"
elif "timeout" in lower:
return "timeout"
elif "401" in lower or "unauthorized" in lower:
return "auth"
elif "invalid input" in lower:
return "invalid_input"
elif "500" in lower or "server error" in lower:
return "server_error"
return "other"
def get_summary(self) -> dict:
"""Get error summary."""
return {
"total_errors": len(self.errors),
"by_category": dict(self.error_counts),
"sample_errors": self.errors[:5] # First 5 for inspection
}
def get_retryable_inputs(self) -> list[dict]:
"""Get inputs that failed with retryable errors."""
retryable_categories = {"rate_limit", "timeout", "server_error"}
return [
inputs for inputs, error in self.errors
if self._categorize(error) in retryable_categories
]
# Usage
aggregator = ErrorAggregator()
for result in batch_results:
if result.error:
aggregator.add_error(result.inputs, result.error)
summary = aggregator.get_summary()
print(f"Total errors: {summary['total_errors']}")
print(f"By category: {summary['by_category']}")
# Retry failed inputs
retryable = aggregator.get_retryable_inputs()
if retryable:
print(f"Retrying {len(retryable)} failed executions...")
retry_results = await client.kickoff_batch(retryable, max_concurrent=5)
```
## Timeout Handling
```python
async def kickoff_with_timeout(
client,
inputs: dict,
execution_timeout: float = 600.0,
poll_timeout: float = 10.0
) -> ExecutionResult:
"""
Kickoff with separate timeouts for execution and polling.
Args:
execution_timeout: Max total time for the crew to complete
poll_timeout: Timeout for each status poll request
"""
start_time = datetime.now()
async with httpx.AsyncClient(timeout=30.0) as http_client:
# Kickoff (short timeout)
resp = await http_client.post(
f"{client.base_url}/kickoff",
headers=client._headers,
json={"inputs": inputs}
)
kickoff_id = resp.json()["kickoff_id"]
# Poll with timeout
while True:
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > execution_timeout:
return ExecutionResult(
kickoff_id=kickoff_id,
inputs=inputs,
status=ExecutionStatus.ERROR,
error=f"Execution timeout after {elapsed:.0f}s"
)
try:
async with asyncio.timeout(poll_timeout):
resp = await http_client.get(
f"{client.base_url}/{kickoff_id}/status",
headers=client._headers
)
except asyncio.TimeoutError:
continue # Retry poll
data = resp.json()
if data["status"] in ("completed", "error"):
return ExecutionResult(...)
await asyncio.sleep(2)
```
## Complete Error-Resilient Batch Execution
```python
async def resilient_batch_execution(
client: CrewAIClient,
inputs_list: list[dict],
max_concurrent: int = 10,
max_total_retries: int = 3
) -> tuple[list[ExecutionResult], list[ExecutionResult]]:
"""
Execute batch with automatic retry of failed executions.
Returns:
Tuple of (successful_results, final_failed_results)
"""
all_successful = []
remaining = inputs_list.copy()
for retry_round in range(max_total_retries + 1):
if not remaining:
break
if retry_round > 0:
print(f"Retry round {retry_round}: {len(remaining)} executions")
# Reduce concurrency on retries
current_concurrent = max(max_concurrent // (2 ** retry_round), 1)
else:
current_concurrent = max_concurrent
results = await client.kickoff_batch(
remaining,
max_concurrent=current_concurrent
)
# Separate successful and failed
successful = [r for r in results if r.is_success]
failed = [r for r in results if not r.is_success]
all_successful.extend(successful)
# Get retryable failures
aggregator = ErrorAggregator()
for r in failed:
aggregator.add_error(r.inputs, r.error or "Unknown error")
remaining = aggregator.get_retryable_inputs()
if not remaining:
# All remaining failures are non-retryable
return all_successful, failed
await asyncio.sleep(5 * (retry_round + 1)) # Increasing wait between rounds
# Return final state
final_failed = [
ExecutionResult(
kickoff_id="",
inputs=inputs,
status=ExecutionStatus.ERROR,
error="Max retries exceeded"
)
for inputs in remaining
]
return all_successful, final_failed
```

View File

@@ -1,648 +0,0 @@
# CrewAI Enterprise Python Client
Complete Python client implementation for interacting with deployed CrewAI crews and flows.
## Contents
- [Full-Featured Client Class](#full-featured-client-class) - Production-ready `CrewAIClient` with sync/async methods
- [Usage Examples](#usage-examples) - Basic, async, structured output, error handling
- [Environment Configuration](#environment-configuration) - Config from environment variables
- [Rate-Limited Semaphore](#rate-limited-semaphore) - Combine concurrency with rate limiting
- [Quick Start Examples](#quick-start-examples) - Minimal code to get started
- [Dependencies](#dependencies) - Required packages
## Full-Featured Client Class
```python
"""
CrewAI Enterprise API Client
Production-ready client for interacting with deployed crews and flows.
Supports synchronous, async, and batch operations with semaphore control.
"""
import asyncio
import httpx
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, TypeVar, Generic
from enum import Enum
from pydantic import BaseModel
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=BaseModel)
class ExecutionStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
ERROR = "error"
@dataclass
class ExecutionResult:
"""Result of a single crew execution."""
kickoff_id: str
inputs: dict
status: ExecutionStatus
result: dict | None = None
error: str | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
progress: dict | None = None
@property
def duration_seconds(self) -> float | None:
if self.started_at and self.completed_at:
return (self.completed_at - self.started_at).total_seconds()
return None
@property
def is_success(self) -> bool:
return self.status == ExecutionStatus.COMPLETED and self.result is not None
@dataclass
class CrewAIClient:
"""
Client for CrewAI Enterprise API.
Usage:
client = CrewAIClient(
base_url="https://your-crew.crewai.com",
token="YOUR_TOKEN"
)
# Sync usage
result = client.kickoff_sync({"topic": "AI"})
# Async usage
result = await client.kickoff({"topic": "AI"})
# Batch with semaphore
results = await client.kickoff_batch(inputs_list, max_concurrent=10)
"""
base_url: str
token: str
timeout: float = 600.0
poll_interval: float = 2.0
max_retries: int = 3
_headers: dict = field(init=False)
def __post_init__(self):
self.base_url = self.base_url.rstrip("/")
self._headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
# =========================================================================
# Input Discovery
# =========================================================================
async def get_inputs(self) -> dict:
"""Discover required inputs for the crew."""
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(
f"{self.base_url}/inputs",
headers=self._headers
)
resp.raise_for_status()
return resp.json()
def get_inputs_sync(self) -> dict:
"""Synchronous version of get_inputs."""
import requests
resp = requests.get(
f"{self.base_url}/inputs",
headers=self._headers,
timeout=30
)
resp.raise_for_status()
return resp.json()
# =========================================================================
# Single Execution
# =========================================================================
async def kickoff(
self,
inputs: dict,
wait: bool = True,
webhook_config: dict | None = None
) -> ExecutionResult:
"""
Start a crew execution.
Args:
inputs: Input parameters for the crew
wait: If True, poll until completion
webhook_config: Optional HITL webhook configuration
Returns:
ExecutionResult with status and results
"""
started_at = datetime.now()
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Build request body
body: dict[str, Any] = {"inputs": inputs}
if webhook_config:
body["humanInputWebhook"] = webhook_config
# Kickoff
resp = await client.post(
f"{self.base_url}/kickoff",
headers=self._headers,
json=body
)
resp.raise_for_status()
kickoff_id = resp.json()["kickoff_id"]
if not wait:
return ExecutionResult(
kickoff_id=kickoff_id,
inputs=inputs,
status=ExecutionStatus.RUNNING,
started_at=started_at
)
# Poll for completion
return await self._poll_until_complete(
client, kickoff_id, inputs, started_at
)
def kickoff_sync(
self,
inputs: dict,
wait: bool = True,
webhook_config: dict | None = None
) -> ExecutionResult:
"""Synchronous version of kickoff."""
return asyncio.run(self.kickoff(inputs, wait, webhook_config))
async def get_status(self, kickoff_id: str) -> ExecutionResult:
"""Get the current status of an execution."""
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(
f"{self.base_url}/{kickoff_id}/status",
headers=self._headers
)
resp.raise_for_status()
data = resp.json()
return ExecutionResult(
kickoff_id=kickoff_id,
inputs={},
status=ExecutionStatus(data["status"]),
result=data.get("result"),
error=data.get("error_message"),
progress=data.get("progress")
)
# =========================================================================
# Batch Execution with Semaphore
# =========================================================================
async def kickoff_batch(
self,
inputs_list: list[dict],
max_concurrent: int = 10,
on_progress: Callable[[int, int, ExecutionResult], None] | None = None,
on_error: Callable[[ExecutionResult], bool] | None = None
) -> list[ExecutionResult]:
"""
Execute multiple crews with semaphore-controlled concurrency.
Args:
inputs_list: List of input dictionaries
max_concurrent: Maximum concurrent executions
on_progress: Callback(completed, total, result) for progress
on_error: Callback(result) -> bool, return True to continue
Returns:
List of ExecutionResult objects
"""
semaphore = asyncio.Semaphore(max_concurrent)
results: list[ExecutionResult] = []
completed_count = 0
total = len(inputs_list)
lock = asyncio.Lock()
async def execute_with_semaphore(inputs: dict) -> ExecutionResult:
nonlocal completed_count
async with semaphore:
result = await self._execute_single_with_retry(inputs)
async with lock:
completed_count += 1
results.append(result)
if on_progress:
on_progress(completed_count, total, result)
if result.error and on_error:
should_continue = on_error(result)
if not should_continue:
raise asyncio.CancelledError("Stopped by on_error callback")
logger.info(
f"[{completed_count}/{total}] "
f"{'OK' if result.is_success else 'ERR'} "
f"id={result.kickoff_id}"
)
return result
tasks = [execute_with_semaphore(inputs) for inputs in inputs_list]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
logger.warning("Batch execution cancelled")
return results
def kickoff_batch_sync(
self,
inputs_list: list[dict],
max_concurrent: int = 10,
on_progress: Callable[[int, int, ExecutionResult], None] | None = None
) -> list[ExecutionResult]:
"""Synchronous version of kickoff_batch."""
return asyncio.run(
self.kickoff_batch(inputs_list, max_concurrent, on_progress)
)
# =========================================================================
# Structured Output Support
# =========================================================================
async def kickoff_typed(
self,
inputs: dict,
output_model: type[T]
) -> T | None:
"""
Execute and parse result into a Pydantic model.
Args:
inputs: Input parameters
output_model: Pydantic model class for parsing
Returns:
Parsed model instance or None if failed
"""
result = await self.kickoff(inputs, wait=True)
if result.is_success and result.result:
output = result.result.get("output", "")
# Try to parse JSON from output
import json
try:
data = json.loads(output)
return output_model.model_validate(data)
except (json.JSONDecodeError, Exception):
return None
return None
# =========================================================================
# Private Methods
# =========================================================================
async def _execute_single_with_retry(self, inputs: dict) -> ExecutionResult:
"""Execute single crew with retry logic."""
started_at = datetime.now()
for attempt in range(self.max_retries):
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Kickoff
resp = await client.post(
f"{self.base_url}/kickoff",
headers=self._headers,
json={"inputs": inputs}
)
resp.raise_for_status()
kickoff_id = resp.json()["kickoff_id"]
# Poll
return await self._poll_until_complete(
client, kickoff_id, inputs, started_at
)
except httpx.HTTPStatusError as e:
if attempt == self.max_retries - 1:
return ExecutionResult(
kickoff_id="",
inputs=inputs,
status=ExecutionStatus.ERROR,
error=f"HTTP {e.response.status_code}: {str(e)}",
started_at=started_at,
completed_at=datetime.now()
)
await asyncio.sleep(2 ** attempt)
except Exception as e:
if attempt == self.max_retries - 1:
return ExecutionResult(
kickoff_id="",
inputs=inputs,
status=ExecutionStatus.ERROR,
error=str(e),
started_at=started_at,
completed_at=datetime.now()
)
await asyncio.sleep(2 ** attempt)
# Should never reach here
return ExecutionResult(
kickoff_id="",
inputs=inputs,
status=ExecutionStatus.ERROR,
error="Max retries exceeded",
started_at=started_at,
completed_at=datetime.now()
)
async def _poll_until_complete(
self,
client: httpx.AsyncClient,
kickoff_id: str,
inputs: dict,
started_at: datetime
) -> ExecutionResult:
"""Poll status until completion or error."""
while True:
resp = await client.get(
f"{self.base_url}/{kickoff_id}/status",
headers=self._headers
)
resp.raise_for_status()
data = resp.json()
status = ExecutionStatus(data["status"])
if status == ExecutionStatus.COMPLETED:
return ExecutionResult(
kickoff_id=kickoff_id,
inputs=inputs,
status=status,
result=data.get("result"),
started_at=started_at,
completed_at=datetime.now()
)
if status == ExecutionStatus.ERROR:
return ExecutionResult(
kickoff_id=kickoff_id,
inputs=inputs,
status=status,
error=data.get("error_message", "Unknown error"),
started_at=started_at,
completed_at=datetime.now()
)
await asyncio.sleep(self.poll_interval)
```
## Usage Examples
### Basic Usage
```python
# Initialize client
client = CrewAIClient(
base_url="https://your-crew.crewai.com",
token="YOUR_TOKEN"
)
# Discover inputs
inputs_schema = client.get_inputs_sync()
print(f"Required inputs: {inputs_schema}")
# Single execution (sync)
result = client.kickoff_sync({"topic": "AI Research"})
if result.is_success:
print(f"Output: {result.result['output']}")
else:
print(f"Error: {result.error}")
```
### Async with Progress Tracking
```python
import asyncio
async def main():
client = CrewAIClient(
base_url="https://your-crew.crewai.com",
token="YOUR_TOKEN",
max_concurrent=10
)
# 100 executions
inputs_list = [{"topic": f"Topic {i}"} for i in range(100)]
def on_progress(completed, total, result):
pct = completed / total * 100
status = "OK" if result.is_success else "ERR"
print(f"[{completed}/{total}] {pct:.0f}% - {status}")
results = await client.kickoff_batch(
inputs_list,
max_concurrent=10,
on_progress=on_progress
)
# Summary
success = sum(1 for r in results if r.is_success)
print(f"\nCompleted: {success}/{len(results)}")
asyncio.run(main())
```
### With Structured Output
```python
from pydantic import BaseModel
class ResearchOutput(BaseModel):
summary: str
key_findings: list[str]
confidence: float
async def main():
client = CrewAIClient(...)
output = await client.kickoff_typed(
inputs={"topic": "Quantum Computing"},
output_model=ResearchOutput
)
if output:
print(f"Summary: {output.summary}")
print(f"Findings: {output.key_findings}")
```
### Error Handling with Early Stop
```python
async def main():
client = CrewAIClient(...)
error_count = 0
max_errors = 5
def on_error(result):
nonlocal error_count
error_count += 1
print(f"Error {error_count}: {result.error}")
# Stop if too many errors
return error_count < max_errors
results = await client.kickoff_batch(
inputs_list,
max_concurrent=10,
on_error=on_error
)
```
## Environment Configuration
```python
import os
from dataclasses import dataclass
@dataclass
class Config:
base_url: str = os.getenv("CREWAI_BASE_URL", "")
token: str = os.getenv("CREWAI_TOKEN", "")
max_concurrent: int = int(os.getenv("CREWAI_MAX_CONCURRENT", "10"))
timeout: float = float(os.getenv("CREWAI_TIMEOUT", "600"))
config = Config()
client = CrewAIClient(
base_url=config.base_url,
token=config.token,
timeout=config.timeout
)
```
## Rate-Limited Semaphore
For APIs with rate limits, combine concurrency control with rate limiting:
```python
import asyncio
from collections import deque
from time import time
class RateLimitedSemaphore:
"""
Semaphore with rate limiting: max N concurrent requests AND max M requests per second.
Usage:
limiter = RateLimitedSemaphore(max_concurrent=10, max_per_second=5.0)
async def make_request():
async with limiter:
return await client.kickoff(inputs)
"""
def __init__(self, max_concurrent: int, max_per_second: float):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.max_per_second = max_per_second
self.request_times: deque = deque()
self.lock = asyncio.Lock()
async def __aenter__(self):
await self.semaphore.acquire()
async with self.lock:
now = time()
# Remove timestamps older than 1 second
while self.request_times and now - self.request_times[0] > 1.0:
self.request_times.popleft()
# If at rate limit, wait
if len(self.request_times) >= self.max_per_second:
sleep_time = 1.0 - (now - self.request_times[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.request_times.append(time())
async def __aexit__(self, *args):
self.semaphore.release()
# Usage with CrewAIClient
async def batch_with_rate_limit(client: CrewAIClient, inputs_list: list[dict]):
"""Execute batch with both concurrency and rate limiting."""
limiter = RateLimitedSemaphore(max_concurrent=10, max_per_second=5.0)
async def single_execution(inputs: dict) -> ExecutionResult:
async with limiter:
return await client.kickoff(inputs, wait=True)
tasks = [single_execution(inputs) for inputs in inputs_list]
return await asyncio.gather(*tasks)
```
## Quick Start Examples
### Minimal Example
```python
# Simplest possible usage
client = CrewAIClient(
base_url="https://your-crew.crewai.com",
token="YOUR_TOKEN"
)
result = client.kickoff_sync({"topic": "AI Research"})
print(result.result["output"] if result.is_success else result.error)
```
### Batch Processing 100 Items
```python
import asyncio
async def process_batch():
client = CrewAIClient(
base_url="https://your-crew.crewai.com",
token="YOUR_TOKEN"
)
inputs = [{"topic": f"Topic {i}"} for i in range(100)]
results = await client.kickoff_batch(
inputs,
max_concurrent=10,
on_progress=lambda done, total, r: print(f"{done}/{total}")
)
success = sum(1 for r in results if r.is_success)
print(f"Success: {success}/{len(results)}")
asyncio.run(process_batch())
```
## Dependencies
```toml
# pyproject.toml
[project]
dependencies = [
"httpx>=0.25.0",
"pydantic>=2.0.0",
]
```

View File

@@ -1,448 +0,0 @@
---
name: crewai-tool-creator
description: |
Guide for creating custom tools for CrewAI agents following best practices from both CrewAI documentation
and Anthropic's agent tool design principles. Use this skill when: (1) Creating new custom tools for agents,
(2) Designing tool input schemas with Pydantic, (3) Writing effective tool descriptions and error messages,
(4) Implementing caching and async tools, (5) Optimizing tools for context efficiency and token consumption.
Always use BaseTool class inheritance for full control over validation, error handling, and behavior.
---
# CrewAI Tool Creator Guide
**Core Principle: Design tools that reduce agent cognitive load while enabling clear, distinct actions.**
Tools are how agents interact with the world. Well-designed tools make agents more effective; poorly designed tools cause confusion, hallucinations, and wasted tokens.
## BaseTool Class Structure
Always use the `BaseTool` class for full control over input validation, error handling, and state:
```python
from typing import Type
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class SearchContactsInput(BaseModel):
"""Input schema for contact search."""
query: str = Field(..., description="Name or email to search for (partial match)")
limit: int = Field(default=10, ge=1, le=100, description="Max results (1-100)")
class SearchContactsTool(BaseTool):
name: str = "search_contacts"
description: str = """
Search for contacts by name or email. Returns matching contacts with
their name, email, and role. Use this instead of listing all contacts.
"""
args_schema: Type[BaseModel] = SearchContactsInput
def _run(self, query: str, limit: int = 10) -> str:
try:
results = self.db.search_contacts(query, limit)
return json.dumps(results)
except DatabaseError as e:
return f"Search failed: {e}. Try a different query or check connection."
```
## Critical Design Principles
### 1. Context Efficiency
Agents have limited context. Design tools that consolidate operations:
```python
# BAD: Forces multiple calls, wastes context
class ListContactsTool(BaseTool):
name: str = "list_contacts"
description: str = "List all contacts in the database."
def _run(self) -> str:
return json.dumps(db.get_all()) # Could return thousands
# GOOD: Search reduces context consumption
class SearchContactsTool(BaseTool):
name: str = "search_contacts"
description: str = "Search contacts by name/email. Max 25 results per query."
args_schema: Type[BaseModel] = SearchInput
def _run(self, query: str, limit: int = 25) -> str:
return json.dumps(db.search(query, limit))
```
### 2. Clear Purpose and Naming
Each tool should have one clear purpose with an unambiguous name:
```python
# BAD: Vague, what does "process" mean?
name: str = "data_tool"
# GOOD: Clear action and target
name: str = "search_customer_orders"
# Use namespacing for related tools
name: str = "crm_search_contacts"
name: str = "crm_create_contact"
name: str = "crm_update_contact"
```
### 3. Semantic Parameter Names
Use descriptive names, not cryptic identifiers:
```python
# BAD: Ambiguous
class BadInput(BaseModel):
user: str # User what? ID? Name? Email?
id: str # ID of what?
# GOOD: Unambiguous
class GoodInput(BaseModel):
user_email: str = Field(..., description="Email address of the user")
order_id: str = Field(..., description="Order ID (format: ORD-XXXXX)")
```
### 4. Meaningful Response Design
Return what agents need, exclude what they don't:
```python
# BAD: Dumps everything including useless metadata
def _run(self, order_id: str) -> str:
order = db.get_order(order_id)
return json.dumps(order.__dict__) # Includes uuid, created_at_unix, internal_flags...
# GOOD: Curated, relevant fields
def _run(self, order_id: str) -> str:
order = db.get_order(order_id)
return json.dumps({
"order_id": order.display_id,
"customer": order.customer_name,
"items": [{"name": i.name, "qty": i.qty} for i in order.items],
"status": order.status,
"total": f"${order.total:.2f}"
})
```
### 5. Actionable Error Messages
Replace stack traces with guidance:
```python
# BAD: Unhelpful
def _run(self, date: str) -> str:
try:
parsed = datetime.fromisoformat(date)
except ValueError:
return "Error: Invalid date format"
# GOOD: Guides correction
def _run(self, date: str) -> str:
try:
parsed = datetime.fromisoformat(date)
except ValueError:
return (
f"Invalid date format: '{date}'. "
f"Use ISO 8601 format: YYYY-MM-DD (e.g., 2024-03-15)"
)
```
## Tool Description Best Practices
Write descriptions as you would for a new team member:
```python
description: str = """
Search for orders by customer email or order ID.
When to use:
- Finding a specific customer's order history
- Looking up order status by ID
- Checking recent orders for a customer
Returns: Order details including status, items, and total.
Does NOT return: Payment details or internal notes.
Tip: For bulk order analysis, use 'export_orders' instead.
"""
```
## Pydantic Input Schema Patterns
### Required vs Optional Fields
```python
class AnalyzeInput(BaseModel):
# Required: no default value
data_source: str = Field(..., description="Data source identifier")
# Optional with default
include_historical: bool = Field(
default=False,
description="Include historical data (slower)"
)
# Optional, can be None
date_filter: Optional[str] = Field(
default=None,
description="Filter by date (ISO format) or None for all"
)
```
### Validation Constraints
```python
class PaginatedInput(BaseModel):
query: str = Field(..., min_length=2, max_length=200)
page: int = Field(default=1, ge=1, description="Page number (starts at 1)")
per_page: int = Field(default=25, ge=1, le=100, description="Results per page")
# Complex validation
@field_validator('query')
@classmethod
def validate_query(cls, v: str) -> str:
if v.strip() != v:
raise ValueError("Query cannot have leading/trailing whitespace")
return v
```
### Enum for Controlled Values
```python
from enum import Enum
class SortOrder(str, Enum):
ASC = "asc"
DESC = "desc"
class SortableInput(BaseModel):
query: str = Field(...)
sort_by: str = Field(default="created_at", description="Field to sort by")
sort_order: SortOrder = Field(default=SortOrder.DESC)
```
## Response Format Flexibility
Let agents request the detail level they need:
```python
class AnalyzeDataInput(BaseModel):
data_source: str = Field(..., description="Data source identifier")
response_format: str = Field(
default="concise",
description="'concise' for summary only, 'detailed' for full breakdown"
)
class AnalyzeDataTool(BaseTool):
name: str = "analyze_data"
description: str = "Analyze data source. Use response_format='concise' unless full details needed."
args_schema: Type[BaseModel] = AnalyzeDataInput
def _run(self, data_source: str, response_format: str = "concise") -> str:
analysis = self.analyze(data_source)
if response_format == "concise":
return json.dumps({
"summary": analysis.summary,
"key_metrics": analysis.top_3_metrics
}) # ~70 tokens
else:
return json.dumps({
"summary": analysis.summary,
"all_metrics": analysis.all_metrics,
"data_points": analysis.raw_data,
"methodology": analysis.methodology
}) # ~300 tokens
```
## Tool with Constructor Dependencies
Inject dependencies through `__init__`:
```python
class DatabaseSearchTool(BaseTool):
name: str = "db_search"
description: str = "Search the database"
args_schema: Type[BaseModel] = SearchInput
def __init__(self, db_connection, cache_client=None):
super().__init__()
self.db = db_connection
self.cache = cache_client
def _run(self, query: str) -> str:
if self.cache:
cached = self.cache.get(query)
if cached:
return cached
results = self.db.search(query)
if self.cache:
self.cache.set(query, results)
return results
```
## Tool Caching
**When to use:** Expensive API calls, idempotent operations, repeated queries with same results.
Use the `cache_function` attribute to control caching. It receives `(arguments: dict, result)` and returns `bool`.
```python
from typing import Callable, Any
class ExpensiveSearchTool(BaseTool):
name: str = "expensive_search"
description: str = "Search with caching for repeated queries"
args_schema: Type[BaseModel] = SearchInput
# Cache all successful results
cache_function: Callable[[dict, Any], bool] = lambda args, result: (
not str(result).startswith("Error:") # Don't cache errors
)
def _run(self, query: str) -> str:
return expensive_api.search(query)
```
**When NOT to cache:**
- User-specific data that changes frequently
- Time-sensitive information
- Error responses (agent should retry)
See [references/tool-patterns.md](references/tool-patterns.md#tool-caching) for conditional caching patterns.
## Async Execution
**Important:** Tool `_run()` methods are **synchronous**. Async execution happens at the crew/task level, not the tool level.
**When you need parallel operations within a tool**, use `ThreadPoolExecutor`:
```python
from concurrent.futures import ThreadPoolExecutor, as_completed
class MultiSourceTool(BaseTool):
def _run(self, query: str) -> str:
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(src.search, query): src for src in self.sources}
results = {futures[f]: f.result() for f in as_completed(futures)}
return json.dumps(results)
```
**For async crew execution**, use `akickoff()` or `kickoff_async()` at the flow level.
See [references/tool-patterns.md](references/tool-patterns.md#async-execution-patterns) for async patterns.
## Complete Production Example
```python
from typing import Type, Optional
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import json
class CustomerSearchInput(BaseModel):
"""Input schema for customer search tool."""
query: str = Field(
...,
min_length=2,
description="Search by name, email, or phone (min 2 chars)"
)
status: Optional[str] = Field(
default=None,
description="Filter by status: 'active', 'inactive', or None for all"
)
limit: int = Field(
default=10,
ge=1,
le=50,
description="Results per page (1-50, default 10)"
)
class CustomerSearchTool(BaseTool):
name: str = "crm_search_customers"
description: str = """
Search for customers by name, email, or phone number.
Use cases:
- Find a specific customer's profile
- Look up customers by partial name/email
- Filter active vs inactive customers
Returns: Customer name, email, status, and last order date.
For full customer details, use 'crm_get_customer' with the customer_id.
"""
args_schema: Type[BaseModel] = CustomerSearchInput
def __init__(self, db_connection):
super().__init__()
self.db = db_connection
def _run(self, query: str, status: Optional[str] = None, limit: int = 10) -> str:
# Validate status if provided
valid_statuses = {"active", "inactive", None}
if status and status not in valid_statuses:
return (
f"Invalid status: '{status}'. "
f"Use 'active', 'inactive', or omit for all customers."
)
try:
results = self.db.search_customers(
query=query,
status=status,
limit=limit
)
if not results:
return f"No customers found matching '{query}'" + (
f" with status '{status}'" if status else ""
) + ". Try a broader search term."
# Return curated, agent-friendly format
return json.dumps({
"count": len(results),
"customers": [
{
"customer_id": c.id,
"name": c.full_name,
"email": c.email,
"status": c.status,
"last_order": c.last_order_date.isoformat() if c.last_order_date else None
}
for c in results
]
})
except DatabaseError:
return "Search failed: Database unavailable. Retry in a moment or contact support."
```
## Assigning Tools to Agents
```python
from crewai import Agent
# Initialize tools with dependencies
search_tool = CustomerSearchTool(db_connection=db)
order_tool = OrderLookupTool(db_connection=db)
# Assign to agent
support_agent = Agent(
role="Customer Support Specialist",
goal="Quickly resolve customer inquiries",
backstory="Expert at navigating customer systems",
tools=[search_tool, order_tool],
verbose=True
)
```
## Reference Files
- [references/tool-patterns.md](references/tool-patterns.md) - Advanced patterns: MCP integration, tool composition, testing
- [references/design-principles.md](references/design-principles.md) - Anthropic's complete agent tool design principles
## Sources
- [CrewAI Custom Tools Documentation](https://docs.crewai.com/en/learn/create-custom-tools)
- [Anthropic: Writing Tools for Agents](https://www.anthropic.com/engineering/writing-tools-for-agents)

View File

@@ -1,404 +0,0 @@
# Agent Tool Design Principles
Based on [Anthropic's Engineering Guide: Writing Tools for Agents](https://www.anthropic.com/engineering/writing-tools-for-agents)
## Core Philosophy
> "LLM agents have limited 'context'...whereas computer memory is cheap and abundant"
Agents operate under fundamentally different constraints than traditional software. Every token consumed reduces their ability to reason. Design tools that maximize value per token.
## Principle 1: Context Efficiency
### The Problem
Agents have limited context windows. Every tool response consumes tokens that could be used for reasoning.
### The Solution
Build tools that consolidate operations and return only what's needed.
```python
# ANTI-PATTERN: List everything
class ListAllContactsTool(BaseTool):
name: str = "list_contacts"
def _run(self) -> str:
# Returns 10,000 contacts = 500,000 tokens wasted
return json.dumps(db.get_all_contacts())
# PATTERN: Search with limits
class SearchContactsTool(BaseTool):
name: str = "search_contacts"
description: str = "Search contacts. Use specific queries to find who you need."
def _run(self, query: str, limit: int = 25) -> str:
# Returns max 25 results = ~2,000 tokens
return json.dumps(db.search(query, limit=limit))
```
### Multi-Step Consolidation
Combine related operations that are typically used together:
```python
# ANTI-PATTERN: Requires 3 tool calls
# 1. get_user(id) -> user
# 2. get_user_orders(user_id) -> orders
# 3. get_order_details(order_id) -> details
# PATTERN: Single call with context
class UserContextTool(BaseTool):
name: str = "get_user_context"
description: str = """
Get user profile with recent activity.
Returns user info, last 5 orders, and active support tickets.
"""
def _run(self, user_id: str) -> str:
user = db.get_user(user_id)
orders = db.get_recent_orders(user_id, limit=5)
tickets = db.get_open_tickets(user_id)
return json.dumps({
"user": {"name": user.name, "email": user.email, "tier": user.tier},
"recent_orders": [{"id": o.id, "date": o.date, "status": o.status} for o in orders],
"open_tickets": len(tickets)
})
```
## Principle 2: Clear, Distinct Purpose
### Tool Selection
Agents must choose between available tools. Overlapping purposes cause confusion.
```python
# ANTI-PATTERN: Overlapping tools
tools = [
DataTool(), # "Process data"
AnalyzerTool(), # "Analyze data"
ProcessorTool(), # "Handle data processing"
]
# Agent: "Which one do I use?"
# PATTERN: Distinct purposes
tools = [
DataValidationTool(), # "Validate data format and completeness"
DataTransformTool(), # "Convert data between formats (CSV, JSON, XML)"
DataAnalysisTool(), # "Calculate statistics and identify patterns"
]
```
### Namespacing
Group related tools with consistent prefixes:
```python
# Service-based namespacing
name: str = "asana_search_tasks"
name: str = "asana_create_task"
name: str = "asana_update_task"
# Or resource-based
name: str = "search_asana_tasks"
name: str = "create_asana_task"
name: str = "update_asana_task"
```
## Principle 3: Semantic Clarity
### Parameter Naming
Agents handle natural language better than cryptic identifiers:
```python
# ANTI-PATTERN: Ambiguous parameters
class BadInput(BaseModel):
user: str # ID? Email? Name?
id: str # Of what?
type: str # What types exist?
# PATTERN: Self-documenting parameters
class GoodInput(BaseModel):
user_email: str = Field(..., description="User's email address")
project_id: str = Field(..., description="Project ID (format: PRJ-XXXXX)")
task_type: str = Field(
...,
description="Type of task: 'bug', 'feature', or 'improvement'"
)
```
### ID Resolution
Resolve UUIDs to semantic identifiers:
```python
# ANTI-PATTERN: Returns raw UUIDs
def _run(self, query: str) -> str:
return json.dumps([{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"parent_id": "f1e2d3c4-b5a6-0987-dcba-654321098765"
}])
# PATTERN: Semantic identifiers
def _run(self, query: str) -> str:
return json.dumps([{
"project_id": "PRJ-001",
"project_name": "Website Redesign",
"parent_project": "Marketing Initiatives"
}])
```
## Principle 4: Meaningful Response Design
### Curated Fields
Return what enables action, exclude noise:
```python
# ANTI-PATTERN: Dump everything
def _run(self, order_id: str) -> str:
order = db.get_order(order_id)
return json.dumps({
"uuid": order.uuid,
"created_at_unix": order.created_at_unix,
"updated_at_unix": order.updated_at_unix,
"internal_flags": order.internal_flags,
"mime_type": order.mime_type,
"256px_image_url": order.thumbnail_url,
"customer_name": order.customer_name,
"status": order.status,
# ... 50 more fields
})
# PATTERN: Agent-relevant fields only
def _run(self, order_id: str) -> str:
order = db.get_order(order_id)
return json.dumps({
"order_id": order.display_id,
"customer": order.customer_name,
"status": order.status,
"items": [{"name": i.name, "qty": i.qty} for i in order.items],
"total": f"${order.total:.2f}",
"can_modify": order.status in ["pending", "processing"]
})
```
### Response Format Flexibility
Let agents request appropriate detail levels:
```python
class AnalysisInput(BaseModel):
target: str
response_format: str = Field(
default="concise",
description="'concise' (~50 tokens) or 'detailed' (~500 tokens)"
)
def _run(self, target: str, response_format: str = "concise") -> str:
analysis = self.analyze(target)
if response_format == "concise":
return json.dumps({
"summary": analysis.summary,
"score": analysis.score
})
else:
return json.dumps({
"summary": analysis.summary,
"score": analysis.score,
"breakdown": analysis.category_scores,
"evidence": analysis.supporting_data,
"methodology": analysis.methodology_notes
})
```
## Principle 5: Actionable Error Messages
### Replace Tracebacks with Guidance
```python
# ANTI-PATTERN: Opaque errors
def _run(self, date: str) -> str:
try:
dt = datetime.fromisoformat(date)
except:
return "ValueError: Invalid isoformat string"
# PATTERN: Corrective guidance
def _run(self, date: str) -> str:
try:
dt = datetime.fromisoformat(date)
except ValueError:
return (
f"Invalid date format: '{date}'. "
f"Expected ISO 8601: YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS. "
f"Example: 2024-03-15 or 2024-03-15T14:30:00"
)
```
### Suggest Alternatives
```python
def _run(self, status: str) -> str:
valid_statuses = ["pending", "active", "completed", "cancelled"]
if status not in valid_statuses:
# Find close matches
close = [s for s in valid_statuses if status.lower() in s]
suggestion = f" Did you mean '{close[0]}'?" if close else ""
return f"Invalid status: '{status}'.{suggestion} Valid options: {valid_statuses}"
```
### Guide Efficient Behavior
```python
def _run(self, query: str, limit: int = 25) -> str:
results = db.search(query, limit=limit)
if len(results) == limit:
return json.dumps({
"results": results,
"note": f"Returned max {limit} results. Refine query for better matches."
})
return json.dumps({"results": results})
```
## Principle 6: Effective Descriptions
### Write for a New Team Member
```python
description: str = """
Search for customer support tickets by keyword or status.
When to use:
- Finding tickets about a specific issue
- Checking status of customer-reported problems
- Looking up tickets by customer email
Parameters:
- query: Search term (searches subject and body)
- status: Filter by 'open', 'pending', 'resolved', or 'all'
- assigned_to: Filter by agent email (optional)
Returns: Ticket ID, subject, status, customer email, last update.
For full ticket details including conversation history, use 'get_ticket_details'.
Note: Only returns last 30 days by default. Use date_range for older tickets.
"""
```
### Clarify Relationships
```python
description: str = """
Create a new task in a project.
Requires:
- project_id: Get from 'search_projects' or 'list_my_projects'
- assignee_id: Get from 'search_team_members' (optional)
Creates task and returns task_id for use with:
- 'add_task_comment': Add notes or updates
- 'update_task_status': Change status
- 'add_task_attachment': Attach files
"""
```
## Principle 7: Truncation and Limits
### Default Limits
```python
DEFAULT_LIMIT = 25
MAX_RESPONSE_TOKENS = 25000
def _run(self, query: str, limit: int = DEFAULT_LIMIT) -> str:
results = db.search(query, limit=min(limit, 100))
response = json.dumps(results)
# Enforce token limit
if len(response) > MAX_RESPONSE_TOKENS * 4: # ~4 chars per token
return json.dumps({
"results": results[:10],
"truncated": True,
"total_available": len(results),
"suggestion": "Refine your query for more specific results"
})
return response
```
### Pagination Guidance
```python
def _run(self, query: str, page: int = 1) -> str:
per_page = 25
total, results = db.search_paginated(query, page, per_page)
return json.dumps({
"results": results,
"page": page,
"total_pages": (total + per_page - 1) // per_page,
"hint": "Use 'page' parameter for more results" if total > per_page else None
})
```
## Principle 8: Evaluation-Driven Development
### Build Real-World Evaluations
Test tools with realistic, multi-step scenarios:
```python
# Evaluation scenarios
EVAL_SCENARIOS = [
{
"name": "customer_lookup_flow",
"prompt": "Find John Smith's last order and check if it shipped",
"expected_tools": ["search_customers", "get_order_details"],
"success_criteria": lambda result: "shipped" in result.lower() or "pending" in result.lower()
},
{
"name": "refund_processing",
"prompt": "Process a refund for order ORD-12345",
"expected_tools": ["get_order_details", "process_refund"],
"success_criteria": lambda result: "refund" in result.lower() and "processed" in result.lower()
}
]
```
### Metrics to Track
- **Accuracy**: Did the agent complete the task correctly?
- **Tool calls**: How many calls were needed? (fewer is better)
- **Token consumption**: Total tokens used
- **Error rate**: How often did tools return errors?
- **Recovery rate**: Did the agent recover from errors?
### Iterate with Agent Feedback
After evaluations, let agents analyze their own transcripts:
```
Analyze this tool usage transcript. Identify:
1. Where did I get confused about which tool to use?
2. Which tool descriptions were unclear?
3. What information was missing from tool responses?
4. Where did I make unnecessary tool calls?
```
## Summary Checklist
Before deploying a tool, verify:
- [ ] **Context Efficient**: Returns only necessary data
- [ ] **Clear Purpose**: Distinct from other tools, unambiguous name
- [ ] **Semantic Names**: Parameters are self-documenting
- [ ] **Meaningful Responses**: Curated fields, no noise
- [ ] **Actionable Errors**: Guide correction, suggest alternatives
- [ ] **Good Description**: Written for a new team member
- [ ] **Reasonable Limits**: Default pagination, token limits
- [ ] **Tested**: Evaluated with realistic multi-step scenarios

View File

@@ -1,600 +0,0 @@
# Advanced Tool Patterns
## Tool Composition
Create tools that work together as a cohesive toolkit:
```python
from typing import Type
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
# Shared connection manager
class DatabaseToolMixin:
"""Mixin for tools that need database access."""
def __init__(self, db_connection):
super().__init__()
self.db = db_connection
# Tool 1: Search
class ProjectSearchInput(BaseModel):
query: str = Field(..., min_length=2)
status: str = Field(default="all")
class ProjectSearchTool(DatabaseToolMixin, BaseTool):
name: str = "project_search"
description: str = """
Search for projects by name or description.
Returns project_id, name, status, and owner.
Use project_id with 'project_get_details' for full info.
"""
args_schema: Type[BaseModel] = ProjectSearchInput
def _run(self, query: str, status: str = "all") -> str:
results = self.db.search_projects(query, status)
return json.dumps([{
"project_id": p.id,
"name": p.name,
"status": p.status,
"owner": p.owner_name
} for p in results])
# Tool 2: Get Details (uses project_id from search)
class ProjectDetailsInput(BaseModel):
project_id: str = Field(..., description="Project ID from search results")
class ProjectDetailsTool(DatabaseToolMixin, BaseTool):
name: str = "project_get_details"
description: str = """
Get full project details by project_id.
Use after 'project_search' to get comprehensive information.
"""
args_schema: Type[BaseModel] = ProjectDetailsInput
def _run(self, project_id: str) -> str:
project = self.db.get_project(project_id)
if not project:
return f"Project '{project_id}' not found. Use 'project_search' to find valid IDs."
return json.dumps({
"project_id": project.id,
"name": project.name,
"description": project.description,
"status": project.status,
"owner": project.owner_name,
"team": [m.name for m in project.team],
"milestones": [{
"name": m.name,
"due": m.due_date.isoformat(),
"complete": m.is_complete
} for m in project.milestones]
})
# Tool 3: Update (uses project_id)
class ProjectUpdateInput(BaseModel):
project_id: str = Field(...)
status: str = Field(default=None)
description: str = Field(default=None)
class ProjectUpdateTool(DatabaseToolMixin, BaseTool):
name: str = "project_update"
description: str = """
Update project status or description.
Requires project_id from search. At least one field must be provided.
"""
args_schema: Type[BaseModel] = ProjectUpdateInput
def _run(self, project_id: str, status: str = None, description: str = None) -> str:
if not status and not description:
return "No updates provided. Specify 'status' and/or 'description'."
updates = {}
if status:
valid = ["active", "paused", "completed", "cancelled"]
if status not in valid:
return f"Invalid status. Choose from: {valid}"
updates["status"] = status
if description:
updates["description"] = description
self.db.update_project(project_id, **updates)
return f"Project {project_id} updated successfully."
```
## Stateful Tools
Tools that maintain state across calls:
```python
class ConversationMemoryTool(BaseTool):
name: str = "conversation_memory"
description: str = """
Store and retrieve conversation context.
Actions: 'store' to save, 'recall' to retrieve, 'clear' to reset.
"""
args_schema: Type[BaseModel] = MemoryInput
def __init__(self):
super().__init__()
self._memory: dict[str, list] = {}
def _run(self, action: str, key: str, value: str = None) -> str:
if action == "store":
if key not in self._memory:
self._memory[key] = []
self._memory[key].append({
"value": value,
"timestamp": datetime.now().isoformat()
})
return f"Stored under '{key}'"
elif action == "recall":
items = self._memory.get(key, [])
if not items:
return f"No memory found for '{key}'"
return json.dumps(items)
elif action == "clear":
if key in self._memory:
del self._memory[key]
return f"Cleared '{key}'"
return f"Nothing to clear for '{key}'"
return f"Unknown action: {action}. Use 'store', 'recall', or 'clear'."
```
## Retry and Circuit Breaker Patterns
```python
from functools import wraps
import time
class ResilientAPITool(BaseTool):
name: str = "api_call"
description: str = "Make API calls with automatic retry"
args_schema: Type[BaseModel] = APIInput
def __init__(self, api_client, max_retries: int = 3):
super().__init__()
self.api = api_client
self.max_retries = max_retries
self._failures = 0
self._circuit_open_until = None
def _run(self, endpoint: str, params: dict = None) -> str:
# Circuit breaker check
if self._circuit_open_until:
if datetime.now() < self._circuit_open_until:
return "Service temporarily unavailable. Try again in 60 seconds."
self._circuit_open_until = None
self._failures = 0
last_error = None
for attempt in range(self.max_retries):
try:
result = self.api.call(endpoint, params)
self._failures = 0 # Reset on success
return json.dumps(result)
except APIError as e:
last_error = e
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
# Track failures for circuit breaker
self._failures += 1
if self._failures >= 5:
self._circuit_open_until = datetime.now() + timedelta(seconds=60)
return f"API call failed after {self.max_retries} attempts: {last_error}"
```
## Paginated Results Pattern
```python
class PaginatedSearchInput(BaseModel):
query: str = Field(...)
page: int = Field(default=1, ge=1)
per_page: int = Field(default=25, ge=1, le=100)
class PaginatedSearchTool(BaseTool):
name: str = "paginated_search"
description: str = """
Search with pagination. Returns results and pagination info.
Use 'page' parameter to navigate through results.
"""
args_schema: Type[BaseModel] = PaginatedSearchInput
def _run(self, query: str, page: int = 1, per_page: int = 25) -> str:
total, results = self.db.search_paginated(query, page, per_page)
total_pages = (total + per_page - 1) // per_page
return json.dumps({
"results": results,
"pagination": {
"current_page": page,
"per_page": per_page,
"total_results": total,
"total_pages": total_pages,
"has_next": page < total_pages,
"has_prev": page > 1
}
})
```
## File Operation Tools
```python
import os
from pathlib import Path
class FileReadInput(BaseModel):
file_path: str = Field(..., description="Path relative to workspace root")
encoding: str = Field(default="utf-8")
class SafeFileReadTool(BaseTool):
name: str = "read_file"
description: str = """
Read file contents from the workspace.
Only files within the workspace directory can be accessed.
"""
args_schema: Type[BaseModel] = FileReadInput
def __init__(self, workspace_root: str):
super().__init__()
self.workspace = Path(workspace_root).resolve()
def _run(self, file_path: str, encoding: str = "utf-8") -> str:
# Security: prevent path traversal
target = (self.workspace / file_path).resolve()
if not str(target).startswith(str(self.workspace)):
return "Error: Access denied. Path must be within workspace."
if not target.exists():
return f"File not found: {file_path}"
if not target.is_file():
return f"Not a file: {file_path}"
# Size limit
if target.stat().st_size > 1_000_000: # 1MB
return "File too large (>1MB). Use 'read_file_chunk' for large files."
try:
return target.read_text(encoding=encoding)
except UnicodeDecodeError:
return f"Cannot read file with encoding '{encoding}'. Try 'latin-1' or 'binary'."
```
## Tool Testing Patterns
```python
import pytest
from unittest.mock import Mock, patch
class TestCustomerSearchTool:
"""Test suite for CustomerSearchTool."""
@pytest.fixture
def mock_db(self):
db = Mock()
db.search_customers.return_value = [
Mock(id="C001", full_name="John Doe", email="john@example.com",
status="active", last_order_date=None)
]
return db
@pytest.fixture
def tool(self, mock_db):
return CustomerSearchTool(db_connection=mock_db)
def test_basic_search(self, tool):
result = tool._run(query="john")
data = json.loads(result)
assert data["count"] == 1
assert data["customers"][0]["name"] == "John Doe"
def test_no_results(self, tool, mock_db):
mock_db.search_customers.return_value = []
result = tool._run(query="nonexistent")
assert "No customers found" in result
def test_invalid_status(self, tool):
result = tool._run(query="john", status="invalid")
assert "Invalid status" in result
def test_db_error_handling(self, tool, mock_db):
mock_db.search_customers.side_effect = DatabaseError("Connection lost")
result = tool._run(query="john")
assert "Database unavailable" in result
```
## Structured Tool Output with Pydantic
```python
class AnalysisOutput(BaseModel):
"""Structured output for analysis tool."""
summary: str
confidence: float = Field(ge=0, le=1)
key_findings: list[str]
recommendations: list[str]
class AnalysisTool(BaseTool):
name: str = "analyze_data"
description: str = "Analyze data and return structured findings"
args_schema: Type[BaseModel] = AnalysisInput
def _run(self, data_source: str) -> str:
raw_analysis = self.analyzer.analyze(data_source)
# Structure the output
output = AnalysisOutput(
summary=raw_analysis.summary,
confidence=raw_analysis.confidence_score,
key_findings=raw_analysis.findings[:5],
recommendations=raw_analysis.recommendations[:3]
)
return output.model_dump_json()
```
## Tool Initialization in Crews
```python
# In your crew file
from crewai import Agent, Crew, Task
from crewai.project import CrewBase, agent, crew, task
from ..tools.customer_tools import CustomerSearchTool, CustomerDetailsTool
from ..tools.order_tools import OrderSearchTool
@CrewBase
class SupportCrew:
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
def __init__(self, db_connection):
super().__init__()
# Initialize tools with shared dependencies
self.customer_search = CustomerSearchTool(db_connection)
self.customer_details = CustomerDetailsTool(db_connection)
self.order_search = OrderSearchTool(db_connection)
@agent
def support_agent(self) -> Agent:
return Agent(
config=self.agents_config["support_agent"], # type: ignore[index]
tools=[
self.customer_search,
self.customer_details,
self.order_search
],
verbose=True,
)
@task
def resolve_inquiry(self) -> Task:
return Task(config=self.tasks_config["resolve_inquiry"]) # type: ignore[index]
@crew
def crew(self) -> Crew:
return Crew(agents=self.agents, tasks=self.tasks)
```
## Tool Caching
CrewAI caches tool results to avoid redundant calls. Use `cache_function` for fine-grained control.
### Cache Function Signature
```python
def cache_function(arguments: dict, result: Any) -> bool:
"""
Determines whether to cache a result.
Args:
arguments: Dict of arguments passed to _run()
result: The return value from _run()
Returns:
True to cache, False to skip caching
"""
return True
```
### Always Cache (Default Behavior)
```python
from typing import Callable, Any
class ExpensiveAPITool(BaseTool):
name: str = "expensive_api"
description: str = "Call expensive API with caching"
args_schema: Type[BaseModel] = APIInput
# Cache all results
cache_function: Callable[[dict, Any], bool] = lambda args, result: True
def _run(self, query: str) -> str:
return expensive_api.call(query)
```
### Conditional Caching - Only Cache Successes
```python
class DataFetchTool(BaseTool):
name: str = "fetch_data"
description: str = "Fetch data, only cache successful results"
args_schema: Type[BaseModel] = FetchInput
def _should_cache(self, args: dict, result: Any) -> bool:
"""Don't cache error responses."""
if isinstance(result, str):
return not result.startswith("Error:")
return True
cache_function: Callable[[dict, Any], bool] = _should_cache
def _run(self, data_id: str) -> str:
try:
data = api.fetch(data_id)
return json.dumps(data)
except APIError as e:
return f"Error: {e}" # Won't be cached, agent can retry
```
### Conditional Caching - Based on Result Value
```python
class CalculationTool(BaseTool):
name: str = "calculate"
description: str = "Calculate with selective caching"
args_schema: Type[BaseModel] = CalcInput
# Only cache positive results
cache_function: Callable[[dict, Any], bool] = lambda args, result: (
isinstance(result, (int, float)) and result > 0
)
def _run(self, a: int, b: int) -> int:
return a * b
```
### Conditional Caching - Based on Arguments
```python
class SearchTool(BaseTool):
name: str = "search"
description: str = "Search with argument-based caching"
args_schema: Type[BaseModel] = SearchInput
def _cache_strategy(self, args: dict, result: Any) -> bool:
"""Cache only broad searches, not user-specific ones."""
# Don't cache user-specific searches (may change frequently)
if args.get("user_id"):
return False
# Cache general searches
return True
cache_function: Callable[[dict, Any], bool] = _cache_strategy
def _run(self, query: str, user_id: str = None) -> str:
return db.search(query, user_id=user_id)
```
## Async Execution Patterns
**Important:** CrewAI tool `_run()` methods are synchronous. Async execution happens at the crew/task level.
### Tools Are Synchronous
```python
import requests
class FetchURLTool(BaseTool):
name: str = "fetch_url"
description: str = "Fetch content from URL"
args_schema: Type[BaseModel] = URLInput
def _run(self, url: str, timeout: int = 30) -> str:
"""Use synchronous HTTP client in _run()."""
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response.text
except requests.Timeout:
return f"Timeout after {timeout}s. Try increasing timeout."
except requests.RequestException as e:
return f"Request failed: {e}"
```
### Parallel External Calls Within a Tool
When a tool needs multiple independent external calls, use `ThreadPoolExecutor`:
```python
from concurrent.futures import ThreadPoolExecutor, as_completed
class MultiSourceSearchTool(BaseTool):
name: str = "multi_source_search"
description: str = "Search multiple sources in parallel"
args_schema: Type[BaseModel] = SearchInput
def __init__(self, sources: list):
super().__init__()
self.sources = sources
def _run(self, query: str) -> str:
with ThreadPoolExecutor(max_workers=len(self.sources)) as executor:
futures = {
executor.submit(src.search, query): src.name
for src in self.sources
}
results = {}
for future in as_completed(futures):
source_name = futures[future]
try:
results[source_name] = future.result(timeout=10)
except Exception as e:
results[source_name] = f"Error: {e}"
return json.dumps(results)
```
### Async at Crew Level
Async execution is configured when kicking off crews:
```python
from crewai.flow.flow import Flow, listen
class ResearchFlow(Flow[ResearchState]):
@listen(classify_topic)
async def run_research_crew(self):
crew = ResearchCrew().crew()
# Option 1: Native async (preferred for high concurrency)
result = await crew.akickoff(inputs={"topic": self.state.topic})
# Option 2: Thread-based async
result = await crew.kickoff_async(inputs={"topic": self.state.topic})
self.state.research = result.raw
return result
@listen(run_research_crew)
async def run_parallel_crews(self):
# Run multiple crews concurrently
crews = [
AnalysisCrew().crew(),
SummaryCrew().crew(),
ValidationCrew().crew()
]
results = await asyncio.gather(*[
crew.akickoff(inputs={"data": self.state.research})
for crew in crews
])
return results
```
### Async Task Configuration
Mark tasks for async execution within a crew:
```python
# config/tasks.yaml
research_task:
description: "Research the topic thoroughly"
expected_output: "Comprehensive research findings"
agent: researcher
async_execution: true # This task runs asynchronously
analysis_task:
description: "Analyze research findings"
expected_output: "Analysis report"
agent: analyst
context:
- research_task # Waits for research_task to complete
```

View File

@@ -1,419 +0,0 @@
---
name: software-architect
description: |
Guide for writing clean, maintainable code following SOLID principles.
Use this skill when: (1) Writing a function that does multiple things (validate, save, notify),
(2) Adding if/elif chains for new feature variations, (3) Creating classes with inheritance,
(4) A class/function requires dependencies it doesn't fully use, (5) Code is hard to test
because it creates its own dependencies, (6) Refactoring for better structure.
Apply these principles to ensure code is easy to understand, modify, and test.
---
# Clean Code with SOLID Principles
**Core Philosophy: Write code that is easy to understand, change, and test.**
SOLID is an acronym for five principles that help you write better code. Apply these every time you write functions or classes.
## Quick Checklist Before Writing Code
| Question to Ask | If Yes, You're Good | If No, Refactor |
|-----------------|---------------------|-----------------|
| Does this function/class do ONE thing? | ✓ | Split it up |
| Can I add features without changing existing code? | ✓ | Use abstractions |
| Can I replace this with a similar component? | ✓ | Fix the contract |
| Am I only using what I need? | ✓ | Create smaller interfaces |
| Do I depend on abstractions, not specifics? | ✓ | Inject dependencies |
---
## S - Single Responsibility Principle
**"A function or class should do one thing and do it well."**
### Why It Matters
- Easier to understand (one purpose = one mental model)
- Easier to test (test one thing at a time)
- Easier to change (change one thing without breaking others)
### Bad Example
```python
def process_user_registration(email: str, password: str) -> dict:
# Validates email
if "@" not in email:
raise ValueError("Invalid email")
# Validates password
if len(password) < 8:
raise ValueError("Password too short")
# Creates user in database
user_id = database.insert("users", {"email": email, "password": hash(password)})
# Sends welcome email
smtp.send(email, "Welcome!", "Thanks for joining!")
# Logs the registration
logger.info(f"New user registered: {email}")
return {"user_id": user_id, "email": email}
```
**Problem**: This function does 5 different things. If you need to change how emails are sent, you're touching the same code that handles validation and database operations.
### Good Example
```python
def validate_email(email: str) -> bool:
"""Check if email format is valid."""
return "@" in email and "." in email
def validate_password(password: str) -> bool:
"""Check if password meets requirements."""
return len(password) >= 8
def create_user(email: str, password: str) -> str:
"""Create user in database and return user ID."""
return database.insert("users", {"email": email, "password": hash(password)})
def send_welcome_email(email: str) -> None:
"""Send welcome email to new user."""
smtp.send(email, "Welcome!", "Thanks for joining!")
def register_user(email: str, password: str) -> dict:
"""Orchestrate the user registration process."""
if not validate_email(email):
raise ValueError("Invalid email")
if not validate_password(password):
raise ValueError("Password too short")
user_id = create_user(email, password)
send_welcome_email(email)
return {"user_id": user_id, "email": email}
```
**Benefits**:
- Each function is easy to understand
- You can test `validate_email` without a database
- You can change email sending without touching validation
---
## O - Open/Closed Principle
**"Code should be open for extension but closed for modification."**
### Why It Matters
- Add new features without changing existing code
- Reduces risk of breaking things that already work
- Makes your code more flexible
### Bad Example
```python
def calculate_discount(customer_type: str, amount: float) -> float:
"""Calculate discount based on customer type."""
if customer_type == "regular":
return amount * 0.05
elif customer_type == "premium":
return amount * 0.10
elif customer_type == "vip":
return amount * 0.20
else:
return 0.0
# Problem: To add a new customer type, you MUST modify this function
# What if you forget a case? What if this function is used everywhere?
```
### Good Example
```python
# Define discount strategies
DISCOUNT_RATES = {
"regular": 0.05,
"premium": 0.10,
"vip": 0.20,
}
def calculate_discount(customer_type: str, amount: float) -> float:
"""Calculate discount based on customer type."""
rate = DISCOUNT_RATES.get(customer_type, 0.0)
return amount * rate
# To add a new customer type, just add to the dictionary:
# DISCOUNT_RATES["enterprise"] = 0.25
# No need to modify the function!
```
---
## L - Liskov Substitution Principle
**"If you replace a parent with a child, things should still work."**
### Why It Matters
- Ensures your code is truly reusable
- Prevents unexpected bugs when using inheritance
- Makes your class hierarchies trustworthy
### Bad Example
```python
class Bird:
def fly(self) -> str:
return "Flying high!"
class Penguin(Bird):
def fly(self) -> str:
raise Exception("Penguins can't fly!") # BREAKS the contract!
def make_bird_fly(bird: Bird) -> str:
return bird.fly()
# This will crash unexpectedly:
penguin = Penguin()
make_bird_fly(penguin) # Exception: Penguins can't fly!
```
**Problem**: `Penguin` inherits from `Bird` but can't fulfill the `fly()` contract. Code expecting a `Bird` will break.
### Good Example
```python
class Bird:
def move(self) -> str:
return "Moving"
class FlyingBird(Bird):
def fly(self) -> str:
return "Flying high!"
class SwimmingBird(Bird):
def swim(self) -> str:
return "Swimming!"
class Eagle(FlyingBird):
def fly(self) -> str:
return "Soaring through the sky!"
class Penguin(SwimmingBird):
def swim(self) -> str:
return "Swimming gracefully!"
# Now each bird type can be used correctly:
def make_bird_fly(bird: FlyingBird) -> str:
return bird.fly()
def make_bird_swim(bird: SwimmingBird) -> str:
return bird.swim()
eagle = Eagle()
make_bird_fly(eagle) # Works!
penguin = Penguin()
make_bird_swim(penguin) # Works!
```
### Simple Rule
If your child class needs to throw an exception or return `None` for a method that the parent defines, you probably have the wrong inheritance structure.
---
## I - Interface Segregation Principle
**"Don't force code to depend on things it doesn't use."**
### Why It Matters
- Keeps your code focused and lean
- Reduces unnecessary dependencies
- Makes testing easier
### Bad Example
```python
class Worker:
def work(self) -> str:
pass
def eat(self) -> str:
pass
def sleep(self) -> str:
pass
class Robot(Worker):
def work(self) -> str:
return "Working..."
def eat(self) -> str:
raise Exception("Robots don't eat!") # Forced to implement this!
def sleep(self) -> str:
raise Exception("Robots don't sleep!") # Forced to implement this!
```
**Problem**: `Robot` is forced to implement `eat()` and `sleep()` even though it doesn't need them.
### Good Example
```python
class Workable:
def work(self) -> str:
pass
class Eatable:
def eat(self) -> str:
pass
class Sleepable:
def sleep(self) -> str:
pass
class Human(Workable, Eatable, Sleepable):
def work(self) -> str:
return "Working..."
def eat(self) -> str:
return "Eating lunch..."
def sleep(self) -> str:
return "Sleeping..."
class Robot(Workable): # Only implements what it needs!
def work(self) -> str:
return "Working 24/7..."
```
### Practical Application: Function Parameters
```python
# Bad: Function takes more than it needs
def send_notification(user: User) -> None:
# Only uses user.email, but requires entire User object
email_service.send(user.email, "Hello!")
# Good: Function takes only what it needs
def send_notification(email: str) -> None:
email_service.send(email, "Hello!")
# Now you can call it without having a full User object:
send_notification("user@example.com")
```
---
## D - Dependency Inversion Principle
**"Depend on abstractions, not concrete implementations."**
### Why It Matters
- Makes code flexible and swappable
- Makes testing much easier (use fakes/mocks)
- Reduces coupling between components
### Bad Example
```python
class EmailService:
def send(self, to: str, message: str) -> None:
# Sends email via SMTP
smtp_server.send(to, message)
class UserRegistration:
def __init__(self):
self.email_service = EmailService() # HARD-CODED dependency!
def register(self, email: str, password: str) -> None:
# Create user...
user_id = create_user(email, password)
# Send welcome email
self.email_service.send(email, "Welcome!")
# Problem: Can't test without actually sending emails!
# Problem: Can't switch to a different email provider easily
```
### Good Example
```python
from abc import ABC, abstractmethod
# 1. Define what you need (abstraction)
class NotificationService(ABC):
@abstractmethod
def send(self, to: str, message: str) -> None:
pass
# 2. Create implementations
class EmailNotificationService(NotificationService):
def send(self, to: str, message: str) -> None:
smtp_server.send(to, message)
class SMSNotificationService(NotificationService):
def send(self, to: str, message: str) -> None:
sms_gateway.send(to, message)
# 3. Depend on the abstraction, not the implementation
class UserRegistration:
def __init__(self, notification_service: NotificationService):
self.notification_service = notification_service # INJECTED!
def register(self, email: str, password: str) -> None:
user_id = create_user(email, password)
self.notification_service.send(email, "Welcome!")
# Usage - you choose which implementation to use:
email_service = EmailNotificationService()
registration = UserRegistration(email_service)
# For testing - use a fake:
class FakeNotificationService(NotificationService):
def __init__(self):
self.sent_messages = []
def send(self, to: str, message: str) -> None:
self.sent_messages.append((to, message))
# Test without sending real emails:
fake_service = FakeNotificationService()
registration = UserRegistration(fake_service)
registration.register("test@example.com", "password123")
assert len(fake_service.sent_messages) == 1
```
---
## Quick Function Guidelines
| Guideline | Recommendation |
|-----------|----------------|
| **Length** | Keep functions under 20 lines |
| **Arguments** | Prefer 0-3 arguments |
| **Naming** | Use verb + noun: `calculate_total`, `send_email` |
| **One thing** | Each function does exactly one thing |
| **No surprises** | Function does what its name says, nothing more |
---
## Summary: SOLID at a Glance
| Principle | In Simple Terms | Quick Test |
|-----------|----------------|------------|
| **S**ingle Responsibility | One function = one job | Can you describe it without saying "and"? |
| **O**pen/Closed | Add features, don't modify | Can you extend without editing? |
| **L**iskov Substitution | Children honor parent's promises | Does every child work where parent works? |
| **I**nterface Segregation | Don't force unused dependencies | Is everything you require actually used? |
| **D**ependency Inversion | Depend on abstractions | Can you swap implementations easily? |
---
## When to Apply
- **Always apply S**: Every function should do one thing
- **Apply O when**: You find yourself adding `if/elif` chains for new cases
- **Apply L when**: Using inheritance
- **Apply I when**: Your interfaces have methods some implementers don't need
- **Apply D when**: You want testable code or need flexibility

View File

@@ -1,333 +0,0 @@
---
name: streamlit
description: |
Guide for building Streamlit web applications in Python. Use this skill when:
- Creating interactive data apps, dashboards, or web UIs with Python
- Working with st.* functions for widgets, layouts, charts, or data display
- Implementing caching (@st.cache_data, @st.cache_resource) or session state
- Building multipage Streamlit apps with st.navigation
- Configuring Streamlit themes, secrets, or database connections
---
# Streamlit Development Guide
Streamlit turns Python scripts into interactive web apps. The framework reruns your entire script from top to bottom whenever users interact with widgets or source code changes.
## Core Data Flow
```python
import streamlit as st
# Script runs top-to-bottom on every interaction
x = st.slider('Value', 0, 100, 50) # Widget interaction triggers rerun
st.write(f"Result: {x * 2}")
```
**Key Principle**: Every widget interaction causes a full script rerun. Use caching and session state to preserve expensive computations and user data.
## Essential Patterns
### Display Data
```python
import streamlit as st
import pandas as pd
# Magic: standalone variables auto-render
df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
df # Automatically displayed
# Explicit display
st.write("Swiss Army knife for any data type")
st.dataframe(df.style.highlight_max(axis=0)) # Interactive
st.table(df) # Static
```
### Widgets with Keys
```python
import streamlit as st
# Access widget values via session_state
st.text_input("Name", key="user_name")
st.number_input("Age", key="user_age")
# Use values anywhere
if st.session_state.user_name:
st.write(f"Hello {st.session_state.user_name}!")
```
### Layout
```python
import streamlit as st
# Sidebar
with st.sidebar:
option = st.selectbox("Choose", ["A", "B", "C"])
threshold = st.slider("Threshold", 0, 100)
# Columns
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Metric 1", 42)
with col2:
st.metric("Metric 2", 84)
with col3:
st.metric("Metric 3", 126)
# Tabs
tab1, tab2 = st.tabs(["Chart", "Data"])
with tab1:
st.line_chart(data)
with tab2:
st.dataframe(data)
```
### Caching (Critical for Performance)
```python
import streamlit as st
# Cache data computations (returns copy each call)
@st.cache_data
def load_data(path: str) -> pd.DataFrame:
return pd.read_csv(path)
# Cache resources (returns same object, shared across sessions)
@st.cache_resource
def load_model(name: str):
return SomeLargeModel(name)
# Usage
df = load_data("data.csv") # Cached after first call
model = load_model("gpt") # Shared across all users
```
**Rule of Thumb**:
- `@st.cache_data` → DataFrames, dicts, lists, strings (serializable data)
- `@st.cache_resource` → ML models, database connections (global resources)
### Session State
```python
import streamlit as st
# Initialize state (only runs once per session)
if "counter" not in st.session_state:
st.session_state.counter = 0
st.session_state.history = []
# Modify state
if st.button("Increment"):
st.session_state.counter += 1
st.session_state.history.append(st.session_state.counter)
st.write(f"Count: {st.session_state.counter}")
```
### Database Connections
```python
import streamlit as st
# Automatic connection management with caching
conn = st.connection("my_database")
df = conn.query("SELECT * FROM users WHERE active = true")
st.dataframe(df)
```
Configure in `.streamlit/secrets.toml`:
```toml
[connections.my_database]
type = "sql"
dialect = "postgresql"
host = "localhost"
port = 5432
database = "mydb"
username = "user"
password = "pass"
```
### Multipage Apps
```python
# streamlit_app.py (entry point)
import streamlit as st
# Define pages
home = st.Page("pages/home.py", title="Home", icon="🏠")
dashboard = st.Page("pages/dashboard.py", title="Dashboard", icon="📊")
settings = st.Page("pages/settings.py", title="Settings", icon="⚙️")
# Create navigation
pg = st.navigation([home, dashboard, settings])
pg.run()
```
Project structure:
```
my_app/
├── streamlit_app.py # Entry point with navigation
├── pages/
│ ├── home.py
│ ├── dashboard.py
│ └── settings.py
└── .streamlit/
├── config.toml # Theme configuration
└── secrets.toml # Database credentials (gitignored)
```
## Charts and Visualization
```python
import streamlit as st
import pandas as pd
import numpy as np
# Built-in charts
st.line_chart(df)
st.bar_chart(df)
st.area_chart(df)
st.scatter_chart(df, x="col1", y="col2", color="category")
# Maps
st.map(df) # Requires 'lat' and 'lon' columns
# External libraries
import plotly.express as px
fig = px.scatter(df, x="x", y="y", color="category")
st.plotly_chart(fig)
import altair as alt
chart = alt.Chart(df).mark_circle().encode(x='x', y='y')
st.altair_chart(chart)
```
## Forms and Callbacks
```python
import streamlit as st
# Forms batch inputs (single rerun on submit)
with st.form("my_form"):
name = st.text_input("Name")
email = st.text_input("Email")
submitted = st.form_submit_button("Submit")
if submitted:
st.success(f"Registered {name} with {email}")
# Callbacks execute before script reruns
def on_change():
st.session_state.processed = process(st.session_state.input_value)
st.text_input("Input", key="input_value", on_change=on_change)
```
## Progress and Status
```python
import streamlit as st
import time
# Progress bar
progress = st.progress(0)
for i in range(100):
progress.progress(i + 1)
time.sleep(0.01)
# Status messages
st.success("Operation completed!")
st.error("Something went wrong")
st.warning("Check your input")
st.info("Processing...")
# Spinner
with st.spinner("Loading..."):
time.sleep(2)
st.success("Done!")
# Empty placeholder for dynamic updates
placeholder = st.empty()
placeholder.text("Waiting...")
# Later...
placeholder.text("Updated!")
```
## File Handling
```python
import streamlit as st
# File upload
uploaded = st.file_uploader("Choose a file", type=["csv", "xlsx"])
if uploaded:
df = pd.read_csv(uploaded)
st.dataframe(df)
# File download
csv = df.to_csv(index=False)
st.download_button(
label="Download CSV",
data=csv,
file_name="data.csv",
mime="text/csv"
)
```
## Theming
Configure in `.streamlit/config.toml`:
```toml
[theme]
primaryColor = "#FF4B4B"
backgroundColor = "#FFFFFF"
secondaryBackgroundColor = "#F0F2F6"
textColor = "#262730"
font = "sans serif"
```
## Static Files
For direct URL access to files:
```
my_app/
├── static/
│ └── logo.png # Accessible at /app/static/logo.png
└── streamlit_app.py
```
Enable in config:
```toml
[server]
enableStaticServing = true
```
## Running Apps
```bash
# Development
streamlit run app.py
# With arguments
streamlit run app.py -- --data-path ./data
# Configuration
streamlit run app.py --server.port 8080 --server.headless true
```
## Best Practices
1. **Minimize reruns**: Use `@st.cache_data` for expensive operations
2. **Preserve state**: Use `st.session_state` for user data that should persist
3. **Batch inputs**: Use `st.form` when multiple inputs should submit together
4. **Structure large apps**: Use multipage navigation with `st.Page` and `st.navigation`
5. **Secure secrets**: Store credentials in `.streamlit/secrets.toml`, never in code
6. **Responsive layout**: Use `st.columns` and `st.sidebar` for organized UIs
## Reference Files
- [references/widgets-catalog.md](references/widgets-catalog.md) - Complete widget reference: text inputs, selection, numeric, date/time, buttons, callbacks
- [references/layout-patterns.md](references/layout-patterns.md) - Advanced layouts: columns, containers, tabs, dialogs, dashboard examples
- [references/state-patterns.md](references/state-patterns.md) - State patterns: forms, multi-step wizards, authentication, cache vs session state

View File

@@ -1,222 +0,0 @@
# Streamlit Layout Patterns
Advanced layout techniques for building professional Streamlit apps.
## Contents
- [Column Layouts](#column-layouts)
- [Container Patterns](#container-patterns)
- [Expander](#expander)
- [Tabs](#tabs)
- [Sidebar](#sidebar)
- [Empty Placeholders](#empty-placeholders)
- [Popover](#popover)
- [Dialog (Modal)](#dialog-modal)
- [Page Configuration](#page-configuration)
- [Dashboard Layout Example](#dashboard-layout-example)
## Column Layouts
```python
import streamlit as st
# Equal columns
col1, col2, col3 = st.columns(3)
# Weighted columns
left, right = st.columns([2, 1]) # 2:1 ratio
# With gap control
cols = st.columns(3, gap="large") # small, medium, large
# Nested columns
outer1, outer2 = st.columns(2)
with outer1:
inner1, inner2 = st.columns(2)
with inner1:
st.metric("A", 100)
with inner2:
st.metric("B", 200)
```
## Container Patterns
```python
import streamlit as st
# Basic container
with st.container():
st.write("Grouped content")
# Container with border
with st.container(border=True):
st.write("Boxed content")
# Fixed height with scrolling
with st.container(height=300):
for i in range(50):
st.write(f"Line {i}")
```
## Expander
```python
import streamlit as st
# Basic expander
with st.expander("See details"):
st.write("Hidden content here")
# Start expanded
with st.expander("FAQ", expanded=True):
st.write("Answer to question")
```
## Tabs
```python
import streamlit as st
tab1, tab2, tab3 = st.tabs(["Data", "Chart", "Settings"])
with tab1:
st.dataframe(df)
with tab2:
st.line_chart(df)
with tab3:
st.slider("Option", 0, 100)
```
## Sidebar
```python
import streamlit as st
# Using 'with' syntax
with st.sidebar:
st.title("Controls")
option = st.selectbox("Filter", ["All", "Active", "Archived"])
# Using object notation
st.sidebar.title("Navigation")
page = st.sidebar.radio("Go to", ["Home", "Data", "About"])
```
## Empty Placeholders
```python
import streamlit as st
import time
# Dynamic content updates
placeholder = st.empty()
# Replace content
placeholder.text("Loading...")
time.sleep(1)
placeholder.text("Still loading...")
time.sleep(1)
placeholder.success("Done!")
# Clear content
placeholder.empty()
# Use as container
with placeholder.container():
st.write("Multiple elements")
st.write("In a placeholder")
```
## Popover
```python
import streamlit as st
with st.popover("Settings"):
st.checkbox("Enable notifications")
st.slider("Volume", 0, 100)
```
## Dialog (Modal)
```python
import streamlit as st
@st.dialog("Confirm Delete")
def confirm_delete(item_name):
st.write(f"Are you sure you want to delete {item_name}?")
col1, col2 = st.columns(2)
if col1.button("Cancel"):
st.rerun()
if col2.button("Delete", type="primary"):
# Perform deletion
st.session_state.deleted = item_name
st.rerun()
if st.button("Delete Item"):
confirm_delete("My Document")
```
## Page Configuration
```python
import streamlit as st
# Must be first Streamlit command
st.set_page_config(
page_title="My App",
page_icon="🎯",
layout="wide", # or "centered"
initial_sidebar_state="expanded", # or "collapsed", "auto"
menu_items={
'Get Help': 'https://example.com/help',
'Report a bug': "https://example.com/bug",
'About': "# My App\nBuilt with Streamlit"
}
)
```
## Dashboard Layout Example
```python
import streamlit as st
st.set_page_config(layout="wide")
# Header row
st.title("Dashboard")
# Metrics row
m1, m2, m3, m4 = st.columns(4)
m1.metric("Users", "1,234", "+12%")
m2.metric("Revenue", "$12.3K", "+8%")
m3.metric("Orders", "456", "-3%")
m4.metric("Rating", "4.8", "+0.2")
st.divider()
# Main content with sidebar
with st.sidebar:
date_range = st.date_input("Date Range", [])
category = st.multiselect("Category", ["A", "B", "C"])
# Two-column content
chart_col, data_col = st.columns([2, 1])
with chart_col:
st.subheader("Trend")
st.line_chart(trend_data)
with data_col:
st.subheader("Top Items")
st.dataframe(top_items, hide_index=True)
# Tabbed details
tab1, tab2 = st.tabs(["Details", "Settings"])
with tab1:
st.write("Detailed view")
with tab2:
st.write("Configuration options")
```

View File

@@ -1,240 +0,0 @@
# Streamlit State Management Patterns
Best practices for managing state in Streamlit applications.
## Contents
- [Session State Basics](#session-state-basics)
- [Widget-State Binding](#widget-state-binding)
- [Callback Pattern](#callback-pattern)
- [Form State Management](#form-state-management)
- [Multi-Step Wizard](#multi-step-wizard)
- [Data Loading with State](#data-loading-with-state)
- [Authentication State](#authentication-state)
- [Cache vs Session State](#cache-vs-session-state)
## Session State Basics
```python
import streamlit as st
# Check and initialize
if "initialized" not in st.session_state:
st.session_state.initialized = True
st.session_state.user = None
st.session_state.data = []
st.session_state.settings = {"theme": "light"}
# Access patterns
value = st.session_state.key # Attribute access
value = st.session_state["key"] # Dict access
value = st.session_state.get("key", default) # Safe access
# Update patterns
st.session_state.key = new_value
st.session_state["key"] = new_value
```
## Widget-State Binding
```python
import streamlit as st
# Widgets with keys auto-sync to session_state
st.text_input("Name", key="name")
st.number_input("Age", key="age")
# Values persist across reruns
st.write(f"Name: {st.session_state.name}, Age: {st.session_state.age}")
# Update widget value programmatically
if st.button("Clear"):
st.session_state.name = ""
st.session_state.age = 0
```
## Callback Pattern
```python
import streamlit as st
# Callbacks run BEFORE the main script
def update_total():
st.session_state.total = (
st.session_state.quantity * st.session_state.price
)
st.number_input("Quantity", key="quantity", on_change=update_total)
st.number_input("Price", key="price", on_change=update_total)
# Total is always up-to-date
if "total" in st.session_state:
st.metric("Total", f"${st.session_state.total:.2f}")
```
## Form State Management
```python
import streamlit as st
# Initialize form data
if "form_data" not in st.session_state:
st.session_state.form_data = {
"name": "",
"email": "",
"submitted": False
}
with st.form("registration"):
name = st.text_input("Name", value=st.session_state.form_data["name"])
email = st.text_input("Email", value=st.session_state.form_data["email"])
if st.form_submit_button("Submit"):
st.session_state.form_data = {
"name": name,
"email": email,
"submitted": True
}
if st.session_state.form_data["submitted"]:
st.success(f"Registered: {st.session_state.form_data['name']}")
```
## Multi-Step Wizard
```python
import streamlit as st
# Track wizard state
if "step" not in st.session_state:
st.session_state.step = 1
st.session_state.wizard_data = {}
def next_step():
st.session_state.step += 1
def prev_step():
st.session_state.step -= 1
# Step 1
if st.session_state.step == 1:
st.header("Step 1: Basic Info")
name = st.text_input("Name", key="wizard_name")
if st.button("Next", on_click=next_step):
st.session_state.wizard_data["name"] = name
# Step 2
elif st.session_state.step == 2:
st.header("Step 2: Details")
details = st.text_area("Details", key="wizard_details")
col1, col2 = st.columns(2)
col1.button("Back", on_click=prev_step)
if col2.button("Next", on_click=next_step):
st.session_state.wizard_data["details"] = details
# Step 3
elif st.session_state.step == 3:
st.header("Step 3: Confirm")
st.write(st.session_state.wizard_data)
st.button("Back", on_click=prev_step)
if st.button("Submit"):
# Process data
st.session_state.step = 1
st.session_state.wizard_data = {}
```
## Data Loading with State
```python
import streamlit as st
import pandas as pd
# Preserve loaded data across reruns
if "data" not in st.session_state:
st.session_state.data = None
st.session_state.data_loaded = False
uploaded = st.file_uploader("Upload CSV")
if uploaded and not st.session_state.data_loaded:
st.session_state.data = pd.read_csv(uploaded)
st.session_state.data_loaded = True
if st.session_state.data is not None:
# Data persists even after file uploader clears
st.dataframe(st.session_state.data)
# Filter without reloading
col = st.selectbox("Filter column", st.session_state.data.columns)
value = st.text_input("Filter value")
if value:
filtered = st.session_state.data[
st.session_state.data[col].astype(str).str.contains(value)
]
st.dataframe(filtered)
```
## Authentication State
```python
import streamlit as st
# Auth state
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
st.session_state.user = None
def login(username, password):
# Validate credentials
if username == "admin" and password == "secret":
st.session_state.authenticated = True
st.session_state.user = {"username": username, "role": "admin"}
return True
return False
def logout():
st.session_state.authenticated = False
st.session_state.user = None
# Login form
if not st.session_state.authenticated:
st.title("Login")
with st.form("login"):
username = st.text_input("Username")
password = st.text_input("Password", type="password")
if st.form_submit_button("Login"):
if not login(username, password):
st.error("Invalid credentials")
else:
# Protected content
st.title(f"Welcome, {st.session_state.user['username']}")
st.button("Logout", on_click=logout)
```
## Cache vs Session State
```python
import streamlit as st
# CACHE: Shared across all users, tied to function inputs
@st.cache_data
def load_global_config():
"""Same result for everyone"""
return load_from_database()
# SESSION STATE: Per-user, per-session
if "user_preferences" not in st.session_state:
st.session_state.user_preferences = {}
# Use cache for:
# - Expensive computations
# - Data that doesn't change per user
# - API calls with same parameters
# Use session state for:
# - User-specific data
# - Form inputs
# - Navigation state
# - Shopping carts, selections
```

View File

@@ -1,193 +0,0 @@
# Streamlit Widgets Catalog
Complete reference for all Streamlit input widgets and their usage patterns.
## Contents
- [Text Input Widgets](#text-input-widgets)
- [Selection Widgets](#selection-widgets)
- [Numeric Widgets](#numeric-widgets)
- [Date and Time](#date-and-time)
- [Media and Files](#media-and-files)
- [Buttons and Actions](#buttons-and-actions)
- [Widget Keys and Session State](#widget-keys-and-session-state)
- [Widget Callbacks](#widget-callbacks)
- [Disabled and Label Visibility](#disabled-and-label-visibility)
- [Help Text](#help-text)
## Text Input Widgets
```python
import streamlit as st
# Single-line text
name = st.text_input("Name", value="", placeholder="Enter name...")
# Multi-line text
bio = st.text_area("Bio", height=150)
# Number input
age = st.number_input("Age", min_value=0, max_value=120, value=25, step=1)
# Password (masked)
password = st.text_input("Password", type="password")
```
## Selection Widgets
```python
import streamlit as st
# Dropdown
option = st.selectbox("Choose one", ["A", "B", "C"], index=0)
# Multi-select
options = st.multiselect("Choose many", ["A", "B", "C"], default=["A"])
# Radio buttons
choice = st.radio("Pick one", ["Option 1", "Option 2"], horizontal=True)
# Checkbox
agree = st.checkbox("I agree", value=False)
# Toggle
enabled = st.toggle("Enable feature")
```
## Numeric Widgets
```python
import streamlit as st
# Slider (single value)
value = st.slider("Value", min_value=0, max_value=100, value=50)
# Range slider
low, high = st.slider("Range", 0, 100, (25, 75))
# Select slider (discrete values)
size = st.select_slider("Size", options=["S", "M", "L", "XL"])
```
## Date and Time
```python
import streamlit as st
from datetime import date, time, datetime
# Date picker
d = st.date_input("Date", value=date.today())
# Date range
start, end = st.date_input("Date range", value=(date(2024, 1, 1), date.today()))
# Time picker
t = st.time_input("Time", value=time(12, 0))
```
## Media and Files
```python
import streamlit as st
# File uploader
file = st.file_uploader("Upload", type=["csv", "xlsx", "pdf"])
files = st.file_uploader("Upload many", accept_multiple_files=True)
# Camera input
photo = st.camera_input("Take a photo")
# Color picker
color = st.color_picker("Pick color", "#FF0000")
```
## Buttons and Actions
```python
import streamlit as st
# Standard button
if st.button("Click me", type="primary"):
st.write("Clicked!")
# Download button
st.download_button(
label="Download",
data=csv_data,
file_name="data.csv",
mime="text/csv"
)
# Link button
st.link_button("Go to docs", "https://docs.streamlit.io")
# Form submit button (only inside forms)
with st.form("form"):
st.text_input("Name")
st.form_submit_button("Submit")
```
## Widget Keys and Session State
Every widget can have a `key` parameter linking it to session state:
```python
import streamlit as st
# Widget with key
st.text_input("Name", key="user_name")
# Access via session_state
if st.session_state.user_name:
st.write(f"Hello, {st.session_state.user_name}")
# Programmatically set widget value
if st.button("Reset"):
st.session_state.user_name = ""
```
## Widget Callbacks
Execute code when widgets change:
```python
import streamlit as st
def on_name_change():
# Runs BEFORE the rest of the script
st.session_state.greeting = f"Hello, {st.session_state.name}!"
st.text_input("Name", key="name", on_change=on_name_change)
# For buttons
def on_click():
st.session_state.counter += 1
st.button("Increment", on_click=on_click)
```
## Disabled and Label Visibility
```python
import streamlit as st
# Disable widget
st.text_input("Locked", disabled=True)
# Hide label (for custom layouts)
st.text_input("Hidden label", label_visibility="hidden")
# Collapse label
st.text_input("Collapsed", label_visibility="collapsed")
```
## Help Text
```python
import streamlit as st
st.text_input(
"API Key",
help="Find your API key in the settings page"
)
```