Compare commits

..

2 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
8ae073c681 Merge branch 'main' into brandon/fix-google-docs 2025-02-07 16:52:40 -05:00
Brandon Hancock
9a34d9c01e clean up google docs 2025-02-07 16:47:05 -05:00
14 changed files with 154 additions and 825 deletions

View File

@@ -1,18 +1,10 @@
<div align="center">
![Logo of CrewAI](./docs/crewai_logo.png)
![Logo of CrewAI, two people rowing on a boat](./docs/crewai_logo.png)
# **CrewAI**
**CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
**CrewAI Enterprise**
Want to plan, build (+ no code), deploy, monitor and interare your agents: [CrewAI Enterprise](https://www.crewai.com/enterprise). Designed for complex, real-world applications, our enterprise solution offers:
- **Seamless Integrations**
- **Scalable & Secure Deployment**
- **Actionable Insights**
- **24/7 Support**
🤖 **CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
<h3>
@@ -400,7 +392,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
goal="Gather and validate supporting market data",
backstory="You excel at finding and correlating multiple data sources"
)
analysis_task = Task(
description="Analyze {sector} sector data for the past {timeframe}",
expected_output="Detailed market analysis with confidence score",
@@ -411,7 +403,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
expected_output="Corroborating evidence and potential contradictions",
agent=researcher
)
# Demonstrate crew autonomy
analysis_crew = Crew(
agents=[analyst, researcher],

View File

@@ -58,107 +58,41 @@ my_crew = Crew(
### Example: Use Custom Memory Instances e.g FAISS as the VectorDB
```python Code
from crewai import Crew, Process
from crewai.memory import LongTermMemory, ShortTermMemory, EntityMemory
from crewai.memory.storage import LTMSQLiteStorage, RAGStorage
from typing import List, Optional
from crewai import Crew, Agent, Task, Process
# Assemble your crew with memory capabilities
my_crew: Crew = Crew(
agents = [...],
tasks = [...],
process = Process.sequential,
memory = True,
# Long-term memory for persistent storage across sessions
long_term_memory = LongTermMemory(
my_crew = Crew(
agents=[...],
tasks=[...],
process="Process.sequential",
memory=True,
long_term_memory=EnhanceLongTermMemory(
storage=LTMSQLiteStorage(
db_path="/my_crew1/long_term_memory_storage.db"
db_path="/my_data_dir/my_crew1/long_term_memory_storage.db"
)
),
# Short-term memory for current context using RAG
short_term_memory = ShortTermMemory(
storage = RAGStorage(
embedder_config={
"provider": "openai",
"config": {
"model": 'text-embedding-3-small'
}
},
type="short_term",
path="/my_crew1/"
)
short_term_memory=EnhanceShortTermMemory(
storage=CustomRAGStorage(
crew_name="my_crew",
storage_type="short_term",
data_dir="//my_data_dir",
model=embedder["model"],
dimension=embedder["dimension"],
),
),
# Entity memory for tracking key information about entities
entity_memory = EntityMemory(
storage=RAGStorage(
embedder_config={
"provider": "openai",
"config": {
"model": 'text-embedding-3-small'
}
},
type="short_term",
path="/my_crew1/"
)
entity_memory=EnhanceEntityMemory(
storage=CustomRAGStorage(
crew_name="my_crew",
storage_type="entities",
data_dir="//my_data_dir",
model=embedder["model"],
dimension=embedder["dimension"],
),
),
verbose=True,
)
```
## Security Considerations
When configuring memory storage:
- Use environment variables for storage paths (e.g., `CREWAI_STORAGE_DIR`)
- Never hardcode sensitive information like database credentials
- Consider access permissions for storage directories
- Use relative paths when possible to maintain portability
Example using environment variables:
```python
import os
from crewai import Crew
from crewai.memory import LongTermMemory
from crewai.memory.storage import LTMSQLiteStorage
# Configure storage path using environment variable
storage_path = os.getenv("CREWAI_STORAGE_DIR", "./storage")
crew = Crew(
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(
db_path="{storage_path}/memory.db".format(storage_path=storage_path)
)
)
)
```
## Configuration Examples
### Basic Memory Configuration
```python
from crewai import Crew
from crewai.memory import LongTermMemory
# Simple memory configuration
crew = Crew(memory=True) # Uses default storage locations
```
### Custom Storage Configuration
```python
from crewai import Crew
from crewai.memory import LongTermMemory
from crewai.memory.storage import LTMSQLiteStorage
# Configure custom storage paths
crew = Crew(
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(db_path="./memory.db")
)
)
```
## Integrating Mem0 for Enhanced User Memory
[Mem0](https://mem0.ai/) is a self-improving memory layer for LLM applications, enabling personalized AI experiences.

View File

@@ -8,9 +8,9 @@ icon: file-pen
## Description
The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files with cross-platform compatibility (Windows, Linux, macOS).
The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files.
It is particularly useful in scenarios such as generating reports, saving logs, creating configuration files, and more.
This tool handles path differences across operating systems, supports UTF-8 encoding, and automatically creates directories if they don't exist, making it easier to organize your output reliably across different platforms.
This tool supports creating new directories if they don't exist, making it easier to organize your output.
## Installation
@@ -43,8 +43,6 @@ print(result)
## Conclusion
By integrating the `FileWriterTool` into your crews, the agents can reliably write content to files across different operating systems.
This tool is essential for tasks that require saving output data, creating structured file systems, and handling cross-platform file operations.
It's particularly recommended for Windows users who may encounter file writing issues with standard Python file operations.
By adhering to the setup and usage guidelines provided, incorporating this tool into projects is straightforward and ensures consistent file writing behavior across all platforms.
By integrating the `FileWriterTool` into your crews, the agents can execute the process of writing content to files and creating directories.
This tool is essential for tasks that require saving output data, creating structured file systems, and more. By adhering to the setup and usage guidelines provided,
incorporating this tool into projects is straightforward and efficient.

View File

@@ -16,6 +16,7 @@ from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
@@ -145,7 +146,7 @@ class Agent(BaseAgent):
def _set_knowledge(self):
try:
if self.knowledge_sources:
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
full_pattern = re.compile(r'[^a-zA-Z0-9\-_\r\n]|(\.\.)')
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"
if isinstance(self.knowledge_sources, list) and all(
isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources

View File

@@ -3,6 +3,11 @@ import subprocess
import click
from crewai.cli.utils import get_crew
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
def reset_memories_command(

View File

@@ -1,12 +1,13 @@
import asyncio
import json
import re
import sys
import uuid
import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from pydantic import (
UUID4,
@@ -37,7 +38,7 @@ from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool, Tool
from crewai.tools.base_tool import Tool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -179,7 +180,7 @@ class Crew(BaseModel):
default=None,
description="Maximum number of requests per minute for the crew execution to be respected.",
)
prompt_file: Optional[str] = Field(
prompt_file: str = Field(
default=None,
description="Path to the prompt json file to be used for the crew.",
)
@@ -380,22 +381,6 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def validate_must_have_non_conditional_task(self) -> "Crew":
"""Ensure that a crew has at least one non-conditional task."""
if not self.tasks:
return self
non_conditional_count = sum(
1 for task in self.tasks if not isinstance(task, ConditionalTask)
)
if non_conditional_count == 0:
raise PydanticCustomError(
"only_conditional_tasks",
"Crew must include at least one non-conditional task",
{},
)
return self
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
@@ -456,7 +441,6 @@ class Crew(BaseModel):
return self
@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
@@ -473,16 +457,7 @@ class Crew(BaseModel):
"missing_keys_in_config", "Config should have 'agents' and 'tasks'.", {}
)
# Get process from config with proper type handling
process_value = self.config.get("process")
if process_value is not None:
if not isinstance(process_value, Process):
try:
process_value = Process(process_value)
except ValueError:
raise ValueError(f"Invalid process value: {process_value}")
self.process = process_value
self.process = self.config.get("process", self.process)
self.agents = [Agent(**agent) for agent in self.config["agents"]]
self.tasks = [self._create_task(task) for task in self.config["tasks"]]
@@ -758,12 +733,8 @@ class Crew(BaseModel):
)
# Determine which tools to use - task tools take precedence over agent tools
initial_tools: List[BaseTool] = []
if task.tools:
initial_tools = list(task.tools)
elif agent_to_use.tools:
initial_tools = list(agent_to_use.tools)
tools_for_task = self._prepare_tools(agent_to_use, task, initial_tools)
tools_for_task = task.tools or agent_to_use.tools or []
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
self._log_task_start(task, agent_to_use.role)
@@ -772,19 +743,16 @@ class Crew(BaseModel):
task, task_outputs, futures, task_index, was_replayed
)
if skipped_task_output:
task_outputs.append(skipped_task_output)
continue
if task.async_execution:
context = self._get_context(
task, [last_sync_output] if last_sync_output else []
)
# Convert Sequence to List for execute_async
tools_list = list(tools_for_task) if tools_for_task else None
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=tools_list,
tools=tools_for_task,
)
futures.append((task, future, task_index))
else:
@@ -793,14 +761,12 @@ class Crew(BaseModel):
futures.clear()
context = self._get_context(task, task_outputs)
# Convert Sequence to List for execute_sync
tools_list = list(tools_for_task) if tools_for_task else None
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=tools_list,
tools=tools_for_task,
)
task_outputs.append(task_output)
task_outputs = [task_output]
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
@@ -821,7 +787,7 @@ class Crew(BaseModel):
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
previous_output = task_outputs[-1] if task_outputs else None
previous_output = task_outputs[task_index - 1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
@@ -836,37 +802,27 @@ class Crew(BaseModel):
return None
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Prepare tools for the agent.
Args:
agent: The agent to prepare tools for
task: The task being executed
tools: Initial set of tools
Returns:
Updated sequence of tools with additional capabilities based on agent configuration
"""
self, agent: BaseAgent, task: Task, tools: List[Tool]
) -> List[Tool]:
# Add delegation tools if agent allows delegation
if getattr(agent, "allow_delegation", False):
if agent.allow_delegation:
if self.process == Process.hierarchical:
if self.manager_agent:
tools = self._update_manager_tools(task, tools) # type: ignore[arg-type]
tools = self._update_manager_tools(task, tools)
else:
raise ValueError(
"Manager agent is required for hierarchical process."
)
elif agent:
tools = self._add_delegation_tools(task, tools) # type: ignore[arg-type]
elif agent and agent.allow_delegation:
tools = self._add_delegation_tools(task, tools)
# Add code execution tools if agent allows code execution
if getattr(agent, "allow_code_execution", False):
tools = self._add_code_execution_tools(agent, tools) # type: ignore[arg-type]
if agent.allow_code_execution:
tools = self._add_code_execution_tools(agent, tools)
if agent and getattr(agent, "multimodal", False):
tools = self._add_multimodal_tools(agent, tools) # type: ignore[arg-type]
if agent and agent.multimodal:
tools = self._add_multimodal_tools(agent, tools)
return tools
@@ -876,17 +832,9 @@ class Crew(BaseModel):
return task.agent
def _merge_tools(
self, existing_tools: Sequence[BaseTool], new_tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name.
Args:
existing_tools: Current sequence of tools
new_tools: New tools to merge in
Returns:
Updated sequence of tools with duplicates removed
"""
self, existing_tools: List[Tool], new_tools: List[Tool]
) -> List[Tool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
if not new_tools:
return existing_tools
@@ -902,67 +850,24 @@ class Crew(BaseModel):
return tools
def _inject_delegation_tools(
self, tools: Sequence[BaseTool], task_agent: BaseAgent, agents: List[BaseAgent]
) -> Sequence[BaseTool]:
"""Add delegation tools for the agent.
Args:
tools: Current set of tools
task_agent: Agent that will use the tools
agents: List of agents that can be delegated to
Returns:
Updated sequence of tools with delegation capabilities
"""
delegation_tools = task_agent.get_delegation_tools(agents) # type: ignore[attr-defined]
self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]
):
delegation_tools = task_agent.get_delegation_tools(agents)
return self._merge_tools(tools, delegation_tools)
def _add_multimodal_tools(
self, agent: BaseAgent, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Add multimodal tools for the agent.
Args:
agent: Agent that will use the tools
tools: Current set of tools
Returns:
Updated sequence of tools with multimodal capabilities
"""
multimodal_tools = agent.get_multimodal_tools() # type: ignore[attr-defined]
def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]):
multimodal_tools = agent.get_multimodal_tools()
return self._merge_tools(tools, multimodal_tools)
def _add_code_execution_tools(
self, agent: BaseAgent, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Add code execution tools for the agent.
Args:
agent: Agent that will use the tools
tools: Current set of tools
Returns:
Updated sequence of tools with code execution capabilities
"""
code_tools = agent.get_code_execution_tools() # type: ignore[attr-defined]
def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]):
code_tools = agent.get_code_execution_tools()
return self._merge_tools(tools, code_tools)
def _add_delegation_tools(
self, task: Task, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Add delegation tools for the task's agent.
Args:
task: Task being executed
tools: Current set of tools
Returns:
Updated sequence of tools with delegation capabilities
"""
def _add_delegation_tools(self, task: Task, tools: List[Tool]):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
tools = [] # type: ignore[assignment]
tools = []
tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation
)
@@ -974,18 +879,7 @@ class Crew(BaseModel):
task_name=task.name, task=task.description, agent=role, status="started"
)
def _update_manager_tools(
self, task: Task, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Update tools for manager agent.
Args:
task: Task being executed
tools: Current set of tools
Returns:
Updated sequence of tools with manager capabilities
"""
def _update_manager_tools(self, task: Task, tools: List[Tool]):
if self.manager_agent:
if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
@@ -1015,15 +909,11 @@ class Crew(BaseModel):
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if not task_outputs:
raise ValueError("No task outputs available to create crew output.")
# Filter out empty outputs and get the last valid one as the main output
valid_outputs = [t for t in task_outputs if t.raw]
if not valid_outputs:
raise ValueError("No valid task outputs available to create crew output.")
final_task_output = valid_outputs[-1]
if len(task_outputs) != 1:
raise ValueError(
"Something went wrong. Kickoff should return only one task output."
)
final_task_output = task_outputs[0]
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
@@ -1032,7 +922,7 @@ class Crew(BaseModel):
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=task_outputs,
tasks_output=[task.output for task in self.tasks if task.output],
token_usage=token_usage,
)
@@ -1237,42 +1127,19 @@ class Crew(BaseModel):
def test(
self,
n_iterations: int,
llm: Optional[Union[str, LLM]] = None,
openai_model_name: Optional[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations.
Args:
n_iterations: Number of test iterations to run
llm: LLM instance or model name to use for evaluation
openai_model_name: (Deprecated) OpenAI model name to use for evaluation
inputs: Optional dictionary of inputs for the crew
Raises:
ValueError: If inputs is not a dictionary or if LLM configuration is invalid
TypeError: If n_iterations is not a positive integer
"""
if n_iterations < 1:
raise TypeError("n_iterations must be a positive integer")
if inputs is not None and not isinstance(inputs, dict):
raise ValueError("inputs must be a dictionary")
# Validate LLM configuration
if isinstance(llm, str) and not llm.strip():
raise ValueError("LLM model name cannot be empty")
test_llm: Union[str, LLM, None] = llm if llm is not None else openai_model_name
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
test_crew = self.copy()
self._test_execution_span = test_crew._telemetry.test_execution_span(
test_crew,
n_iterations,
inputs,
test_llm,
)
evaluator = CrewEvaluator(test_crew, test_llm)
openai_model_name, # type: ignore[arg-type]
) # type: ignore[arg-type]
evaluator = CrewEvaluator(test_crew, openai_model_name) # type: ignore[arg-type]
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)

View File

@@ -164,7 +164,6 @@ class LLM:
self.context_window_size = 0
self.reasoning_effort = reasoning_effort
self.additional_params = kwargs
self.is_anthropic = self._is_anthropic_model(model)
litellm.drop_params = True
@@ -179,62 +178,42 @@ class LLM:
self.set_callbacks(callbacks)
self.set_env_callbacks()
def _is_anthropic_model(self, model: str) -> bool:
"""Determine if the model is from Anthropic provider.
Args:
model: The model identifier string.
Returns:
bool: True if the model is from Anthropic, False otherwise.
"""
ANTHROPIC_PREFIXES = ('anthropic/', 'claude-', 'claude/')
return any(prefix in model.lower() for prefix in ANTHROPIC_PREFIXES)
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""High-level LLM call method.
Args:
messages: Input messages for the LLM.
Can be a string or list of message dictionaries.
If string, it will be converted to a single user message.
If list, each dict must have 'role' and 'content' keys.
tools: Optional list of tool schemas for function calling.
Each tool should define its name, description, and parameters.
callbacks: Optional list of callback functions to be executed
during and after the LLM call.
available_functions: Optional dict mapping function names to callables
that can be invoked by the LLM.
) -> str:
"""
High-level llm call method that:
1) Accepts either a string or a list of messages
2) Converts string input to the required message format
3) Calls litellm.completion
4) Handles function/tool calls if any
5) Returns the final text response or tool result
Parameters:
- messages (Union[str, List[Dict[str, str]]]): The input messages for the LLM.
- If a string is provided, it will be converted into a message list with a single entry.
- If a list of dictionaries is provided, each dictionary should have 'role' and 'content' keys.
- tools (Optional[List[dict]]): A list of tool schemas for function calling.
- callbacks (Optional[List[Any]]): A list of callback functions to be executed.
- available_functions (Optional[Dict[str, Any]]): A dictionary mapping function names to actual Python functions.
Returns:
Union[str, Any]: Either a text response from the LLM (str) or
the result of a tool function call (Any).
Raises:
TypeError: If messages format is invalid
ValueError: If response format is not supported
LLMContextLengthExceededException: If input exceeds model's context limit
- str: The final text response from the LLM or the result of a tool function call.
Examples:
# Example 1: Simple string input
>>> response = llm.call("Return the name of a random city.")
>>> print(response)
"Paris"
# Example 2: Message list with system and user messages
>>> messages = [
... {"role": "system", "content": "You are a geography expert"},
... {"role": "user", "content": "What is France's capital?"}
... ]
>>> response = llm.call(messages)
>>> print(response)
"The capital of France is Paris."
---------
# Example 1: Using a string input
response = llm.call("Return the name of a random city in the world.")
print(response)
# Example 2: Using a list of messages
messages = [{"role": "user", "content": "What is the capital of France?"}]
response = llm.call(messages)
print(response)
"""
# Validate parameters before proceeding with the call.
self._validate_call_params()
@@ -254,13 +233,10 @@ class LLM:
self.set_callbacks(callbacks)
try:
# --- 1) Format messages according to provider requirements
formatted_messages = self._format_messages_for_provider(messages)
# --- 2) Prepare the parameters for the completion call
# --- 1) Prepare the parameters for the completion call
params = {
"model": self.model,
"messages": formatted_messages,
"messages": messages,
"timeout": self.timeout,
"temperature": self.temperature,
"top_p": self.top_p,
@@ -348,38 +324,6 @@ class LLM:
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _format_messages_for_provider(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Format messages according to provider requirements.
Args:
messages: List of message dictionaries with 'role' and 'content' keys.
Can be empty or None.
Returns:
List of formatted messages according to provider requirements.
For Anthropic models, ensures first message has 'user' role.
Raises:
TypeError: If messages is None or contains invalid message format.
"""
if messages is None:
raise TypeError("Messages cannot be None")
# Validate message format first
for msg in messages:
if not isinstance(msg, dict) or "role" not in msg or "content" not in msg:
raise TypeError("Invalid message format. Each message must be a dict with 'role' and 'content' keys")
if not self.is_anthropic:
return messages
# Anthropic requires messages to start with 'user' role
if not messages or messages[0]["role"] == "system":
# If first message is system or empty, add a placeholder user message
return [{"role": "user", "content": "."}, *messages]
return messages
def _get_custom_llm_provider(self) -> str:
"""
Derives the custom_llm_provider from the model string.

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Optional
from pydantic import PrivateAttr

View File

@@ -1,7 +1,9 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from crewai.memory.storage.rag_storage import RAGStorage
class Memory(BaseModel):
"""

