From 639e5342deb0a5c0c6584336aa391a89ee85bee2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 9 Feb 2025 21:23:34 +0000 Subject: [PATCH] fix: improve type safety and error handling Co-Authored-By: Joe Moura --- src/crewai/crew.py | 175 ++++++++++++++---- .../evaluators/crew_evaluator_handler.py | 21 ++- tests/crew_test.py | 38 +++- .../evaluators/test_crew_evaluator_handler.py | 2 +- 4 files changed, 191 insertions(+), 45 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 573d635c3..2ac529857 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -6,7 +6,7 @@ import warnings from concurrent.futures import Future from copy import copy as shallow_copy from hashlib import md5 -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union from pydantic import ( UUID4, @@ -20,10 +20,8 @@ from pydantic import ( ) from pydantic_core import PydanticCustomError -from typing import Union - -from crewai.llm import LLM from crewai.agent import Agent +from crewai.tools.base_tool import BaseTool from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.cache import CacheHandler from crewai.crews.crew_output import CrewOutput @@ -182,7 +180,7 @@ class Crew(BaseModel): default=None, description="Maximum number of requests per minute for the crew execution to be respected.", ) - prompt_file: str = Field( + prompt_file: Optional[str] = Field( default=None, description="Path to the prompt json file to be used for the crew.", ) @@ -476,7 +474,16 @@ class Crew(BaseModel): "missing_keys_in_config", "Config should have 'agents' and 'tasks'.", {} ) - self.process = self.config.get("process", self.process) + # Get process from config with proper type handling + process_value = self.config.get("process") + if process_value is not None: + if not isinstance(process_value, Process): + try: + process_value = Process(process_value) + except ValueError: + raise ValueError(f"Invalid process value: {process_value}") + self.process = process_value + self.agents = [Agent(**agent) for agent in self.config["agents"]] self.tasks = [self._create_task(task) for task in self.config["tasks"]] @@ -752,8 +759,12 @@ class Crew(BaseModel): ) # Determine which tools to use - task tools take precedence over agent tools - tools_for_task = task.tools or agent_to_use.tools or [] - tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task) + initial_tools: List[BaseTool] = [] + if task.tools: + initial_tools = list(task.tools) + elif agent_to_use.tools: + initial_tools = list(agent_to_use.tools) + tools_for_task = self._prepare_tools(agent_to_use, task, initial_tools) self._log_task_start(task, agent_to_use.role) @@ -769,10 +780,12 @@ class Crew(BaseModel): context = self._get_context( task, [last_sync_output] if last_sync_output else [] ) + # Convert Sequence to List for execute_async + tools_list = list(tools_for_task) if tools_for_task else None future = task.execute_async( agent=agent_to_use, context=context, - tools=tools_for_task, + tools=tools_list, ) futures.append((task, future, task_index)) else: @@ -781,10 +794,12 @@ class Crew(BaseModel): futures.clear() context = self._get_context(task, task_outputs) + # Convert Sequence to List for execute_sync + tools_list = list(tools_for_task) if tools_for_task else None task_output = task.execute_sync( agent=agent_to_use, context=context, - tools=tools_for_task, + tools=tools_list, ) task_outputs.append(task_output) self._process_task_result(task, task_output) @@ -822,27 +837,37 @@ class Crew(BaseModel): return None def _prepare_tools( - self, agent: BaseAgent, task: Task, tools: List[Tool] - ) -> List[Tool]: + self, agent: BaseAgent, task: Task, tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: + """Prepare tools for the agent. + + Args: + agent: The agent to prepare tools for + task: The task being executed + tools: Initial set of tools + + Returns: + Updated sequence of tools with additional capabilities based on agent configuration + """ # Add delegation tools if agent allows delegation - if agent.allow_delegation: + if getattr(agent, "allow_delegation", False): if self.process == Process.hierarchical: if self.manager_agent: - tools = self._update_manager_tools(task, tools) + tools = self._update_manager_tools(task, tools) # type: ignore[arg-type] else: raise ValueError( "Manager agent is required for hierarchical process." ) - elif agent and agent.allow_delegation: - tools = self._add_delegation_tools(task, tools) + elif agent: + tools = self._add_delegation_tools(task, tools) # type: ignore[arg-type] # Add code execution tools if agent allows code execution - if agent.allow_code_execution: - tools = self._add_code_execution_tools(agent, tools) + if getattr(agent, "allow_code_execution", False): + tools = self._add_code_execution_tools(agent, tools) # type: ignore[arg-type] - if agent and agent.multimodal: - tools = self._add_multimodal_tools(agent, tools) + if agent and getattr(agent, "multimodal", False): + tools = self._add_multimodal_tools(agent, tools) # type: ignore[arg-type] return tools @@ -852,9 +877,17 @@ class Crew(BaseModel): return task.agent def _merge_tools( - self, existing_tools: List[Tool], new_tools: List[Tool] - ) -> List[Tool]: - """Merge new tools into existing tools list, avoiding duplicates by tool name.""" + self, existing_tools: Sequence[BaseTool], new_tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: + """Merge new tools into existing tools list, avoiding duplicates by tool name. + + Args: + existing_tools: Current sequence of tools + new_tools: New tools to merge in + + Returns: + Updated sequence of tools with duplicates removed + """ if not new_tools: return existing_tools @@ -870,24 +903,67 @@ class Crew(BaseModel): return tools def _inject_delegation_tools( - self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent] - ): - delegation_tools = task_agent.get_delegation_tools(agents) + self, tools: Sequence[BaseTool], task_agent: BaseAgent, agents: List[BaseAgent] + ) -> Sequence[BaseTool]: + """Add delegation tools for the agent. + + Args: + tools: Current set of tools + task_agent: Agent that will use the tools + agents: List of agents that can be delegated to + + Returns: + Updated sequence of tools with delegation capabilities + """ + delegation_tools = task_agent.get_delegation_tools(agents) # type: ignore[attr-defined] return self._merge_tools(tools, delegation_tools) - def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]): - multimodal_tools = agent.get_multimodal_tools() + def _add_multimodal_tools( + self, agent: BaseAgent, tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: + """Add multimodal tools for the agent. + + Args: + agent: Agent that will use the tools + tools: Current set of tools + + Returns: + Updated sequence of tools with multimodal capabilities + """ + multimodal_tools = agent.get_multimodal_tools() # type: ignore[attr-defined] return self._merge_tools(tools, multimodal_tools) - def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]): - code_tools = agent.get_code_execution_tools() + def _add_code_execution_tools( + self, agent: BaseAgent, tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: + """Add code execution tools for the agent. + + Args: + agent: Agent that will use the tools + tools: Current set of tools + + Returns: + Updated sequence of tools with code execution capabilities + """ + code_tools = agent.get_code_execution_tools() # type: ignore[attr-defined] return self._merge_tools(tools, code_tools) - def _add_delegation_tools(self, task: Task, tools: List[Tool]): + def _add_delegation_tools( + self, task: Task, tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: + """Add delegation tools for the task's agent. + + Args: + task: Task being executed + tools: Current set of tools + + Returns: + Updated sequence of tools with delegation capabilities + """ agents_for_delegation = [agent for agent in self.agents if agent != task.agent] if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent: if not tools: - tools = [] + tools = [] # type: ignore[assignment] tools = self._inject_delegation_tools( tools, task.agent, agents_for_delegation ) @@ -899,7 +975,18 @@ class Crew(BaseModel): task_name=task.name, task=task.description, agent=role, status="started" ) - def _update_manager_tools(self, task: Task, tools: List[Tool]): + def _update_manager_tools( + self, task: Task, tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: + """Update tools for manager agent. + + Args: + task: Task being executed + tools: Current set of tools + + Returns: + Updated sequence of tools with manager capabilities + """ if self.manager_agent: if task.agent: tools = self._inject_delegation_tools(tools, task.agent, [task.agent]) @@ -1161,17 +1248,31 @@ class Crew(BaseModel): n_iterations: Number of test iterations to run llm: LLM instance or model name to use for evaluation openai_model_name: (Deprecated) OpenAI model name to use for evaluation - inputs: Optional inputs for the crew + inputs: Optional dictionary of inputs for the crew + + Raises: + ValueError: If inputs is not a dictionary or if LLM configuration is invalid + TypeError: If n_iterations is not a positive integer """ + if n_iterations < 1: + raise TypeError("n_iterations must be a positive integer") + + if inputs is not None and not isinstance(inputs, dict): + raise ValueError("inputs must be a dictionary") + + # Validate LLM configuration + if isinstance(llm, str) and not llm.strip(): + raise ValueError("LLM model name cannot be empty") + + test_llm: Union[str, LLM, None] = llm if llm is not None else openai_model_name test_crew = self.copy() - test_llm = llm or openai_model_name self._test_execution_span = test_crew._telemetry.test_execution_span( test_crew, n_iterations, inputs, - test_llm, # type: ignore[arg-type] - ) # type: ignore[arg-type] + test_llm, + ) evaluator = CrewEvaluator(test_crew, test_llm) for i in range(1, n_iterations + 1): diff --git a/src/crewai/utilities/evaluators/crew_evaluator_handler.py b/src/crewai/utilities/evaluators/crew_evaluator_handler.py index d41ec3944..777924b54 100644 --- a/src/crewai/utilities/evaluators/crew_evaluator_handler.py +++ b/src/crewai/utilities/evaluators/crew_evaluator_handler.py @@ -1,7 +1,5 @@ -from typing import Union - -from crewai.llm import LLM from collections import defaultdict +from typing import Union from pydantic import BaseModel, Field from rich.box import HEAVY_EDGE @@ -9,6 +7,7 @@ from rich.console import Console from rich.table import Table from crewai.agent import Agent +from crewai.llm import LLM from crewai.task import Task from crewai.tasks.task_output import TaskOutput from crewai.telemetry import Telemetry @@ -41,9 +40,21 @@ class CrewEvaluator: Args: crew: The crew to evaluate llm: LLM instance or model name to use for evaluation + + Raises: + ValueError: If LLM model name is empty or invalid + RuntimeError: If evaluator agent initialization fails """ self.crew = crew - self._llm = llm if isinstance(llm, LLM) else LLM(model=llm) if llm else None + + if isinstance(llm, str) and not llm.strip(): + raise ValueError("LLM model name cannot be empty") + + try: + self._llm = llm if isinstance(llm, LLM) else LLM(model=llm) if llm else None + except Exception as e: + raise RuntimeError(f"Failed to initialize LLM: {str(e)}") + self._telemetry = Telemetry() self._setup_for_evaluating() @@ -190,7 +201,7 @@ class CrewEvaluator: self.crew, evaluation_result.pydantic.quality, current_task.execution_duration, - self._llm.model if self._llm else None, + self._llm.model if self._llm else "default", ) self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality) self.run_execution_times[self.iteration].append( diff --git a/tests/crew_test.py b/tests/crew_test.py index 147741ee7..a0726cefd 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -10,13 +10,12 @@ import instructor import pydantic_core import pytest -from crewai.llm import LLM -from crewai.utilities.evaluators.crew_evaluator_handler import TaskEvaluationPydanticOutput from crewai.agent import Agent from crewai.agents.cache import CacheHandler from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource +from crewai.llm import LLM from crewai.memory.contextual.contextual_memory import ContextualMemory from crewai.process import Process from crewai.project import crew @@ -26,6 +25,9 @@ from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput from crewai.types.usage_metrics import UsageMetrics from crewai.utilities import Logger +from crewai.utilities.evaluators.crew_evaluator_handler import ( + TaskEvaluationPydanticOutput, +) from crewai.utilities.rpm_controller import RPMController from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler @@ -3341,6 +3343,38 @@ def test_crew_test_backward_compatibility(mock_kickoff, mock_copy, mock_evaluato args = mock_evaluator.call_args[0] assert args[1] == "gpt-4" +@mock.patch("crewai.crew.CrewEvaluator") +@mock.patch.object(Crew, "copy") +@mock.patch.object(Crew, "kickoff") +def test_crew_test_with_invalid_inputs(mock_kickoff, mock_copy, mock_evaluator): + """Test that Crew.test() validates inputs properly.""" + task = Task(description="Test task", expected_output="Test output", agent=researcher) + crew = Crew(agents=[researcher], tasks=[task]) + mock_copy.return_value = crew + + with pytest.raises(TypeError): + crew.test(n_iterations=0) # Invalid iterations + + with pytest.raises(ValueError): + crew.test(n_iterations=1, inputs="invalid") # Invalid inputs type + + with pytest.raises(ValueError): + crew.test(n_iterations=1, llm="") # Empty LLM name + +@mock.patch("crewai.crew.CrewEvaluator") +@mock.patch.object(Crew, "copy") +@mock.patch.object(Crew, "kickoff") +def test_crew_test_concurrent_execution(mock_kickoff, mock_copy, mock_evaluator): + """Test that Crew.test() handles concurrent execution properly.""" + task = Task(description="Test task", expected_output="Test output", agent=researcher) + crew = Crew(agents=[researcher], tasks=[task]) + mock_copy.return_value = crew + mock_evaluator.return_value = mock.MagicMock() + n_iterations = 3 + + crew.test(n_iterations=n_iterations) + assert mock_evaluator.return_value.set_iteration.call_count == n_iterations + @mock.patch("crewai.crew.CrewEvaluator") @mock.patch.object(Crew, "copy") @mock.patch.object(Crew, "kickoff") diff --git a/tests/utilities/evaluators/test_crew_evaluator_handler.py b/tests/utilities/evaluators/test_crew_evaluator_handler.py index 7c760f69b..b4f668523 100644 --- a/tests/utilities/evaluators/test_crew_evaluator_handler.py +++ b/tests/utilities/evaluators/test_crew_evaluator_handler.py @@ -2,9 +2,9 @@ from unittest import mock import pytest -from crewai.llm import LLM from crewai.agent import Agent from crewai.crew import Crew +from crewai.llm import LLM from crewai.task import Task from crewai.tasks.task_output import TaskOutput from crewai.utilities.evaluators.crew_evaluator_handler import (