mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-03 13:18:29 +00:00
Compare commits
6 Commits
lorenze/ne
...
gl/chore/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b2d5633c1 | ||
|
|
f071966951 | ||
|
|
318310bb7a | ||
|
|
34a03f882c | ||
|
|
a0fcc0c8d1 | ||
|
|
748c25451c |
286
.cursorrules
286
.cursorrules
@@ -260,7 +260,7 @@ def handle_success(self):
|
||||
# Handle success case
|
||||
pass
|
||||
|
||||
@listen("failure_path")
|
||||
@listen("failure_path")
|
||||
def handle_failure(self):
|
||||
# Handle failure case
|
||||
pass
|
||||
@@ -288,7 +288,7 @@ class SelectiveFlow(Flow):
|
||||
def critical_step(self):
|
||||
# Only this method's state is persisted
|
||||
self.state["important_data"] = "value"
|
||||
|
||||
|
||||
@start()
|
||||
def temporary_step(self):
|
||||
# This method's state is not persisted
|
||||
@@ -322,20 +322,20 @@ flow.plot("workflow_diagram") # Generates HTML visualization
|
||||
class CyclicFlow(Flow):
|
||||
max_iterations = 5
|
||||
current_iteration = 0
|
||||
|
||||
|
||||
@start("loop")
|
||||
def process_iteration(self):
|
||||
if self.current_iteration >= self.max_iterations:
|
||||
return
|
||||
# Process current iteration
|
||||
self.current_iteration += 1
|
||||
|
||||
|
||||
@router(process_iteration)
|
||||
def check_continue(self):
|
||||
if self.current_iteration < self.max_iterations:
|
||||
return "loop" # Continue cycling
|
||||
return "complete"
|
||||
|
||||
|
||||
@listen("complete")
|
||||
def finalize(self):
|
||||
# Final processing
|
||||
@@ -369,7 +369,7 @@ def risky_operation(self):
|
||||
self.state["success"] = False
|
||||
return None
|
||||
|
||||
@listen(risky_operation)
|
||||
@listen(risky_operation)
|
||||
def handle_result(self, result):
|
||||
if self.state.get("success", False):
|
||||
# Handle success case
|
||||
@@ -390,7 +390,7 @@ class CrewOrchestrationFlow(Flow[WorkflowState]):
|
||||
result = research_crew.crew().kickoff(inputs={"topic": self.state.research_topic})
|
||||
self.state.research_results = result.raw
|
||||
return result
|
||||
|
||||
|
||||
@listen(research_phase)
|
||||
def analysis_phase(self, research_results):
|
||||
analysis_crew = AnalysisCrew()
|
||||
@@ -400,13 +400,13 @@ class CrewOrchestrationFlow(Flow[WorkflowState]):
|
||||
})
|
||||
self.state.analysis_results = result.raw
|
||||
return result
|
||||
|
||||
|
||||
@router(analysis_phase)
|
||||
def decide_next_action(self):
|
||||
if self.state.analysis_results.confidence > 0.7:
|
||||
return "generate_report"
|
||||
return "additional_research"
|
||||
|
||||
|
||||
@listen("generate_report")
|
||||
def final_report(self):
|
||||
reporting_crew = ReportingCrew()
|
||||
@@ -439,7 +439,7 @@ class CrewOrchestrationFlow(Flow[WorkflowState]):
|
||||
## CrewAI Version Compatibility:
|
||||
- Stay updated with CrewAI releases for new features and bug fixes
|
||||
- Test crew functionality when upgrading CrewAI versions
|
||||
- Use version constraints in pyproject.toml (e.g., "crewai[tools]>=0.134.0,<1.0.0")
|
||||
- Use version constraints in pyproject.toml (e.g., "crewai[tools]>=0.140.0,<1.0.0")
|
||||
- Monitor deprecation warnings for future compatibility
|
||||
|
||||
## Code Examples and Implementation Patterns
|
||||
@@ -464,22 +464,22 @@ class ResearchOutput(BaseModel):
|
||||
@CrewBase
|
||||
class ResearchCrew():
|
||||
"""Advanced research crew with structured outputs and validation"""
|
||||
|
||||
|
||||
agents: List[BaseAgent]
|
||||
tasks: List[Task]
|
||||
|
||||
|
||||
@before_kickoff
|
||||
def setup_environment(self):
|
||||
"""Initialize environment before crew execution"""
|
||||
print("🚀 Setting up research environment...")
|
||||
# Validate API keys, create directories, etc.
|
||||
|
||||
|
||||
@after_kickoff
|
||||
def cleanup_and_report(self, output):
|
||||
"""Handle post-execution tasks"""
|
||||
print(f"✅ Research completed. Generated {len(output.tasks_output)} task outputs")
|
||||
print(f"📊 Token usage: {output.token_usage}")
|
||||
|
||||
|
||||
@agent
|
||||
def researcher(self) -> Agent:
|
||||
return Agent(
|
||||
@@ -490,7 +490,7 @@ class ResearchCrew():
|
||||
max_iter=15,
|
||||
max_execution_time=1800
|
||||
)
|
||||
|
||||
|
||||
@agent
|
||||
def analyst(self) -> Agent:
|
||||
return Agent(
|
||||
@@ -499,7 +499,7 @@ class ResearchCrew():
|
||||
verbose=True,
|
||||
memory=True
|
||||
)
|
||||
|
||||
|
||||
@task
|
||||
def research_task(self) -> Task:
|
||||
return Task(
|
||||
@@ -507,7 +507,7 @@ class ResearchCrew():
|
||||
agent=self.researcher(),
|
||||
output_pydantic=ResearchOutput
|
||||
)
|
||||
|
||||
|
||||
@task
|
||||
def validation_task(self) -> Task:
|
||||
return Task(
|
||||
@@ -517,7 +517,7 @@ class ResearchCrew():
|
||||
guardrail=self.validate_research_quality,
|
||||
max_retries=3
|
||||
)
|
||||
|
||||
|
||||
def validate_research_quality(self, output) -> tuple[bool, str]:
|
||||
"""Custom guardrail to ensure research quality"""
|
||||
content = output.raw
|
||||
@@ -526,7 +526,7 @@ class ResearchCrew():
|
||||
if not any(keyword in content.lower() for keyword in ['conclusion', 'finding', 'result']):
|
||||
return False, "Missing key analytical elements."
|
||||
return True, content
|
||||
|
||||
|
||||
@crew
|
||||
def crew(self) -> Crew:
|
||||
return Crew(
|
||||
@@ -557,13 +557,13 @@ class RobustSearchTool(BaseTool):
|
||||
name: str = "robust_search"
|
||||
description: str = "Perform web search with retry logic and error handling"
|
||||
args_schema: Type[BaseModel] = SearchInput
|
||||
|
||||
|
||||
def __init__(self, api_key: Optional[str] = None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.api_key = api_key or os.getenv("SEARCH_API_KEY")
|
||||
self.rate_limit_delay = 1.0
|
||||
self.last_request_time = 0
|
||||
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=10)
|
||||
@@ -575,43 +575,43 @@ class RobustSearchTool(BaseTool):
|
||||
time_since_last = time.time() - self.last_request_time
|
||||
if time_since_last < self.rate_limit_delay:
|
||||
time.sleep(self.rate_limit_delay - time_since_last)
|
||||
|
||||
|
||||
# Input validation
|
||||
if not query or len(query.strip()) == 0:
|
||||
return "Error: Empty search query provided"
|
||||
|
||||
|
||||
if len(query) > 500:
|
||||
return "Error: Search query too long (max 500 characters)"
|
||||
|
||||
|
||||
# Perform search
|
||||
results = self._perform_search(query, max_results, timeout)
|
||||
self.last_request_time = time.time()
|
||||
|
||||
|
||||
return self._format_results(results)
|
||||
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
return f"Search timed out after {timeout} seconds"
|
||||
except requests.exceptions.RequestException as e:
|
||||
return f"Search failed due to network error: {str(e)}"
|
||||
except Exception as e:
|
||||
return f"Unexpected error during search: {str(e)}"
|
||||
|
||||
|
||||
def _perform_search(self, query: str, max_results: int, timeout: int) -> List[dict]:
|
||||
"""Implement actual search logic here"""
|
||||
# Your search API implementation
|
||||
pass
|
||||
|
||||
|
||||
def _format_results(self, results: List[dict]) -> str:
|
||||
"""Format search results for LLM consumption"""
|
||||
if not results:
|
||||
return "No results found for the given query."
|
||||
|
||||
|
||||
formatted = "Search Results:\n\n"
|
||||
for i, result in enumerate(results[:10], 1):
|
||||
formatted += f"{i}. {result.get('title', 'No title')}\n"
|
||||
formatted += f" URL: {result.get('url', 'No URL')}\n"
|
||||
formatted += f" Summary: {result.get('snippet', 'No summary')}\n\n"
|
||||
|
||||
|
||||
return formatted
|
||||
```
|
||||
|
||||
@@ -623,20 +623,20 @@ from crewai.memory.storage.mem0_storage import Mem0Storage
|
||||
|
||||
class AdvancedMemoryManager:
|
||||
"""Enhanced memory management for CrewAI applications"""
|
||||
|
||||
|
||||
def __init__(self, crew, config: dict = None):
|
||||
self.crew = crew
|
||||
self.config = config or {}
|
||||
self.setup_memory_systems()
|
||||
|
||||
|
||||
def setup_memory_systems(self):
|
||||
"""Configure multiple memory systems"""
|
||||
# Short-term memory for current session
|
||||
self.short_term = ShortTermMemory()
|
||||
|
||||
|
||||
# Long-term memory for cross-session persistence
|
||||
self.long_term = LongTermMemory()
|
||||
|
||||
|
||||
# External memory with Mem0 (if configured)
|
||||
if self.config.get('use_external_memory'):
|
||||
self.external = ExternalMemory.create_storage(
|
||||
@@ -649,8 +649,8 @@ class AdvancedMemoryManager:
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
def save_with_context(self, content: str, memory_type: str = "short_term",
|
||||
|
||||
def save_with_context(self, content: str, memory_type: str = "short_term",
|
||||
metadata: dict = None, agent: str = None):
|
||||
"""Save content with enhanced metadata"""
|
||||
enhanced_metadata = {
|
||||
@@ -659,14 +659,14 @@ class AdvancedMemoryManager:
|
||||
"crew_type": self.crew.__class__.__name__,
|
||||
**(metadata or {})
|
||||
}
|
||||
|
||||
|
||||
if memory_type == "short_term":
|
||||
self.short_term.save(content, enhanced_metadata, agent)
|
||||
elif memory_type == "long_term":
|
||||
self.long_term.save(content, enhanced_metadata, agent)
|
||||
elif memory_type == "external" and hasattr(self, 'external'):
|
||||
self.external.save(content, enhanced_metadata, agent)
|
||||
|
||||
|
||||
def search_across_memories(self, query: str, limit: int = 5) -> dict:
|
||||
"""Search across all memory systems"""
|
||||
results = {
|
||||
@@ -674,23 +674,23 @@ class AdvancedMemoryManager:
|
||||
"long_term": [],
|
||||
"external": []
|
||||
}
|
||||
|
||||
|
||||
# Search short-term memory
|
||||
results["short_term"] = self.short_term.search(query, limit=limit)
|
||||
|
||||
|
||||
# Search long-term memory
|
||||
results["long_term"] = self.long_term.search(query, limit=limit)
|
||||
|
||||
|
||||
# Search external memory (if available)
|
||||
if hasattr(self, 'external'):
|
||||
results["external"] = self.external.search(query, limit=limit)
|
||||
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def cleanup_old_memories(self, days_threshold: int = 30):
|
||||
"""Clean up old memories based on age"""
|
||||
cutoff_time = time.time() - (days_threshold * 24 * 60 * 60)
|
||||
|
||||
|
||||
# Implement cleanup logic based on timestamps in metadata
|
||||
# This would vary based on your specific storage implementation
|
||||
pass
|
||||
@@ -719,12 +719,12 @@ class TaskMetrics:
|
||||
|
||||
class CrewMonitor:
|
||||
"""Comprehensive monitoring for CrewAI applications"""
|
||||
|
||||
|
||||
def __init__(self, crew_name: str, log_level: str = "INFO"):
|
||||
self.crew_name = crew_name
|
||||
self.metrics: List[TaskMetrics] = []
|
||||
self.session_start = time.time()
|
||||
|
||||
|
||||
# Setup logging
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, log_level),
|
||||
@@ -735,7 +735,7 @@ class CrewMonitor:
|
||||
]
|
||||
)
|
||||
self.logger = logging.getLogger(f"CrewAI.{crew_name}")
|
||||
|
||||
|
||||
def start_task_monitoring(self, task_name: str, agent_name: str) -> dict:
|
||||
"""Start monitoring a task execution"""
|
||||
context = {
|
||||
@@ -743,16 +743,16 @@ class CrewMonitor:
|
||||
"agent_name": agent_name,
|
||||
"start_time": time.time()
|
||||
}
|
||||
|
||||
|
||||
self.logger.info(f"Task started: {task_name} by {agent_name}")
|
||||
return context
|
||||
|
||||
def end_task_monitoring(self, context: dict, success: bool = True,
|
||||
|
||||
def end_task_monitoring(self, context: dict, success: bool = True,
|
||||
tokens_used: int = 0, error: str = None):
|
||||
"""End monitoring and record metrics"""
|
||||
end_time = time.time()
|
||||
duration = end_time - context["start_time"]
|
||||
|
||||
|
||||
# Get memory usage (if psutil is available)
|
||||
memory_usage = None
|
||||
try:
|
||||
@@ -761,7 +761,7 @@ class CrewMonitor:
|
||||
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
metrics = TaskMetrics(
|
||||
task_name=context["task_name"],
|
||||
agent_name=context["agent_name"],
|
||||
@@ -773,29 +773,29 @@ class CrewMonitor:
|
||||
error_message=error,
|
||||
memory_usage_mb=memory_usage
|
||||
)
|
||||
|
||||
|
||||
self.metrics.append(metrics)
|
||||
|
||||
|
||||
# Log the completion
|
||||
status = "SUCCESS" if success else "FAILED"
|
||||
self.logger.info(f"Task {status}: {context['task_name']} "
|
||||
f"(Duration: {duration:.2f}s, Tokens: {tokens_used})")
|
||||
|
||||
|
||||
if error:
|
||||
self.logger.error(f"Task error: {error}")
|
||||
|
||||
|
||||
def get_performance_summary(self) -> Dict[str, Any]:
|
||||
"""Generate comprehensive performance summary"""
|
||||
if not self.metrics:
|
||||
return {"message": "No metrics recorded yet"}
|
||||
|
||||
|
||||
successful_tasks = [m for m in self.metrics if m.success]
|
||||
failed_tasks = [m for m in self.metrics if not m.success]
|
||||
|
||||
|
||||
total_duration = sum(m.duration for m in self.metrics)
|
||||
total_tokens = sum(m.tokens_used for m in self.metrics)
|
||||
avg_duration = total_duration / len(self.metrics)
|
||||
|
||||
|
||||
return {
|
||||
"crew_name": self.crew_name,
|
||||
"session_duration": time.time() - self.session_start,
|
||||
@@ -811,7 +811,7 @@ class CrewMonitor:
|
||||
"most_token_intensive": max(self.metrics, key=lambda x: x.tokens_used).task_name if self.metrics else None,
|
||||
"common_errors": self._get_common_errors()
|
||||
}
|
||||
|
||||
|
||||
def _get_common_errors(self) -> Dict[str, int]:
|
||||
"""Get frequency of common errors"""
|
||||
error_counts = {}
|
||||
@@ -819,20 +819,20 @@ class CrewMonitor:
|
||||
if metric.error_message:
|
||||
error_counts[metric.error_message] = error_counts.get(metric.error_message, 0) + 1
|
||||
return dict(sorted(error_counts.items(), key=lambda x: x[1], reverse=True))
|
||||
|
||||
|
||||
def export_metrics(self, filename: str = None) -> str:
|
||||
"""Export metrics to JSON file"""
|
||||
if not filename:
|
||||
filename = f"crew_metrics_{self.crew_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
|
||||
|
||||
export_data = {
|
||||
"summary": self.get_performance_summary(),
|
||||
"detailed_metrics": [asdict(m) for m in self.metrics]
|
||||
}
|
||||
|
||||
|
||||
with open(filename, 'w') as f:
|
||||
json.dump(export_data, f, indent=2, default=str)
|
||||
|
||||
|
||||
self.logger.info(f"Metrics exported to {filename}")
|
||||
return filename
|
||||
|
||||
@@ -847,10 +847,10 @@ def monitored_research_task(self) -> Task:
|
||||
if context:
|
||||
tokens = getattr(task_output, 'token_usage', {}).get('total', 0)
|
||||
monitor.end_task_monitoring(context, success=True, tokens_used=tokens)
|
||||
|
||||
|
||||
# Start monitoring would be called before task execution
|
||||
# This is a simplified example - in practice you'd integrate this into the task execution flow
|
||||
|
||||
|
||||
return Task(
|
||||
config=self.tasks_config['research_task'],
|
||||
agent=self.researcher(),
|
||||
@@ -872,7 +872,7 @@ class ErrorSeverity(Enum):
|
||||
|
||||
class CrewError(Exception):
|
||||
"""Base exception for CrewAI applications"""
|
||||
def __init__(self, message: str, severity: ErrorSeverity = ErrorSeverity.MEDIUM,
|
||||
def __init__(self, message: str, severity: ErrorSeverity = ErrorSeverity.MEDIUM,
|
||||
context: dict = None):
|
||||
super().__init__(message)
|
||||
self.severity = severity
|
||||
@@ -893,19 +893,19 @@ class ConfigurationError(CrewError):
|
||||
|
||||
class ErrorHandler:
|
||||
"""Centralized error handling for CrewAI applications"""
|
||||
|
||||
|
||||
def __init__(self, crew_name: str):
|
||||
self.crew_name = crew_name
|
||||
self.error_log: List[CrewError] = []
|
||||
self.recovery_strategies: Dict[type, Callable] = {}
|
||||
|
||||
|
||||
def register_recovery_strategy(self, error_type: type, strategy: Callable):
|
||||
"""Register a recovery strategy for specific error types"""
|
||||
self.recovery_strategies[error_type] = strategy
|
||||
|
||||
|
||||
def handle_error(self, error: Exception, context: dict = None) -> Any:
|
||||
"""Handle errors with appropriate recovery strategies"""
|
||||
|
||||
|
||||
# Convert to CrewError if needed
|
||||
if not isinstance(error, CrewError):
|
||||
crew_error = CrewError(
|
||||
@@ -915,11 +915,11 @@ class ErrorHandler:
|
||||
)
|
||||
else:
|
||||
crew_error = error
|
||||
|
||||
|
||||
# Log the error
|
||||
self.error_log.append(crew_error)
|
||||
self._log_error(crew_error)
|
||||
|
||||
|
||||
# Apply recovery strategy if available
|
||||
error_type = type(error)
|
||||
if error_type in self.recovery_strategies:
|
||||
@@ -931,21 +931,21 @@ class ErrorHandler:
|
||||
ErrorSeverity.HIGH,
|
||||
{"original_error": str(error), "recovery_error": str(recovery_error)}
|
||||
))
|
||||
|
||||
|
||||
# If critical, re-raise
|
||||
if crew_error.severity == ErrorSeverity.CRITICAL:
|
||||
raise crew_error
|
||||
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _log_error(self, error: CrewError):
|
||||
"""Log error with appropriate level based on severity"""
|
||||
logger = logging.getLogger(f"CrewAI.{self.crew_name}.ErrorHandler")
|
||||
|
||||
|
||||
error_msg = f"[{error.severity.value.upper()}] {error}"
|
||||
if error.context:
|
||||
error_msg += f" | Context: {error.context}"
|
||||
|
||||
|
||||
if error.severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]:
|
||||
logger.error(error_msg)
|
||||
logger.error(f"Stack trace: {traceback.format_exc()}")
|
||||
@@ -953,16 +953,16 @@ class ErrorHandler:
|
||||
logger.warning(error_msg)
|
||||
else:
|
||||
logger.info(error_msg)
|
||||
|
||||
|
||||
def get_error_summary(self) -> Dict[str, Any]:
|
||||
"""Get summary of errors encountered"""
|
||||
if not self.error_log:
|
||||
return {"total_errors": 0}
|
||||
|
||||
|
||||
severity_counts = {}
|
||||
for error in self.error_log:
|
||||
severity_counts[error.severity.value] = severity_counts.get(error.severity.value, 0) + 1
|
||||
|
||||
|
||||
return {
|
||||
"total_errors": len(self.error_log),
|
||||
"severity_breakdown": severity_counts,
|
||||
@@ -1004,7 +1004,7 @@ def robust_task(self) -> Task:
|
||||
# Use fallback response
|
||||
return "Task failed, using fallback response"
|
||||
return wrapper
|
||||
|
||||
|
||||
return Task(
|
||||
config=self.tasks_config['research_task'],
|
||||
agent=self.researcher()
|
||||
@@ -1020,60 +1020,60 @@ from pydantic import BaseSettings, Field, validator
|
||||
|
||||
class Environment(str, Enum):
|
||||
DEVELOPMENT = "development"
|
||||
TESTING = "testing"
|
||||
TESTING = "testing"
|
||||
STAGING = "staging"
|
||||
PRODUCTION = "production"
|
||||
|
||||
class CrewAISettings(BaseSettings):
|
||||
"""Comprehensive settings management for CrewAI applications"""
|
||||
|
||||
|
||||
# Environment
|
||||
environment: Environment = Field(default=Environment.DEVELOPMENT)
|
||||
debug: bool = Field(default=True)
|
||||
|
||||
|
||||
# API Keys (loaded from environment)
|
||||
openai_api_key: Optional[str] = Field(default=None, env="OPENAI_API_KEY")
|
||||
anthropic_api_key: Optional[str] = Field(default=None, env="ANTHROPIC_API_KEY")
|
||||
serper_api_key: Optional[str] = Field(default=None, env="SERPER_API_KEY")
|
||||
mem0_api_key: Optional[str] = Field(default=None, env="MEM0_API_KEY")
|
||||
|
||||
|
||||
# CrewAI Configuration
|
||||
crew_max_rpm: int = Field(default=100)
|
||||
crew_max_execution_time: int = Field(default=3600) # 1 hour
|
||||
default_llm_model: str = Field(default="gpt-4")
|
||||
fallback_llm_model: str = Field(default="gpt-3.5-turbo")
|
||||
|
||||
|
||||
# Memory and Storage
|
||||
crewai_storage_dir: str = Field(default="./storage", env="CREWAI_STORAGE_DIR")
|
||||
memory_enabled: bool = Field(default=True)
|
||||
memory_cleanup_interval: int = Field(default=86400) # 24 hours in seconds
|
||||
|
||||
|
||||
# Performance
|
||||
enable_caching: bool = Field(default=True)
|
||||
max_retries: int = Field(default=3)
|
||||
retry_delay: float = Field(default=1.0)
|
||||
|
||||
|
||||
# Monitoring
|
||||
enable_monitoring: bool = Field(default=True)
|
||||
log_level: str = Field(default="INFO")
|
||||
metrics_export_interval: int = Field(default=3600) # 1 hour
|
||||
|
||||
|
||||
# Security
|
||||
input_sanitization: bool = Field(default=True)
|
||||
max_input_length: int = Field(default=10000)
|
||||
allowed_file_types: list = Field(default=["txt", "md", "pdf", "docx"])
|
||||
|
||||
|
||||
@validator('environment', pre=True)
|
||||
def set_debug_based_on_env(cls, v):
|
||||
return v
|
||||
|
||||
|
||||
@validator('debug')
|
||||
def set_debug_from_env(cls, v, values):
|
||||
env = values.get('environment')
|
||||
if env == Environment.PRODUCTION:
|
||||
return False
|
||||
return v
|
||||
|
||||
|
||||
@validator('openai_api_key')
|
||||
def validate_openai_key(cls, v):
|
||||
if not v:
|
||||
@@ -1081,15 +1081,15 @@ class CrewAISettings(BaseSettings):
|
||||
if not v.startswith('sk-'):
|
||||
raise ValueError("Invalid OpenAI API key format")
|
||||
return v
|
||||
|
||||
|
||||
@property
|
||||
def is_production(self) -> bool:
|
||||
return self.environment == Environment.PRODUCTION
|
||||
|
||||
|
||||
@property
|
||||
def is_development(self) -> bool:
|
||||
return self.environment == Environment.DEVELOPMENT
|
||||
|
||||
|
||||
def get_llm_config(self) -> Dict[str, Any]:
|
||||
"""Get LLM configuration based on environment"""
|
||||
config = {
|
||||
@@ -1098,12 +1098,12 @@ class CrewAISettings(BaseSettings):
|
||||
"max_tokens": 4000 if self.is_production else 2000,
|
||||
"timeout": 60
|
||||
}
|
||||
|
||||
|
||||
if self.is_development:
|
||||
config["model"] = self.fallback_llm_model
|
||||
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def get_memory_config(self) -> Dict[str, Any]:
|
||||
"""Get memory configuration"""
|
||||
return {
|
||||
@@ -1112,7 +1112,7 @@ class CrewAISettings(BaseSettings):
|
||||
"cleanup_interval": self.memory_cleanup_interval,
|
||||
"provider": "mem0" if self.mem0_api_key and self.is_production else "local"
|
||||
}
|
||||
|
||||
|
||||
class Config:
|
||||
env_file = ".env"
|
||||
env_file_encoding = 'utf-8'
|
||||
@@ -1125,25 +1125,25 @@ settings = CrewAISettings()
|
||||
@CrewBase
|
||||
class ConfigurableCrew():
|
||||
"""Crew that uses centralized configuration"""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.settings = settings
|
||||
self.validate_configuration()
|
||||
|
||||
|
||||
def validate_configuration(self):
|
||||
"""Validate configuration before crew execution"""
|
||||
required_keys = [self.settings.openai_api_key]
|
||||
if not all(required_keys):
|
||||
raise ConfigurationError("Missing required API keys")
|
||||
|
||||
|
||||
if not os.path.exists(self.settings.crewai_storage_dir):
|
||||
os.makedirs(self.settings.crewai_storage_dir, exist_ok=True)
|
||||
|
||||
|
||||
@agent
|
||||
def adaptive_agent(self) -> Agent:
|
||||
"""Agent that adapts to configuration"""
|
||||
llm_config = self.settings.get_llm_config()
|
||||
|
||||
|
||||
return Agent(
|
||||
config=self.agents_config['researcher'],
|
||||
llm=llm_config["model"],
|
||||
@@ -1163,7 +1163,7 @@ from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
class CrewAITestFramework:
|
||||
"""Comprehensive testing framework for CrewAI applications"""
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create_mock_agent(role: str = "test_agent", tools: list = None) -> Mock:
|
||||
"""Create a mock agent for testing"""
|
||||
@@ -1175,9 +1175,9 @@ class CrewAITestFramework:
|
||||
mock_agent.llm = "gpt-3.5-turbo"
|
||||
mock_agent.verbose = False
|
||||
return mock_agent
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create_mock_task_output(content: str, success: bool = True,
|
||||
def create_mock_task_output(content: str, success: bool = True,
|
||||
tokens: int = 100) -> TaskOutput:
|
||||
"""Create a mock task output for testing"""
|
||||
return TaskOutput(
|
||||
@@ -1187,13 +1187,13 @@ class CrewAITestFramework:
|
||||
pydantic=None,
|
||||
json_dict=None
|
||||
)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def create_test_crew(agents: list = None, tasks: list = None) -> Crew:
|
||||
"""Create a test crew with mock components"""
|
||||
test_agents = agents or [CrewAITestFramework.create_mock_agent()]
|
||||
test_tasks = tasks or []
|
||||
|
||||
|
||||
return Crew(
|
||||
agents=test_agents,
|
||||
tasks=test_tasks,
|
||||
@@ -1203,53 +1203,53 @@ class CrewAITestFramework:
|
||||
# Example test cases
|
||||
class TestResearchCrew:
|
||||
"""Test cases for research crew functionality"""
|
||||
|
||||
|
||||
def setup_method(self):
|
||||
"""Setup test environment"""
|
||||
self.framework = CrewAITestFramework()
|
||||
self.mock_serper = Mock()
|
||||
|
||||
|
||||
@patch('crewai_tools.SerperDevTool')
|
||||
def test_agent_creation(self, mock_serper_tool):
|
||||
"""Test agent creation with proper configuration"""
|
||||
mock_serper_tool.return_value = self.mock_serper
|
||||
|
||||
|
||||
crew = ResearchCrew()
|
||||
researcher = crew.researcher()
|
||||
|
||||
|
||||
assert researcher.role == "Senior Research Analyst"
|
||||
assert len(researcher.tools) > 0
|
||||
assert researcher.verbose is True
|
||||
|
||||
|
||||
def test_task_validation(self):
|
||||
"""Test task validation logic"""
|
||||
crew = ResearchCrew()
|
||||
|
||||
|
||||
# Test valid output
|
||||
valid_output = self.framework.create_mock_task_output(
|
||||
"This is a comprehensive research summary with conclusions and findings."
|
||||
)
|
||||
is_valid, message = crew.validate_research_quality(valid_output)
|
||||
assert is_valid is True
|
||||
|
||||
|
||||
# Test invalid output (too short)
|
||||
invalid_output = self.framework.create_mock_task_output("Too short")
|
||||
is_valid, message = crew.validate_research_quality(invalid_output)
|
||||
assert is_valid is False
|
||||
assert "brief" in message.lower()
|
||||
|
||||
|
||||
@patch('requests.get')
|
||||
def test_tool_error_handling(self, mock_requests):
|
||||
"""Test tool error handling and recovery"""
|
||||
# Simulate network error
|
||||
mock_requests.side_effect = requests.exceptions.RequestException("Network error")
|
||||
|
||||
|
||||
tool = RobustSearchTool()
|
||||
result = tool._run("test query")
|
||||
|
||||
|
||||
assert "network error" in result.lower()
|
||||
assert "failed" in result.lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_crew_execution_flow(self):
|
||||
"""Test complete crew execution with mocked dependencies"""
|
||||
@@ -1257,18 +1257,18 @@ class TestResearchCrew:
|
||||
mock_execute.return_value = self.framework.create_mock_task_output(
|
||||
"Research completed successfully with findings and recommendations."
|
||||
)
|
||||
|
||||
|
||||
crew = ResearchCrew()
|
||||
result = crew.crew().kickoff(inputs={"topic": "AI testing"})
|
||||
|
||||
|
||||
assert result is not None
|
||||
assert "successfully" in result.raw.lower()
|
||||
|
||||
|
||||
def test_memory_integration(self):
|
||||
"""Test memory system integration"""
|
||||
crew = ResearchCrew()
|
||||
memory_manager = AdvancedMemoryManager(crew)
|
||||
|
||||
|
||||
# Test saving to memory
|
||||
test_content = "Important research finding about AI"
|
||||
memory_manager.save_with_context(
|
||||
@@ -1277,34 +1277,34 @@ class TestResearchCrew:
|
||||
metadata={"importance": "high"},
|
||||
agent="researcher"
|
||||
)
|
||||
|
||||
|
||||
# Test searching memory
|
||||
results = memory_manager.search_across_memories("AI research")
|
||||
assert "short_term" in results
|
||||
|
||||
|
||||
def test_error_handling_workflow(self):
|
||||
"""Test error handling and recovery mechanisms"""
|
||||
error_handler = ErrorHandler("test_crew")
|
||||
|
||||
|
||||
# Test error registration and handling
|
||||
test_error = TaskExecutionError("Test task failed", ErrorSeverity.MEDIUM)
|
||||
result = error_handler.handle_error(test_error)
|
||||
|
||||
|
||||
assert len(error_handler.error_log) == 1
|
||||
assert error_handler.error_log[0].severity == ErrorSeverity.MEDIUM
|
||||
|
||||
|
||||
def test_configuration_validation(self):
|
||||
"""Test configuration validation"""
|
||||
# Test with missing API key
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
with pytest.raises(ValueError):
|
||||
settings = CrewAISettings()
|
||||
|
||||
|
||||
# Test with valid configuration
|
||||
with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-key"}):
|
||||
settings = CrewAISettings()
|
||||
assert settings.openai_api_key == "sk-test-key"
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_end_to_end_workflow(self):
|
||||
"""Integration test for complete workflow"""
|
||||
@@ -1315,41 +1315,41 @@ class TestResearchCrew:
|
||||
# Performance testing
|
||||
class TestCrewPerformance:
|
||||
"""Performance tests for CrewAI applications"""
|
||||
|
||||
|
||||
def test_memory_usage(self):
|
||||
"""Test memory usage during crew execution"""
|
||||
import psutil
|
||||
import gc
|
||||
|
||||
|
||||
process = psutil.Process()
|
||||
initial_memory = process.memory_info().rss
|
||||
|
||||
|
||||
# Create and run crew multiple times
|
||||
for i in range(10):
|
||||
crew = ResearchCrew()
|
||||
# Simulate crew execution
|
||||
del crew
|
||||
gc.collect()
|
||||
|
||||
|
||||
final_memory = process.memory_info().rss
|
||||
memory_increase = final_memory - initial_memory
|
||||
|
||||
|
||||
# Assert memory increase is reasonable (less than 100MB)
|
||||
assert memory_increase < 100 * 1024 * 1024
|
||||
|
||||
|
||||
def test_concurrent_execution(self):
|
||||
"""Test concurrent crew execution"""
|
||||
import concurrent.futures
|
||||
|
||||
|
||||
def run_crew(crew_id):
|
||||
crew = ResearchCrew()
|
||||
# Simulate execution
|
||||
return f"crew_{crew_id}_completed"
|
||||
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
|
||||
futures = [executor.submit(run_crew, i) for i in range(5)]
|
||||
results = [future.result() for future in futures]
|
||||
|
||||
|
||||
assert len(results) == 5
|
||||
assert all("completed" in result for result in results)
|
||||
|
||||
@@ -1400,7 +1400,7 @@ class TestCrewPerformance:
|
||||
|
||||
### Development:
|
||||
1. Always use .env files for sensitive configuration
|
||||
2. Implement comprehensive error handling and logging
|
||||
2. Implement comprehensive error handling and logging
|
||||
3. Use structured outputs with Pydantic for reliability
|
||||
4. Test crew functionality with different input scenarios
|
||||
5. Follow CrewAI patterns and conventions consistently
|
||||
@@ -1426,4 +1426,4 @@ class TestCrewPerformance:
|
||||
5. Use async patterns for I/O-bound operations
|
||||
6. Implement proper connection pooling and resource management
|
||||
7. Profile and optimize critical paths
|
||||
8. Plan for horizontal scaling when needed
|
||||
8. Plan for horizontal scaling when needed
|
||||
|
||||
20
.github/workflows/tests.yml
vendored
20
.github/workflows/tests.yml
vendored
@@ -7,14 +7,18 @@ permissions:
|
||||
|
||||
env:
|
||||
OPENAI_API_KEY: fake-api-key
|
||||
PYTHONUNBUFFERED: 1
|
||||
|
||||
jobs:
|
||||
tests:
|
||||
name: tests (${{ matrix.python-version }})
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 15
|
||||
strategy:
|
||||
fail-fast: true
|
||||
matrix:
|
||||
python-version: ['3.10', '3.11', '3.12', '3.13']
|
||||
group: [1, 2, 3, 4, 5, 6, 7, 8]
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
@@ -23,6 +27,9 @@ jobs:
|
||||
uses: astral-sh/setup-uv@v3
|
||||
with:
|
||||
enable-cache: true
|
||||
cache-dependency-glob: |
|
||||
**/pyproject.toml
|
||||
**/uv.lock
|
||||
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
run: uv python install ${{ matrix.python-version }}
|
||||
@@ -30,5 +37,14 @@ jobs:
|
||||
- name: Install the project
|
||||
run: uv sync --dev --all-extras
|
||||
|
||||
- name: Run tests
|
||||
run: uv run pytest --block-network --timeout=60 -vv
|
||||
- name: Run tests (group ${{ matrix.group }} of 8)
|
||||
run: |
|
||||
uv run pytest \
|
||||
--block-network \
|
||||
--timeout=30 \
|
||||
-vv \
|
||||
--splits 8 \
|
||||
--group ${{ matrix.group }} \
|
||||
--durations=10 \
|
||||
-n auto \
|
||||
--maxfail=3
|
||||
|
||||
@@ -5,3 +5,7 @@ repos:
|
||||
- id: ruff
|
||||
args: ["--fix"]
|
||||
- id: ruff-format
|
||||
- repo: https://github.com/commitizen-tools/commitizen
|
||||
rev: v3.13.0
|
||||
hooks:
|
||||
- id: commitizen
|
||||
|
||||
@@ -94,7 +94,7 @@
|
||||
"pages": [
|
||||
"en/guides/advanced/customizing-prompts",
|
||||
"en/guides/advanced/fingerprinting"
|
||||
|
||||
|
||||
]
|
||||
}
|
||||
]
|
||||
@@ -296,7 +296,8 @@
|
||||
"en/enterprise/features/webhook-streaming",
|
||||
"en/enterprise/features/traces",
|
||||
"en/enterprise/features/hallucination-guardrail",
|
||||
"en/enterprise/features/integrations"
|
||||
"en/enterprise/features/integrations",
|
||||
"en/enterprise/features/agent-repositories"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -373,7 +374,7 @@
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -730,7 +731,7 @@
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
]
|
||||
}
|
||||
]
|
||||
@@ -774,7 +775,7 @@
|
||||
"destination": "/en/introduction"
|
||||
},
|
||||
{
|
||||
"source": "/installation",
|
||||
"source": "/installation",
|
||||
"destination": "/en/installation"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -526,6 +526,103 @@ agent = Agent(
|
||||
The context window management feature works automatically in the background. You don't need to call any special functions - just set `respect_context_window` to your preferred behavior and CrewAI handles the rest!
|
||||
</Note>
|
||||
|
||||
## Direct Agent Interaction with `kickoff()`
|
||||
|
||||
Agents can be used directly without going through a task or crew workflow using the `kickoff()` method. This provides a simpler way to interact with an agent when you don't need the full crew orchestration capabilities.
|
||||
|
||||
### How `kickoff()` Works
|
||||
|
||||
The `kickoff()` method allows you to send messages directly to an agent and get a response, similar to how you would interact with an LLM but with all the agent's capabilities (tools, reasoning, etc.).
|
||||
|
||||
```python Code
|
||||
from crewai import Agent
|
||||
from crewai_tools import SerperDevTool
|
||||
|
||||
# Create an agent
|
||||
researcher = Agent(
|
||||
role="AI Technology Researcher",
|
||||
goal="Research the latest AI developments",
|
||||
tools=[SerperDevTool()],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Use kickoff() to interact directly with the agent
|
||||
result = researcher.kickoff("What are the latest developments in language models?")
|
||||
|
||||
# Access the raw response
|
||||
print(result.raw)
|
||||
```
|
||||
|
||||
### Parameters and Return Values
|
||||
|
||||
| Parameter | Type | Description |
|
||||
| :---------------- | :---------------------------------- | :------------------------------------------------------------------------ |
|
||||
| `messages` | `Union[str, List[Dict[str, str]]]` | Either a string query or a list of message dictionaries with role/content |
|
||||
| `response_format` | `Optional[Type[Any]]` | Optional Pydantic model for structured output |
|
||||
|
||||
The method returns a `LiteAgentOutput` object with the following properties:
|
||||
|
||||
- `raw`: String containing the raw output text
|
||||
- `pydantic`: Parsed Pydantic model (if a `response_format` was provided)
|
||||
- `agent_role`: Role of the agent that produced the output
|
||||
- `usage_metrics`: Token usage metrics for the execution
|
||||
|
||||
### Structured Output
|
||||
|
||||
You can get structured output by providing a Pydantic model as the `response_format`:
|
||||
|
||||
```python Code
|
||||
from pydantic import BaseModel
|
||||
from typing import List
|
||||
|
||||
class ResearchFindings(BaseModel):
|
||||
main_points: List[str]
|
||||
key_technologies: List[str]
|
||||
future_predictions: str
|
||||
|
||||
# Get structured output
|
||||
result = researcher.kickoff(
|
||||
"Summarize the latest developments in AI for 2025",
|
||||
response_format=ResearchFindings
|
||||
)
|
||||
|
||||
# Access structured data
|
||||
print(result.pydantic.main_points)
|
||||
print(result.pydantic.future_predictions)
|
||||
```
|
||||
|
||||
### Multiple Messages
|
||||
|
||||
You can also provide a conversation history as a list of message dictionaries:
|
||||
|
||||
```python Code
|
||||
messages = [
|
||||
{"role": "user", "content": "I need information about large language models"},
|
||||
{"role": "assistant", "content": "I'd be happy to help with that! What specifically would you like to know?"},
|
||||
{"role": "user", "content": "What are the latest developments in 2025?"}
|
||||
]
|
||||
|
||||
result = researcher.kickoff(messages)
|
||||
```
|
||||
|
||||
### Async Support
|
||||
|
||||
An asynchronous version is available via `kickoff_async()` with the same parameters:
|
||||
|
||||
```python Code
|
||||
import asyncio
|
||||
|
||||
async def main():
|
||||
result = await researcher.kickoff_async("What are the latest developments in AI?")
|
||||
print(result.raw)
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
<Note>
|
||||
The `kickoff()` method uses a `LiteAgent` internally, which provides a simpler execution flow while preserving all of the agent's configuration (role, goal, backstory, tools, etc.).
|
||||
</Note>
|
||||
|
||||
## Important Considerations and Best Practices
|
||||
|
||||
### Security and Code Execution
|
||||
|
||||
155
docs/en/enterprise/features/agent-repositories.mdx
Normal file
155
docs/en/enterprise/features/agent-repositories.mdx
Normal file
@@ -0,0 +1,155 @@
|
||||
---
|
||||
title: 'Agent Repositories'
|
||||
description: 'Learn how to use Agent Repositories to share and reuse your agents across teams and projects'
|
||||
icon: 'database'
|
||||
---
|
||||
|
||||
Agent Repositories allow enterprise users to store, share, and reuse agent definitions across teams and projects. This feature enables organizations to maintain a centralized library of standardized agents, promoting consistency and reducing duplication of effort.
|
||||
|
||||
## Benefits of Agent Repositories
|
||||
|
||||
- **Standardization**: Maintain consistent agent definitions across your organization
|
||||
- **Reusability**: Create an agent once and use it in multiple crews and projects
|
||||
- **Governance**: Implement organization-wide policies for agent configurations
|
||||
- **Collaboration**: Enable teams to share and build upon each other's work
|
||||
|
||||
## Using Agent Repositories
|
||||
|
||||
### Prerequisites
|
||||
|
||||
1. You must have an account at CrewAI, try the [free plan](https://app.crewai.com).
|
||||
2. You need to be authenticated using the CrewAI CLI.
|
||||
3. If you have more than one organization, make sure you are switched to the correct organization using the CLI command:
|
||||
|
||||
```bash
|
||||
crewai org switch <org_id>
|
||||
```
|
||||
|
||||
### Creating and Managing Agents in Repositories
|
||||
|
||||
To create and manage agents in repositories,Enterprise Dashboard.
|
||||
|
||||
### Loading Agents from Repositories
|
||||
|
||||
You can load agents from repositories in your code using the `from_repository` parameter:
|
||||
|
||||
```python
|
||||
from crewai import Agent
|
||||
|
||||
# Create an agent by loading it from a repository
|
||||
# The agent is loaded with all its predefined configurations
|
||||
researcher = Agent(
|
||||
from_repository="market-research-agent"
|
||||
)
|
||||
|
||||
```
|
||||
|
||||
### Overriding Repository Settings
|
||||
|
||||
You can override specific settings from the repository by providing them in the configuration:
|
||||
|
||||
```python
|
||||
researcher = Agent(
|
||||
from_repository="market-research-agent",
|
||||
goal="Research the latest trends in AI development", # Override the repository goal
|
||||
verbose=True # Add a setting not in the repository
|
||||
)
|
||||
```
|
||||
|
||||
### Example: Creating a Crew with Repository Agents
|
||||
|
||||
```python
|
||||
from crewai import Crew, Agent, Task
|
||||
|
||||
# Load agents from repositories
|
||||
researcher = Agent(
|
||||
from_repository="market-research-agent"
|
||||
)
|
||||
|
||||
writer = Agent(
|
||||
from_repository="content-writer-agent"
|
||||
)
|
||||
|
||||
# Create tasks
|
||||
research_task = Task(
|
||||
description="Research the latest trends in AI",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
writing_task = Task(
|
||||
description="Write a comprehensive report based on the research",
|
||||
agent=writer
|
||||
)
|
||||
|
||||
# Create the crew
|
||||
crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
tasks=[research_task, writing_task],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Run the crew
|
||||
result = crew.kickoff()
|
||||
```
|
||||
|
||||
### Example: Using `kickoff()` with Repository Agents
|
||||
|
||||
You can also use repository agents directly with the `kickoff()` method for simpler interactions:
|
||||
|
||||
```python
|
||||
from crewai import Agent
|
||||
from pydantic import BaseModel
|
||||
from typing import List
|
||||
|
||||
# Define a structured output format
|
||||
class MarketAnalysis(BaseModel):
|
||||
key_trends: List[str]
|
||||
opportunities: List[str]
|
||||
recommendation: str
|
||||
|
||||
# Load an agent from repository
|
||||
analyst = Agent(
|
||||
from_repository="market-analyst-agent",
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Get a free-form response
|
||||
result = analyst.kickoff("Analyze the AI market in 2025")
|
||||
print(result.raw) # Access the raw response
|
||||
|
||||
# Get structured output
|
||||
structured_result = analyst.kickoff(
|
||||
"Provide a structured analysis of the AI market in 2025",
|
||||
response_format=MarketAnalysis
|
||||
)
|
||||
|
||||
# Access structured data
|
||||
print(f"Key Trends: {structured_result.pydantic.key_trends}")
|
||||
print(f"Recommendation: {structured_result.pydantic.recommendation}")
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Naming Convention**: Use clear, descriptive names for your repository agents
|
||||
2. **Documentation**: Include comprehensive descriptions for each agent
|
||||
3. **Tool Management**: Ensure that tools referenced by repository agents are available in your environment
|
||||
4. **Access Control**: Manage permissions to ensure only authorized team members can modify repository agents
|
||||
|
||||
## Organization Management
|
||||
|
||||
To switch between organizations or see your current organization, use the CrewAI CLI:
|
||||
|
||||
```bash
|
||||
# View current organization
|
||||
crewai org current
|
||||
|
||||
# Switch to a different organization
|
||||
crewai org switch <org_id>
|
||||
|
||||
# List all available organizations
|
||||
crewai org list
|
||||
```
|
||||
|
||||
<Note>
|
||||
When loading agents from repositories, you must be authenticated and switched to the correct organization. If you receive errors, check your authentication status and organization settings using the CLI commands above.
|
||||
</Note>
|
||||
@@ -33,7 +33,7 @@ dependencies = [
|
||||
"click>=8.1.7",
|
||||
"appdirs>=1.4.4",
|
||||
"jsonref>=1.1.0",
|
||||
"json-repair>=0.25.2",
|
||||
"json-repair==0.25.2",
|
||||
"uv>=0.4.25",
|
||||
"tomli-w>=1.1.0",
|
||||
"tomli>=2.0.2",
|
||||
@@ -47,11 +47,11 @@ Documentation = "https://docs.crewai.com"
|
||||
Repository = "https://github.com/crewAIInc/crewAI"
|
||||
|
||||
[project.optional-dependencies]
|
||||
tools = ["crewai-tools~=0.48.0"]
|
||||
tools = ["crewai-tools~=0.49.0"]
|
||||
embeddings = [
|
||||
"tiktoken~=0.8.0"
|
||||
]
|
||||
agentops = ["agentops>=0.3.0"]
|
||||
agentops = ["agentops==0.3.18"]
|
||||
pdfplumber = [
|
||||
"pdfplumber>=0.11.4",
|
||||
]
|
||||
@@ -83,6 +83,8 @@ dev-dependencies = [
|
||||
"pytest-recording>=0.13.2",
|
||||
"pytest-randomly>=3.16.0",
|
||||
"pytest-timeout>=2.3.1",
|
||||
"pytest-xdist>=3.6.1",
|
||||
"pytest-split>=0.9.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
@@ -123,3 +125,15 @@ path = "src/crewai/__init__.py"
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
exclude = [
|
||||
"docs/**",
|
||||
"docs/",
|
||||
]
|
||||
|
||||
[tool.hatch.build.targets.sdist]
|
||||
exclude = [
|
||||
"docs/**",
|
||||
"docs/",
|
||||
]
|
||||
|
||||
@@ -28,19 +28,19 @@ _telemetry_submitted = False
|
||||
def _track_install():
|
||||
"""Track package installation/first-use via Scarf analytics."""
|
||||
global _telemetry_submitted
|
||||
|
||||
|
||||
if _telemetry_submitted or Telemetry._is_telemetry_disabled():
|
||||
return
|
||||
|
||||
|
||||
try:
|
||||
pixel_url = "https://api.scarf.sh/v2/packages/CrewAI/crewai/docs/00f2dad1-8334-4a39-934e-003b2e1146db"
|
||||
|
||||
|
||||
req = urllib.request.Request(pixel_url)
|
||||
req.add_header('User-Agent', f'CrewAI-Python/{__version__}')
|
||||
|
||||
|
||||
with urllib.request.urlopen(req, timeout=2): # nosec B310
|
||||
_telemetry_submitted = True
|
||||
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -54,7 +54,7 @@ def _track_install_async():
|
||||
|
||||
_track_install_async()
|
||||
|
||||
__version__ = "0.134.0"
|
||||
__version__ = "0.140.0"
|
||||
__all__ = [
|
||||
"Agent",
|
||||
"Crew",
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]>=0.134.0,<1.0.0"
|
||||
"crewai[tools]>=0.140.0,<1.0.0"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]>=0.134.0,<1.0.0",
|
||||
"crewai[tools]>=0.140.0,<1.0.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]>=0.134.0"
|
||||
"crewai[tools]>=0.140.0"
|
||||
]
|
||||
|
||||
[tool.crewai]
|
||||
|
||||
@@ -18,6 +18,11 @@ from typing import (
|
||||
cast,
|
||||
)
|
||||
|
||||
from opentelemetry import baggage
|
||||
from opentelemetry.context import attach, detach
|
||||
|
||||
from crewai.utilities.crew.models import CrewContext
|
||||
|
||||
from pydantic import (
|
||||
UUID4,
|
||||
BaseModel,
|
||||
@@ -616,6 +621,11 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self,
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
) -> CrewOutput:
|
||||
ctx = baggage.set_baggage(
|
||||
"crew_context", CrewContext(id=str(self.id), key=self.key)
|
||||
)
|
||||
token = attach(ctx)
|
||||
|
||||
try:
|
||||
for before_callback in self.before_kickoff_callbacks:
|
||||
if inputs is None:
|
||||
@@ -676,6 +686,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
CrewKickoffFailedEvent(error=str(e), crew_name=self.name or "crew"),
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
detach(token)
|
||||
|
||||
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
|
||||
"""Executes the Crew's workflow for each input in the list and aggregates results."""
|
||||
|
||||
1
src/crewai/utilities/crew/__init__.py
Normal file
1
src/crewai/utilities/crew/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Crew-specific utilities."""
|
||||
16
src/crewai/utilities/crew/crew_context.py
Normal file
16
src/crewai/utilities/crew/crew_context.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""Context management utilities for tracking crew and task execution context using OpenTelemetry baggage."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from opentelemetry import baggage
|
||||
|
||||
from crewai.utilities.crew.models import CrewContext
|
||||
|
||||
|
||||
def get_crew_context() -> Optional[CrewContext]:
|
||||
"""Get the current crew context from OpenTelemetry baggage.
|
||||
|
||||
Returns:
|
||||
CrewContext instance containing crew context information, or None if no context is set
|
||||
"""
|
||||
return baggage.get_baggage("crew_context")
|
||||
16
src/crewai/utilities/crew/models.py
Normal file
16
src/crewai/utilities/crew/models.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""Models for crew-related data structures."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class CrewContext(BaseModel):
|
||||
"""Model representing crew context information."""
|
||||
|
||||
id: Optional[str] = Field(
|
||||
default=None, description="Unique identifier for the crew"
|
||||
)
|
||||
key: Optional[str] = Field(
|
||||
default=None, description="Optional crew key/name for identification"
|
||||
)
|
||||
@@ -1,3 +1,4 @@
|
||||
from inspect import getsource
|
||||
from typing import Any, Callable, Optional, Union
|
||||
|
||||
from crewai.utilities.events.base_events import BaseEvent
|
||||
@@ -16,23 +17,26 @@ class LLMGuardrailStartedEvent(BaseEvent):
|
||||
retry_count: int
|
||||
|
||||
def __init__(self, **data):
|
||||
from inspect import getsource
|
||||
|
||||
from crewai.tasks.llm_guardrail import LLMGuardrail
|
||||
from crewai.tasks.hallucination_guardrail import HallucinationGuardrail
|
||||
|
||||
super().__init__(**data)
|
||||
|
||||
if isinstance(self.guardrail, LLMGuardrail) or isinstance(
|
||||
self.guardrail, HallucinationGuardrail
|
||||
):
|
||||
if isinstance(self.guardrail, (LLMGuardrail, HallucinationGuardrail)):
|
||||
self.guardrail = self.guardrail.description.strip()
|
||||
elif isinstance(self.guardrail, Callable):
|
||||
self.guardrail = getsource(self.guardrail).strip()
|
||||
|
||||
|
||||
class LLMGuardrailCompletedEvent(BaseEvent):
|
||||
"""Event emitted when a guardrail task completes"""
|
||||
"""Event emitted when a guardrail task completes
|
||||
|
||||
Attributes:
|
||||
success: Whether the guardrail validation passed
|
||||
result: The validation result
|
||||
error: Error message if validation failed
|
||||
retry_count: The number of times the guardrail has been retried
|
||||
"""
|
||||
|
||||
type: str = "llm_guardrail_completed"
|
||||
success: bool
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -52,6 +52,7 @@ from crewai.utilities.events.memory_events import (
|
||||
MemoryRetrievalCompletedEvent,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ceo():
|
||||
return Agent(
|
||||
@@ -935,12 +936,27 @@ def test_cache_hitting_between_agents(researcher, writer, ceo):
|
||||
read.return_value = "12"
|
||||
crew.kickoff()
|
||||
assert read.call_count == 2, "read was not called exactly twice"
|
||||
# Check if read was called with the expected arguments
|
||||
expected_calls = [
|
||||
call(tool="multiplier", input={"first_number": 2, "second_number": 6}),
|
||||
call(tool="multiplier", input={"first_number": 2, "second_number": 6}),
|
||||
|
||||
# Filter the mock calls to only include the ones with 'tool' and 'input' keywords
|
||||
cache_calls = [
|
||||
call
|
||||
for call in read.call_args_list
|
||||
if len(call.kwargs) == 2
|
||||
and "tool" in call.kwargs
|
||||
and "input" in call.kwargs
|
||||
]
|
||||
read.assert_has_calls(expected_calls, any_order=False)
|
||||
|
||||
# Check if we have the expected number of cache calls
|
||||
assert len(cache_calls) == 2, f"Expected 2 cache calls, got {len(cache_calls)}"
|
||||
|
||||
# Check if both calls were made with the expected arguments
|
||||
expected_call = call(
|
||||
tool="multiplier", input={"first_number": 2, "second_number": 6}
|
||||
)
|
||||
assert cache_calls[0] == expected_call, f"First call mismatch: {cache_calls[0]}"
|
||||
assert (
|
||||
cache_calls[1] == expected_call
|
||||
), f"Second call mismatch: {cache_calls[1]}"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@@ -1797,7 +1813,7 @@ def test_hierarchical_kickoff_usage_metrics_include_manager(researcher):
|
||||
agent=researcher, # *regular* agent
|
||||
)
|
||||
|
||||
# ── 2. Stub out each agent’s _token_process.get_summary() ───────────────────
|
||||
# ── 2. Stub out each agent's _token_process.get_summary() ───────────────────
|
||||
researcher_metrics = UsageMetrics(
|
||||
total_tokens=120, prompt_tokens=80, completion_tokens=40, successful_requests=2
|
||||
)
|
||||
@@ -1821,7 +1837,7 @@ def test_hierarchical_kickoff_usage_metrics_include_manager(researcher):
|
||||
process=Process.hierarchical,
|
||||
)
|
||||
|
||||
# We don’t care about LLM output here; patch execute_sync to avoid network
|
||||
# We don't care about LLM output here; patch execute_sync to avoid network
|
||||
with patch.object(
|
||||
Task,
|
||||
"execute_sync",
|
||||
@@ -2489,17 +2505,19 @@ def test_using_contextual_memory():
|
||||
memory=True,
|
||||
)
|
||||
|
||||
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
|
||||
with patch.object(
|
||||
ContextualMemory, "build_context_for_task", return_value=""
|
||||
) as contextual_mem:
|
||||
crew.kickoff()
|
||||
contextual_mem.assert_called_once()
|
||||
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_memory_events_are_emitted():
|
||||
events = defaultdict(list)
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(MemorySaveStartedEvent)
|
||||
def handle_memory_save_started(source, event):
|
||||
events["MemorySaveStartedEvent"].append(event)
|
||||
@@ -2562,6 +2580,7 @@ def test_memory_events_are_emitted():
|
||||
assert len(events["MemoryRetrievalStartedEvent"]) == 1
|
||||
assert len(events["MemoryRetrievalCompletedEvent"]) == 1
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_using_contextual_memory_with_long_term_memory():
|
||||
from unittest.mock import patch
|
||||
@@ -2585,7 +2604,9 @@ def test_using_contextual_memory_with_long_term_memory():
|
||||
long_term_memory=LongTermMemory(),
|
||||
)
|
||||
|
||||
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
|
||||
with patch.object(
|
||||
ContextualMemory, "build_context_for_task", return_value=""
|
||||
) as contextual_mem:
|
||||
crew.kickoff()
|
||||
contextual_mem.assert_called_once()
|
||||
assert crew.memory is False
|
||||
@@ -2686,7 +2707,9 @@ def test_using_contextual_memory_with_short_term_memory():
|
||||
short_term_memory=ShortTermMemory(),
|
||||
)
|
||||
|
||||
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
|
||||
with patch.object(
|
||||
ContextualMemory, "build_context_for_task", return_value=""
|
||||
) as contextual_mem:
|
||||
crew.kickoff()
|
||||
contextual_mem.assert_called_once()
|
||||
assert crew.memory is False
|
||||
@@ -2715,7 +2738,9 @@ def test_disabled_memory_using_contextual_memory():
|
||||
memory=False,
|
||||
)
|
||||
|
||||
with patch.object(ContextualMemory, "build_context_for_task", return_value="") as contextual_mem:
|
||||
with patch.object(
|
||||
ContextualMemory, "build_context_for_task", return_value=""
|
||||
) as contextual_mem:
|
||||
crew.kickoff()
|
||||
contextual_mem.assert_not_called()
|
||||
|
||||
|
||||
226
tests/test_crew_thread_safety.py
Normal file
226
tests/test_crew_thread_safety.py
Normal file
@@ -0,0 +1,226 @@
|
||||
import asyncio
|
||||
import threading
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Dict, Any, Callable
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.utilities.crew.crew_context import get_crew_context
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_agent_factory():
|
||||
def create_agent(name: str) -> Agent:
|
||||
return Agent(
|
||||
role=f"{name} Agent",
|
||||
goal=f"Complete {name} task",
|
||||
backstory=f"I am agent for {name}",
|
||||
)
|
||||
|
||||
return create_agent
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_task_factory():
|
||||
def create_task(name: str, callback: Callable = None) -> Task:
|
||||
return Task(
|
||||
description=f"Task for {name}", expected_output="Done", callback=callback
|
||||
)
|
||||
|
||||
return create_task
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def crew_factory(simple_agent_factory, simple_task_factory):
|
||||
def create_crew(name: str, task_callback: Callable = None) -> Crew:
|
||||
agent = simple_agent_factory(name)
|
||||
task = simple_task_factory(name, callback=task_callback)
|
||||
task.agent = agent
|
||||
|
||||
return Crew(agents=[agent], tasks=[task], verbose=False)
|
||||
|
||||
return create_crew
|
||||
|
||||
|
||||
class TestCrewThreadSafety:
|
||||
@patch("crewai.Agent.execute_task")
|
||||
def test_parallel_crews_thread_safety(self, mock_execute_task, crew_factory):
|
||||
mock_execute_task.return_value = "Task completed"
|
||||
num_crews = 5
|
||||
|
||||
def run_crew_with_context_check(crew_id: str) -> Dict[str, Any]:
|
||||
results = {"crew_id": crew_id, "contexts": []}
|
||||
|
||||
def check_context_task(output):
|
||||
context = get_crew_context()
|
||||
results["contexts"].append(
|
||||
{
|
||||
"stage": "task_callback",
|
||||
"crew_id": context.id if context else None,
|
||||
"crew_key": context.key if context else None,
|
||||
"thread": threading.current_thread().name,
|
||||
}
|
||||
)
|
||||
return output
|
||||
|
||||
context_before = get_crew_context()
|
||||
results["contexts"].append(
|
||||
{
|
||||
"stage": "before_kickoff",
|
||||
"crew_id": context_before.id if context_before else None,
|
||||
"thread": threading.current_thread().name,
|
||||
}
|
||||
)
|
||||
|
||||
crew = crew_factory(crew_id, task_callback=check_context_task)
|
||||
output = crew.kickoff()
|
||||
|
||||
context_after = get_crew_context()
|
||||
results["contexts"].append(
|
||||
{
|
||||
"stage": "after_kickoff",
|
||||
"crew_id": context_after.id if context_after else None,
|
||||
"thread": threading.current_thread().name,
|
||||
}
|
||||
)
|
||||
|
||||
results["crew_uuid"] = str(crew.id)
|
||||
results["output"] = output.raw
|
||||
|
||||
return results
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_crews) as executor:
|
||||
futures = []
|
||||
for i in range(num_crews):
|
||||
future = executor.submit(run_crew_with_context_check, f"crew_{i}")
|
||||
futures.append(future)
|
||||
|
||||
results = [f.result() for f in futures]
|
||||
|
||||
for result in results:
|
||||
crew_uuid = result["crew_uuid"]
|
||||
|
||||
before_ctx = next(
|
||||
ctx for ctx in result["contexts"] if ctx["stage"] == "before_kickoff"
|
||||
)
|
||||
assert (
|
||||
before_ctx["crew_id"] is None
|
||||
), f"Context should be None before kickoff for {result['crew_id']}"
|
||||
|
||||
task_ctx = next(
|
||||
ctx for ctx in result["contexts"] if ctx["stage"] == "task_callback"
|
||||
)
|
||||
assert (
|
||||
task_ctx["crew_id"] == crew_uuid
|
||||
), f"Context mismatch during task for {result['crew_id']}"
|
||||
|
||||
after_ctx = next(
|
||||
ctx for ctx in result["contexts"] if ctx["stage"] == "after_kickoff"
|
||||
)
|
||||
assert (
|
||||
after_ctx["crew_id"] is None
|
||||
), f"Context should be None after kickoff for {result['crew_id']}"
|
||||
|
||||
thread_name = before_ctx["thread"]
|
||||
assert (
|
||||
"ThreadPoolExecutor" in thread_name
|
||||
), f"Should run in thread pool for {result['crew_id']}"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch("crewai.Agent.execute_task")
|
||||
async def test_async_crews_thread_safety(self, mock_execute_task, crew_factory):
|
||||
mock_execute_task.return_value = "Task completed"
|
||||
num_crews = 5
|
||||
|
||||
async def run_crew_async(crew_id: str) -> Dict[str, Any]:
|
||||
task_context = {"crew_id": crew_id, "context": None}
|
||||
|
||||
def capture_context(output):
|
||||
ctx = get_crew_context()
|
||||
task_context["context"] = {
|
||||
"crew_id": ctx.id if ctx else None,
|
||||
"crew_key": ctx.key if ctx else None,
|
||||
}
|
||||
return output
|
||||
|
||||
crew = crew_factory(crew_id, task_callback=capture_context)
|
||||
output = await crew.kickoff_async()
|
||||
|
||||
return {
|
||||
"crew_id": crew_id,
|
||||
"crew_uuid": str(crew.id),
|
||||
"output": output.raw,
|
||||
"task_context": task_context,
|
||||
}
|
||||
|
||||
tasks = [run_crew_async(f"async_crew_{i}") for i in range(num_crews)]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
for result in results:
|
||||
crew_uuid = result["crew_uuid"]
|
||||
task_ctx = result["task_context"]["context"]
|
||||
|
||||
assert (
|
||||
task_ctx is not None
|
||||
), f"Context should exist during task for {result['crew_id']}"
|
||||
assert (
|
||||
task_ctx["crew_id"] == crew_uuid
|
||||
), f"Context mismatch for {result['crew_id']}"
|
||||
|
||||
@patch("crewai.Agent.execute_task")
|
||||
def test_concurrent_kickoff_for_each(self, mock_execute_task, crew_factory):
|
||||
mock_execute_task.return_value = "Task completed"
|
||||
contexts_captured = []
|
||||
|
||||
def capture_context(output):
|
||||
ctx = get_crew_context()
|
||||
contexts_captured.append(
|
||||
{
|
||||
"context_id": ctx.id if ctx else None,
|
||||
"thread": threading.current_thread().name,
|
||||
}
|
||||
)
|
||||
return output
|
||||
|
||||
crew = crew_factory("for_each_test", task_callback=capture_context)
|
||||
inputs = [{"item": f"input_{i}"} for i in range(3)]
|
||||
|
||||
results = crew.kickoff_for_each(inputs=inputs)
|
||||
|
||||
assert len(results) == len(inputs)
|
||||
assert len(contexts_captured) == len(inputs)
|
||||
|
||||
context_ids = [ctx["context_id"] for ctx in contexts_captured]
|
||||
assert len(set(context_ids)) == len(
|
||||
inputs
|
||||
), "Each execution should have unique context"
|
||||
|
||||
@patch("crewai.Agent.execute_task")
|
||||
def test_no_context_leakage_between_crews(self, mock_execute_task, crew_factory):
|
||||
mock_execute_task.return_value = "Task completed"
|
||||
contexts = []
|
||||
|
||||
def check_context(output):
|
||||
ctx = get_crew_context()
|
||||
contexts.append(
|
||||
{
|
||||
"context_id": ctx.id if ctx else None,
|
||||
"context_key": ctx.key if ctx else None,
|
||||
}
|
||||
)
|
||||
return output
|
||||
|
||||
def run_crew(name: str):
|
||||
crew = crew_factory(name, task_callback=check_context)
|
||||
crew.kickoff()
|
||||
return str(crew.id)
|
||||
|
||||
crew1_id = run_crew("First")
|
||||
crew2_id = run_crew("Second")
|
||||
|
||||
assert len(contexts) == 2
|
||||
assert contexts[0]["context_id"] == crew1_id
|
||||
assert contexts[1]["context_id"] == crew2_id
|
||||
assert contexts[0]["context_id"] != contexts[1]["context_id"]
|
||||
0
tests/utilities/crew/__init__.py
Normal file
0
tests/utilities/crew/__init__.py
Normal file
88
tests/utilities/crew/test_crew_context.py
Normal file
88
tests/utilities/crew/test_crew_context.py
Normal file
@@ -0,0 +1,88 @@
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from opentelemetry import baggage
|
||||
from opentelemetry.context import attach, detach
|
||||
|
||||
from crewai.utilities.crew.crew_context import get_crew_context
|
||||
from crewai.utilities.crew.models import CrewContext
|
||||
|
||||
|
||||
def test_crew_context_creation():
|
||||
crew_id = str(uuid.uuid4())
|
||||
context = CrewContext(id=crew_id, key="test-crew")
|
||||
assert context.id == crew_id
|
||||
assert context.key == "test-crew"
|
||||
|
||||
|
||||
def test_get_crew_context_with_baggage():
|
||||
crew_id = str(uuid.uuid4())
|
||||
assert get_crew_context() is None
|
||||
|
||||
crew_ctx = CrewContext(id=crew_id, key="test-key")
|
||||
ctx = baggage.set_baggage("crew_context", crew_ctx)
|
||||
token = attach(ctx)
|
||||
|
||||
try:
|
||||
context = get_crew_context()
|
||||
assert context is not None
|
||||
assert context.id == crew_id
|
||||
assert context.key == "test-key"
|
||||
finally:
|
||||
detach(token)
|
||||
|
||||
assert get_crew_context() is None
|
||||
|
||||
|
||||
def test_get_crew_context_empty():
|
||||
assert get_crew_context() is None
|
||||
|
||||
|
||||
def test_baggage_nested_contexts():
|
||||
crew_id1 = str(uuid.uuid4())
|
||||
crew_id2 = str(uuid.uuid4())
|
||||
|
||||
crew_ctx1 = CrewContext(id=crew_id1, key="outer")
|
||||
ctx1 = baggage.set_baggage("crew_context", crew_ctx1)
|
||||
token1 = attach(ctx1)
|
||||
|
||||
try:
|
||||
outer_context = get_crew_context()
|
||||
assert outer_context.id == crew_id1
|
||||
assert outer_context.key == "outer"
|
||||
|
||||
crew_ctx2 = CrewContext(id=crew_id2, key="inner")
|
||||
ctx2 = baggage.set_baggage("crew_context", crew_ctx2)
|
||||
token2 = attach(ctx2)
|
||||
|
||||
try:
|
||||
inner_context = get_crew_context()
|
||||
assert inner_context.id == crew_id2
|
||||
assert inner_context.key == "inner"
|
||||
finally:
|
||||
detach(token2)
|
||||
|
||||
restored_context = get_crew_context()
|
||||
assert restored_context.id == crew_id1
|
||||
assert restored_context.key == "outer"
|
||||
finally:
|
||||
detach(token1)
|
||||
|
||||
assert get_crew_context() is None
|
||||
|
||||
|
||||
def test_baggage_exception_handling():
|
||||
crew_id = str(uuid.uuid4())
|
||||
|
||||
crew_ctx = CrewContext(id=crew_id, key="test")
|
||||
ctx = baggage.set_baggage("crew_context", crew_ctx)
|
||||
token = attach(ctx)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
try:
|
||||
assert get_crew_context() is not None
|
||||
raise ValueError("Test exception")
|
||||
finally:
|
||||
detach(token)
|
||||
|
||||
assert get_crew_context() is None
|
||||
Reference in New Issue
Block a user