View File

@@ -674,32 +674,19 @@ class Task(BaseModel):
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _save_file(self, result: Union[Dict, str, Any]) -> None:
def _save_file(self, result: Any) -> None:
"""Save task output to a file.
Note:
For cross-platform file writing, especially on Windows, consider using FileWriterTool
from the crewai_tools package:
pip install 'crewai[tools]'
from crewai_tools import FileWriterTool
Args:
result: The result to save to the file. Can be a dict or any stringifiable object.
Raises:
ValueError: If output_file is not set
RuntimeError: If there is an error writing to the file. For cross-platform
compatibility, especially on Windows, use FileWriterTool from crewai_tools
package.
RuntimeError: If there is an error writing to the file
"""
if self.output_file is None:
raise ValueError("output_file is not set.")
FILEWRITER_RECOMMENDATION = (
"For cross-platform file writing, especially on Windows, "
"use FileWriterTool from crewai_tools package."
)
try:
resolved_path = Path(self.output_file).expanduser().resolve()
directory = resolved_path.parent
@@ -715,12 +702,7 @@ class Task(BaseModel):
else:
file.write(str(result))
except (OSError, IOError) as e:
raise RuntimeError(
"\n".join([
f"Failed to save output file: {e}",
FILEWRITER_RECOMMENDATION
])
)
raise RuntimeError(f"Failed to save output file: {e}")
return None
def __repr__(self):

