mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-13 02:48:30 +00:00
* fix: clean up whitespace and update dependencies * Removed unnecessary whitespace in multiple files for consistency. * Updated `crewai-tools` dependency version to `0.49.0` in `pyproject.toml` and related template files. * Bumped CrewAI version to `0.140.0` in `__init__.py` for alignment with updated dependencies. * chore: update pyproject.toml to exclude documentation from build targets * Added exclusions for the `docs` directory in both wheel and sdist build targets to streamline the build process and reduce unnecessary file inclusion. * chore: update uv.lock for dependency resolution and Python version compatibility * Incremented revision to 2. * Updated resolution markers to include support for Python 3.13 and adjusted platform checks for better compatibility. * Added new wheel URLs for zstandard version 0.23.0 to ensure availability across various platforms. * chore: pin json-repair dependency version in pyproject.toml and uv.lock * Updated json-repair dependency from a range to a specific version (0.25.2) for consistency and to avoid potential compatibility issues. * Adjusted related entries in uv.lock to reflect the pinned version, ensuring alignment across project files. * chore: pin agentops dependency version in pyproject.toml and uv.lock * Updated agentops dependency from a range to a specific version (0.3.18) for consistency and to avoid potential compatibility issues. * Adjusted related entries in uv.lock to reflect the pinned version, ensuring alignment across project files. * test: enhance cache call assertions in crew tests * Improved the test for cache hitting between agents by filtering mock calls to ensure they include the expected 'tool' and 'input' keywords. * Added assertions to verify the number of cache calls and their expected arguments, enhancing the reliability of the test. * Cleaned up whitespace and improved readability in various test cases for better maintainability.
1430 lines
51 KiB
Plaintext
1430 lines
51 KiB
Plaintext
# CrewAI Development Rules
|
|
# Comprehensive best practices for developing with the CrewAI library, covering code organization, performance, security, testing, and common patterns. Based on actual CrewAI codebase analysis for accuracy.
|
|
|
|
## General Best Practices:
|
|
- Leverage structured responses from LLM calls using Pydantic BaseModel for output validation.
|
|
- Use the @CrewBase decorator pattern with @agent, @task, and @crew decorators for proper organization.
|
|
- Regularly validate outputs from agents and tasks using built-in guardrails or custom validation.
|
|
- Use UV for dependency management (CrewAI's standard) with pyproject.toml configuration.
|
|
- Python version requirements: 3.10 to 3.14 (as per CrewAI's pyproject.toml).
|
|
- Prefer declarative YAML configuration for agents and tasks over hardcoded definitions.
|
|
|
|
## Code Organization and Structure:
|
|
- **Standard CrewAI Project Structure** (from CLI templates):
|
|
- `project_name/` (Root directory)
|
|
- `.env` (Environment variables - never commit API keys)
|
|
- `pyproject.toml` (UV-based dependency management)
|
|
- `knowledge/` (Knowledge base files)
|
|
- `src/project_name/`
|
|
- `__init__.py`
|
|
- `main.py` (Entry point)
|
|
- `crew.py` (Crew orchestration with @CrewBase decorator)
|
|
- `config/`
|
|
- `agents.yaml` (Agent definitions)
|
|
- `tasks.yaml` (Task definitions)
|
|
- `tools/`
|
|
- `custom_tool.py` (Custom agent tools)
|
|
- `__init__.py`
|
|
- **File Naming Conventions**:
|
|
- Use descriptive, lowercase names with underscores (e.g., `research_agent.py`).
|
|
- Pydantic models: singular names (e.g., `article_summary.py` with class `ArticleSummary`).
|
|
- Tests: mirror source file name with `_test` suffix (e.g., `crew_test.py`).
|
|
- **CrewAI Class Architecture**:
|
|
- Use @CrewBase decorator for main crew class.
|
|
- Define agents with @agent decorator returning Agent instances.
|
|
- Define tasks with @task decorator returning Task instances.
|
|
- Define crew orchestration with @crew decorator returning Crew instance.
|
|
- Access configuration via `self.agents_config` and `self.tasks_config`.
|
|
|
|
## Memory System Patterns:
|
|
- **Memory Types** (all supported by CrewAI):
|
|
- Short-term memory: ChromaDB with RAG for current context
|
|
- Long-term memory: SQLite for task results across sessions
|
|
- Entity memory: RAG to track entities (people, places, concepts)
|
|
- External memory: Mem0 integration for advanced memory features
|
|
- **Memory Configuration**:
|
|
- Enable basic memory: `Crew(..., memory=True)`
|
|
- Custom storage location: Set `CREWAI_STORAGE_DIR` environment variable
|
|
- Memory is stored in platform-specific directories via `appdirs` by default
|
|
- **Memory Usage**:
|
|
- Memory is automatically managed by agents during task execution
|
|
- Access via agent's memory attribute for custom implementations
|
|
- Use metadata for categorizing and filtering memory entries
|
|
|
|
## Pydantic Integration Patterns:
|
|
- **Structured Outputs**:
|
|
- Use `output_pydantic` in Task definitions for structured results
|
|
- Use `output_json` for JSON dictionary outputs
|
|
- Cannot use both output_pydantic and output_json simultaneously
|
|
- **Task Output Handling**:
|
|
- TaskOutput contains raw, pydantic, and json_dict attributes
|
|
- CrewOutput aggregates all task outputs with token usage metrics
|
|
- Use model_validate_json for Pydantic model validation
|
|
- **Custom Models**:
|
|
- Inherit from BaseModel for all data structures
|
|
- Use Field descriptions for LLM understanding
|
|
- Implement model_validator for custom validation logic
|
|
|
|
## YAML Configuration Best Practices:
|
|
- **agents.yaml Structure**:
|
|
```yaml
|
|
agent_name:
|
|
role: "Clear, specific role description"
|
|
goal: "Specific goal statement"
|
|
backstory: "Detailed background for context"
|
|
# Optional: tools, llm, memory, etc.
|
|
```
|
|
- **tasks.yaml Structure**:
|
|
```yaml
|
|
task_name:
|
|
description: "Detailed task description with context"
|
|
expected_output: "Clear output format specification"
|
|
agent: agent_name # Reference to agent in agents.yaml
|
|
# Optional: context, tools, output_file, etc.
|
|
```
|
|
- **Configuration Access**:
|
|
- Use `self.agents_config['agent_name']` in @agent methods
|
|
- Use `self.tasks_config['task_name']` in @task methods
|
|
- Support for dynamic configuration via placeholders like {topic}
|
|
|
|
## Tools and Integration Patterns:
|
|
- **Custom Tools**:
|
|
- Inherit from BaseTool for custom tool implementation
|
|
- Use @tool decorator for simple tool definitions
|
|
- Implement proper error handling and input validation
|
|
- **Tool Integration**:
|
|
- Add tools to agents via tools parameter in Agent constructor
|
|
- Tools are automatically inherited by tasks from their assigned agents
|
|
- Use structured tool outputs for better LLM understanding
|
|
|
|
## Performance Considerations:
|
|
- **LLM Optimization**:
|
|
- Use task context to pass information between sequential tasks
|
|
- Implement output caching to avoid redundant LLM calls
|
|
- Configure appropriate LLM models per agent for cost/performance balance
|
|
- **Memory Management**:
|
|
- Be mindful of memory storage growth in long-running applications
|
|
- Use score_threshold in memory search to filter relevant results
|
|
- Implement periodic memory cleanup if needed
|
|
- **Async Operations**:
|
|
- Use execute_sync for synchronous task execution
|
|
- Consider async patterns for I/O-bound operations in custom tools
|
|
|
|
## Security Best Practices:
|
|
- **API Key Management**:
|
|
- Always use .env files for API keys and sensitive configuration
|
|
- Never commit API keys to version control
|
|
- Use environment variables in production deployments
|
|
- **Input Validation**:
|
|
- Validate all inputs using Pydantic models where possible
|
|
- Implement guardrails for task output validation
|
|
- Use field_validator for custom validation logic
|
|
- **Tool Security**:
|
|
- Implement proper access controls in custom tools
|
|
- Validate tool inputs and outputs
|
|
- Follow principle of least privilege for tool permissions
|
|
|
|
## Testing Approaches:
|
|
- **Unit Testing**:
|
|
- Test individual agents, tasks, and tools in isolation
|
|
- Use mocking for external dependencies (LLMs, APIs)
|
|
- Test configuration loading and validation
|
|
- **Integration Testing**:
|
|
- Test crew execution end-to-end with realistic scenarios
|
|
- Verify memory persistence across crew runs
|
|
- Test tool integration and data flow between tasks
|
|
- **Test Organization**:
|
|
- Follow CrewAI's test structure: separate test files for each component
|
|
- Use pytest fixtures for common test setup
|
|
- Mock LLM responses for consistent, fast tests
|
|
|
|
## Common CrewAI Patterns and Anti-patterns:
|
|
- **Recommended Patterns**:
|
|
- Use sequential Process for dependent tasks, hierarchical for manager delegation
|
|
- Implement task context for data flow between tasks
|
|
- Use output_file for persistent task results
|
|
- Leverage crew callbacks with @before_kickoff and @after_kickoff decorators
|
|
- **Anti-patterns to Avoid**:
|
|
- Don't hardcode agent configurations in Python code (use YAML)
|
|
- Don't create circular task dependencies
|
|
- Don't ignore task execution failures without proper error handling
|
|
- Don't overload single agents with too many diverse tools
|
|
- **Error Handling**:
|
|
- Implement task-level guardrails for output validation
|
|
- Use try-catch blocks in custom tools
|
|
- Set appropriate max_retries for tasks prone to failures
|
|
- Log errors with sufficient context for debugging
|
|
|
|
## Development Workflow:
|
|
- **UV Commands**:
|
|
- `crewai create crew <name>` - Create new crew project
|
|
- `crewai install` - Install dependencies via UV
|
|
- `crewai run` - Execute the crew
|
|
- `uv sync` - Sync dependencies
|
|
- `uv add <package>` - Add new dependencies
|
|
- **Project Setup**:
|
|
- Use CrewAI CLI for project scaffolding
|
|
- Follow the standard directory structure
|
|
- Configure agents and tasks in YAML before implementing crew logic
|
|
- **Development Tools**:
|
|
- Use UV for dependency management (CrewAI standard)
|
|
- Configure pre-commit hooks for code quality
|
|
- Use pytest for testing with CrewAI's testing patterns
|
|
|
|
## Deployment and Production:
|
|
- **Environment Configuration**:
|
|
- Set CREWAI_STORAGE_DIR for controlled memory storage location
|
|
- Use proper logging configuration for production monitoring
|
|
- Configure appropriate LLM providers and rate limits
|
|
- **Containerization**:
|
|
- Include knowledge and config directories in Docker images
|
|
- Mount memory storage as persistent volumes if needed
|
|
- Set proper environment variables for API keys and configuration
|
|
- **Monitoring**:
|
|
- Monitor token usage via CrewOutput.token_usage
|
|
- Track task execution times and success rates
|
|
- Implement health checks for long-running crew services
|
|
|
|
## CrewAI Flow Patterns and Best Practices
|
|
|
|
### Flow Architecture and Structure
|
|
- **Use Flow class** for complex multi-step workflows that go beyond simple crew orchestration
|
|
- **Combine Flows with Crews** to create sophisticated AI automation pipelines
|
|
- **Leverage state management** to share data between flow methods
|
|
- **Event-driven design** allows for dynamic and responsive workflow execution
|
|
|
|
### Flow Decorators and Control Flow
|
|
- **@start()**: Mark entry points for flow execution (can have multiple start methods)
|
|
- **@listen()**: Create method dependencies and execution chains
|
|
- **@router()**: Implement conditional branching based on method outputs
|
|
- **or_()** and **and_()**: Combine multiple trigger conditions for complex workflows
|
|
|
|
### Flow State Management Patterns
|
|
```python
|
|
# Structured state with Pydantic (recommended for complex workflows)
|
|
class WorkflowState(BaseModel):
|
|
task_results: List[str] = []
|
|
current_step: str = "initialize"
|
|
user_preferences: dict = {}
|
|
completion_status: bool = False
|
|
|
|
class MyFlow(Flow[WorkflowState]):
|
|
@start()
|
|
def initialize(self):
|
|
self.state.current_step = "processing"
|
|
# State automatically gets unique UUID in self.state.id
|
|
|
|
# Unstructured state (good for simple workflows)
|
|
class SimpleFlow(Flow):
|
|
@start()
|
|
def begin(self):
|
|
self.state["counter"] = 0
|
|
self.state["results"] = []
|
|
# Auto-generated ID available in self.state["id"]
|
|
```
|
|
|
|
### Flow Method Patterns
|
|
```python
|
|
# Basic sequential flow
|
|
@start()
|
|
def step_one(self):
|
|
return "data from step one"
|
|
|
|
@listen(step_one)
|
|
def step_two(self, data_from_step_one):
|
|
return f"processed: {data_from_step_one}"
|
|
|
|
# Parallel execution with convergence
|
|
@start()
|
|
def task_a(self):
|
|
return "result_a"
|
|
|
|
@start()
|
|
def task_b(self):
|
|
return "result_b"
|
|
|
|
@listen(and_(task_a, task_b))
|
|
def combine_results(self):
|
|
# Waits for both task_a AND task_b to complete
|
|
return f"combined: {self.state}"
|
|
|
|
# Conditional routing
|
|
@router(step_one)
|
|
def decision_point(self):
|
|
if some_condition:
|
|
return "success_path"
|
|
return "failure_path"
|
|
|
|
@listen("success_path")
|
|
def handle_success(self):
|
|
# Handle success case
|
|
pass
|
|
|
|
@listen("failure_path")
|
|
def handle_failure(self):
|
|
# Handle failure case
|
|
pass
|
|
|
|
# OR condition listening
|
|
@listen(or_(task_a, task_b))
|
|
def process_any_result(self, result):
|
|
# Triggers when EITHER task_a OR task_b completes
|
|
return f"got result: {result}"
|
|
```
|
|
|
|
### Flow Persistence Patterns
|
|
```python
|
|
# Class-level persistence (all methods persisted)
|
|
@persist(verbose=True)
|
|
class PersistentFlow(Flow[MyState]):
|
|
@start()
|
|
def initialize(self):
|
|
self.state.counter += 1
|
|
|
|
# Method-level persistence (selective)
|
|
class SelectiveFlow(Flow):
|
|
@persist
|
|
@start()
|
|
def critical_step(self):
|
|
# Only this method's state is persisted
|
|
self.state["important_data"] = "value"
|
|
|
|
@start()
|
|
def temporary_step(self):
|
|
# This method's state is not persisted
|
|
pass
|
|
```
|
|
|
|
### Flow Execution Patterns
|
|
```python
|
|
# Synchronous execution
|
|
flow = MyFlow()
|
|
result = flow.kickoff()
|
|
final_state = flow.state
|
|
|
|
# Asynchronous execution
|
|
async def run_async_flow():
|
|
flow = MyFlow()
|
|
result = await flow.kickoff_async()
|
|
return result
|
|
|
|
# Flow with input parameters
|
|
flow = MyFlow()
|
|
result = flow.kickoff(inputs={"user_id": "123", "task": "research"})
|
|
|
|
# Flow plotting and visualization
|
|
flow.plot("workflow_diagram") # Generates HTML visualization
|
|
```
|
|
|
|
### Advanced Flow Patterns
|
|
```python
|
|
# Cyclic/Loop patterns
|
|
class CyclicFlow(Flow):
|
|
max_iterations = 5
|
|
current_iteration = 0
|
|
|
|
@start("loop")
|
|
def process_iteration(self):
|
|
if self.current_iteration >= self.max_iterations:
|
|
return
|
|
# Process current iteration
|
|
self.current_iteration += 1
|
|
|
|
@router(process_iteration)
|
|
def check_continue(self):
|
|
if self.current_iteration < self.max_iterations:
|
|
return "loop" # Continue cycling
|
|
return "complete"
|
|
|
|
@listen("complete")
|
|
def finalize(self):
|
|
# Final processing
|
|
pass
|
|
|
|
# Complex multi-router pattern
|
|
@router(analyze_data)
|
|
def primary_router(self):
|
|
# Returns multiple possible paths based on analysis
|
|
if self.state.confidence > 0.8:
|
|
return "high_confidence"
|
|
elif self.state.errors_found:
|
|
return "error_handling"
|
|
return "manual_review"
|
|
|
|
@router("high_confidence")
|
|
def secondary_router(self):
|
|
# Further routing based on high confidence results
|
|
return "automated_processing"
|
|
|
|
# Exception handling in flows
|
|
@start()
|
|
def risky_operation(self):
|
|
try:
|
|
# Some operation that might fail
|
|
result = dangerous_function()
|
|
self.state["success"] = True
|
|
return result
|
|
except Exception as e:
|
|
self.state["error"] = str(e)
|
|
self.state["success"] = False
|
|
return None
|
|
|
|
@listen(risky_operation)
|
|
def handle_result(self, result):
|
|
if self.state.get("success", False):
|
|
# Handle success case
|
|
pass
|
|
else:
|
|
# Handle error case
|
|
error = self.state.get("error")
|
|
# Implement error recovery logic
|
|
```
|
|
|
|
### Flow Integration with Crews
|
|
```python
|
|
# Combining Flows with Crews for complex workflows
|
|
class CrewOrchestrationFlow(Flow[WorkflowState]):
|
|
@start()
|
|
def research_phase(self):
|
|
research_crew = ResearchCrew()
|
|
result = research_crew.crew().kickoff(inputs={"topic": self.state.research_topic})
|
|
self.state.research_results = result.raw
|
|
return result
|
|
|
|
@listen(research_phase)
|
|
def analysis_phase(self, research_results):
|
|
analysis_crew = AnalysisCrew()
|
|
result = analysis_crew.crew().kickoff(inputs={
|
|
"data": self.state.research_results,
|
|
"focus": self.state.analysis_focus
|
|
})
|
|
self.state.analysis_results = result.raw
|
|
return result
|
|
|
|
@router(analysis_phase)
|
|
def decide_next_action(self):
|
|
if self.state.analysis_results.confidence > 0.7:
|
|
return "generate_report"
|
|
return "additional_research"
|
|
|
|
@listen("generate_report")
|
|
def final_report(self):
|
|
reporting_crew = ReportingCrew()
|
|
return reporting_crew.crew().kickoff(inputs={
|
|
"research": self.state.research_results,
|
|
"analysis": self.state.analysis_results
|
|
})
|
|
```
|
|
|
|
### Flow Best Practices
|
|
- **State Management**: Use structured state (Pydantic) for complex workflows, unstructured for simple ones
|
|
- **Method Design**: Keep flow methods focused and single-purpose
|
|
- **Error Handling**: Implement proper exception handling and error recovery paths
|
|
- **State Persistence**: Use @persist for critical workflows that need recovery capability
|
|
- **Flow Visualization**: Use flow.plot() to understand and debug complex workflow structures
|
|
- **Async Support**: Leverage async methods for I/O-bound operations within flows
|
|
- **Resource Management**: Be mindful of state size and memory usage in long-running flows
|
|
- **Testing Flows**: Test individual methods and overall flow execution patterns
|
|
- **Event Monitoring**: Use CrewAI event system to monitor flow execution and performance
|
|
|
|
### Flow Anti-patterns to Avoid
|
|
- **Don't create overly complex flows** with too many branches and conditions
|
|
- **Don't store large objects** in state that could cause memory issues
|
|
- **Don't ignore error handling** in flow methods
|
|
- **Don't create circular dependencies** between flow methods
|
|
- **Don't mix synchronous and asynchronous** patterns inconsistently
|
|
- **Don't overuse routers** when simple linear flow would suffice
|
|
- **Don't forget to handle edge cases** in router logic
|
|
|
|
## CrewAI Version Compatibility:
|
|
- Stay updated with CrewAI releases for new features and bug fixes
|
|
- Test crew functionality when upgrading CrewAI versions
|
|
- Use version constraints in pyproject.toml (e.g., "crewai[tools]>=0.140.0,<1.0.0")
|
|
- Monitor deprecation warnings for future compatibility
|
|
|
|
## Code Examples and Implementation Patterns
|
|
|
|
### Complete Crew Implementation Example:
|
|
```python
|
|
from crewai import Agent, Crew, Process, Task
|
|
from crewai.project import CrewBase, agent, crew, task, before_kickoff, after_kickoff
|
|
from crewai_tools import SerperDevTool, FileReadTool
|
|
from crewai.agents.agent_builder.base_agent import BaseAgent
|
|
from typing import List
|
|
from pydantic import BaseModel, Field
|
|
|
|
class ResearchOutput(BaseModel):
|
|
title: str = Field(description="Research topic title")
|
|
summary: str = Field(description="Executive summary")
|
|
key_findings: List[str] = Field(description="Key research findings")
|
|
recommendations: List[str] = Field(description="Actionable recommendations")
|
|
sources: List[str] = Field(description="Source URLs and references")
|
|
confidence_score: float = Field(description="Confidence in findings (0-1)")
|
|
|
|
@CrewBase
|
|
class ResearchCrew():
|
|
"""Advanced research crew with structured outputs and validation"""
|
|
|
|
agents: List[BaseAgent]
|
|
tasks: List[Task]
|
|
|
|
@before_kickoff
|
|
def setup_environment(self):
|
|
"""Initialize environment before crew execution"""
|
|
print("🚀 Setting up research environment...")
|
|
# Validate API keys, create directories, etc.
|
|
|
|
@after_kickoff
|
|
def cleanup_and_report(self, output):
|
|
"""Handle post-execution tasks"""
|
|
print(f"✅ Research completed. Generated {len(output.tasks_output)} task outputs")
|
|
print(f"📊 Token usage: {output.token_usage}")
|
|
|
|
@agent
|
|
def researcher(self) -> Agent:
|
|
return Agent(
|
|
config=self.agents_config['researcher'],
|
|
tools=[SerperDevTool()],
|
|
verbose=True,
|
|
memory=True,
|
|
max_iter=15,
|
|
max_execution_time=1800
|
|
)
|
|
|
|
@agent
|
|
def analyst(self) -> Agent:
|
|
return Agent(
|
|
config=self.agents_config['analyst'],
|
|
tools=[FileReadTool()],
|
|
verbose=True,
|
|
memory=True
|
|
)
|
|
|
|
@task
|
|
def research_task(self) -> Task:
|
|
return Task(
|
|
config=self.tasks_config['research_task'],
|
|
agent=self.researcher(),
|
|
output_pydantic=ResearchOutput
|
|
)
|
|
|
|
@task
|
|
def validation_task(self) -> Task:
|
|
return Task(
|
|
config=self.tasks_config['validation_task'],
|
|
agent=self.analyst(),
|
|
context=[self.research_task()],
|
|
guardrail=self.validate_research_quality,
|
|
max_retries=3
|
|
)
|
|
|
|
def validate_research_quality(self, output) -> tuple[bool, str]:
|
|
"""Custom guardrail to ensure research quality"""
|
|
content = output.raw
|
|
if len(content) < 500:
|
|
return False, "Research output too brief. Need more detailed analysis."
|
|
if not any(keyword in content.lower() for keyword in ['conclusion', 'finding', 'result']):
|
|
return False, "Missing key analytical elements."
|
|
return True, content
|
|
|
|
@crew
|
|
def crew(self) -> Crew:
|
|
return Crew(
|
|
agents=self.agents,
|
|
tasks=self.tasks,
|
|
process=Process.sequential,
|
|
memory=True,
|
|
verbose=True,
|
|
max_rpm=100
|
|
)
|
|
```
|
|
|
|
### Custom Tool Implementation with Error Handling:
|
|
```python
|
|
from crewai.tools import BaseTool
|
|
from typing import Type, Optional, Any
|
|
from pydantic import BaseModel, Field
|
|
import requests
|
|
import time
|
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
|
|
|
class SearchInput(BaseModel):
|
|
query: str = Field(description="Search query")
|
|
max_results: int = Field(default=10, description="Maximum results to return")
|
|
timeout: int = Field(default=30, description="Request timeout in seconds")
|
|
|
|
class RobustSearchTool(BaseTool):
|
|
name: str = "robust_search"
|
|
description: str = "Perform web search with retry logic and error handling"
|
|
args_schema: Type[BaseModel] = SearchInput
|
|
|
|
def __init__(self, api_key: Optional[str] = None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.api_key = api_key or os.getenv("SEARCH_API_KEY")
|
|
self.rate_limit_delay = 1.0
|
|
self.last_request_time = 0
|
|
|
|
@retry(
|
|
stop=stop_after_attempt(3),
|
|
wait=wait_exponential(multiplier=1, min=4, max=10)
|
|
)
|
|
def _run(self, query: str, max_results: int = 10, timeout: int = 30) -> str:
|
|
"""Execute search with retry logic"""
|
|
try:
|
|
# Rate limiting
|
|
time_since_last = time.time() - self.last_request_time
|
|
if time_since_last < self.rate_limit_delay:
|
|
time.sleep(self.rate_limit_delay - time_since_last)
|
|
|
|
# Input validation
|
|
if not query or len(query.strip()) == 0:
|
|
return "Error: Empty search query provided"
|
|
|
|
if len(query) > 500:
|
|
return "Error: Search query too long (max 500 characters)"
|
|
|
|
# Perform search
|
|
results = self._perform_search(query, max_results, timeout)
|
|
self.last_request_time = time.time()
|
|
|
|
return self._format_results(results)
|
|
|
|
except requests.exceptions.Timeout:
|
|
return f"Search timed out after {timeout} seconds"
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Search failed due to network error: {str(e)}"
|
|
except Exception as e:
|
|
return f"Unexpected error during search: {str(e)}"
|
|
|
|
def _perform_search(self, query: str, max_results: int, timeout: int) -> List[dict]:
|
|
"""Implement actual search logic here"""
|
|
# Your search API implementation
|
|
pass
|
|
|
|
def _format_results(self, results: List[dict]) -> str:
|
|
"""Format search results for LLM consumption"""
|
|
if not results:
|
|
return "No results found for the given query."
|
|
|
|
formatted = "Search Results:\n\n"
|
|
for i, result in enumerate(results[:10], 1):
|
|
formatted += f"{i}. {result.get('title', 'No title')}\n"
|
|
formatted += f" URL: {result.get('url', 'No URL')}\n"
|
|
formatted += f" Summary: {result.get('snippet', 'No summary')}\n\n"
|
|
|
|
return formatted
|
|
```
|
|
|
|
### Advanced Memory Management:
|
|
```python
|
|
import os
|
|
from crewai.memory import ExternalMemory, ShortTermMemory, LongTermMemory
|
|
from crewai.memory.storage.mem0_storage import Mem0Storage
|
|
|
|
class AdvancedMemoryManager:
|
|
"""Enhanced memory management for CrewAI applications"""
|
|
|
|
def __init__(self, crew, config: dict = None):
|
|
self.crew = crew
|
|
self.config = config or {}
|
|
self.setup_memory_systems()
|
|
|
|
def setup_memory_systems(self):
|
|
"""Configure multiple memory systems"""
|
|
# Short-term memory for current session
|
|
self.short_term = ShortTermMemory()
|
|
|
|
# Long-term memory for cross-session persistence
|
|
self.long_term = LongTermMemory()
|
|
|
|
# External memory with Mem0 (if configured)
|
|
if self.config.get('use_external_memory'):
|
|
self.external = ExternalMemory.create_storage(
|
|
crew=self.crew,
|
|
embedder_config={
|
|
"provider": "mem0",
|
|
"config": {
|
|
"api_key": os.getenv("MEM0_API_KEY"),
|
|
"user_id": self.config.get('user_id', 'default')
|
|
}
|
|
}
|
|
)
|
|
|
|
def save_with_context(self, content: str, memory_type: str = "short_term",
|
|
metadata: dict = None, agent: str = None):
|
|
"""Save content with enhanced metadata"""
|
|
enhanced_metadata = {
|
|
"timestamp": time.time(),
|
|
"session_id": self.config.get('session_id'),
|
|
"crew_type": self.crew.__class__.__name__,
|
|
**(metadata or {})
|
|
}
|
|
|
|
if memory_type == "short_term":
|
|
self.short_term.save(content, enhanced_metadata, agent)
|
|
elif memory_type == "long_term":
|
|
self.long_term.save(content, enhanced_metadata, agent)
|
|
elif memory_type == "external" and hasattr(self, 'external'):
|
|
self.external.save(content, enhanced_metadata, agent)
|
|
|
|
def search_across_memories(self, query: str, limit: int = 5) -> dict:
|
|
"""Search across all memory systems"""
|
|
results = {
|
|
"short_term": [],
|
|
"long_term": [],
|
|
"external": []
|
|
}
|
|
|
|
# Search short-term memory
|
|
results["short_term"] = self.short_term.search(query, limit=limit)
|
|
|
|
# Search long-term memory
|
|
results["long_term"] = self.long_term.search(query, limit=limit)
|
|
|
|
# Search external memory (if available)
|
|
if hasattr(self, 'external'):
|
|
results["external"] = self.external.search(query, limit=limit)
|
|
|
|
return results
|
|
|
|
def cleanup_old_memories(self, days_threshold: int = 30):
|
|
"""Clean up old memories based on age"""
|
|
cutoff_time = time.time() - (days_threshold * 24 * 60 * 60)
|
|
|
|
# Implement cleanup logic based on timestamps in metadata
|
|
# This would vary based on your specific storage implementation
|
|
pass
|
|
```
|
|
|
|
### Production Monitoring and Metrics:
|
|
```python
|
|
import time
|
|
import logging
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List
|
|
from dataclasses import dataclass, asdict
|
|
|
|
@dataclass
|
|
class TaskMetrics:
|
|
task_name: str
|
|
agent_name: str
|
|
start_time: float
|
|
end_time: float
|
|
duration: float
|
|
tokens_used: int
|
|
success: bool
|
|
error_message: Optional[str] = None
|
|
memory_usage_mb: Optional[float] = None
|
|
|
|
class CrewMonitor:
|
|
"""Comprehensive monitoring for CrewAI applications"""
|
|
|
|
def __init__(self, crew_name: str, log_level: str = "INFO"):
|
|
self.crew_name = crew_name
|
|
self.metrics: List[TaskMetrics] = []
|
|
self.session_start = time.time()
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=getattr(logging, log_level),
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(f'crew_{crew_name}_{datetime.now().strftime("%Y%m%d")}.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
self.logger = logging.getLogger(f"CrewAI.{crew_name}")
|
|
|
|
def start_task_monitoring(self, task_name: str, agent_name: str) -> dict:
|
|
"""Start monitoring a task execution"""
|
|
context = {
|
|
"task_name": task_name,
|
|
"agent_name": agent_name,
|
|
"start_time": time.time()
|
|
}
|
|
|
|
self.logger.info(f"Task started: {task_name} by {agent_name}")
|
|
return context
|
|
|
|
def end_task_monitoring(self, context: dict, success: bool = True,
|
|
tokens_used: int = 0, error: str = None):
|
|
"""End monitoring and record metrics"""
|
|
end_time = time.time()
|
|
duration = end_time - context["start_time"]
|
|
|
|
# Get memory usage (if psutil is available)
|
|
memory_usage = None
|
|
try:
|
|
import psutil
|
|
process = psutil.Process()
|
|
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
|
|
except ImportError:
|
|
pass
|
|
|
|
metrics = TaskMetrics(
|
|
task_name=context["task_name"],
|
|
agent_name=context["agent_name"],
|
|
start_time=context["start_time"],
|
|
end_time=end_time,
|
|
duration=duration,
|
|
tokens_used=tokens_used,
|
|
success=success,
|
|
error_message=error,
|
|
memory_usage_mb=memory_usage
|
|
)
|
|
|
|
self.metrics.append(metrics)
|
|
|
|
# Log the completion
|
|
status = "SUCCESS" if success else "FAILED"
|
|
self.logger.info(f"Task {status}: {context['task_name']} "
|
|
f"(Duration: {duration:.2f}s, Tokens: {tokens_used})")
|
|
|
|
if error:
|
|
self.logger.error(f"Task error: {error}")
|
|
|
|
def get_performance_summary(self) -> Dict[str, Any]:
|
|
"""Generate comprehensive performance summary"""
|
|
if not self.metrics:
|
|
return {"message": "No metrics recorded yet"}
|
|
|
|
successful_tasks = [m for m in self.metrics if m.success]
|
|
failed_tasks = [m for m in self.metrics if not m.success]
|
|
|
|
total_duration = sum(m.duration for m in self.metrics)
|
|
total_tokens = sum(m.tokens_used for m in self.metrics)
|
|
avg_duration = total_duration / len(self.metrics)
|
|
|
|
return {
|
|
"crew_name": self.crew_name,
|
|
"session_duration": time.time() - self.session_start,
|
|
"total_tasks": len(self.metrics),
|
|
"successful_tasks": len(successful_tasks),
|
|
"failed_tasks": len(failed_tasks),
|
|
"success_rate": len(successful_tasks) / len(self.metrics),
|
|
"total_duration": total_duration,
|
|
"average_task_duration": avg_duration,
|
|
"total_tokens_used": total_tokens,
|
|
"average_tokens_per_task": total_tokens / len(self.metrics) if self.metrics else 0,
|
|
"slowest_task": max(self.metrics, key=lambda x: x.duration).task_name if self.metrics else None,
|
|
"most_token_intensive": max(self.metrics, key=lambda x: x.tokens_used).task_name if self.metrics else None,
|
|
"common_errors": self._get_common_errors()
|
|
}
|
|
|
|
def _get_common_errors(self) -> Dict[str, int]:
|
|
"""Get frequency of common errors"""
|
|
error_counts = {}
|
|
for metric in self.metrics:
|
|
if metric.error_message:
|
|
error_counts[metric.error_message] = error_counts.get(metric.error_message, 0) + 1
|
|
return dict(sorted(error_counts.items(), key=lambda x: x[1], reverse=True))
|
|
|
|
def export_metrics(self, filename: str = None) -> str:
|
|
"""Export metrics to JSON file"""
|
|
if not filename:
|
|
filename = f"crew_metrics_{self.crew_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
|
|
export_data = {
|
|
"summary": self.get_performance_summary(),
|
|
"detailed_metrics": [asdict(m) for m in self.metrics]
|
|
}
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(export_data, f, indent=2, default=str)
|
|
|
|
self.logger.info(f"Metrics exported to {filename}")
|
|
return filename
|
|
|
|
# Usage in crew implementation
|
|
monitor = CrewMonitor("research_crew")
|
|
|
|
@task
|
|
def monitored_research_task(self) -> Task:
|
|
def task_callback(task_output):
|
|
# This would be called after task completion
|
|
context = getattr(task_output, '_monitor_context', {})
|
|
if context:
|
|
tokens = getattr(task_output, 'token_usage', {}).get('total', 0)
|
|
monitor.end_task_monitoring(context, success=True, tokens_used=tokens)
|
|
|
|
# Start monitoring would be called before task execution
|
|
# This is a simplified example - in practice you'd integrate this into the task execution flow
|
|
|
|
return Task(
|
|
config=self.tasks_config['research_task'],
|
|
agent=self.researcher(),
|
|
callback=task_callback
|
|
)
|
|
```
|
|
|
|
### Error Handling and Recovery Patterns:
|
|
```python
|
|
from enum import Enum
|
|
from typing import Optional, Callable, Any
|
|
import traceback
|
|
|
|
class ErrorSeverity(Enum):
|
|
LOW = "low"
|
|
MEDIUM = "medium"
|
|
HIGH = "high"
|
|
CRITICAL = "critical"
|
|
|
|
class CrewError(Exception):
|
|
"""Base exception for CrewAI applications"""
|
|
def __init__(self, message: str, severity: ErrorSeverity = ErrorSeverity.MEDIUM,
|
|
context: dict = None):
|
|
super().__init__(message)
|
|
self.severity = severity
|
|
self.context = context or {}
|
|
self.timestamp = time.time()
|
|
|
|
class TaskExecutionError(CrewError):
|
|
"""Raised when task execution fails"""
|
|
pass
|
|
|
|
class ValidationError(CrewError):
|
|
"""Raised when validation fails"""
|
|
pass
|
|
|
|
class ConfigurationError(CrewError):
|
|
"""Raised when configuration is invalid"""
|
|
pass
|
|
|
|
class ErrorHandler:
|
|
"""Centralized error handling for CrewAI applications"""
|
|
|
|
def __init__(self, crew_name: str):
|
|
self.crew_name = crew_name
|
|
self.error_log: List[CrewError] = []
|
|
self.recovery_strategies: Dict[type, Callable] = {}
|
|
|
|
def register_recovery_strategy(self, error_type: type, strategy: Callable):
|
|
"""Register a recovery strategy for specific error types"""
|
|
self.recovery_strategies[error_type] = strategy
|
|
|
|
def handle_error(self, error: Exception, context: dict = None) -> Any:
|
|
"""Handle errors with appropriate recovery strategies"""
|
|
|
|
# Convert to CrewError if needed
|
|
if not isinstance(error, CrewError):
|
|
crew_error = CrewError(
|
|
message=str(error),
|
|
severity=ErrorSeverity.MEDIUM,
|
|
context=context or {}
|
|
)
|
|
else:
|
|
crew_error = error
|
|
|
|
# Log the error
|
|
self.error_log.append(crew_error)
|
|
self._log_error(crew_error)
|
|
|
|
# Apply recovery strategy if available
|
|
error_type = type(error)
|
|
if error_type in self.recovery_strategies:
|
|
try:
|
|
return self.recovery_strategies[error_type](error, context)
|
|
except Exception as recovery_error:
|
|
self._log_error(CrewError(
|
|
f"Recovery strategy failed: {str(recovery_error)}",
|
|
ErrorSeverity.HIGH,
|
|
{"original_error": str(error), "recovery_error": str(recovery_error)}
|
|
))
|
|
|
|
# If critical, re-raise
|
|
if crew_error.severity == ErrorSeverity.CRITICAL:
|
|
raise crew_error
|
|
|
|
return None
|
|
|
|
def _log_error(self, error: CrewError):
|
|
"""Log error with appropriate level based on severity"""
|
|
logger = logging.getLogger(f"CrewAI.{self.crew_name}.ErrorHandler")
|
|
|
|
error_msg = f"[{error.severity.value.upper()}] {error}"
|
|
if error.context:
|
|
error_msg += f" | Context: {error.context}"
|
|
|
|
if error.severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]:
|
|
logger.error(error_msg)
|
|
logger.error(f"Stack trace: {traceback.format_exc()}")
|
|
elif error.severity == ErrorSeverity.MEDIUM:
|
|
logger.warning(error_msg)
|
|
else:
|
|
logger.info(error_msg)
|
|
|
|
def get_error_summary(self) -> Dict[str, Any]:
|
|
"""Get summary of errors encountered"""
|
|
if not self.error_log:
|
|
return {"total_errors": 0}
|
|
|
|
severity_counts = {}
|
|
for error in self.error_log:
|
|
severity_counts[error.severity.value] = severity_counts.get(error.severity.value, 0) + 1
|
|
|
|
return {
|
|
"total_errors": len(self.error_log),
|
|
"severity_breakdown": severity_counts,
|
|
"recent_errors": [str(e) for e in self.error_log[-5:]], # Last 5 errors
|
|
"most_recent_error": str(self.error_log[-1]) if self.error_log else None
|
|
}
|
|
|
|
# Example usage in crew
|
|
error_handler = ErrorHandler("research_crew")
|
|
|
|
# Register recovery strategies
|
|
def retry_with_simpler_model(error, context):
|
|
"""Recovery strategy: retry with a simpler model"""
|
|
if "rate limit" in str(error).lower():
|
|
time.sleep(60) # Wait and retry
|
|
return "RETRY"
|
|
elif "model overloaded" in str(error).lower():
|
|
# Switch to simpler model and retry
|
|
return "RETRY_WITH_SIMPLE_MODEL"
|
|
return None
|
|
|
|
error_handler.register_recovery_strategy(TaskExecutionError, retry_with_simpler_model)
|
|
|
|
@task
|
|
def robust_task(self) -> Task:
|
|
def execute_with_error_handling(task_func):
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
return task_func(*args, **kwargs)
|
|
except Exception as e:
|
|
result = error_handler.handle_error(e, {"task": "research_task"})
|
|
if result == "RETRY":
|
|
# Implement retry logic
|
|
pass
|
|
elif result == "RETRY_WITH_SIMPLE_MODEL":
|
|
# Switch model and retry
|
|
pass
|
|
else:
|
|
# Use fallback response
|
|
return "Task failed, using fallback response"
|
|
return wrapper
|
|
|
|
return Task(
|
|
config=self.tasks_config['research_task'],
|
|
agent=self.researcher()
|
|
)
|
|
```
|
|
|
|
### Environment and Configuration Management:
|
|
```python
|
|
import os
|
|
from enum import Enum
|
|
from typing import Optional, Dict, Any
|
|
from pydantic import BaseSettings, Field, validator
|
|
|
|
class Environment(str, Enum):
|
|
DEVELOPMENT = "development"
|
|
TESTING = "testing"
|
|
STAGING = "staging"
|
|
PRODUCTION = "production"
|
|
|
|
class CrewAISettings(BaseSettings):
|
|
"""Comprehensive settings management for CrewAI applications"""
|
|
|
|
# Environment
|
|
environment: Environment = Field(default=Environment.DEVELOPMENT)
|
|
debug: bool = Field(default=True)
|
|
|
|
# API Keys (loaded from environment)
|
|
openai_api_key: Optional[str] = Field(default=None, env="OPENAI_API_KEY")
|
|
anthropic_api_key: Optional[str] = Field(default=None, env="ANTHROPIC_API_KEY")
|
|
serper_api_key: Optional[str] = Field(default=None, env="SERPER_API_KEY")
|
|
mem0_api_key: Optional[str] = Field(default=None, env="MEM0_API_KEY")
|
|
|
|
# CrewAI Configuration
|
|
crew_max_rpm: int = Field(default=100)
|
|
crew_max_execution_time: int = Field(default=3600) # 1 hour
|
|
default_llm_model: str = Field(default="gpt-4")
|
|
fallback_llm_model: str = Field(default="gpt-3.5-turbo")
|
|
|
|
# Memory and Storage
|
|
crewai_storage_dir: str = Field(default="./storage", env="CREWAI_STORAGE_DIR")
|
|
memory_enabled: bool = Field(default=True)
|
|
memory_cleanup_interval: int = Field(default=86400) # 24 hours in seconds
|
|
|
|
# Performance
|
|
enable_caching: bool = Field(default=True)
|
|
max_retries: int = Field(default=3)
|
|
retry_delay: float = Field(default=1.0)
|
|
|
|
# Monitoring
|
|
enable_monitoring: bool = Field(default=True)
|
|
log_level: str = Field(default="INFO")
|
|
metrics_export_interval: int = Field(default=3600) # 1 hour
|
|
|
|
# Security
|
|
input_sanitization: bool = Field(default=True)
|
|
max_input_length: int = Field(default=10000)
|
|
allowed_file_types: list = Field(default=["txt", "md", "pdf", "docx"])
|
|
|
|
@validator('environment', pre=True)
|
|
def set_debug_based_on_env(cls, v):
|
|
return v
|
|
|
|
@validator('debug')
|
|
def set_debug_from_env(cls, v, values):
|
|
env = values.get('environment')
|
|
if env == Environment.PRODUCTION:
|
|
return False
|
|
return v
|
|
|
|
@validator('openai_api_key')
|
|
def validate_openai_key(cls, v):
|
|
if not v:
|
|
raise ValueError("OPENAI_API_KEY is required")
|
|
if not v.startswith('sk-'):
|
|
raise ValueError("Invalid OpenAI API key format")
|
|
return v
|
|
|
|
@property
|
|
def is_production(self) -> bool:
|
|
return self.environment == Environment.PRODUCTION
|
|
|
|
@property
|
|
def is_development(self) -> bool:
|
|
return self.environment == Environment.DEVELOPMENT
|
|
|
|
def get_llm_config(self) -> Dict[str, Any]:
|
|
"""Get LLM configuration based on environment"""
|
|
config = {
|
|
"model": self.default_llm_model,
|
|
"temperature": 0.1 if self.is_production else 0.3,
|
|
"max_tokens": 4000 if self.is_production else 2000,
|
|
"timeout": 60
|
|
}
|
|
|
|
if self.is_development:
|
|
config["model"] = self.fallback_llm_model
|
|
|
|
return config
|
|
|
|
def get_memory_config(self) -> Dict[str, Any]:
|
|
"""Get memory configuration"""
|
|
return {
|
|
"enabled": self.memory_enabled,
|
|
"storage_dir": self.crewai_storage_dir,
|
|
"cleanup_interval": self.memory_cleanup_interval,
|
|
"provider": "mem0" if self.mem0_api_key and self.is_production else "local"
|
|
}
|
|
|
|
class Config:
|
|
env_file = ".env"
|
|
env_file_encoding = 'utf-8'
|
|
case_sensitive = False
|
|
|
|
# Global settings instance
|
|
settings = CrewAISettings()
|
|
|
|
# Usage in crew
|
|
@CrewBase
|
|
class ConfigurableCrew():
|
|
"""Crew that uses centralized configuration"""
|
|
|
|
def __init__(self):
|
|
self.settings = settings
|
|
self.validate_configuration()
|
|
|
|
def validate_configuration(self):
|
|
"""Validate configuration before crew execution"""
|
|
required_keys = [self.settings.openai_api_key]
|
|
if not all(required_keys):
|
|
raise ConfigurationError("Missing required API keys")
|
|
|
|
if not os.path.exists(self.settings.crewai_storage_dir):
|
|
os.makedirs(self.settings.crewai_storage_dir, exist_ok=True)
|
|
|
|
@agent
|
|
def adaptive_agent(self) -> Agent:
|
|
"""Agent that adapts to configuration"""
|
|
llm_config = self.settings.get_llm_config()
|
|
|
|
return Agent(
|
|
config=self.agents_config['researcher'],
|
|
llm=llm_config["model"],
|
|
max_iter=15 if self.settings.is_production else 10,
|
|
max_execution_time=self.settings.crew_max_execution_time,
|
|
verbose=self.settings.debug
|
|
)
|
|
```
|
|
|
|
### Comprehensive Testing Framework:
|
|
```python
|
|
import pytest
|
|
import asyncio
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
from crewai import Agent, Task, Crew
|
|
from crewai.tasks.task_output import TaskOutput
|
|
|
|
class CrewAITestFramework:
|
|
"""Comprehensive testing framework for CrewAI applications"""
|
|
|
|
@staticmethod
|
|
def create_mock_agent(role: str = "test_agent", tools: list = None) -> Mock:
|
|
"""Create a mock agent for testing"""
|
|
mock_agent = Mock(spec=Agent)
|
|
mock_agent.role = role
|
|
mock_agent.goal = f"Test goal for {role}"
|
|
mock_agent.backstory = f"Test backstory for {role}"
|
|
mock_agent.tools = tools or []
|
|
mock_agent.llm = "gpt-3.5-turbo"
|
|
mock_agent.verbose = False
|
|
return mock_agent
|
|
|
|
@staticmethod
|
|
def create_mock_task_output(content: str, success: bool = True,
|
|
tokens: int = 100) -> TaskOutput:
|
|
"""Create a mock task output for testing"""
|
|
return TaskOutput(
|
|
description="Test task",
|
|
raw=content,
|
|
agent="test_agent",
|
|
pydantic=None,
|
|
json_dict=None
|
|
)
|
|
|
|
@staticmethod
|
|
def create_test_crew(agents: list = None, tasks: list = None) -> Crew:
|
|
"""Create a test crew with mock components"""
|
|
test_agents = agents or [CrewAITestFramework.create_mock_agent()]
|
|
test_tasks = tasks or []
|
|
|
|
return Crew(
|
|
agents=test_agents,
|
|
tasks=test_tasks,
|
|
verbose=False
|
|
)
|
|
|
|
# Example test cases
|
|
class TestResearchCrew:
|
|
"""Test cases for research crew functionality"""
|
|
|
|
def setup_method(self):
|
|
"""Setup test environment"""
|
|
self.framework = CrewAITestFramework()
|
|
self.mock_serper = Mock()
|
|
|
|
@patch('crewai_tools.SerperDevTool')
|
|
def test_agent_creation(self, mock_serper_tool):
|
|
"""Test agent creation with proper configuration"""
|
|
mock_serper_tool.return_value = self.mock_serper
|
|
|
|
crew = ResearchCrew()
|
|
researcher = crew.researcher()
|
|
|
|
assert researcher.role == "Senior Research Analyst"
|
|
assert len(researcher.tools) > 0
|
|
assert researcher.verbose is True
|
|
|
|
def test_task_validation(self):
|
|
"""Test task validation logic"""
|
|
crew = ResearchCrew()
|
|
|
|
# Test valid output
|
|
valid_output = self.framework.create_mock_task_output(
|
|
"This is a comprehensive research summary with conclusions and findings."
|
|
)
|
|
is_valid, message = crew.validate_research_quality(valid_output)
|
|
assert is_valid is True
|
|
|
|
# Test invalid output (too short)
|
|
invalid_output = self.framework.create_mock_task_output("Too short")
|
|
is_valid, message = crew.validate_research_quality(invalid_output)
|
|
assert is_valid is False
|
|
assert "brief" in message.lower()
|
|
|
|
@patch('requests.get')
|
|
def test_tool_error_handling(self, mock_requests):
|
|
"""Test tool error handling and recovery"""
|
|
# Simulate network error
|
|
mock_requests.side_effect = requests.exceptions.RequestException("Network error")
|
|
|
|
tool = RobustSearchTool()
|
|
result = tool._run("test query")
|
|
|
|
assert "network error" in result.lower()
|
|
assert "failed" in result.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_crew_execution_flow(self):
|
|
"""Test complete crew execution with mocked dependencies"""
|
|
with patch.object(Agent, 'execute_task') as mock_execute:
|
|
mock_execute.return_value = self.framework.create_mock_task_output(
|
|
"Research completed successfully with findings and recommendations."
|
|
)
|
|
|
|
crew = ResearchCrew()
|
|
result = crew.crew().kickoff(inputs={"topic": "AI testing"})
|
|
|
|
assert result is not None
|
|
assert "successfully" in result.raw.lower()
|
|
|
|
def test_memory_integration(self):
|
|
"""Test memory system integration"""
|
|
crew = ResearchCrew()
|
|
memory_manager = AdvancedMemoryManager(crew)
|
|
|
|
# Test saving to memory
|
|
test_content = "Important research finding about AI"
|
|
memory_manager.save_with_context(
|
|
content=test_content,
|
|
memory_type="short_term",
|
|
metadata={"importance": "high"},
|
|
agent="researcher"
|
|
)
|
|
|
|
# Test searching memory
|
|
results = memory_manager.search_across_memories("AI research")
|
|
assert "short_term" in results
|
|
|
|
def test_error_handling_workflow(self):
|
|
"""Test error handling and recovery mechanisms"""
|
|
error_handler = ErrorHandler("test_crew")
|
|
|
|
# Test error registration and handling
|
|
test_error = TaskExecutionError("Test task failed", ErrorSeverity.MEDIUM)
|
|
result = error_handler.handle_error(test_error)
|
|
|
|
assert len(error_handler.error_log) == 1
|
|
assert error_handler.error_log[0].severity == ErrorSeverity.MEDIUM
|
|
|
|
def test_configuration_validation(self):
|
|
"""Test configuration validation"""
|
|
# Test with missing API key
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
with pytest.raises(ValueError):
|
|
settings = CrewAISettings()
|
|
|
|
# Test with valid configuration
|
|
with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-key"}):
|
|
settings = CrewAISettings()
|
|
assert settings.openai_api_key == "sk-test-key"
|
|
|
|
@pytest.mark.integration
|
|
def test_end_to_end_workflow(self):
|
|
"""Integration test for complete workflow"""
|
|
# This would test the entire crew workflow with real components
|
|
# Use sparingly and with proper API key management
|
|
pass
|
|
|
|
# Performance testing
|
|
class TestCrewPerformance:
|
|
"""Performance tests for CrewAI applications"""
|
|
|
|
def test_memory_usage(self):
|
|
"""Test memory usage during crew execution"""
|
|
import psutil
|
|
import gc
|
|
|
|
process = psutil.Process()
|
|
initial_memory = process.memory_info().rss
|
|
|
|
# Create and run crew multiple times
|
|
for i in range(10):
|
|
crew = ResearchCrew()
|
|
# Simulate crew execution
|
|
del crew
|
|
gc.collect()
|
|
|
|
final_memory = process.memory_info().rss
|
|
memory_increase = final_memory - initial_memory
|
|
|
|
# Assert memory increase is reasonable (less than 100MB)
|
|
assert memory_increase < 100 * 1024 * 1024
|
|
|
|
def test_concurrent_execution(self):
|
|
"""Test concurrent crew execution"""
|
|
import concurrent.futures
|
|
|
|
def run_crew(crew_id):
|
|
crew = ResearchCrew()
|
|
# Simulate execution
|
|
return f"crew_{crew_id}_completed"
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
|
|
futures = [executor.submit(run_crew, i) for i in range(5)]
|
|
results = [future.result() for future in futures]
|
|
|
|
assert len(results) == 5
|
|
assert all("completed" in result for result in results)
|
|
|
|
# Run tests with coverage
|
|
# pytest --cov=src --cov-report=html --cov-report=term tests/
|
|
```
|
|
|
|
## Troubleshooting Common Issues
|
|
|
|
### Memory and Performance Issues:
|
|
- **Large memory usage**: Implement memory cleanup, use score thresholds, monitor ChromaDB size
|
|
- **Slow LLM responses**: Optimize prompts, use appropriate model sizes, implement caching
|
|
- **High token costs**: Implement output caching, use context efficiently, set token limits
|
|
- **Memory leaks**: Properly dispose of crew instances, monitor memory usage, use garbage collection
|
|
|
|
### Configuration and Setup Issues:
|
|
- **YAML parsing errors**: Validate YAML syntax, check indentation, use YAML linters
|
|
- **Missing environment variables**: Use .env.example, validate at startup, provide clear error messages
|
|
- **Tool import failures**: Ensure proper tool installation, check import paths, verify dependencies
|
|
- **API key issues**: Validate key format, check permissions, implement key rotation
|
|
|
|
### Storage and Persistence Issues:
|
|
- **Permission errors**: Check CREWAI_STORAGE_DIR permissions, ensure write access
|
|
- **Database locks**: Ensure single crew instance access, implement proper connection handling
|
|
- **Storage growth**: Implement cleanup strategies, monitor disk usage, archive old data
|
|
- **ChromaDB issues**: Check vector database health, validate embeddings, handle corrupted indices
|
|
|
|
## Local Development and Testing
|
|
|
|
### Development Best Practices:
|
|
- Validate all API keys and credentials in .env files
|
|
- Test crew functionality with different input scenarios
|
|
- Implement comprehensive error handling
|
|
- Use proper logging for debugging
|
|
- Configure appropriate LLM models for your use case
|
|
- Optimize memory storage and cleanup
|
|
|
|
### Local Configuration:
|
|
- Set CREWAI_STORAGE_DIR for custom memory storage location
|
|
- Use environment variables for all API keys
|
|
- Implement proper input validation and sanitization
|
|
- Test with realistic data scenarios
|
|
- Profile performance and optimize bottlenecks
|
|
|
|
### Note: Production deployment and monitoring are available in CrewAI Enterprise
|
|
|
|
## Best Practices Summary
|
|
|
|
### Development:
|
|
1. Always use .env files for sensitive configuration
|
|
2. Implement comprehensive error handling and logging
|
|
3. Use structured outputs with Pydantic for reliability
|
|
4. Test crew functionality with different input scenarios
|
|
5. Follow CrewAI patterns and conventions consistently
|
|
6. Use UV for dependency management as per CrewAI standards
|
|
7. Implement proper validation for all inputs and outputs
|
|
8. Optimize performance for your specific use cases
|
|
|
|
### Security:
|
|
1. Never commit API keys or sensitive data to version control
|
|
2. Implement input validation and sanitization
|
|
3. Use proper authentication and authorization
|
|
4. Follow principle of least privilege for tool access
|
|
5. Implement rate limiting and abuse prevention
|
|
6. Monitor for security threats and anomalies
|
|
7. Keep dependencies updated and secure
|
|
8. Implement audit logging for sensitive operations
|
|
|
|
### Performance:
|
|
1. Optimize LLM calls and implement caching where appropriate
|
|
2. Use appropriate model sizes for different tasks
|
|
3. Implement efficient memory management and cleanup
|
|
4. Monitor token usage and implement cost controls
|
|
5. Use async patterns for I/O-bound operations
|
|
6. Implement proper connection pooling and resource management
|
|
7. Profile and optimize critical paths
|
|
8. Plan for horizontal scaling when needed
|