diff --git a/docs/concepts/memory.mdx b/docs/concepts/memory.mdx index 33df47b82..8db1fe33a 100644 --- a/docs/concepts/memory.mdx +++ b/docs/concepts/memory.mdx @@ -58,41 +58,107 @@ my_crew = Crew( ### Example: Use Custom Memory Instances e.g FAISS as the VectorDB ```python Code -from crewai import Crew, Agent, Task, Process +from crewai import Crew, Process +from crewai.memory import LongTermMemory, ShortTermMemory, EntityMemory +from crewai.memory.storage import LTMSQLiteStorage, RAGStorage +from typing import List, Optional # Assemble your crew with memory capabilities -my_crew = Crew( - agents=[...], - tasks=[...], - process="Process.sequential", - memory=True, - long_term_memory=EnhanceLongTermMemory( +my_crew: Crew = Crew( + agents = [...], + tasks = [...], + process = Process.sequential, + memory = True, + # Long-term memory for persistent storage across sessions + long_term_memory = LongTermMemory( storage=LTMSQLiteStorage( - db_path="/my_data_dir/my_crew1/long_term_memory_storage.db" + db_path="/my_crew1/long_term_memory_storage.db" ) ), - short_term_memory=EnhanceShortTermMemory( - storage=CustomRAGStorage( - crew_name="my_crew", - storage_type="short_term", - data_dir="//my_data_dir", - model=embedder["model"], - dimension=embedder["dimension"], + # Short-term memory for current context using RAG + short_term_memory = ShortTermMemory( + storage = RAGStorage( + embedder_config={ + "provider": "openai", + "config": { + "model": 'text-embedding-3-small' + } + }, + type="short_term", + path="/my_crew1/" + ) ), ), - entity_memory=EnhanceEntityMemory( - storage=CustomRAGStorage( - crew_name="my_crew", - storage_type="entities", - data_dir="//my_data_dir", - model=embedder["model"], - dimension=embedder["dimension"], - ), + # Entity memory for tracking key information about entities + entity_memory = EntityMemory( + storage=RAGStorage( + embedder_config={ + "provider": "openai", + "config": { + "model": 'text-embedding-3-small' + } + }, + type="short_term", + path="/my_crew1/" + ) ), verbose=True, ) ``` +## Security Considerations + +When configuring memory storage: +- Use environment variables for storage paths (e.g., `CREWAI_STORAGE_DIR`) +- Never hardcode sensitive information like database credentials +- Consider access permissions for storage directories +- Use relative paths when possible to maintain portability + +Example using environment variables: +```python +import os +from crewai import Crew +from crewai.memory import LongTermMemory +from crewai.memory.storage import LTMSQLiteStorage + +# Configure storage path using environment variable +storage_path = os.getenv("CREWAI_STORAGE_DIR", "./storage") +crew = Crew( + memory=True, + long_term_memory=LongTermMemory( + storage=LTMSQLiteStorage( + db_path="{storage_path}/memory.db".format(storage_path=storage_path) + ) + ) +) +``` + +## Configuration Examples + +### Basic Memory Configuration +```python +from crewai import Crew +from crewai.memory import LongTermMemory + +# Simple memory configuration +crew = Crew(memory=True) # Uses default storage locations +``` + +### Custom Storage Configuration +```python +from crewai import Crew +from crewai.memory import LongTermMemory +from crewai.memory.storage import LTMSQLiteStorage + +# Configure custom storage paths +crew = Crew( + memory=True, + long_term_memory=LongTermMemory( + storage=LTMSQLiteStorage(db_path="./memory.db") + ) +) +``` + ## Integrating Mem0 for Enhanced User Memory [Mem0](https://mem0.ai/) is a self-improving memory layer for LLM applications, enabling personalized AI experiences. diff --git a/docs/tools/filewritetool.mdx b/docs/tools/filewritetool.mdx index f5dffb2ad..5e00801b7 100644 --- a/docs/tools/filewritetool.mdx +++ b/docs/tools/filewritetool.mdx @@ -8,9 +8,9 @@ icon: file-pen ## Description -The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files. +The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files with cross-platform compatibility (Windows, Linux, macOS). It is particularly useful in scenarios such as generating reports, saving logs, creating configuration files, and more. -This tool supports creating new directories if they don't exist, making it easier to organize your output. +This tool handles path differences across operating systems, supports UTF-8 encoding, and automatically creates directories if they don't exist, making it easier to organize your output reliably across different platforms. ## Installation @@ -43,6 +43,8 @@ print(result) ## Conclusion -By integrating the `FileWriterTool` into your crews, the agents can execute the process of writing content to files and creating directories. -This tool is essential for tasks that require saving output data, creating structured file systems, and more. By adhering to the setup and usage guidelines provided, -incorporating this tool into projects is straightforward and efficient. \ No newline at end of file +By integrating the `FileWriterTool` into your crews, the agents can reliably write content to files across different operating systems. +This tool is essential for tasks that require saving output data, creating structured file systems, and handling cross-platform file operations. +It's particularly recommended for Windows users who may encounter file writing issues with standard Python file operations. + +By adhering to the setup and usage guidelines provided, incorporating this tool into projects is straightforward and ensures consistent file writing behavior across all platforms. diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 95821043f..498a8d15e 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -17,4 +17,1230 @@ from pydantic import ( model_validator, ) -# Rest of crew.py content... +from crewai.agent import Agent +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.agents.cache import CacheHandler +from crewai.crews.crew_output import CrewOutput +from crewai.knowledge.knowledge import Knowledge +from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource +from crewai.llm import LLM +from crewai.memory.entity.entity_memory import EntityMemory +from crewai.memory.long_term.long_term_memory import LongTermMemory +from crewai.memory.short_term.short_term_memory import ShortTermMemory +from crewai.memory.user.user_memory import UserMemory +from crewai.process import Process +from crewai.task import Task +from crewai.tasks.conditional_task import ConditionalTask +from crewai.tasks.task_output import TaskOutput +from crewai.telemetry import Telemetry +from crewai.tools.agent_tools.agent_tools import AgentTools +from crewai.tools.base_tool import Tool +from crewai.types.usage_metrics import UsageMetrics +from crewai.utilities import I18N, FileHandler, Logger, RPMController +from crewai.utilities.constants import TRAINING_DATA_FILE +from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator +from crewai.utilities.evaluators.task_evaluator import TaskEvaluator +from crewai.utilities.formatter import ( + aggregate_raw_outputs_from_task_outputs, + aggregate_raw_outputs_from_tasks, +) +from crewai.utilities.llm_utils import create_llm +from crewai.utilities.planning_handler import CrewPlanner +from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler +from crewai.utilities.training_handler import CrewTrainingHandler + +try: + import agentops # type: ignore +except ImportError: + agentops = None + + +warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd") + + +class Crew(BaseModel): + """ + Represents a group of agents, defining how they should collaborate and the tasks they should perform. + + Attributes: + tasks: List of tasks assigned to the crew. + agents: List of agents part of this crew. + manager_llm: The language model that will run manager agent. + manager_agent: Custom agent that will be used as manager. + memory: Whether the crew should use memory to store memories of it's execution. + memory_config: Configuration for the memory to be used for the crew. + cache: Whether the crew should use a cache to store the results of the tools execution. + function_calling_llm: The language model that will run the tool calling for all the agents. + process: The process flow that the crew will follow (e.g., sequential, hierarchical). + verbose: Indicates the verbosity level for logging during execution. + config: Configuration settings for the crew. + max_rpm: Maximum number of requests per minute for the crew execution to be respected. + prompt_file: Path to the prompt json file to be used for the crew. + id: A unique identifier for the crew instance. + task_callback: Callback to be executed after each task for every agents execution. + step_callback: Callback to be executed after each step for every agents execution. + share_crew: Whether you want to share the complete crew information and execution with crewAI to make the library better, and allow us to train models. + planning: Plan the crew execution and add the plan to the crew. + chat_llm: The language model used for orchestrating chat interactions with the crew. + """ + + __hash__ = object.__hash__ # type: ignore + _execution_span: Any = PrivateAttr() + _rpm_controller: RPMController = PrivateAttr() + _logger: Logger = PrivateAttr() + _file_handler: FileHandler = PrivateAttr() + _cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler()) + _short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr() + _long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr() + _entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr() + _user_memory: Optional[InstanceOf[UserMemory]] = PrivateAttr() + _train: Optional[bool] = PrivateAttr(default=False) + _train_iteration: Optional[int] = PrivateAttr() + _inputs: Optional[Dict[str, Any]] = PrivateAttr(default=None) + _logging_color: str = PrivateAttr( + default="bold_purple", + ) + _task_output_handler: TaskOutputStorageHandler = PrivateAttr( + default_factory=TaskOutputStorageHandler + ) + + name: Optional[str] = Field(default=None) + cache: bool = Field(default=True) + tasks: List[Task] = Field(default_factory=list) + agents: List[BaseAgent] = Field(default_factory=list) + process: Process = Field(default=Process.sequential) + verbose: bool = Field(default=False) + memory: bool = Field( + default=False, + description="Whether the crew should use memory to store memories of it's execution", + ) + memory_config: Optional[Dict[str, Any]] = Field( + default=None, + description="Configuration for the memory to be used for the crew.", + ) + short_term_memory: Optional[InstanceOf[ShortTermMemory]] = Field( + default=None, + description="An Instance of the ShortTermMemory to be used by the Crew", + ) + long_term_memory: Optional[InstanceOf[LongTermMemory]] = Field( + default=None, + description="An Instance of the LongTermMemory to be used by the Crew", + ) + entity_memory: Optional[InstanceOf[EntityMemory]] = Field( + default=None, + description="An Instance of the EntityMemory to be used by the Crew", + ) + user_memory: Optional[InstanceOf[UserMemory]] = Field( + default=None, + description="An instance of the UserMemory to be used by the Crew to store/fetch memories of a specific user.", + ) + embedder: Optional[dict] = Field( + default=None, + description="Configuration for the embedder to be used for the crew.", + ) + usage_metrics: Optional[UsageMetrics] = Field( + default=None, + description="Metrics for the LLM usage during all tasks execution.", + ) + manager_llm: Optional[Any] = Field( + description="Language model that will run the agent.", default=None + ) + manager_agent: Optional[BaseAgent] = Field( + description="Custom agent that will be used as manager.", default=None + ) + function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field( + description="Language model that will run the agent.", default=None + ) + config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None) + id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True) + share_crew: Optional[bool] = Field(default=False) + step_callback: Optional[Any] = Field( + default=None, + description="Callback to be executed after each step for all agents execution.", + ) + task_callback: Optional[Any] = Field( + default=None, + description="Callback to be executed after each task for all agents execution.", + ) + before_kickoff_callbacks: List[ + Callable[[Optional[Dict[str, Any]]], Optional[Dict[str, Any]]] + ] = Field( + default_factory=list, + description="List of callbacks to be executed before crew kickoff. It may be used to adjust inputs before the crew is executed.", + ) + after_kickoff_callbacks: List[Callable[[CrewOutput], CrewOutput]] = Field( + default_factory=list, + description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.", + ) + max_rpm: Optional[int] = Field( + default=None, + description="Maximum number of requests per minute for the crew execution to be respected.", + ) + prompt_file: str = Field( + default=None, + description="Path to the prompt json file to be used for the crew.", + ) + output_log_file: Optional[Union[bool, str]] = Field( + default=None, + description="Path to the log file to be saved", + ) + planning: Optional[bool] = Field( + default=False, + description="Plan the crew execution and add the plan to the crew.", + ) + planning_llm: Optional[Any] = Field( + default=None, + description="Language model that will run the AgentPlanner if planning is True.", + ) + task_execution_output_json_files: Optional[List[str]] = Field( + default=None, + description="List of file paths for task execution JSON files.", + ) + execution_logs: List[Dict[str, Any]] = Field( + default=[], + description="List of execution logs for tasks", + ) + knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field( + default=None, + description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.", + ) + chat_llm: Optional[Any] = Field( + default=None, + description="LLM used to handle chatting with the crew.", + ) + knowledge: Optional[Knowledge] = Field( + default=None, + description="Knowledge for the crew.", + ) + + @field_validator("id", mode="before") + @classmethod + def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: + """Prevent manual setting of the 'id' field by users.""" + if v: + raise PydanticCustomError( + "may_not_set_field", "The 'id' field cannot be set by the user.", {} + ) + + @field_validator("config", mode="before") + @classmethod + def check_config_type( + cls, v: Union[Json, Dict[str, Any]] + ) -> Union[Json, Dict[str, Any]]: + """Validates that the config is a valid type. + Args: + v: The config to be validated. + Returns: + The config if it is valid. + """ + + # TODO: Improve typing + return json.loads(v) if isinstance(v, Json) else v # type: ignore + + @model_validator(mode="after") + def set_private_attrs(self) -> "Crew": + """Set private attributes.""" + self._cache_handler = CacheHandler() + self._logger = Logger(verbose=self.verbose) + if self.output_log_file: + self._file_handler = FileHandler(self.output_log_file) + self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger) + if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM): + self.function_calling_llm = create_llm(self.function_calling_llm) + + self._telemetry = Telemetry() + self._telemetry.set_tracer() + return self + + @model_validator(mode="after") + def create_crew_memory(self) -> "Crew": + """Set private attributes.""" + if self.memory: + self._long_term_memory = ( + self.long_term_memory if self.long_term_memory else LongTermMemory() + ) + self._short_term_memory = ( + self.short_term_memory + if self.short_term_memory + else ShortTermMemory( + crew=self, + embedder_config=self.embedder, + ) + ) + self._entity_memory = ( + self.entity_memory + if self.entity_memory + else EntityMemory(crew=self, embedder_config=self.embedder) + ) + if hasattr(self, "memory_config") and self.memory_config is not None: + self._user_memory = ( + self.user_memory if self.user_memory else UserMemory(crew=self) + ) + else: + self._user_memory = None + return self + + @model_validator(mode="after") + def create_crew_knowledge(self) -> "Crew": + """Create the knowledge for the crew.""" + if self.knowledge_sources: + try: + if isinstance(self.knowledge_sources, list) and all( + isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources + ): + self.knowledge = Knowledge( + sources=self.knowledge_sources, + embedder=self.embedder, + collection_name="crew", + ) + + except Exception as e: + self._logger.log( + "warning", f"Failed to init knowledge: {e}", color="yellow" + ) + return self + + @model_validator(mode="after") + def check_manager_llm(self): + """Validates that the language model is set when using hierarchical process.""" + if self.process == Process.hierarchical: + if not self.manager_llm and not self.manager_agent: + raise PydanticCustomError( + "missing_manager_llm_or_manager_agent", + "Attribute `manager_llm` or `manager_agent` is required when using hierarchical process.", + {}, + ) + + if (self.manager_agent is not None) and ( + self.agents.count(self.manager_agent) > 0 + ): + raise PydanticCustomError( + "manager_agent_in_agents", + "Manager agent should not be included in agents list.", + {}, + ) + + return self + + @model_validator(mode="after") + def check_config(self): + """Validates that the crew is properly configured with agents and tasks.""" + if not self.config and not self.tasks and not self.agents: + raise PydanticCustomError( + "missing_keys", + "Either 'agents' and 'tasks' need to be set or 'config'.", + {}, + ) + + if self.config: + self._setup_from_config() + + if self.agents: + for agent in self.agents: + if self.cache: + agent.set_cache_handler(self._cache_handler) + if self.max_rpm: + agent.set_rpm_controller(self._rpm_controller) + return self + + @model_validator(mode="after") + def validate_tasks(self): + if self.process == Process.sequential: + for task in self.tasks: + if task.agent is None: + raise PydanticCustomError( + "missing_agent_in_task", + f"Sequential process error: Agent is missing in the task with the following description: {task.description}", # type: ignore # Argument of type "str" cannot be assigned to parameter "message_template" of type "LiteralString" + {}, + ) + + return self + + @model_validator(mode="after") + def validate_end_with_at_most_one_async_task(self): + """Validates that the crew ends with at most one asynchronous task.""" + final_async_task_count = 0 + + # Traverse tasks backward + for task in reversed(self.tasks): + if task.async_execution: + final_async_task_count += 1 + else: + break # Stop traversing as soon as a non-async task is encountered + + if final_async_task_count > 1: + raise PydanticCustomError( + "async_task_count", + "The crew must end with at most one asynchronous task.", + {}, + ) + + return self + + @model_validator(mode="after") + def validate_must_have_non_conditional_task(self) -> "Crew": + """Ensure that a crew has at least one non-conditional task.""" + if not self.tasks: + return self + non_conditional_count = sum( + 1 for task in self.tasks if not isinstance(task, ConditionalTask) + ) + if non_conditional_count == 0: + raise PydanticCustomError( + "only_conditional_tasks", + "Crew must include at least one non-conditional task", + {}, + ) + return self + + @model_validator(mode="after") + def validate_first_task(self) -> "Crew": + """Ensure the first task is not a ConditionalTask.""" + if self.tasks and isinstance(self.tasks[0], ConditionalTask): + raise PydanticCustomError( + "invalid_first_task", + "The first task cannot be a ConditionalTask.", + {}, + ) + return self + + @model_validator(mode="after") + def validate_async_tasks_not_async(self) -> "Crew": + """Ensure that ConditionalTask is not async.""" + for task in self.tasks: + if task.async_execution and isinstance(task, ConditionalTask): + raise PydanticCustomError( + "invalid_async_conditional_task", + f"Conditional Task: {task.description} , cannot be executed asynchronously.", # type: ignore # Argument of type "str" cannot be assigned to parameter "message_template" of type "LiteralString" + {}, + ) + return self + + @model_validator(mode="after") + def validate_async_task_cannot_include_sequential_async_tasks_in_context(self): + """ + Validates that if a task is set to be executed asynchronously, + it cannot include other asynchronous tasks in its context unless + separated by a synchronous task. + """ + for i, task in enumerate(self.tasks): + if task.async_execution and task.context: + for context_task in task.context: + if context_task.async_execution: + for j in range(i - 1, -1, -1): + if self.tasks[j] == context_task: + raise ValueError( + f"Task '{task.description}' is asynchronous and cannot include other sequential asynchronous tasks in its context." + ) + if not self.tasks[j].async_execution: + break + return self + + @model_validator(mode="after") + def validate_context_no_future_tasks(self): + """Validates that a task's context does not include future tasks.""" + task_indices = {id(task): i for i, task in enumerate(self.tasks)} + + for task in self.tasks: + if task.context: + for context_task in task.context: + if id(context_task) not in task_indices: + continue # Skip context tasks not in the main tasks list + if task_indices[id(context_task)] > task_indices[id(task)]: + raise ValueError( + f"Task '{task.description}' has a context dependency on a future task '{context_task.description}', which is not allowed." + ) + return self + + + + @property + def key(self) -> str: + source = [agent.key for agent in self.agents] + [ + task.key for task in self.tasks + ] + return md5("|".join(source).encode(), usedforsecurity=False).hexdigest() + + def _setup_from_config(self): + assert self.config is not None, "Config should not be None." + + """Initializes agents and tasks from the provided config.""" + if not self.config.get("agents") or not self.config.get("tasks"): + raise PydanticCustomError( + "missing_keys_in_config", "Config should have 'agents' and 'tasks'.", {} + ) + + self.process = self.config.get("process", self.process) + self.agents = [Agent(**agent) for agent in self.config["agents"]] + self.tasks = [self._create_task(task) for task in self.config["tasks"]] + + def _create_task(self, task_config: Dict[str, Any]) -> Task: + """Creates a task instance from its configuration. + + Args: + task_config: The configuration of the task. + + Returns: + A task instance. + """ + task_agent = next( + agt for agt in self.agents if agt.role == task_config["agent"] + ) + del task_config["agent"] + return Task(**task_config, agent=task_agent) + + def _setup_for_training(self, filename: str) -> None: + """Sets up the crew for training.""" + self._train = True + + for task in self.tasks: + task.human_input = True + + for agent in self.agents: + agent.allow_delegation = False + + CrewTrainingHandler(TRAINING_DATA_FILE).initialize_file() + CrewTrainingHandler(filename).initialize_file() + + def train( + self, n_iterations: int, filename: str, inputs: Optional[Dict[str, Any]] = {} + ) -> None: + """Trains the crew for a given number of iterations.""" + train_crew = self.copy() + train_crew._setup_for_training(filename) + + try: + for n_iteration in range(n_iterations): + train_crew._train_iteration = n_iteration + train_crew.kickoff(inputs=inputs) + + training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load() + + for agent in train_crew.agents: + if training_data.get(str(agent.id)): + result = TaskEvaluator(agent).evaluate_training_data( + training_data=training_data, agent_id=str(agent.id) + ) + CrewTrainingHandler(filename).save_trained_data( + agent_id=str(agent.role), trained_data=result.model_dump() + ) + except Exception as e: + self._logger.log("error", f"Training failed: {e}", color="red") + CrewTrainingHandler(TRAINING_DATA_FILE).clear() + CrewTrainingHandler(filename).clear() + raise + + def kickoff( + self, + inputs: Optional[Dict[str, Any]] = None, + ) -> CrewOutput: + for before_callback in self.before_kickoff_callbacks: + if inputs is None: + inputs = {} + inputs = before_callback(inputs) + + """Starts the crew to work on its assigned tasks.""" + self._execution_span = self._telemetry.crew_execution_span(self, inputs) + self._task_output_handler.reset() + self._logging_color = "bold_purple" + + if inputs is not None: + self._inputs = inputs + self._interpolate_inputs(inputs) + self._set_tasks_callbacks() + + i18n = I18N(prompt_file=self.prompt_file) + + for agent in self.agents: + agent.i18n = i18n + # type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]" + agent.crew = self # type: ignore[attr-defined] + # TODO: Create an AgentFunctionCalling protocol for future refactoring + if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm" + agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm" + + if not agent.step_callback: # type: ignore # "BaseAgent" has no attribute "step_callback" + agent.step_callback = self.step_callback # type: ignore # "BaseAgent" has no attribute "step_callback" + + agent.create_agent_executor() + + if self.planning: + self._handle_crew_planning() + + metrics: List[UsageMetrics] = [] + + if self.process == Process.sequential: + result = self._run_sequential_process() + elif self.process == Process.hierarchical: + result = self._run_hierarchical_process() + else: + raise NotImplementedError( + f"The process '{self.process}' is not implemented yet." + ) + + for after_callback in self.after_kickoff_callbacks: + result = after_callback(result) + + metrics += [agent._token_process.get_summary() for agent in self.agents] + + self.usage_metrics = UsageMetrics() + for metric in metrics: + self.usage_metrics.add_usage_metrics(metric) + + return result + + def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: + """Executes the Crew's workflow for each input in the list and aggregates results.""" + results: List[CrewOutput] = [] + + # Initialize the parent crew's usage metrics + total_usage_metrics = UsageMetrics() + + for input_data in inputs: + crew = self.copy() + + output = crew.kickoff(inputs=input_data) + + if crew.usage_metrics: + total_usage_metrics.add_usage_metrics(crew.usage_metrics) + + results.append(output) + + self.usage_metrics = total_usage_metrics + self._task_output_handler.reset() + return results + + async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> CrewOutput: + """Asynchronous kickoff method to start the crew execution.""" + return await asyncio.to_thread(self.kickoff, inputs) + + async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[CrewOutput]: + crew_copies = [self.copy() for _ in inputs] + + async def run_crew(crew, input_data): + return await crew.kickoff_async(inputs=input_data) + + tasks = [ + asyncio.create_task(run_crew(crew_copies[i], inputs[i])) + for i in range(len(inputs)) + ] + + results = await asyncio.gather(*tasks) + + total_usage_metrics = UsageMetrics() + for crew in crew_copies: + if crew.usage_metrics: + total_usage_metrics.add_usage_metrics(crew.usage_metrics) + + self.usage_metrics = total_usage_metrics + self._task_output_handler.reset() + return results + + def _handle_crew_planning(self): + """Handles the Crew planning.""" + self._logger.log("info", "Planning the crew execution") + result = CrewPlanner( + tasks=self.tasks, planning_agent_llm=self.planning_llm + )._handle_crew_planning() + + for task, step_plan in zip(self.tasks, result.list_of_plans_per_task): + task.description += step_plan.plan + + def _store_execution_log( + self, + task: Task, + output: TaskOutput, + task_index: int, + was_replayed: bool = False, + ): + if self._inputs: + inputs = self._inputs + else: + inputs = {} + + log = { + "task": task, + "output": { + "description": output.description, + "summary": output.summary, + "raw": output.raw, + "pydantic": output.pydantic, + "json_dict": output.json_dict, + "output_format": output.output_format, + "agent": output.agent, + }, + "task_index": task_index, + "inputs": inputs, + "was_replayed": was_replayed, + } + self._task_output_handler.update(task_index, log) + + def _run_sequential_process(self) -> CrewOutput: + """Executes tasks sequentially and returns the final output.""" + return self._execute_tasks(self.tasks) + + def _run_hierarchical_process(self) -> CrewOutput: + """Creates and assigns a manager agent to make sure the crew completes the tasks.""" + self._create_manager_agent() + return self._execute_tasks(self.tasks) + + def _create_manager_agent(self): + i18n = I18N(prompt_file=self.prompt_file) + if self.manager_agent is not None: + self.manager_agent.allow_delegation = True + manager = self.manager_agent + if manager.tools is not None and len(manager.tools) > 0: + self._logger.log( + "warning", "Manager agent should not have tools", color="orange" + ) + manager.tools = [] + raise Exception("Manager agent should not have tools") + else: + self.manager_llm = create_llm(self.manager_llm) + manager = Agent( + role=i18n.retrieve("hierarchical_manager_agent", "role"), + goal=i18n.retrieve("hierarchical_manager_agent", "goal"), + backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"), + tools=AgentTools(agents=self.agents).tools(), + allow_delegation=True, + llm=self.manager_llm, + verbose=self.verbose, + ) + self.manager_agent = manager + manager.crew = self + + def _execute_tasks( + self, + tasks: List[Task], + start_index: Optional[int] = 0, + was_replayed: bool = False, + ) -> CrewOutput: + """Executes tasks sequentially and returns the final output. + + Args: + tasks (List[Task]): List of tasks to execute + manager (Optional[BaseAgent], optional): Manager agent to use for delegation. Defaults to None. + + Returns: + CrewOutput: Final output of the crew + """ + + task_outputs: List[TaskOutput] = [] + futures: List[Tuple[Task, Future[TaskOutput], int]] = [] + last_sync_output: Optional[TaskOutput] = None + + for task_index, task in enumerate(tasks): + if start_index is not None and task_index < start_index: + if task.output: + if task.async_execution: + task_outputs.append(task.output) + else: + task_outputs = [task.output] + last_sync_output = task.output + continue + + agent_to_use = self._get_agent_to_use(task) + if agent_to_use is None: + raise ValueError( + f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided." + ) + + # Determine which tools to use - task tools take precedence over agent tools + tools_for_task = task.tools or agent_to_use.tools or [] + tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task) + + self._log_task_start(task, agent_to_use.role) + + if isinstance(task, ConditionalTask): + skipped_task_output = self._handle_conditional_task( + task, task_outputs, futures, task_index, was_replayed + ) + if skipped_task_output: + task_outputs.append(skipped_task_output) + continue + + if task.async_execution: + context = self._get_context( + task, [last_sync_output] if last_sync_output else [] + ) + future = task.execute_async( + agent=agent_to_use, + context=context, + tools=tools_for_task, + ) + futures.append((task, future, task_index)) + else: + if futures: + task_outputs = self._process_async_tasks(futures, was_replayed) + futures.clear() + + context = self._get_context(task, task_outputs) + task_output = task.execute_sync( + agent=agent_to_use, + context=context, + tools=tools_for_task, + ) + task_outputs.append(task_output) + self._process_task_result(task, task_output) + self._store_execution_log(task, task_output, task_index, was_replayed) + + if futures: + task_outputs = self._process_async_tasks(futures, was_replayed) + + return self._create_crew_output(task_outputs) + + def _handle_conditional_task( + self, + task: ConditionalTask, + task_outputs: List[TaskOutput], + futures: List[Tuple[Task, Future[TaskOutput], int]], + task_index: int, + was_replayed: bool, + ) -> Optional[TaskOutput]: + if futures: + task_outputs = self._process_async_tasks(futures, was_replayed) + futures.clear() + + previous_output = task_outputs[-1] if task_outputs else None + if previous_output is not None and not task.should_execute(previous_output): + self._logger.log( + "debug", + f"Skipping conditional task: {task.description}", + color="yellow", + ) + skipped_task_output = task.get_skipped_task_output() + + if not was_replayed: + self._store_execution_log(task, skipped_task_output, task_index) + return skipped_task_output + return None + + def _prepare_tools( + self, agent: BaseAgent, task: Task, tools: List[Tool] + ) -> List[Tool]: + # Add delegation tools if agent allows delegation + if agent.allow_delegation: + if self.process == Process.hierarchical: + if self.manager_agent: + tools = self._update_manager_tools(task, tools) + else: + raise ValueError( + "Manager agent is required for hierarchical process." + ) + + elif agent and agent.allow_delegation: + tools = self._add_delegation_tools(task, tools) + + # Add code execution tools if agent allows code execution + if agent.allow_code_execution: + tools = self._add_code_execution_tools(agent, tools) + + if agent and agent.multimodal: + tools = self._add_multimodal_tools(agent, tools) + + return tools + + def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]: + if self.process == Process.hierarchical: + return self.manager_agent + return task.agent + + def _merge_tools( + self, existing_tools: List[Tool], new_tools: List[Tool] + ) -> List[Tool]: + """Merge new tools into existing tools list, avoiding duplicates by tool name.""" + if not new_tools: + return existing_tools + + # Create mapping of tool names to new tools + new_tool_map = {tool.name: tool for tool in new_tools} + + # Remove any existing tools that will be replaced + tools = [tool for tool in existing_tools if tool.name not in new_tool_map] + + # Add all new tools + tools.extend(new_tools) + + return tools + + def _inject_delegation_tools( + self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent] + ): + delegation_tools = task_agent.get_delegation_tools(agents) + return self._merge_tools(tools, delegation_tools) + + def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]): + multimodal_tools = agent.get_multimodal_tools() + return self._merge_tools(tools, multimodal_tools) + + def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]): + code_tools = agent.get_code_execution_tools() + return self._merge_tools(tools, code_tools) + + def _add_delegation_tools(self, task: Task, tools: List[Tool]): + agents_for_delegation = [agent for agent in self.agents if agent != task.agent] + if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent: + if not tools: + tools = [] + tools = self._inject_delegation_tools( + tools, task.agent, agents_for_delegation + ) + return tools + + def _log_task_start(self, task: Task, role: str = "None"): + if self.output_log_file: + self._file_handler.log( + task_name=task.name, task=task.description, agent=role, status="started" + ) + + def _update_manager_tools(self, task: Task, tools: List[Tool]): + if self.manager_agent: + if task.agent: + tools = self._inject_delegation_tools(tools, task.agent, [task.agent]) + else: + tools = self._inject_delegation_tools( + tools, self.manager_agent, self.agents + ) + return tools + + def _get_context(self, task: Task, task_outputs: List[TaskOutput]): + context = ( + aggregate_raw_outputs_from_tasks(task.context) + if task.context + else aggregate_raw_outputs_from_task_outputs(task_outputs) + ) + return context + + def _process_task_result(self, task: Task, output: TaskOutput) -> None: + role = task.agent.role if task.agent is not None else "None" + if self.output_log_file: + self._file_handler.log( + task_name=task.name, + task=task.description, + agent=role, + status="completed", + output=output.raw, + ) + + def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput: + if not task_outputs: + raise ValueError("No task outputs available to create crew output.") + + # Filter out empty outputs and get the last valid one as the main output + valid_outputs = [t for t in task_outputs if t.raw] + if not valid_outputs: + raise ValueError("No valid task outputs available to create crew output.") + final_task_output = valid_outputs[-1] + + final_string_output = final_task_output.raw + self._finish_execution(final_string_output) + token_usage = self.calculate_usage_metrics() + + return CrewOutput( + raw=final_task_output.raw, + pydantic=final_task_output.pydantic, + json_dict=final_task_output.json_dict, + tasks_output=task_outputs, + token_usage=token_usage, + ) + + def _process_async_tasks( + self, + futures: List[Tuple[Task, Future[TaskOutput], int]], + was_replayed: bool = False, + ) -> List[TaskOutput]: + task_outputs: List[TaskOutput] = [] + for future_task, future, task_index in futures: + task_output = future.result() + task_outputs.append(task_output) + self._process_task_result(future_task, task_output) + self._store_execution_log( + future_task, task_output, task_index, was_replayed + ) + return task_outputs + + def _find_task_index( + self, task_id: str, stored_outputs: List[Any] + ) -> Optional[int]: + return next( + ( + index + for (index, d) in enumerate(stored_outputs) + if d["task_id"] == str(task_id) + ), + None, + ) + + def replay( + self, task_id: str, inputs: Optional[Dict[str, Any]] = None + ) -> CrewOutput: + stored_outputs = self._task_output_handler.load() + if not stored_outputs: + raise ValueError(f"Task with id {task_id} not found in the crew's tasks.") + + start_index = self._find_task_index(task_id, stored_outputs) + + if start_index is None: + raise ValueError(f"Task with id {task_id} not found in the crew's tasks.") + + replay_inputs = ( + inputs if inputs is not None else stored_outputs[start_index]["inputs"] + ) + self._inputs = replay_inputs + + if replay_inputs: + self._interpolate_inputs(replay_inputs) + + if self.process == Process.hierarchical: + self._create_manager_agent() + + for i in range(start_index): + stored_output = stored_outputs[i][ + "output" + ] # for adding context to the task + task_output = TaskOutput( + description=stored_output["description"], + agent=stored_output["agent"], + raw=stored_output["raw"], + pydantic=stored_output["pydantic"], + json_dict=stored_output["json_dict"], + output_format=stored_output["output_format"], + ) + self.tasks[i].output = task_output + + self._logging_color = "bold_blue" + result = self._execute_tasks(self.tasks, start_index, True) + return result + + def query_knowledge(self, query: List[str]) -> Union[List[Dict[str, Any]], None]: + if self.knowledge: + return self.knowledge.query(query) + return None + + def fetch_inputs(self) -> Set[str]: + """ + Gathers placeholders (e.g., {something}) referenced in tasks or agents. + Scans each task's 'description' + 'expected_output', and each agent's + 'role', 'goal', and 'backstory'. + + Returns a set of all discovered placeholder names. + """ + placeholder_pattern = re.compile(r"\{(.+?)\}") + required_inputs: Set[str] = set() + + # Scan tasks for inputs + for task in self.tasks: + # description and expected_output might contain e.g. {topic}, {user_name}, etc. + text = f"{task.description or ''} {task.expected_output or ''}" + required_inputs.update(placeholder_pattern.findall(text)) + + # Scan agents for inputs + for agent in self.agents: + # role, goal, backstory might have placeholders like {role_detail}, etc. + text = f"{agent.role or ''} {agent.goal or ''} {agent.backstory or ''}" + required_inputs.update(placeholder_pattern.findall(text)) + + return required_inputs + + def copy(self): + """Create a deep copy of the Crew.""" + + exclude = { + "id", + "_rpm_controller", + "_logger", + "_execution_span", + "_file_handler", + "_cache_handler", + "_short_term_memory", + "_long_term_memory", + "_entity_memory", + "_telemetry", + "agents", + "tasks", + "knowledge_sources", + "knowledge", + } + + cloned_agents = [agent.copy() for agent in self.agents] + + task_mapping = {} + + cloned_tasks = [] + existing_knowledge_sources = shallow_copy(self.knowledge_sources) + existing_knowledge = shallow_copy(self.knowledge) + + for task in self.tasks: + cloned_task = task.copy(cloned_agents, task_mapping) + cloned_tasks.append(cloned_task) + task_mapping[task.key] = cloned_task + + for cloned_task, original_task in zip(cloned_tasks, self.tasks): + if original_task.context: + cloned_context = [ + task_mapping[context_task.key] + for context_task in original_task.context + ] + cloned_task.context = cloned_context + + copied_data = self.model_dump(exclude=exclude) + copied_data = {k: v for k, v in copied_data.items() if v is not None} + + copied_data.pop("agents", None) + copied_data.pop("tasks", None) + + copied_crew = Crew( + **copied_data, + agents=cloned_agents, + tasks=cloned_tasks, + knowledge_sources=existing_knowledge_sources, + knowledge=existing_knowledge, + ) + + return copied_crew + + def _set_tasks_callbacks(self) -> None: + """Sets callback for every task suing task_callback""" + for task in self.tasks: + if not task.callback: + task.callback = self.task_callback + + def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None: + """Interpolates the inputs in the tasks and agents.""" + [ + task.interpolate_inputs_and_add_conversation_history( + # type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None) + inputs + ) + for task in self.tasks + ] + # type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None) + for agent in self.agents: + agent.interpolate_inputs(inputs) + + def _finish_execution(self, final_string_output: str) -> None: + if self.max_rpm: + self._rpm_controller.stop_rpm_counter() + if agentops: + agentops.end_session( + end_state="Success", + end_state_reason="Finished Execution", + is_auto_end=True, + ) + self._telemetry.end_crew(self, final_string_output) + + def calculate_usage_metrics(self) -> UsageMetrics: + """Calculates and returns the usage metrics.""" + total_usage_metrics = UsageMetrics() + for agent in self.agents: + if hasattr(agent, "_token_process"): + token_sum = agent._token_process.get_summary() + total_usage_metrics.add_usage_metrics(token_sum) + if self.manager_agent and hasattr(self.manager_agent, "_token_process"): + token_sum = self.manager_agent._token_process.get_summary() + total_usage_metrics.add_usage_metrics(token_sum) + self.usage_metrics = total_usage_metrics + return total_usage_metrics + + def test( + self, + n_iterations: int, + openai_model_name: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None, + ) -> None: + """Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures.""" + test_crew = self.copy() + + self._test_execution_span = test_crew._telemetry.test_execution_span( + test_crew, + n_iterations, + inputs, + openai_model_name, # type: ignore[arg-type] + ) # type: ignore[arg-type] + evaluator = CrewEvaluator(test_crew, openai_model_name) # type: ignore[arg-type] + + for i in range(1, n_iterations + 1): + evaluator.set_iteration(i) + test_crew.kickoff(inputs=inputs) + + evaluator.print_crew_evaluation_result() + + def __repr__(self): + return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})" + + def reset_memories(self, command_type: str) -> None: + """Reset specific or all memories for the crew. + + Args: + command_type: Type of memory to reset. + Valid options: 'long', 'short', 'entity', 'knowledge', + 'kickoff_outputs', or 'all' + + Raises: + ValueError: If an invalid command type is provided. + RuntimeError: If memory reset operation fails. + """ + VALID_TYPES = frozenset( + ["long", "short", "entity", "knowledge", "kickoff_outputs", "all"] + ) + + if command_type not in VALID_TYPES: + raise ValueError( + f"Invalid command type. Must be one of: {', '.join(sorted(VALID_TYPES))}" + ) + + try: + if command_type == "all": + self._reset_all_memories() + else: + self._reset_specific_memory(command_type) + + self._logger.log("info", f"{command_type} memory has been reset") + + except Exception as e: + error_msg = f"Failed to reset {command_type} memory: {str(e)}" + self._logger.log("error", error_msg) + raise RuntimeError(error_msg) from e + + def _reset_all_memories(self) -> None: + """Reset all available memory systems.""" + memory_systems = [ + ("short term", self._short_term_memory), + ("entity", self._entity_memory), + ("long term", self._long_term_memory), + ("task output", self._task_output_handler), + ("knowledge", self.knowledge), + ] + + for name, system in memory_systems: + if system is not None: + try: + system.reset() + except Exception as e: + raise RuntimeError(f"Failed to reset {name} memory") from e + + def _reset_specific_memory(self, memory_type: str) -> None: + """Reset a specific memory system. + + Args: + memory_type: Type of memory to reset + + Raises: + RuntimeError: If the specified memory system fails to reset + """ + reset_functions = { + "long": (self._long_term_memory, "long term"), + "short": (self._short_term_memory, "short term"), + "entity": (self._entity_memory, "entity"), + "knowledge": (self.knowledge, "knowledge"), + "kickoff_outputs": (self._task_output_handler, "task output"), + } + + memory_system, name = reset_functions[memory_type] + if memory_system is None: + raise RuntimeError(f"{name} memory system is not initialized") + + try: + memory_system.reset() + except Exception as e: + raise RuntimeError(f"Failed to reset {name} memory") from e diff --git a/src/crewai/llm.py b/src/crewai/llm.py index d6be4b588..ada5c9bf3 100644 --- a/src/crewai/llm.py +++ b/src/crewai/llm.py @@ -164,6 +164,7 @@ class LLM: self.context_window_size = 0 self.reasoning_effort = reasoning_effort self.additional_params = kwargs + self.is_anthropic = self._is_anthropic_model(model) litellm.drop_params = True @@ -178,42 +179,62 @@ class LLM: self.set_callbacks(callbacks) self.set_env_callbacks() + def _is_anthropic_model(self, model: str) -> bool: + """Determine if the model is from Anthropic provider. + + Args: + model: The model identifier string. + + Returns: + bool: True if the model is from Anthropic, False otherwise. + """ + ANTHROPIC_PREFIXES = ('anthropic/', 'claude-', 'claude/') + return any(prefix in model.lower() for prefix in ANTHROPIC_PREFIXES) + def call( self, messages: Union[str, List[Dict[str, str]]], tools: Optional[List[dict]] = None, callbacks: Optional[List[Any]] = None, available_functions: Optional[Dict[str, Any]] = None, - ) -> str: - """ - High-level llm call method that: - 1) Accepts either a string or a list of messages - 2) Converts string input to the required message format - 3) Calls litellm.completion - 4) Handles function/tool calls if any - 5) Returns the final text response or tool result - - Parameters: - - messages (Union[str, List[Dict[str, str]]]): The input messages for the LLM. - - If a string is provided, it will be converted into a message list with a single entry. - - If a list of dictionaries is provided, each dictionary should have 'role' and 'content' keys. - - tools (Optional[List[dict]]): A list of tool schemas for function calling. - - callbacks (Optional[List[Any]]): A list of callback functions to be executed. - - available_functions (Optional[Dict[str, Any]]): A dictionary mapping function names to actual Python functions. - + ) -> Union[str, Any]: + """High-level LLM call method. + + Args: + messages: Input messages for the LLM. + Can be a string or list of message dictionaries. + If string, it will be converted to a single user message. + If list, each dict must have 'role' and 'content' keys. + tools: Optional list of tool schemas for function calling. + Each tool should define its name, description, and parameters. + callbacks: Optional list of callback functions to be executed + during and after the LLM call. + available_functions: Optional dict mapping function names to callables + that can be invoked by the LLM. + Returns: - - str: The final text response from the LLM or the result of a tool function call. - + Union[str, Any]: Either a text response from the LLM (str) or + the result of a tool function call (Any). + + Raises: + TypeError: If messages format is invalid + ValueError: If response format is not supported + LLMContextLengthExceededException: If input exceeds model's context limit + Examples: - --------- - # Example 1: Using a string input - response = llm.call("Return the name of a random city in the world.") - print(response) - - # Example 2: Using a list of messages - messages = [{"role": "user", "content": "What is the capital of France?"}] - response = llm.call(messages) - print(response) + # Example 1: Simple string input + >>> response = llm.call("Return the name of a random city.") + >>> print(response) + "Paris" + + # Example 2: Message list with system and user messages + >>> messages = [ + ... {"role": "system", "content": "You are a geography expert"}, + ... {"role": "user", "content": "What is France's capital?"} + ... ] + >>> response = llm.call(messages) + >>> print(response) + "The capital of France is Paris." """ # Validate parameters before proceeding with the call. self._validate_call_params() @@ -233,10 +254,13 @@ class LLM: self.set_callbacks(callbacks) try: - # --- 1) Prepare the parameters for the completion call + # --- 1) Format messages according to provider requirements + formatted_messages = self._format_messages_for_provider(messages) + + # --- 2) Prepare the parameters for the completion call params = { "model": self.model, - "messages": messages, + "messages": formatted_messages, "timeout": self.timeout, "temperature": self.temperature, "top_p": self.top_p, @@ -324,6 +348,38 @@ class LLM: logging.error(f"LiteLLM call failed: {str(e)}") raise + def _format_messages_for_provider(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]: + """Format messages according to provider requirements. + + Args: + messages: List of message dictionaries with 'role' and 'content' keys. + Can be empty or None. + + Returns: + List of formatted messages according to provider requirements. + For Anthropic models, ensures first message has 'user' role. + + Raises: + TypeError: If messages is None or contains invalid message format. + """ + if messages is None: + raise TypeError("Messages cannot be None") + + # Validate message format first + for msg in messages: + if not isinstance(msg, dict) or "role" not in msg or "content" not in msg: + raise TypeError("Invalid message format. Each message must be a dict with 'role' and 'content' keys") + + if not self.is_anthropic: + return messages + + # Anthropic requires messages to start with 'user' role + if not messages or messages[0]["role"] == "system": + # If first message is system or empty, add a placeholder user message + return [{"role": "user", "content": "."}, *messages] + + return messages + def _get_custom_llm_provider(self) -> str: """ Derives the custom_llm_provider from the model string. diff --git a/src/crewai/task.py b/src/crewai/task.py index 7efb2c97c..4220d53fb 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -695,19 +695,32 @@ class Task(BaseModel): return OutputFormat.PYDANTIC return OutputFormat.RAW - def _save_file(self, result: Any) -> None: + def _save_file(self, result: Union[Dict, str, Any]) -> None: """Save task output to a file. + Note: + For cross-platform file writing, especially on Windows, consider using FileWriterTool + from the crewai_tools package: + pip install 'crewai[tools]' + from crewai_tools import FileWriterTool + Args: result: The result to save to the file. Can be a dict or any stringifiable object. Raises: ValueError: If output_file is not set - RuntimeError: If there is an error writing to the file + RuntimeError: If there is an error writing to the file. For cross-platform + compatibility, especially on Windows, use FileWriterTool from crewai_tools + package. """ if self.output_file is None: raise ValueError("output_file is not set.") + FILEWRITER_RECOMMENDATION = ( + "For cross-platform file writing, especially on Windows, " + "use FileWriterTool from crewai_tools package." + ) + try: resolved_path = Path(self.output_file).expanduser().resolve() directory = resolved_path.parent @@ -723,7 +736,12 @@ class Task(BaseModel): else: file.write(str(result)) except (OSError, IOError) as e: - raise RuntimeError(f"Failed to save output file: {e}") + raise RuntimeError( + "\n".join([ + f"Failed to save output file: {e}", + FILEWRITER_RECOMMENDATION + ]) + ) return None def __repr__(self): diff --git a/tests/crew_test.py b/tests/crew_test.py index 4812ab93f..e69c71315 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -49,6 +49,39 @@ writer = Agent( ) +def test_crew_with_only_conditional_tasks_raises_error(): + """Test that creating a crew with only conditional tasks raises an error.""" + def condition_func(task_output: TaskOutput) -> bool: + return True + + conditional1 = ConditionalTask( + description="Conditional task 1", + expected_output="Output 1", + agent=researcher, + condition=condition_func, + ) + conditional2 = ConditionalTask( + description="Conditional task 2", + expected_output="Output 2", + agent=researcher, + condition=condition_func, + ) + conditional3 = ConditionalTask( + description="Conditional task 3", + expected_output="Output 3", + agent=researcher, + condition=condition_func, + ) + + with pytest.raises( + pydantic_core._pydantic_core.ValidationError, + match="Crew must include at least one non-conditional task", + ): + Crew( + agents=[researcher], + tasks=[conditional1, conditional2, conditional3], + ) + def test_crew_config_conditional_requirement(): with pytest.raises(ValueError): Crew(process=Process.sequential) @@ -2060,6 +2093,195 @@ def test_tools_with_custom_caching(): assert result.raw == "3" +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_conditional_task_uses_last_output(): + """Test that conditional tasks use the last task output for condition evaluation.""" + task1 = Task( + description="First task", + expected_output="First output", + agent=researcher, + ) + def condition_fails(task_output: TaskOutput) -> bool: + # This condition will never be met + return "never matches" in task_output.raw.lower() + + def condition_succeeds(task_output: TaskOutput) -> bool: + # This condition will match first task's output + return "first success" in task_output.raw.lower() + + conditional_task1 = ConditionalTask( + description="Second task - conditional that fails condition", + expected_output="Second output", + agent=researcher, + condition=condition_fails, + ) + + conditional_task2 = ConditionalTask( + description="Third task - conditional that succeeds using first task output", + expected_output="Third output", + agent=writer, + condition=condition_succeeds, + ) + + crew = Crew( + agents=[researcher, writer], + tasks=[task1, conditional_task1, conditional_task2], + ) + + # Mock outputs for tasks + mock_first = TaskOutput( + description="First task output", + raw="First success output", # Will be used by third task's condition + agent=researcher.role, + ) + mock_skipped = TaskOutput( + description="Second task output", + raw="", # Empty output since condition fails + agent=researcher.role, + ) + mock_third = TaskOutput( + description="Third task output", + raw="Third task executed", # Output when condition succeeds using first task output + agent=writer.role, + ) + + # Set up mocks for task execution and conditional logic + with patch.object(ConditionalTask, "should_execute") as mock_should_execute: + # First conditional fails, second succeeds + mock_should_execute.side_effect = [False, True] + + with patch.object(Task, "execute_sync") as mock_execute: + mock_execute.side_effect = [mock_first, mock_third] + result = crew.kickoff() + + # Verify execution behavior + assert mock_execute.call_count == 2 # Only first and third tasks execute + assert mock_should_execute.call_count == 2 # Both conditionals checked + + # Verify outputs collection + assert len(result.tasks_output) == 3 + assert result.tasks_output[0].raw == "First success output" # First task succeeded + assert result.tasks_output[1].raw == "" # Second task skipped (condition failed) + assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_conditional_tasks_result_collection(): + """Test that task outputs are properly collected based on execution status.""" + task1 = Task( + description="Normal task that always executes", + expected_output="First output", + agent=researcher, + ) + + def condition_never_met(task_output: TaskOutput) -> bool: + return "never matches" in task_output.raw.lower() + + def condition_always_met(task_output: TaskOutput) -> bool: + return "success" in task_output.raw.lower() + + task2 = ConditionalTask( + description="Conditional task that never executes", + expected_output="Second output", + agent=researcher, + condition=condition_never_met, + ) + + task3 = ConditionalTask( + description="Conditional task that always executes", + expected_output="Third output", + agent=writer, + condition=condition_always_met, + ) + + crew = Crew( + agents=[researcher, writer], + tasks=[task1, task2, task3], + ) + + # Mock outputs for different execution paths + mock_success = TaskOutput( + description="Success output", + raw="Success output", # Triggers third task's condition + agent=researcher.role, + ) + mock_skipped = TaskOutput( + description="Skipped output", + raw="", # Empty output for skipped task + agent=researcher.role, + ) + mock_conditional = TaskOutput( + description="Conditional output", + raw="Conditional task executed", + agent=writer.role, + ) + + # Set up mocks for task execution and conditional logic + with patch.object(ConditionalTask, "should_execute") as mock_should_execute: + # First conditional fails, second succeeds + mock_should_execute.side_effect = [False, True] + + with patch.object(Task, "execute_sync") as mock_execute: + mock_execute.side_effect = [mock_success, mock_conditional] + result = crew.kickoff() + + # Verify execution behavior + assert mock_execute.call_count == 2 # Only first and third tasks execute + assert mock_should_execute.call_count == 2 # Both conditionals checked + + # Verify task output collection + assert len(result.tasks_output) == 3 + assert result.tasks_output[0].raw == "Success output" # Normal task executed + assert result.tasks_output[1].raw == "" # Second task skipped + assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_multiple_conditional_tasks(): + """Test that having multiple conditional tasks in sequence works correctly.""" + task1 = Task( + description="Initial research task", + expected_output="Research output", + agent=researcher, + ) + + def condition1(task_output: TaskOutput) -> bool: + return "success" in task_output.raw.lower() + + def condition2(task_output: TaskOutput) -> bool: + return "proceed" in task_output.raw.lower() + + task2 = ConditionalTask( + description="First conditional task", + expected_output="Conditional output 1", + agent=writer, + condition=condition1, + ) + + task3 = ConditionalTask( + description="Second conditional task", + expected_output="Conditional output 2", + agent=writer, + condition=condition2, + ) + + crew = Crew( + agents=[researcher, writer], + tasks=[task1, task2, task3], + ) + + # Mock different task outputs to test conditional logic + mock_success = TaskOutput( + description="Mock success", + raw="Success and proceed output", + agent=researcher.role, + ) + + # Set up mocks for task execution + with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute: + result = crew.kickoff() + # Verify all tasks were executed (no IndexError) + assert mock_execute.call_count == 3 + assert len(result.tasks_output) == 3 + @pytest.mark.vcr(filter_headers=["authorization"]) def test_using_contextual_memory(): from unittest.mock import patch diff --git a/tests/llm_test.py b/tests/llm_test.py index d64639dca..2e5faf774 100644 --- a/tests/llm_test.py +++ b/tests/llm_test.py @@ -286,6 +286,79 @@ def test_o3_mini_reasoning_effort_medium(): @pytest.mark.vcr(filter_headers=["authorization"]) +@pytest.fixture +def anthropic_llm(): + """Fixture providing an Anthropic LLM instance.""" + return LLM(model="anthropic/claude-3-sonnet") + +@pytest.fixture +def system_message(): + """Fixture providing a system message.""" + return {"role": "system", "content": "test"} + +@pytest.fixture +def user_message(): + """Fixture providing a user message.""" + return {"role": "user", "content": "test"} + +def test_anthropic_message_formatting_edge_cases(anthropic_llm): + """Test edge cases for Anthropic message formatting.""" + # Test None messages + with pytest.raises(TypeError, match="Messages cannot be None"): + anthropic_llm._format_messages_for_provider(None) + + # Test empty message list + formatted = anthropic_llm._format_messages_for_provider([]) + assert len(formatted) == 1 + assert formatted[0]["role"] == "user" + assert formatted[0]["content"] == "." + + # Test invalid message format + with pytest.raises(TypeError, match="Invalid message format"): + anthropic_llm._format_messages_for_provider([{"invalid": "message"}]) + +def test_anthropic_model_detection(): + """Test Anthropic model detection with various formats.""" + models = [ + ("anthropic/claude-3", True), + ("claude-instant", True), + ("claude/v1", True), + ("gpt-4", False), + ("", False), + ("anthropomorphic", False), # Should not match partial words + ] + + for model, expected in models: + llm = LLM(model=model) + assert llm.is_anthropic == expected, f"Failed for model: {model}" + +def test_anthropic_message_formatting(anthropic_llm, system_message, user_message): + """Test Anthropic message formatting with fixtures.""" + # Test when first message is system + formatted = anthropic_llm._format_messages_for_provider([system_message]) + assert len(formatted) == 2 + assert formatted[0]["role"] == "user" + assert formatted[0]["content"] == "." + assert formatted[1] == system_message + + # Test when first message is already user + formatted = anthropic_llm._format_messages_for_provider([user_message]) + assert len(formatted) == 1 + assert formatted[0] == user_message + + # Test with empty message list + formatted = anthropic_llm._format_messages_for_provider([]) + assert len(formatted) == 1 + assert formatted[0]["role"] == "user" + assert formatted[0]["content"] == "." + + # Test with non-Anthropic model (should not modify messages) + non_anthropic_llm = LLM(model="gpt-4") + formatted = non_anthropic_llm._format_messages_for_provider([system_message]) + assert len(formatted) == 1 + assert formatted[0] == system_message + + def test_deepseek_r1_with_open_router(): if not os.getenv("OPEN_ROUTER_API_KEY"): pytest.skip("OPEN_ROUTER_API_KEY not set; skipping test.")