View File

@@ -1,5 +1,4 @@
from collections import defaultdict
from typing import Union
from pydantic import BaseModel, Field
from rich.box import HEAVY_EDGE
@@ -7,7 +6,6 @@ from rich.console import Console
from rich.table import Table
from crewai.agent import Agent
from crewai.llm import LLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
@@ -34,27 +32,9 @@ class CrewEvaluator:
run_execution_times: defaultdict = defaultdict(list)
iteration: int = 0
def __init__(self, crew, llm: Union[str, LLM, None] = None):
"""Initialize the CrewEvaluator.
Args:
crew: The crew to evaluate
llm: LLM instance or model name to use for evaluation
Raises:
ValueError: If LLM model name is empty or invalid
RuntimeError: If evaluator agent initialization fails
"""
def __init__(self, crew, openai_model_name: str):
self.crew = crew
if isinstance(llm, str) and not llm.strip():
raise ValueError("LLM model name cannot be empty")
try:
self._llm = llm if isinstance(llm, LLM) else LLM(model=llm) if llm else None
except Exception as e:
raise RuntimeError(f"Failed to initialize LLM: {str(e)}")
self.openai_model_name = openai_model_name
self._telemetry = Telemetry()
self._setup_for_evaluating()
@@ -71,7 +51,7 @@ class CrewEvaluator:
),
backstory="Evaluator agent for crew evaluation with precise capabilities to evaluate the performance of the agents in the crew based on the tasks they have performed",
verbose=False,
llm=self._llm,
llm=self.openai_model_name,
)
def _evaluation_task(
@@ -201,7 +181,7 @@ class CrewEvaluator:
self.crew,
evaluation_result.pydantic.quality,
current_task.execution_duration,
self._llm.model if self._llm else "default",
self.openai_model_name,
)
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
self.run_execution_times[self.iteration].append(

View File

@@ -15,7 +15,6 @@ from crewai.agents.cache import CacheHandler
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.process import Process
from crewai.project import crew
@@ -25,9 +24,6 @@ from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import Logger
from crewai.utilities.evaluators.crew_evaluator_handler import (
TaskEvaluationPydanticOutput,
)
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
@@ -53,39 +49,6 @@ writer = Agent(
)
def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True
conditional1 = ConditionalTask(
description="Conditional task 1",
expected_output="Output 1",
agent=researcher,
condition=condition_func,
)
conditional2 = ConditionalTask(
description="Conditional task 2",
expected_output="Output 2",
agent=researcher,
condition=condition_func,
)
conditional3 = ConditionalTask(
description="Conditional task 3",
expected_output="Output 3",
agent=researcher,
condition=condition_func,
)
with pytest.raises(
pydantic_core._pydantic_core.ValidationError,
match="Crew must include at least one non-conditional task",
):
Crew(
agents=[researcher],
tasks=[conditional1, conditional2, conditional3],
)
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -2097,195 +2060,6 @@ def test_tools_with_custom_caching():
assert result.raw == "3"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_task_uses_last_output():
"""Test that conditional tasks use the last task output for condition evaluation."""
task1 = Task(
description="First task",
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
agent=writer,
condition=condition_succeeds,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, conditional_task1, conditional_task2],
)
# Mock outputs for tasks
mock_first = TaskOutput(
description="First task output",
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Second task output",
raw="", # Empty output since condition fails
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "First success output" # First task succeeded
assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
"""Test that task outputs are properly collected based on execution status."""
task1 = Task(
description="Normal task that always executes",
expected_output="First output",
agent=researcher,
)
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
agent=writer,
condition=condition_always_met,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock outputs for different execution paths
mock_success = TaskOutput(
description="Success output",
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Skipped output",
raw="", # Empty output for skipped task
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Success output" # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
"""Test that having multiple conditional tasks in sequence works correctly."""
task1 = Task(
description="Initial research task",
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
agent=writer,
condition=condition2,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock different task outputs to test conditional logic
mock_success = TaskOutput(
description="Mock success",
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
# Verify all tasks were executed (no IndexError)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
@@ -3309,95 +3083,39 @@ def test_conditional_should_execute():
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch.object(Crew, "copy")
@mock.patch.object(Crew, "kickoff")
def test_crew_test_with_custom_llm(mock_kickoff, mock_copy, mock_evaluator):
"""Test that Crew.test() works with a custom LLM implementation."""
task = Task(description="Test task", expected_output="Test output", agent=researcher)
crew = Crew(agents=[researcher], tasks=[task])
mock_copy.return_value = crew
mock_evaluator.return_value = mock.MagicMock()
llm = LLM(model="gpt-4")
crew.test(n_iterations=1, llm=llm)
# Verify CrewEvaluator was called with the LLM instance
mock_evaluator.assert_called_once()
args = mock_evaluator.call_args[0]
assert args[1] == llm
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch.object(Crew, "copy")
@mock.patch.object(Crew, "kickoff")
def test_crew_test_backward_compatibility(mock_kickoff, mock_copy, mock_evaluator):
"""Test that Crew.test() maintains backward compatibility with openai_model_name."""
task = Task(description="Test task", expected_output="Test output", agent=researcher)
crew = Crew(agents=[researcher], tasks=[task])
mock_copy.return_value = crew
mock_evaluator.return_value = mock.MagicMock()
crew.test(n_iterations=1, openai_model_name="gpt-4")
# Verify CrewEvaluator was called with the model name
mock_evaluator.assert_called_once()
args = mock_evaluator.call_args[0]
assert args[1] == "gpt-4"
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch.object(Crew, "copy")
@mock.patch.object(Crew, "kickoff")
def test_crew_test_with_invalid_inputs(mock_kickoff, mock_copy, mock_evaluator):
"""Test that Crew.test() validates inputs properly."""
task = Task(description="Test task", expected_output="Test output", agent=researcher)
crew = Crew(agents=[researcher], tasks=[task])
mock_copy.return_value = crew
with pytest.raises(TypeError):
crew.test(n_iterations=0) # Invalid iterations
with pytest.raises(ValueError):
crew.test(n_iterations=1, inputs="invalid") # Invalid inputs type
with pytest.raises(ValueError):
crew.test(n_iterations=1, llm="") # Empty LLM name
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch.object(Crew, "copy")
@mock.patch.object(Crew, "kickoff")
def test_crew_test_concurrent_execution(mock_kickoff, mock_copy, mock_evaluator):
"""Test that Crew.test() handles concurrent execution properly."""
task = Task(description="Test task", expected_output="Test output", agent=researcher)
crew = Crew(agents=[researcher], tasks=[task])
mock_copy.return_value = crew
mock_evaluator.return_value = mock.MagicMock()
n_iterations = 3
crew.test(n_iterations=n_iterations)
assert mock_evaluator.return_value.set_iteration.call_count == n_iterations
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch.object(Crew, "copy")
@mock.patch.object(Crew, "kickoff")
def test_crew_testing_function(mock_kickoff, mock_copy, mock_evaluator):
"""Test that Crew.test() works with basic functionality."""
@mock.patch("crewai.crew.Crew.copy")
@mock.patch("crewai.crew.Crew.kickoff")
def test_crew_testing_function(kickoff_mock, copy_mock, crew_evaluator):
task = Task(
description="Test task",
expected_output="Test output",
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
mock_copy.return_value = crew
mock_evaluator.return_value = mock.MagicMock()
crew.test(n_iterations=1)
# Verify CrewEvaluator was called with None as llm (default behavior)
mock_evaluator.assert_called_once()
args = mock_evaluator.call_args[0]
assert args[1] is None
# Verify kickoff was called
mock_kickoff.assert_called_once()
crew = Crew(
agents=[researcher],
tasks=[task],
)
# Create a mock for the copied crew
copy_mock.return_value = crew
n_iterations = 2
crew.test(n_iterations, openai_model_name="gpt-4o-mini", inputs={"topic": "AI"})
# Ensure kickoff is called on the copied crew
kickoff_mock.assert_has_calls(
[mock.call(inputs={"topic": "AI"}), mock.call(inputs={"topic": "AI"})]
)
crew_evaluator.assert_has_calls(
[
mock.call(crew, "gpt-4o-mini"),
mock.call().set_iteration(1),
mock.call().set_iteration(2),
mock.call().print_crew_evaluation_result(),
]
)
@pytest.mark.vcr(filter_headers=["authorization"])

View File

@@ -286,79 +286,6 @@ def test_o3_mini_reasoning_effort_medium():
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.fixture
def anthropic_llm():
"""Fixture providing an Anthropic LLM instance."""
return LLM(model="anthropic/claude-3-sonnet")
@pytest.fixture
def system_message():
"""Fixture providing a system message."""
return {"role": "system", "content": "test"}
@pytest.fixture
def user_message():
"""Fixture providing a user message."""
return {"role": "user", "content": "test"}
def test_anthropic_message_formatting_edge_cases(anthropic_llm):
"""Test edge cases for Anthropic message formatting."""
# Test None messages
with pytest.raises(TypeError, match="Messages cannot be None"):
anthropic_llm._format_messages_for_provider(None)
# Test empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test invalid message format
with pytest.raises(TypeError, match="Invalid message format"):
anthropic_llm._format_messages_for_provider([{"invalid": "message"}])
def test_anthropic_model_detection():
"""Test Anthropic model detection with various formats."""
models = [
("anthropic/claude-3", True),
("claude-instant", True),
("claude/v1", True),
("gpt-4", False),
("", False),
("anthropomorphic", False), # Should not match partial words
]
for model, expected in models:
llm = LLM(model=model)
assert llm.is_anthropic == expected, f"Failed for model: {model}"
def test_anthropic_message_formatting(anthropic_llm, system_message, user_message):
"""Test Anthropic message formatting with fixtures."""
# Test when first message is system
formatted = anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 2
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
assert formatted[1] == system_message
# Test when first message is already user
formatted = anthropic_llm._format_messages_for_provider([user_message])
assert len(formatted) == 1
assert formatted[0] == user_message
# Test with empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test with non-Anthropic model (should not modify messages)
non_anthropic_llm = LLM(model="gpt-4")
formatted = non_anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 1
assert formatted[0] == system_message
def test_deepseek_r1_with_open_router():
if not os.getenv("OPEN_ROUTER_API_KEY"):
pytest.skip("OPEN_ROUTER_API_KEY not set; skipping test.")

View File

@@ -4,7 +4,6 @@ import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.llm import LLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.evaluators.crew_evaluator_handler import (
@@ -141,23 +140,3 @@ class InternalCrewEvaluator:
execute().pydantic = TaskEvaluationPydanticOutput(quality=9.5)
crew_planner.evaluate(task_output)
assert crew_planner.tasks_scores[0] == [9.5]
def test_crew_evaluator_with_llm_instance(self):
"""Test that CrewEvaluator works with an LLM instance."""
agent = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")
task = Task(description="Task 1", expected_output="Output 1", agent=agent)
crew = Crew(agents=[agent], tasks=[task])
llm = LLM(model="gpt-4")
evaluator = CrewEvaluator(crew, llm)
assert evaluator._llm == llm
def test_crew_evaluator_with_model_name(self):
"""Test that CrewEvaluator works with a model name string."""
agent = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")
task = Task(description="Task 1", expected_output="Output 1", agent=agent)
crew = Crew(agents=[agent], tasks=[task])
evaluator = CrewEvaluator(crew, "gpt-4")
assert isinstance(evaluator._llm, LLM)
assert evaluator._llm.model == "gpt-4"