fix: improve type safety and error handling

Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
Devin AI
2025-02-09 21:23:34 +00:00
parent 257780ff6a
commit 639e5342de
4 changed files with 191 additions and 45 deletions

View File

@@ -6,7 +6,7 @@ import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union
from pydantic import (
UUID4,
@@ -20,10 +20,8 @@ from pydantic import (
)
from pydantic_core import PydanticCustomError
from typing import Union
from crewai.llm import LLM
from crewai.agent import Agent
from crewai.tools.base_tool import BaseTool
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache import CacheHandler
from crewai.crews.crew_output import CrewOutput
@@ -182,7 +180,7 @@ class Crew(BaseModel):
default=None,
description="Maximum number of requests per minute for the crew execution to be respected.",
)
prompt_file: str = Field(
prompt_file: Optional[str] = Field(
default=None,
description="Path to the prompt json file to be used for the crew.",
)
@@ -476,7 +474,16 @@ class Crew(BaseModel):
"missing_keys_in_config", "Config should have 'agents' and 'tasks'.", {}
)
self.process = self.config.get("process", self.process)
# Get process from config with proper type handling
process_value = self.config.get("process")
if process_value is not None:
if not isinstance(process_value, Process):
try:
process_value = Process(process_value)
except ValueError:
raise ValueError(f"Invalid process value: {process_value}")
self.process = process_value
self.agents = [Agent(**agent) for agent in self.config["agents"]]
self.tasks = [self._create_task(task) for task in self.config["tasks"]]
@@ -752,8 +759,12 @@ class Crew(BaseModel):
)
# Determine which tools to use - task tools take precedence over agent tools
tools_for_task = task.tools or agent_to_use.tools or []
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
initial_tools: List[BaseTool] = []
if task.tools:
initial_tools = list(task.tools)
elif agent_to_use.tools:
initial_tools = list(agent_to_use.tools)
tools_for_task = self._prepare_tools(agent_to_use, task, initial_tools)
self._log_task_start(task, agent_to_use.role)
@@ -769,10 +780,12 @@ class Crew(BaseModel):
context = self._get_context(
task, [last_sync_output] if last_sync_output else []
)
# Convert Sequence to List for execute_async
tools_list = list(tools_for_task) if tools_for_task else None
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=tools_for_task,
tools=tools_list,
)
futures.append((task, future, task_index))
else:
@@ -781,10 +794,12 @@ class Crew(BaseModel):
futures.clear()
context = self._get_context(task, task_outputs)
# Convert Sequence to List for execute_sync
tools_list = list(tools_for_task) if tools_for_task else None
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=tools_for_task,
tools=tools_list,
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
@@ -822,27 +837,37 @@ class Crew(BaseModel):
return None
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: List[Tool]
) -> List[Tool]:
self, agent: BaseAgent, task: Task, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Prepare tools for the agent.
Args:
agent: The agent to prepare tools for
task: The task being executed
tools: Initial set of tools
Returns:
Updated sequence of tools with additional capabilities based on agent configuration
"""
# Add delegation tools if agent allows delegation
if agent.allow_delegation:
if getattr(agent, "allow_delegation", False):
if self.process == Process.hierarchical:
if self.manager_agent:
tools = self._update_manager_tools(task, tools)
tools = self._update_manager_tools(task, tools) # type: ignore[arg-type]
else:
raise ValueError(
"Manager agent is required for hierarchical process."
)
elif agent and agent.allow_delegation:
tools = self._add_delegation_tools(task, tools)
elif agent:
tools = self._add_delegation_tools(task, tools) # type: ignore[arg-type]
# Add code execution tools if agent allows code execution
if agent.allow_code_execution:
tools = self._add_code_execution_tools(agent, tools)
if getattr(agent, "allow_code_execution", False):
tools = self._add_code_execution_tools(agent, tools) # type: ignore[arg-type]
if agent and agent.multimodal:
tools = self._add_multimodal_tools(agent, tools)
if agent and getattr(agent, "multimodal", False):
tools = self._add_multimodal_tools(agent, tools) # type: ignore[arg-type]
return tools
@@ -852,9 +877,17 @@ class Crew(BaseModel):
return task.agent
def _merge_tools(
self, existing_tools: List[Tool], new_tools: List[Tool]
) -> List[Tool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
self, existing_tools: Sequence[BaseTool], new_tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name.
Args:
existing_tools: Current sequence of tools
new_tools: New tools to merge in
Returns:
Updated sequence of tools with duplicates removed
"""
if not new_tools:
return existing_tools
@@ -870,24 +903,67 @@ class Crew(BaseModel):
return tools
def _inject_delegation_tools(
self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]
):
delegation_tools = task_agent.get_delegation_tools(agents)
self, tools: Sequence[BaseTool], task_agent: BaseAgent, agents: List[BaseAgent]
) -> Sequence[BaseTool]:
"""Add delegation tools for the agent.
Args:
tools: Current set of tools
task_agent: Agent that will use the tools
agents: List of agents that can be delegated to
Returns:
Updated sequence of tools with delegation capabilities
"""
delegation_tools = task_agent.get_delegation_tools(agents) # type: ignore[attr-defined]
return self._merge_tools(tools, delegation_tools)
def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]):
multimodal_tools = agent.get_multimodal_tools()
def _add_multimodal_tools(
self, agent: BaseAgent, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Add multimodal tools for the agent.
Args:
agent: Agent that will use the tools
tools: Current set of tools
Returns:
Updated sequence of tools with multimodal capabilities
"""
multimodal_tools = agent.get_multimodal_tools() # type: ignore[attr-defined]
return self._merge_tools(tools, multimodal_tools)
def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]):
code_tools = agent.get_code_execution_tools()
def _add_code_execution_tools(
self, agent: BaseAgent, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Add code execution tools for the agent.
Args:
agent: Agent that will use the tools
tools: Current set of tools
Returns:
Updated sequence of tools with code execution capabilities
"""
code_tools = agent.get_code_execution_tools() # type: ignore[attr-defined]
return self._merge_tools(tools, code_tools)
def _add_delegation_tools(self, task: Task, tools: List[Tool]):
def _add_delegation_tools(
self, task: Task, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Add delegation tools for the task's agent.
Args:
task: Task being executed
tools: Current set of tools
Returns:
Updated sequence of tools with delegation capabilities
"""
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
tools = []
tools = [] # type: ignore[assignment]
tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation
)
@@ -899,7 +975,18 @@ class Crew(BaseModel):
task_name=task.name, task=task.description, agent=role, status="started"
)
def _update_manager_tools(self, task: Task, tools: List[Tool]):
def _update_manager_tools(
self, task: Task, tools: Sequence[BaseTool]
) -> Sequence[BaseTool]:
"""Update tools for manager agent.
Args:
task: Task being executed
tools: Current set of tools
Returns:
Updated sequence of tools with manager capabilities
"""
if self.manager_agent:
if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
@@ -1161,17 +1248,31 @@ class Crew(BaseModel):
n_iterations: Number of test iterations to run
llm: LLM instance or model name to use for evaluation
openai_model_name: (Deprecated) OpenAI model name to use for evaluation
inputs: Optional inputs for the crew
inputs: Optional dictionary of inputs for the crew
Raises:
ValueError: If inputs is not a dictionary or if LLM configuration is invalid
TypeError: If n_iterations is not a positive integer
"""
if n_iterations < 1:
raise TypeError("n_iterations must be a positive integer")
if inputs is not None and not isinstance(inputs, dict):
raise ValueError("inputs must be a dictionary")
# Validate LLM configuration
if isinstance(llm, str) and not llm.strip():
raise ValueError("LLM model name cannot be empty")
test_llm: Union[str, LLM, None] = llm if llm is not None else openai_model_name
test_crew = self.copy()
test_llm = llm or openai_model_name
self._test_execution_span = test_crew._telemetry.test_execution_span(
test_crew,
n_iterations,
inputs,
test_llm, # type: ignore[arg-type]
) # type: ignore[arg-type]
test_llm,
)
evaluator = CrewEvaluator(test_crew, test_llm)
for i in range(1, n_iterations + 1):

View File

@@ -1,7 +1,5 @@
from typing import Union
from crewai.llm import LLM
from collections import defaultdict
from typing import Union
from pydantic import BaseModel, Field
from rich.box import HEAVY_EDGE
@@ -9,6 +7,7 @@ from rich.console import Console
from rich.table import Table
from crewai.agent import Agent
from crewai.llm import LLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
@@ -41,9 +40,21 @@ class CrewEvaluator:
Args:
crew: The crew to evaluate
llm: LLM instance or model name to use for evaluation
Raises:
ValueError: If LLM model name is empty or invalid
RuntimeError: If evaluator agent initialization fails
"""
self.crew = crew
self._llm = llm if isinstance(llm, LLM) else LLM(model=llm) if llm else None
if isinstance(llm, str) and not llm.strip():
raise ValueError("LLM model name cannot be empty")
try:
self._llm = llm if isinstance(llm, LLM) else LLM(model=llm) if llm else None
except Exception as e:
raise RuntimeError(f"Failed to initialize LLM: {str(e)}")
self._telemetry = Telemetry()
self._setup_for_evaluating()
@@ -190,7 +201,7 @@ class CrewEvaluator:
self.crew,
evaluation_result.pydantic.quality,
current_task.execution_duration,
self._llm.model if self._llm else None,
self._llm.model if self._llm else "default",
)
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
self.run_execution_times[self.iteration].append(

View File

@@ -10,13 +10,12 @@ import instructor
import pydantic_core
import pytest
from crewai.llm import LLM
from crewai.utilities.evaluators.crew_evaluator_handler import TaskEvaluationPydanticOutput
from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.process import Process
from crewai.project import crew
@@ -26,6 +25,9 @@ from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import Logger
from crewai.utilities.evaluators.crew_evaluator_handler import (
TaskEvaluationPydanticOutput,
)
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
@@ -3341,6 +3343,38 @@ def test_crew_test_backward_compatibility(mock_kickoff, mock_copy, mock_evaluato
args = mock_evaluator.call_args[0]
assert args[1] == "gpt-4"
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch.object(Crew, "copy")
@mock.patch.object(Crew, "kickoff")
def test_crew_test_with_invalid_inputs(mock_kickoff, mock_copy, mock_evaluator):
"""Test that Crew.test() validates inputs properly."""
task = Task(description="Test task", expected_output="Test output", agent=researcher)
crew = Crew(agents=[researcher], tasks=[task])
mock_copy.return_value = crew
with pytest.raises(TypeError):
crew.test(n_iterations=0) # Invalid iterations
with pytest.raises(ValueError):
crew.test(n_iterations=1, inputs="invalid") # Invalid inputs type
with pytest.raises(ValueError):
crew.test(n_iterations=1, llm="") # Empty LLM name
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch.object(Crew, "copy")
@mock.patch.object(Crew, "kickoff")
def test_crew_test_concurrent_execution(mock_kickoff, mock_copy, mock_evaluator):
"""Test that Crew.test() handles concurrent execution properly."""
task = Task(description="Test task", expected_output="Test output", agent=researcher)
crew = Crew(agents=[researcher], tasks=[task])
mock_copy.return_value = crew
mock_evaluator.return_value = mock.MagicMock()
n_iterations = 3
crew.test(n_iterations=n_iterations)
assert mock_evaluator.return_value.set_iteration.call_count == n_iterations
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch.object(Crew, "copy")
@mock.patch.object(Crew, "kickoff")

View File

@@ -2,9 +2,9 @@ from unittest import mock
import pytest
from crewai.llm import LLM
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.llm import LLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.evaluators.crew_evaluator_handler import (