Compare commits

...

5 Commits

Author SHA1 Message Date
Devin AI
bf2b0c5864 fix: Replace remaining Type imports with built-in type annotations
Co-Authored-By: João <joao@crewai.com>
2025-09-04 02:23:58 +00:00
Devin AI
3257d2757f fix: Complete deprecated typing imports replacement
- Replace typing.Type with type in all utility files
- Replace typing.Dict with dict in remaining files
- Replace typing.List with list in remaining files
- Fix all undefined name errors from deprecated imports
- Ensure compatibility with Python 3.10-3.13 type checking

Co-Authored-By: João <joao@crewai.com>
2025-09-04 02:17:59 +00:00
Devin AI
045da4f030 fix: Replace remaining deprecated typing imports with built-in types
- Replace List[...] with list[...] in crew.py method signatures
- Replace Dict[str, Any] with dict[str, Any] in crew.py and task.py
- Fix all undefined name errors from deprecated typing imports
- Maintain backward compatibility while modernizing type hints

Co-Authored-By: João <joao@crewai.com>
2025-09-04 02:11:50 +00:00
Devin AI
3619d4dc50 fix: Replace deprecated typing imports with built-in types
- Replace Dict, List, Set, Tuple with dict, list, set, tuple throughout codebase
- Add missing type annotations to crew_events.py methods
- Add proper type annotations to test_crew_cancellation.py
- Use type: ignore[method-assign] comments for mock assignments
- Maintain backward compatibility while modernizing type hints

This resolves lint and type-checker failures in CI while preserving
the cancellation functionality.

Co-Authored-By: João <joao@crewai.com>
2025-09-04 02:07:02 +00:00
Devin AI
3a54cc859a feat: Add external termination/cancellation support for running crews
- Add threading.Event-based cancellation mechanism to Crew class
- Implement cancel(), is_cancelled(), and _reset_cancellation() methods
- Add cancellation checks in _execute_tasks() main execution loop
- Handle cancellation in async task processing (_process_async_tasks)
- Create CrewKickoffCancelledEvent following existing event patterns
- Add comprehensive tests for cancellation functionality
- Support graceful shutdown allowing current task to complete
- Ensure thread-safe cancellation across sequential and hierarchical processes

Fixes #3445

Co-Authored-By: João <joao@crewai.com>
2025-09-04 01:52:42 +00:00
111 changed files with 969 additions and 720 deletions

View File

@@ -4,12 +4,9 @@ import time
from typing import (
Any,
Callable,
Dict,
List,
Literal,
Optional,
Sequence,
Tuple,
Type,
Union,
)
@@ -151,7 +148,7 @@ class Agent(BaseAgent):
default=None,
description="Maximum number of reasoning attempts before executing the task. If None, will try until ready.",
)
embedder: Optional[Dict[str, Any]] = Field(
embedder: Optional[dict[str, Any]] = Field(
default=None,
description="Embedder configuration for the agent.",
)
@@ -171,7 +168,7 @@ class Agent(BaseAgent):
default=None,
description="The Agent's role to be used from your repository.",
)
guardrail: Optional[Union[Callable[[Any], Tuple[bool, Any]], str]] = Field(
guardrail: Optional[Union[Callable[[Any], tuple[bool, Any]], str]] = Field(
default=None,
description="Function or string description of a guardrail to validate agent output",
)
@@ -208,7 +205,7 @@ class Agent(BaseAgent):
self.cache_handler = CacheHandler()
self.set_cache_handler(self.cache_handler)
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
def set_knowledge(self, crew_embedder: Optional[dict[str, Any]] = None):
try:
if self.embedder is None and crew_embedder:
self.embedder = crew_embedder
@@ -245,7 +242,7 @@ class Agent(BaseAgent):
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> str:
"""Execute a task with the agent.
@@ -554,14 +551,14 @@ class Agent(BaseAgent):
)["output"]
def create_agent_executor(
self, tools: Optional[List[BaseTool]] = None, task=None
self, tools: Optional[list[BaseTool]] = None, task=None
) -> None:
"""Create an agent executor for the agent.
Returns:
An instance of the CrewAgentExecutor class.
"""
raw_tools: List[BaseTool] = tools or self.tools or []
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
prompt = Prompts(
@@ -603,7 +600,7 @@ class Agent(BaseAgent):
callbacks=[TokenCalcHandler(self._token_process)],
)
def get_delegation_tools(self, agents: List[BaseAgent]):
def get_delegation_tools(self, agents: list[BaseAgent]):
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
@@ -654,7 +651,7 @@ class Agent(BaseAgent):
)
return task_prompt
def _render_text_description(self, tools: List[Any]) -> str:
def _render_text_description(self, tools: list[Any]) -> str:
"""Render the tool name and description in plain text.
Output will be in the format of:
@@ -796,7 +793,7 @@ class Agent(BaseAgent):
def kickoff(
self,
messages: Union[str, List[Dict[str, str]]],
messages: Union[str, list[dict[str, str]]],
response_format: Optional[Type[Any]] = None,
) -> LiteAgentOutput:
"""
@@ -836,7 +833,7 @@ class Agent(BaseAgent):
async def kickoff_async(
self,
messages: Union[str, List[Dict[str, str]]],
messages: Union[str, list[dict[str, str]]],
response_format: Optional[Type[Any]] = None,
) -> LiteAgentOutput:
"""

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from pydantic import PrivateAttr
@@ -16,16 +16,16 @@ class BaseAgentAdapter(BaseAgent, ABC):
"""
adapted_structured_output: bool = False
_agent_config: Optional[Dict[str, Any]] = PrivateAttr(default=None)
_agent_config: Optional[dict[str, Any]] = PrivateAttr(default=None)
model_config = {"arbitrary_types_allowed": True}
def __init__(self, agent_config: Optional[Dict[str, Any]] = None, **kwargs: Any):
def __init__(self, agent_config: Optional[dict[str, Any]] = None, **kwargs: Any):
super().__init__(adapted_agent=True, **kwargs)
self._agent_config = agent_config
@abstractmethod
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[list[BaseTool]] = None) -> None:
"""Configure and adapt tools for the specific agent implementation.
Args:

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from typing import Any, Optional
from crewai.tools.base_tool import BaseTool
@@ -12,15 +12,15 @@ class BaseToolAdapter(ABC):
different frameworks and platforms.
"""
original_tools: List[BaseTool]
converted_tools: List[Any]
original_tools: list[BaseTool]
converted_tools: list[Any]
def __init__(self, tools: Optional[List[BaseTool]] = None):
def __init__(self, tools: Optional[list[BaseTool]] = None):
self.original_tools = tools or []
self.converted_tools = []
@abstractmethod
def configure_tools(self, tools: List[BaseTool]) -> None:
def configure_tools(self, tools: list[BaseTool]) -> None:
"""Configure and convert tools for the specific implementation.
Args:
@@ -28,7 +28,7 @@ class BaseToolAdapter(ABC):
"""
pass
def tools(self) -> List[Any]:
def tools(self) -> list[Any]:
"""Return all converted tools."""
return self.converted_tools

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from pydantic import Field, PrivateAttr
@@ -51,10 +51,10 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
role: str,
goal: str,
backstory: str,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
llm: Any = None,
max_iterations: int = 10,
agent_config: Optional[Dict[str, Any]] = None,
agent_config: Optional[dict[str, Any]] = None,
**kwargs,
):
"""Initialize the LangGraph agent adapter."""
@@ -81,7 +81,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
try:
self._memory = MemorySaver()
converted_tools: List[Any] = self._tool_adapter.tools()
converted_tools: list[Any] = self._tool_adapter.tools()
if self._agent_config:
self._graph = create_react_agent(
model=self.llm,
@@ -124,7 +124,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> str:
"""Execute a task using the LangGraph workflow."""
self.create_agent_executor(tools)
@@ -197,11 +197,11 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
def create_agent_executor(self, tools: Optional[list[BaseTool]] = None) -> None:
"""Configure the LangGraph agent for execution."""
self.configure_tools(tools)
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[list[BaseTool]] = None) -> None:
"""Configure tools for the LangGraph agent."""
if tools:
all_tools = list(self.tools or []) + list(tools or [])
@@ -209,7 +209,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
available_tools = self._tool_adapter.tools()
self._graph.tools = available_tools
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
"""Implement delegation tools support for LangGraph."""
agent_tools = AgentTools(agents=agents)
return agent_tools.tools()

View File

@@ -1,5 +1,5 @@
import inspect
from typing import Any, List, Optional
from typing import Any, Optional
from crewai.agents.agent_adapters.base_tool_adapter import BaseToolAdapter
from crewai.tools.base_tool import BaseTool
@@ -8,11 +8,11 @@ from crewai.tools.base_tool import BaseTool
class LangGraphToolAdapter(BaseToolAdapter):
"""Adapts CrewAI tools to LangGraph agent tool compatible format"""
def __init__(self, tools: Optional[List[BaseTool]] = None):
def __init__(self, tools: Optional[list[BaseTool]] = None):
self.original_tools = tools or []
self.converted_tools = []
def configure_tools(self, tools: List[BaseTool]) -> None:
def configure_tools(self, tools: list[BaseTool]) -> None:
"""
Configure and convert CrewAI tools to LangGraph-compatible format.
LangGraph expects tools in langchain_core.tools format.
@@ -57,5 +57,5 @@ class LangGraphToolAdapter(BaseToolAdapter):
self.converted_tools = converted_tools
def tools(self) -> List[Any]:
def tools(self) -> list[Any]:
return self.converted_tools or []

View File

@@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import Any, Optional
from pydantic import Field, PrivateAttr
@@ -44,7 +44,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
def __init__(
self,
model: str = "gpt-4o-mini",
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
agent_config: Optional[dict] = None,
**kwargs,
):
@@ -85,7 +85,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> str:
"""Execute a task using the OpenAI Assistant"""
self._converter_adapter.configure_structured_output(task)
@@ -131,7 +131,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
def create_agent_executor(self, tools: Optional[list[BaseTool]] = None) -> None:
"""
Configure the OpenAI agent for execution.
While OpenAI handles execution differently through Runner,
@@ -152,7 +152,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
self.agent_executor = Runner
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[list[BaseTool]] = None) -> None:
"""Configure tools for the OpenAI Assistant"""
if tools:
self._tool_adapter.configure_tools(tools)
@@ -163,7 +163,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
"""Process OpenAI Assistant execution result converting any structured output to a string"""
return self._converter_adapter.post_process_result(result.final_output)
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
"""Implement delegation tools support"""
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()

View File

@@ -1,5 +1,5 @@
import inspect
from typing import Any, List, Optional
from typing import Any, Optional
from agents import FunctionTool, Tool
@@ -10,10 +10,10 @@ from crewai.tools import BaseTool
class OpenAIAgentToolAdapter(BaseToolAdapter):
"""Adapter for OpenAI Assistant tools"""
def __init__(self, tools: Optional[List[BaseTool]] = None):
def __init__(self, tools: Optional[list[BaseTool]] = None):
self.original_tools = tools or []
def configure_tools(self, tools: List[BaseTool]) -> None:
def configure_tools(self, tools: list[BaseTool]) -> None:
"""Configure tools for the OpenAI Assistant"""
if self.original_tools:
all_tools = tools + self.original_tools
@@ -23,8 +23,8 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
self.converted_tools = self._convert_tools_to_openai_format(all_tools)
def _convert_tools_to_openai_format(
self, tools: Optional[List[BaseTool]]
) -> List[Tool]:
self, tools: Optional[list[BaseTool]]
) -> list[Tool]:
"""Convert CrewAI tools to OpenAI Assistant tool format"""
if not tools:
return []

View File

@@ -2,7 +2,7 @@ import uuid
from abc import ABC, abstractmethod
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, TypeVar
from typing import Any, Callable, Optional, TypeVar
from pydantic import (
UUID4,
@@ -40,11 +40,11 @@ class BaseAgent(ABC, BaseModel):
goal (str): Objective of the agent.
backstory (str): Backstory of the agent.
cache (bool): Whether the agent should use a cache for tool usage.
config (Optional[Dict[str, Any]]): Configuration for the agent.
config (Optional[dict[str, Any]]): Configuration for the agent.
verbose (bool): Verbose mode for the Agent Execution.
max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution.
allow_delegation (bool): Allow delegation of tasks to agents.
tools (Optional[List[Any]]): Tools at the agent's disposal.
tools (Optional[list[Any]]): Tools at the agent's disposal.
max_iter (int): Maximum iterations for an agent to execute a task.
agent_executor (InstanceOf): An instance of the CrewAgentExecutor class.
llm (Any): Language model that will run the agent.
@@ -59,15 +59,15 @@ class BaseAgent(ABC, BaseModel):
Methods:
execute_task(task: Any, context: Optional[str] = None, tools: Optional[List[BaseTool]] = None) -> str:
execute_task(task: Any, context: Optional[str] = None, tools: Optional[list[BaseTool]] = None) -> str:
Abstract method to execute a task.
create_agent_executor(tools=None) -> None:
Abstract method to create an agent executor.
get_delegation_tools(agents: List["BaseAgent"]):
get_delegation_tools(agents: list["BaseAgent"]):
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
get_output_converter(llm, model, instructions):
Abstract method to get the converter class for the agent to create json/pydantic outputs.
interpolate_inputs(inputs: Dict[str, Any]) -> None:
interpolate_inputs(inputs: dict[str, Any]) -> None:
Interpolate inputs into the agent description and backstory.
set_cache_handler(cache_handler: CacheHandler) -> None:
Set the cache handler for the agent.
@@ -91,7 +91,7 @@ class BaseAgent(ABC, BaseModel):
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
config: Optional[Dict[str, Any]] = Field(
config: Optional[dict[str, Any]] = Field(
description="Configuration for the agent", default=None, exclude=True
)
cache: bool = Field(
@@ -108,7 +108,7 @@ class BaseAgent(ABC, BaseModel):
default=False,
description="Enable agent to delegate and ask questions among each other.",
)
tools: Optional[List[BaseTool]] = Field(
tools: Optional[list[BaseTool]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: int = Field(
@@ -129,7 +129,7 @@ class BaseAgent(ABC, BaseModel):
default_factory=ToolsHandler,
description="An instance of the ToolsHandler class.",
)
tools_results: List[Dict[str, Any]] = Field(
tools_results: list[dict[str, Any]] = Field(
default=[], description="Results of the tools used by the agent."
)
max_tokens: Optional[int] = Field(
@@ -138,7 +138,7 @@ class BaseAgent(ABC, BaseModel):
knowledge: Optional[Knowledge] = Field(
default=None, description="Knowledge for the agent."
)
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
knowledge_sources: Optional[list[BaseKnowledgeSource]] = Field(
default=None,
description="Knowledge sources for the agent.",
)
@@ -150,7 +150,7 @@ class BaseAgent(ABC, BaseModel):
default_factory=SecurityConfig,
description="Security configuration for the agent, including fingerprinting.",
)
callbacks: List[Callable] = Field(
callbacks: list[Callable] = Field(
default=[], description="Callbacks to be used for the agent"
)
adapted_agent: bool = Field(
@@ -168,7 +168,7 @@ class BaseAgent(ABC, BaseModel):
@field_validator("tools")
@classmethod
def validate_tools(cls, tools: List[Any]) -> List[BaseTool]:
def validate_tools(cls, tools: list[Any]) -> list[BaseTool]:
"""Validate and process the tools provided to the agent.
This method ensures that each tool is either an instance of BaseTool
@@ -253,7 +253,7 @@ class BaseAgent(ABC, BaseModel):
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> str:
pass
@@ -262,7 +262,7 @@ class BaseAgent(ABC, BaseModel):
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
def get_delegation_tools(self, agents: list["BaseAgent"]) -> list[BaseTool]:
"""Set the task tools that init BaseAgenTools class."""
pass
@@ -320,7 +320,7 @@ class BaseAgent(ABC, BaseModel):
return copied_agent
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
def interpolate_inputs(self, inputs: dict[str, Any]) -> None:
"""Interpolate inputs into the agent description and backstory."""
if self._original_role is None:
self._original_role = self.role
@@ -362,5 +362,5 @@ class BaseAgent(ABC, BaseModel):
self._rpm_controller = rpm_controller
self.create_agent_executor()
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
def set_knowledge(self, crew_embedder: Optional[dict[str, Any]] = None):
pass

View File

@@ -1,5 +1,5 @@
import time
from typing import TYPE_CHECKING, Dict, List
from typing import TYPE_CHECKING
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
@@ -21,7 +21,7 @@ class CrewAgentExecutorMixin:
task: "Task"
iterations: int
max_iter: int
messages: List[Dict[str, str]]
messages: list[dict[str, str]]
_i18n: I18N
_printer: Printer = Printer()

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Any, Optional
from pydantic import BaseModel, PrivateAttr
@@ -6,7 +6,7 @@ from pydantic import BaseModel, PrivateAttr
class CacheHandler(BaseModel):
"""Callback handler for tool usage."""
_cache: Dict[str, Any] = PrivateAttr(default_factory=dict)
_cache: dict[str, Any] = PrivateAttr(default_factory=dict)
def add(self, tool, input, output):
self._cache[f"{tool}-{input}"] = output

View File

@@ -1,4 +1,4 @@
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, Optional, Union
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
@@ -48,17 +48,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
agent: BaseAgent,
prompt: dict[str, str],
max_iter: int,
tools: List[CrewStructuredTool],
tools: list[CrewStructuredTool],
tools_names: str,
stop_words: List[str],
stop_words: list[str],
tools_description: str,
tools_handler: ToolsHandler,
step_callback: Any = None,
original_tools: List[Any] | None = None,
original_tools: list[Any] | None = None,
function_calling_llm: Any = None,
respect_context_window: bool = False,
request_within_rpm_limit: Optional[Callable[[], bool]] = None,
callbacks: List[Any] | None = None,
callbacks: list[Any] | None = None,
):
self._i18n: I18N = I18N()
self.llm: BaseLLM = llm
@@ -81,10 +81,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit
self.ask_for_human_input = False
self.messages: List[Dict[str, str]] = []
self.messages: list[dict[str, str]] = []
self.iterations = 0
self.log_error_after = 3
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
self.tool_name_to_tool_map: dict[str, Union[CrewStructuredTool, BaseTool]] = {
tool.name: tool for tool in self.tools
}
existing_stop = self.llm.stop or []
@@ -96,7 +96,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
)
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
def invoke(self, inputs: dict[str, str]) -> dict[str, Any]:
if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
@@ -371,7 +371,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
training_data[agent_id] = agent_training_data
training_handler.save(training_data)
def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str:
def _format_prompt(self, prompt: str, inputs: dict[str, str]) -> str:
prompt = prompt.replace("{input}", inputs["input"])
prompt = prompt.replace("{tool_names}", inputs["tool_names"])
prompt = prompt.replace("{tools}", inputs["tools"])

View File

@@ -1,6 +1,6 @@
import time
import webbrowser
from typing import Any, Dict, Optional
from typing import Any, Optional
import requests
from rich.console import Console
@@ -70,7 +70,7 @@ class AuthenticationCommand:
return self._poll_for_token(device_code_data)
def _get_device_code(self) -> Dict[str, Any]:
def _get_device_code(self) -> dict[str, Any]:
"""Get the device code to authenticate the user."""
device_code_payload = {
@@ -86,13 +86,13 @@ class AuthenticationCommand:
response.raise_for_status()
return response.json()
def _display_auth_instructions(self, device_code_data: Dict[str, str]) -> None:
def _display_auth_instructions(self, device_code_data: dict[str, str]) -> None:
"""Display the authentication instructions to the user."""
console.print("1. Navigate to: ", device_code_data["verification_uri_complete"])
console.print("2. Enter the following code: ", device_code_data["user_code"])
webbrowser.open(device_code_data["verification_uri_complete"])
def _poll_for_token(self, device_code_data: Dict[str, Any]) -> None:
def _poll_for_token(self, device_code_data: dict[str, Any]) -> None:
"""Polls the server for the token until it is received, or max attempts are reached."""
token_payload = {
@@ -135,7 +135,7 @@ class AuthenticationCommand:
"Timeout: Failed to get the token. Please try again.", style="bold red"
)
def _validate_and_save_token(self, token_data: Dict[str, Any]) -> None:
def _validate_and_save_token(self, token_data: dict[str, Any]) -> None:
"""Validates the JWT token and saves the token to the token manager."""
jwt_token = token_data["access_token"]

View File

@@ -5,7 +5,7 @@ import sys
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
from typing import Any, Optional
import click
import tomli
@@ -157,7 +157,7 @@ def build_system_message(crew_chat_inputs: ChatInputs) -> str:
)
def create_tool_function(crew: Crew, messages: List[Dict[str, str]]) -> Any:
def create_tool_function(crew: Crew, messages: list[dict[str, str]]) -> Any:
"""Creates a wrapper function for running the crew tool with messages."""
def run_crew_tool_with_messages(**kwargs):
@@ -221,9 +221,9 @@ def get_user_input() -> str:
def handle_user_input(
user_input: str,
chat_llm: LLM,
messages: List[Dict[str, str]],
crew_tool_schema: Dict[str, Any],
available_functions: Dict[str, Any],
messages: list[dict[str, str]],
crew_tool_schema: dict[str, Any],
available_functions: dict[str, Any],
) -> None:
if user_input.strip().lower() == "exit":
click.echo("Exiting chat. Goodbye!")
@@ -281,13 +281,13 @@ def generate_crew_tool_schema(crew_inputs: ChatInputs) -> dict:
}
def run_crew_tool(crew: Crew, messages: List[Dict[str, str]], **kwargs):
def run_crew_tool(crew: Crew, messages: list[dict[str, str]], **kwargs):
"""
Runs the crew using crew.kickoff(inputs=kwargs) and returns the output.
Args:
crew (Crew): The crew instance to run.
messages (List[Dict[str, str]]): The chat messages up to this point.
messages (list[dict[str, str]]): The chat messages up to this point.
**kwargs: The inputs collected from the user.
Returns:
@@ -314,12 +314,12 @@ def run_crew_tool(crew: Crew, messages: List[Dict[str, str]], **kwargs):
sys.exit(1)
def load_crew_and_name() -> Tuple[Crew, str]:
def load_crew_and_name() -> tuple[Crew, str]:
"""
Loads the crew by importing the crew class from the user's project.
Returns:
Tuple[Crew, str]: A tuple containing the Crew instance and the name of the crew.
tuple[Crew, str]: A tuple containing the Crew instance and the name of the crew.
"""
# Get the current working directory
cwd = Path.cwd()
@@ -395,7 +395,7 @@ def generate_crew_chat_inputs(crew: Crew, crew_name: str, chat_llm) -> ChatInput
)
def fetch_required_inputs(crew: Crew) -> Set[str]:
def fetch_required_inputs(crew: Crew) -> set[str]:
"""
Extracts placeholders from the crew's tasks and agents.
@@ -403,10 +403,10 @@ def fetch_required_inputs(crew: Crew) -> Set[str]:
crew (Crew): The crew object.
Returns:
Set[str]: A set of placeholder names.
set[str]: A set of placeholder names.
"""
placeholder_pattern = re.compile(r"\{(.+?)\}")
required_inputs: Set[str] = set()
required_inputs: set[str] = set()
# Scan tasks
for task in crew.tasks:

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from rich.console import Console
@@ -32,12 +32,12 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
style="bold red",
)
def _display_deployment_info(self, json_response: Dict[str, Any]) -> None:
def _display_deployment_info(self, json_response: dict[str, Any]) -> None:
"""
Display deployment information.
Args:
json_response (Dict[str, Any]): The deployment information to display.
json_response (dict[str, Any]): The deployment information to display.
"""
console.print("Deploying the crew...\n", style="bold blue")
for key, value in json_response.items():
@@ -47,12 +47,12 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
console.print(" or")
console.print(f"crewai deploy status --uuid \"{json_response['uuid']}\"")
def _display_logs(self, log_messages: List[Dict[str, Any]]) -> None:
def _display_logs(self, log_messages: list[dict[str, Any]]) -> None:
"""
Display log messages.
Args:
log_messages (List[Dict[str, Any]]): The log messages to display.
log_messages (list[dict[str, Any]]): The log messages to display.
"""
for log_message in log_messages:
console.print(
@@ -110,13 +110,13 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
self._display_creation_success(response.json())
def _confirm_input(
self, env_vars: Dict[str, str], remote_repo_url: str, confirm: bool
self, env_vars: dict[str, str], remote_repo_url: str, confirm: bool
) -> None:
"""
Confirm input parameters with the user.
Args:
env_vars (Dict[str, str]): Environment variables.
env_vars (dict[str, str]): Environment variables.
remote_repo_url (str): Remote repository URL.
confirm (bool): Whether to confirm input.
"""
@@ -128,18 +128,18 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
def _create_payload(
self,
env_vars: Dict[str, str],
env_vars: dict[str, str],
remote_repo_url: str,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""
Create the payload for crew creation.
Args:
remote_repo_url (str): Remote repository URL.
env_vars (Dict[str, str]): Environment variables.
env_vars (dict[str, str]): Environment variables.
Returns:
Dict[str, Any]: The payload for crew creation.
dict[str, Any]: The payload for crew creation.
"""
return {
"deploy": {
@@ -149,12 +149,12 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
}
}
def _display_creation_success(self, json_response: Dict[str, Any]) -> None:
def _display_creation_success(self, json_response: dict[str, Any]) -> None:
"""
Display success message after crew creation.
Args:
json_response (Dict[str, Any]): The response containing crew information.
json_response (dict[str, Any]): The response containing crew information.
"""
console.print("Deployment created successfully!\n", style="bold green")
console.print(
@@ -179,12 +179,12 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
else:
self._display_no_crews_message()
def _display_crews(self, crews_data: List[Dict[str, Any]]) -> None:
def _display_crews(self, crews_data: list[dict[str, Any]]) -> None:
"""
Display the list of crews.
Args:
crews_data (List[Dict[str, Any]]): List of crew data to display.
crews_data (list[dict[str, Any]]): List of crew data to display.
"""
for crew_data in crews_data:
console.print(
@@ -217,12 +217,12 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
self._validate_response(response)
self._display_crew_status(response.json())
def _display_crew_status(self, status_data: Dict[str, str]) -> None:
def _display_crew_status(self, status_data: dict[str, str]) -> None:
"""
Display the status of a crew.
Args:
status_data (Dict[str, str]): The status data to display.
status_data (dict[str, str]): The status data to display.
"""
console.print(f"Name:\t {status_data['name']}")
console.print(f"Status:\t {status_data['status']}")

View File

@@ -1,5 +1,5 @@
import requests
from typing import Dict, Any
from typing import Any
from rich.console import Console
from requests.exceptions import RequestException, JSONDecodeError
@@ -32,7 +32,7 @@ class EnterpriseConfigureCommand(BaseCommand):
console.print(f"❌ Failed to configure Enterprise settings: {str(e)}", style="bold red")
raise SystemExit(1)
def _fetch_oauth_config(self, enterprise_url: str) -> Dict[str, Any]:
def _fetch_oauth_config(self, enterprise_url: str) -> dict[str, Any]:
oauth_endpoint = f"{enterprise_url}/auth/parameters"
try:
@@ -64,7 +64,7 @@ class EnterpriseConfigureCommand(BaseCommand):
except Exception as e:
raise ValueError(f"Error fetching OAuth2 configuration: {str(e)}")
def _update_oauth_settings(self, enterprise_url: str, oauth_config: Dict[str, Any]) -> None:
def _update_oauth_settings(self, enterprise_url: str, oauth_config: dict[str, Any]) -> None:
try:
config_mapping = {
'enterprise_base_url': enterprise_url,

View File

@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Optional
from urllib.parse import urljoin
import requests
@@ -58,7 +58,7 @@ class PlusAPI:
version: str,
description: Optional[str],
encoded_file: str,
available_exports: Optional[List[str]] = None,
available_exports: Optional[list[str]] = None,
):
params = {
"handle": handle,

View File

@@ -1,6 +1,6 @@
import subprocess
from enum import Enum
from typing import List, Optional
from typing import Optional
import click
from packaging import version

View File

@@ -1,7 +1,10 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai import Task
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
@@ -10,8 +13,8 @@ from typing import List
class {{crew_name}}():
"""{{crew_name}} crew"""
agents: List[BaseAgent]
tasks: List[Task]
agents: list["BaseAgent"]
tasks: list["Task"]
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended

View File

@@ -1,7 +1,6 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
@@ -12,8 +11,8 @@ from typing import List
class PoemCrew:
"""Poem Crew"""
agents: List[BaseAgent]
tasks: List[Task]
agents: list[BaseAgent]
tasks: list[Task]
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended

View File

@@ -5,7 +5,7 @@ import sys
from functools import reduce
from inspect import getmro, isclass, isfunction, ismethod
from pathlib import Path
from typing import Any, Dict, List, get_type_hints
from typing import Any, get_type_hints
import click
import tomli
@@ -77,7 +77,7 @@ def get_project_description(
def _get_project_attribute(
pyproject_path: str, keys: List[str], require: bool
pyproject_path: str, keys: list[str], require: bool
) -> Any | None:
"""Get an attribute from the pyproject.toml file."""
attribute = None
@@ -117,7 +117,7 @@ def _get_project_attribute(
return attribute
def _get_nested_value(data: Dict[str, Any], keys: List[str]) -> Any:
def _get_nested_value(data: dict[str, Any], keys: list[str]) -> Any:
return reduce(dict.__getitem__, keys, data)

View File

@@ -1,6 +1,7 @@
import asyncio
import json
import re
import threading
import uuid
import warnings
from concurrent.futures import Future
@@ -9,11 +10,7 @@ from hashlib import md5
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Set,
Tuple,
Union,
cast,
)
@@ -60,6 +57,7 @@ from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.events.types.crew_events import (
CrewKickoffCancelledEvent,
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
@@ -130,18 +128,19 @@ class Crew(FlowTrackable, BaseModel):
_external_memory: Optional[InstanceOf[ExternalMemory]] = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr()
_inputs: Optional[Dict[str, Any]] = PrivateAttr(default=None)
_inputs: Optional[dict[str, Any]] = PrivateAttr(default=None)
_logging_color: str = PrivateAttr(
default="bold_purple",
)
_task_output_handler: TaskOutputStorageHandler = PrivateAttr(
default_factory=TaskOutputStorageHandler
)
_cancellation_event: threading.Event = PrivateAttr(default_factory=threading.Event)
name: Optional[str] = Field(default="crew")
cache: bool = Field(default=True)
tasks: List[Task] = Field(default_factory=list)
agents: List[BaseAgent] = Field(default_factory=list)
tasks: list[Task] = Field(default_factory=list)
agents: list[BaseAgent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential)
verbose: bool = Field(default=False)
memory: bool = Field(
@@ -181,7 +180,7 @@ class Crew(FlowTrackable, BaseModel):
function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
description="Language model that will run the agent.", default=None
)
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None)
config: Optional[Union[Json, dict[str, Any]]] = Field(default=None)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
share_crew: Optional[bool] = Field(default=False)
step_callback: Optional[Any] = Field(
@@ -192,13 +191,13 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Callback to be executed after each task for all agents execution.",
)
before_kickoff_callbacks: List[
Callable[[Optional[Dict[str, Any]]], Optional[Dict[str, Any]]]
before_kickoff_callbacks: list[
Callable[[Optional[dict[str, Any]]], Optional[dict[str, Any]]]
] = Field(
default_factory=list,
description="List of callbacks to be executed before crew kickoff. It may be used to adjust inputs before the crew is executed.",
)
after_kickoff_callbacks: List[Callable[[CrewOutput], CrewOutput]] = Field(
after_kickoff_callbacks: list[Callable[[CrewOutput], CrewOutput]] = Field(
default_factory=list,
description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.",
)
@@ -222,15 +221,15 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Language model that will run the AgentPlanner if planning is True.",
)
task_execution_output_json_files: Optional[List[str]] = Field(
task_execution_output_json_files: Optional[list[str]] = Field(
default=None,
description="List of file paths for task execution JSON files.",
)
execution_logs: List[Dict[str, Any]] = Field(
execution_logs: list[dict[str, Any]] = Field(
default=[],
description="List of execution logs for tasks",
)
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
knowledge_sources: Optional[list[BaseKnowledgeSource]] = Field(
default=None,
description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.",
)
@@ -267,8 +266,8 @@ class Crew(FlowTrackable, BaseModel):
@field_validator("config", mode="before")
@classmethod
def check_config_type(
cls, v: Union[Json, Dict[str, Any]]
) -> Union[Json, Dict[str, Any]]:
cls, v: Union[Json, dict[str, Any]]
) -> Union[Json, dict[str, Any]]:
"""Validates that the config is a valid type.
Args:
v: The config to be validated.
@@ -502,7 +501,7 @@ class Crew(FlowTrackable, BaseModel):
@property
def key(self) -> str:
source: List[str] = [agent.key for agent in self.agents] + [
source: list[str] = [agent.key for agent in self.agents] + [
task.key for task in self.tasks
]
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@@ -530,7 +529,7 @@ class Crew(FlowTrackable, BaseModel):
self.agents = [Agent(**agent) for agent in self.config["agents"]]
self.tasks = [self._create_task(task) for task in self.config["tasks"]]
def _create_task(self, task_config: Dict[str, Any]) -> Task:
def _create_task(self, task_config: dict[str, Any]) -> Task:
"""Creates a task instance from its configuration.
Args:
@@ -559,7 +558,7 @@ class Crew(FlowTrackable, BaseModel):
CrewTrainingHandler(filename).initialize_file()
def train(
self, n_iterations: int, filename: str, inputs: Optional[Dict[str, Any]] = None
self, n_iterations: int, filename: str, inputs: Optional[dict[str, Any]] = None
) -> None:
"""Trains the crew for a given number of iterations."""
inputs = inputs or {}
@@ -611,8 +610,10 @@ class Crew(FlowTrackable, BaseModel):
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = None,
inputs: Optional[dict[str, Any]] = None,
) -> CrewOutput:
self._reset_cancellation()
ctx = baggage.set_baggage(
"crew_context", CrewContext(id=str(self.id), key=self.key)
)
@@ -682,9 +683,9 @@ class Crew(FlowTrackable, BaseModel):
finally:
detach(token)
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
def kickoff_for_each(self, inputs: list[dict[str, Any]]) -> list[CrewOutput]:
"""Executes the Crew's workflow for each input in the list and aggregates results."""
results: List[CrewOutput] = []
results: list[CrewOutput] = []
# Initialize the parent crew's usage metrics
total_usage_metrics = UsageMetrics()
@@ -704,13 +705,13 @@ class Crew(FlowTrackable, BaseModel):
return results
async def kickoff_async(
self, inputs: Optional[Dict[str, Any]] = None
self, inputs: Optional[dict[str, Any]] = None
) -> CrewOutput:
"""Asynchronous kickoff method to start the crew execution."""
inputs = inputs or {}
return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[CrewOutput]:
async def kickoff_for_each_async(self, inputs: list[dict]) -> list[CrewOutput]:
crew_copies = [self.copy() for _ in inputs]
async def run_crew(crew, input_data):
@@ -807,25 +808,37 @@ class Crew(FlowTrackable, BaseModel):
def _execute_tasks(
self,
tasks: List[Task],
tasks: list[Task],
start_index: Optional[int] = 0,
was_replayed: bool = False,
) -> CrewOutput:
"""Executes tasks sequentially and returns the final output.
Args:
tasks (List[Task]): List of tasks to execute
tasks (list[Task]): List of tasks to execute
manager (Optional[BaseAgent], optional): Manager agent to use for delegation. Defaults to None.
Returns:
CrewOutput: Final output of the crew
"""
task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
task_outputs: list[TaskOutput] = []
futures: list[tuple[Task, Future[TaskOutput], int]] = []
last_sync_output: Optional[TaskOutput] = None
for task_index, task in enumerate(tasks):
if self.is_cancelled():
self._logger.log("info", f"Crew execution cancelled after {task_index} tasks", color="yellow")
crewai_event_bus.emit(
self,
CrewKickoffCancelledEvent(
crew_name=self.name,
completed_tasks=task_index,
total_tasks=len(tasks),
),
)
return self._create_crew_output(task_outputs)
if start_index is not None and task_index < start_index:
if task.output:
if task.async_execution:
@@ -847,7 +860,7 @@ class Crew(FlowTrackable, BaseModel):
tools_for_task = self._prepare_tools(
agent_to_use,
task,
cast(Union[List[Tool], List[BaseTool]], tools_for_task),
cast(Union[list[Tool], list[BaseTool]], tools_for_task),
)
self._log_task_start(task, agent_to_use.role)
@@ -867,7 +880,7 @@ class Crew(FlowTrackable, BaseModel):
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=cast(List[BaseTool], tools_for_task),
tools=cast(list[BaseTool], tools_for_task),
)
futures.append((task, future, task_index))
else:
@@ -879,7 +892,7 @@ class Crew(FlowTrackable, BaseModel):
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=cast(List[BaseTool], tools_for_task),
tools=cast(list[BaseTool], tools_for_task),
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
@@ -893,8 +906,8 @@ class Crew(FlowTrackable, BaseModel):
def _handle_conditional_task(
self,
task: ConditionalTask,
task_outputs: List[TaskOutput],
futures: List[Tuple[Task, Future[TaskOutput], int]],
task_outputs: list[TaskOutput],
futures: list[tuple[Task, Future[TaskOutput], int]],
task_index: int,
was_replayed: bool,
) -> Optional[TaskOutput]:
@@ -917,8 +930,8 @@ class Crew(FlowTrackable, BaseModel):
return None
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, agent: BaseAgent, task: Task, tools: Union[list[Tool], list[BaseTool]]
) -> list[BaseTool]:
# Add delegation tools if agent allows delegation
if hasattr(agent, "allow_delegation") and getattr(
agent, "allow_delegation", False
@@ -947,8 +960,8 @@ class Crew(FlowTrackable, BaseModel):
):
tools = self._add_multimodal_tools(agent, tools)
# Return a List[BaseTool] which is compatible with both Task.execute_sync and Task.execute_async
return cast(List[BaseTool], tools)
# Return a list[BaseTool] which is compatible with both Task.execute_sync and Task.execute_async
return cast(list[BaseTool], tools)
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
@@ -957,12 +970,12 @@ class Crew(FlowTrackable, BaseModel):
def _merge_tools(
self,
existing_tools: Union[List[Tool], List[BaseTool]],
new_tools: Union[List[Tool], List[BaseTool]],
) -> List[BaseTool]:
existing_tools: Union[list[Tool], list[BaseTool]],
new_tools: Union[list[Tool], list[BaseTool]],
) -> list[BaseTool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
if not new_tools:
return cast(List[BaseTool], existing_tools)
return cast(list[BaseTool], existing_tools)
# Create mapping of tool names to new tools
new_tool_map = {tool.name: tool for tool in new_tools}
@@ -973,41 +986,41 @@ class Crew(FlowTrackable, BaseModel):
# Add all new tools
tools.extend(new_tools)
return cast(List[BaseTool], tools)
return cast(list[BaseTool], tools)
def _inject_delegation_tools(
self,
tools: Union[List[Tool], List[BaseTool]],
tools: Union[list[Tool], list[BaseTool]],
task_agent: BaseAgent,
agents: List[BaseAgent],
) -> List[BaseTool]:
agents: list[BaseAgent],
) -> list[BaseTool]:
if hasattr(task_agent, "get_delegation_tools"):
delegation_tools = task_agent.get_delegation_tools(agents)
# Cast delegation_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], delegation_tools))
return cast(List[BaseTool], tools)
return self._merge_tools(tools, cast(list[BaseTool], delegation_tools))
return cast(list[BaseTool], tools)
def _add_multimodal_tools(
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, agent: BaseAgent, tools: Union[list[Tool], list[BaseTool]]
) -> list[BaseTool]:
if hasattr(agent, "get_multimodal_tools"):
multimodal_tools = agent.get_multimodal_tools()
# Cast multimodal_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], multimodal_tools))
return cast(List[BaseTool], tools)
return self._merge_tools(tools, cast(list[BaseTool], multimodal_tools))
return cast(list[BaseTool], tools)
def _add_code_execution_tools(
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, agent: BaseAgent, tools: Union[list[Tool], list[BaseTool]]
) -> list[BaseTool]:
if hasattr(agent, "get_code_execution_tools"):
code_tools = agent.get_code_execution_tools()
# Cast code_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], code_tools))
return cast(List[BaseTool], tools)
return self._merge_tools(tools, cast(list[BaseTool], code_tools))
return cast(list[BaseTool], tools)
def _add_delegation_tools(
self, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, task: Task, tools: Union[list[Tool], list[BaseTool]]
) -> list[BaseTool]:
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
@@ -1015,7 +1028,7 @@ class Crew(FlowTrackable, BaseModel):
tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation
)
return cast(List[BaseTool], tools)
return cast(list[BaseTool], tools)
def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file:
@@ -1024,8 +1037,8 @@ class Crew(FlowTrackable, BaseModel):
)
def _update_manager_tools(
self, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, task: Task, tools: Union[list[Tool], list[BaseTool]]
) -> list[BaseTool]:
if self.manager_agent:
if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
@@ -1033,9 +1046,9 @@ class Crew(FlowTrackable, BaseModel):
tools = self._inject_delegation_tools(
tools, self.manager_agent, self.agents
)
return cast(List[BaseTool], tools)
return cast(list[BaseTool], tools)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]) -> str:
def _get_context(self, task: Task, task_outputs: list[TaskOutput]) -> str:
if not task.context:
return ""
@@ -1057,7 +1070,7 @@ class Crew(FlowTrackable, BaseModel):
output=output.raw,
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
def _create_crew_output(self, task_outputs: list[TaskOutput]) -> CrewOutput:
if not task_outputs:
raise ValueError("No task outputs available to create crew output.")
@@ -1088,11 +1101,15 @@ class Crew(FlowTrackable, BaseModel):
def _process_async_tasks(
self,
futures: List[Tuple[Task, Future[TaskOutput], int]],
futures: list[tuple[Task, Future[TaskOutput], int]],
was_replayed: bool = False,
) -> List[TaskOutput]:
task_outputs: List[TaskOutput] = []
) -> list[TaskOutput]:
task_outputs: list[TaskOutput] = []
for future_task, future, task_index in futures:
if self.is_cancelled():
future.cancel()
continue
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
@@ -1102,7 +1119,7 @@ class Crew(FlowTrackable, BaseModel):
return task_outputs
def _find_task_index(
self, task_id: str, stored_outputs: List[Any]
self, task_id: str, stored_outputs: list[Any]
) -> Optional[int]:
return next(
(
@@ -1114,7 +1131,7 @@ class Crew(FlowTrackable, BaseModel):
)
def replay(
self, task_id: str, inputs: Optional[Dict[str, Any]] = None
self, task_id: str, inputs: Optional[dict[str, Any]] = None
) -> CrewOutput:
stored_outputs = self._task_output_handler.load()
if not stored_outputs:
@@ -1155,15 +1172,15 @@ class Crew(FlowTrackable, BaseModel):
return result
def query_knowledge(
self, query: List[str], results_limit: int = 3, score_threshold: float = 0.35
) -> Union[List[Dict[str, Any]], None]:
self, query: list[str], results_limit: int = 3, score_threshold: float = 0.35
) -> Union[list[dict[str, Any]], None]:
if self.knowledge:
return self.knowledge.query(
query, results_limit=results_limit, score_threshold=score_threshold
)
return None
def fetch_inputs(self) -> Set[str]:
def fetch_inputs(self) -> set[str]:
"""
Gathers placeholders (e.g., {something}) referenced in tasks or agents.
Scans each task's 'description' + 'expected_output', and each agent's
@@ -1172,7 +1189,7 @@ class Crew(FlowTrackable, BaseModel):
Returns a set of all discovered placeholder names.
"""
placeholder_pattern = re.compile(r"\{(.+?)\}")
required_inputs: Set[str] = set()
required_inputs: set[str] = set()
# Scan tasks for inputs
for task in self.tasks:
@@ -1274,7 +1291,7 @@ class Crew(FlowTrackable, BaseModel):
if not task.callback:
task.callback = self.task_callback
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
def _interpolate_inputs(self, inputs: dict[str, Any]) -> None:
"""Interpolates the inputs in the tasks and agents."""
[
task.interpolate_inputs_and_add_conversation_history(
@@ -1308,7 +1325,7 @@ class Crew(FlowTrackable, BaseModel):
self,
n_iterations: int,
eval_llm: Union[str, InstanceOf[BaseLLM]],
inputs: Optional[Dict[str, Any]] = None,
inputs: Optional[dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
try:
@@ -1506,7 +1523,7 @@ class Crew(FlowTrackable, BaseModel):
},
}
def reset_knowledge(self, knowledges: List[Knowledge]) -> None:
def reset_knowledge(self, knowledges: list[Knowledge]) -> None:
"""Reset crew and agent knowledge storage."""
for ks in knowledges:
ks.reset()
@@ -1525,3 +1542,16 @@ class Crew(FlowTrackable, BaseModel):
and able_to_inject
):
self.tasks[0].allow_crewai_trigger_context = True
def cancel(self) -> None:
"""Cancel the crew execution. This will stop the crew after the current task completes."""
self._cancellation_event.set()
self._logger.log("info", "Crew cancellation requested", color="yellow")
def is_cancelled(self) -> bool:
"""Check if the crew execution has been cancelled."""
return self._cancellation_event.is_set()
def _reset_cancellation(self) -> None:
"""Reset the cancellation state for reuse of the crew instance."""
self._cancellation_event.clear()

View File

@@ -1,5 +1,5 @@
import json
from typing import Any, Dict, Optional
from typing import Any, Optional
from pydantic import BaseModel, Field
@@ -15,7 +15,7 @@ class CrewOutput(BaseModel):
pydantic: Optional[BaseModel] = Field(
description="Pydantic output of Crew", default=None
)
json_dict: Optional[Dict[str, Any]] = Field(
json_dict: Optional[dict[str, Any]] = Field(
description="JSON dict output of Crew", default=None
)
tasks_output: list[TaskOutput] = Field(
@@ -32,7 +32,7 @@ class CrewOutput(BaseModel):
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json_dict:

View File

@@ -1,5 +1,5 @@
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from typing import Any, Optional
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
@@ -14,7 +14,7 @@ class BaseEvent(BaseModel):
source_type: Optional[str] = (
None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
)
fingerprint_metadata: Optional[Dict[str, Any]] = None # Any relevant metadata
fingerprint_metadata: Optional[dict[str, Any]] = None # Any relevant metadata
def to_json(self, exclude: set[str] | None = None):
"""
@@ -28,13 +28,13 @@ class BaseEvent(BaseModel):
"""
return to_serializable(self, exclude=exclude)
def _set_task_params(self, data: Dict[str, Any]):
def _set_task_params(self, data: dict[str, Any]):
if "from_task" in data and (task := data["from_task"]):
self.task_id = task.id
self.task_name = task.name or task.description
self.from_task = None
def _set_agent_params(self, data: Dict[str, Any]):
def _set_agent_params(self, data: dict[str, Any]):
task = data.get("from_task", None)
agent = task.agent if task else data.get("from_agent", None)

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import threading
from contextlib import contextmanager
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from typing import Any, Callable, TypeVar, cast
from blinker import Signal
@@ -32,10 +32,10 @@ class CrewAIEventsBus:
def _initialize(self) -> None:
"""Initialize the event bus internal state"""
self._signal = Signal("crewai_event_bus")
self._handlers: Dict[Type[BaseEvent], List[Callable]] = {}
self._handlers: dict[type[BaseEvent], list[Callable]] = {}
def on(
self, event_type: Type[EventT]
self, event_type: type[EventT]
) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]:
"""
Decorator to register an event handler for a specific event type.
@@ -82,7 +82,7 @@ class CrewAIEventsBus:
self._signal.send(source, event=event)
def register_handler(
self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None]
self, event_type: type[EventTypes], handler: Callable[[Any, EventTypes], None]
) -> None:
"""Register an event handler for a specific event type"""
if event_type not in self._handlers:

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
from io import StringIO
from typing import Any, Dict
from typing import Any
from pydantic import Field, PrivateAttr
from crewai.llm import LLM
@@ -80,7 +80,7 @@ class EventListener(BaseEventListener):
_instance = None
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
execution_spans: dict[Task, Any] = Field(default_factory=dict)
next_chunk = 0
text_stream = StringIO()
knowledge_retrieval_in_progress = False

View File

@@ -1,6 +1,6 @@
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional
from typing import Any, Optional
from dataclasses import dataclass, field
from crewai.utilities.constants import CREWAI_BASE_URL
@@ -23,11 +23,11 @@ class TraceBatch:
version: str = field(default_factory=get_crewai_version)
batch_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_context: Dict[str, str] = field(default_factory=dict)
execution_metadata: Dict[str, Any] = field(default_factory=dict)
events: List[TraceEvent] = field(default_factory=list)
user_context: dict[str, str] = field(default_factory=dict)
execution_metadata: dict[str, Any] = field(default_factory=dict)
events: list[TraceEvent] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
return {
"version": self.version,
"batch_id": self.batch_id,
@@ -43,8 +43,8 @@ class TraceBatchManager:
is_current_batch_ephemeral: bool = False
trace_batch_id: Optional[str] = None
current_batch: Optional[TraceBatch] = None
event_buffer: List[TraceEvent] = []
execution_start_times: Dict[str, datetime] = {}
event_buffer: list[TraceEvent] = []
execution_start_times: dict[str, datetime] = {}
batch_owner_type: Optional[str] = None
batch_owner_id: Optional[str] = None
@@ -58,8 +58,8 @@ class TraceBatchManager:
def initialize_batch(
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
user_context: dict[str, str],
execution_metadata: dict[str, Any],
use_ephemeral: bool = False,
) -> TraceBatch:
"""Initialize a new trace batch"""
@@ -76,8 +76,8 @@ class TraceBatchManager:
def _initialize_backend_batch(
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
user_context: dict[str, str],
execution_metadata: dict[str, Any],
use_ephemeral: bool = False,
):
"""Send batch initialization to backend"""

View File

@@ -1,7 +1,7 @@
import os
import uuid
from typing import Dict, Any, Optional
from typing import Any, Optional
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.types.agent_events import (
@@ -112,7 +112,7 @@ class TraceCollectionListener(BaseEventListener):
except AuthError:
return False
def _get_user_context(self) -> Dict[str, str]:
def _get_user_context(self) -> dict[str, str]:
"""Extract user context for tracing"""
return {
"user_id": os.getenv("CREWAI_USER_ID", "anonymous"),
@@ -325,7 +325,7 @@ class TraceCollectionListener(BaseEventListener):
self._initialize_batch(user_context, execution_metadata)
def _initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
self, user_context: dict[str, str], execution_metadata: dict[str, Any]
):
"""Initialize trace batch if ephemeral"""
if not self._check_authenticated():
@@ -371,7 +371,7 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return self._safe_serialize_to_dict(event)
@@ -429,7 +429,7 @@ class TraceCollectionListener(BaseEventListener):
# TODO: move to utils
def _safe_serialize_to_dict(
self, obj, exclude: set[str] | None = None
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
try:
serialized = to_serializable(obj, exclude)

View File

@@ -1,6 +1,6 @@
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from typing import Dict, Any
from typing import Any
import uuid
@@ -13,7 +13,7 @@ class TraceEvent:
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
type: str = ""
event_data: Dict[str, Any] = field(default_factory=dict)
event_data: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
return asdict(self)

View File

@@ -2,7 +2,7 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional, Sequence, Union
from typing import Any, Optional, Sequence, Union
from pydantic import model_validator
@@ -89,9 +89,9 @@ class AgentExecutionErrorEvent(BaseEvent):
class LiteAgentExecutionStartedEvent(BaseEvent):
"""Event emitted when a LiteAgent starts executing"""
agent_info: Dict[str, Any]
agent_info: dict[str, Any]
tools: Optional[Sequence[Union[BaseTool, CrewStructuredTool]]]
messages: Union[str, List[Dict[str, str]]]
messages: Union[str, list[dict[str, str]]]
type: str = "lite_agent_execution_started"
model_config = {"arbitrary_types_allowed": True}
@@ -100,7 +100,7 @@ class LiteAgentExecutionStartedEvent(BaseEvent):
class LiteAgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when a LiteAgent completes execution"""
agent_info: Dict[str, Any]
agent_info: dict[str, Any]
output: str
type: str = "lite_agent_execution_completed"
@@ -108,7 +108,7 @@ class LiteAgentExecutionCompletedEvent(BaseEvent):
class LiteAgentExecutionErrorEvent(BaseEvent):
"""Event emitted when a LiteAgent encounters an error during execution"""
agent_info: Dict[str, Any]
agent_info: dict[str, Any]
error: str
type: str = "lite_agent_execution_error"

View File

@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from typing import TYPE_CHECKING, Any, Optional, Union
from crewai.events.base_events import BaseEvent
@@ -14,7 +14,7 @@ class CrewBaseEvent(BaseEvent):
crew_name: Optional[str]
crew: Optional[Crew] = None
def __init__(self, **data):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
self.set_crew_fingerprint()
@@ -28,7 +28,7 @@ class CrewBaseEvent(BaseEvent):
):
self.fingerprint_metadata = self.crew.fingerprint.metadata
def to_json(self, exclude: set[str] | None = None):
def to_json(self, exclude: set[str] | None = None) -> dict[str, Any]:
if exclude is None:
exclude = set()
exclude.add("crew")
@@ -38,7 +38,7 @@ class CrewBaseEvent(BaseEvent):
class CrewKickoffStartedEvent(CrewBaseEvent):
"""Event emitted when a crew starts execution"""
inputs: Optional[Dict[str, Any]]
inputs: Optional[dict[str, Any]]
type: str = "crew_kickoff_started"
@@ -62,7 +62,7 @@ class CrewTrainStartedEvent(CrewBaseEvent):
n_iterations: int
filename: str
inputs: Optional[Dict[str, Any]]
inputs: Optional[dict[str, Any]]
type: str = "crew_train_started"
@@ -86,7 +86,7 @@ class CrewTestStartedEvent(CrewBaseEvent):
n_iterations: int
eval_llm: Optional[Union[str, Any]]
inputs: Optional[Dict[str, Any]]
inputs: Optional[dict[str, Any]]
type: str = "crew_test_started"
@@ -110,3 +110,12 @@ class CrewTestResultEvent(CrewBaseEvent):
execution_duration: float
model: str
type: str = "crew_test_result"
class CrewKickoffCancelledEvent(CrewBaseEvent):
"""Event emitted when a crew execution is cancelled"""
reason: str = "External cancellation requested"
completed_tasks: int = 0
total_tasks: int = 0
type: str = "crew_kickoff_cancelled"

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Union
from typing import Any, Optional, Union
from pydantic import BaseModel, ConfigDict
@@ -16,7 +16,7 @@ class FlowStartedEvent(FlowEvent):
"""Event emitted when a flow starts execution"""
flow_name: str
inputs: Optional[Dict[str, Any]] = None
inputs: Optional[dict[str, Any]] = None
type: str = "flow_started"
@@ -32,8 +32,8 @@ class MethodExecutionStartedEvent(FlowEvent):
flow_name: str
method_name: str
state: Union[Dict[str, Any], BaseModel]
params: Optional[Dict[str, Any]] = None
state: Union[dict[str, Any], BaseModel]
params: Optional[dict[str, Any]] = None
type: str = "method_execution_started"
@@ -43,7 +43,7 @@ class MethodExecutionFinishedEvent(FlowEvent):
flow_name: str
method_name: str
result: Any = None
state: Union[Dict[str, Any], BaseModel]
state: Union[dict[str, Any], BaseModel]
type: str = "method_execution_finished"

View File

@@ -1,5 +1,5 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from typing import Any, Optional, Union
from pydantic import BaseModel
@@ -39,10 +39,10 @@ class LLMCallStartedEvent(LLMEventBase):
type: str = "llm_call_started"
model: Optional[str] = None
messages: Optional[Union[str, List[Dict[str, Any]]]] = None
tools: Optional[List[dict[str, Any]]] = None
callbacks: Optional[List[Any]] = None
available_functions: Optional[Dict[str, Any]] = None
messages: Optional[Union[str, list[dict[str, Any]]]] = None
tools: Optional[list[dict[str, Any]]] = None
callbacks: Optional[list[Any]] = None
available_functions: Optional[dict[str, Any]] = None
class LLMCallCompletedEvent(LLMEventBase):

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Any, Optional
from crewai.events.base_events import BaseEvent
@@ -55,7 +55,7 @@ class MemorySaveStartedEvent(MemoryBaseEvent):
type: str = "memory_save_started"
value: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
metadata: Optional[dict[str, Any]] = None
agent_role: Optional[str] = None
@@ -64,7 +64,7 @@ class MemorySaveCompletedEvent(MemoryBaseEvent):
type: str = "memory_save_completed"
value: str
metadata: Optional[Dict[str, Any]] = None
metadata: Optional[dict[str, Any]] = None
agent_role: Optional[str] = None
save_time_ms: float
@@ -74,7 +74,7 @@ class MemorySaveFailedEvent(MemoryBaseEvent):
type: str = "memory_save_failed"
value: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
metadata: Optional[dict[str, Any]] = None
agent_role: Optional[str] = None
error: str

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, Callable, Dict, Optional
from typing import Any, Callable, Optional
from crewai.events.base_events import BaseEvent
@@ -11,7 +11,7 @@ class ToolUsageEvent(BaseEvent):
agent_role: Optional[str] = None
agent_id: Optional[str] = None
tool_name: str
tool_args: Dict[str, Any] | str
tool_args: dict[str, Any] | str
tool_class: Optional[str] = None
run_attempts: int | None = None
delegations: int | None = None
@@ -81,7 +81,7 @@ class ToolExecutionErrorEvent(BaseEvent):
error: Any
type: str = "tool_execution_error"
tool_name: str
tool_args: Dict[str, Any]
tool_args: dict[str, Any]
tool_class: Callable
agent: Optional[Any] = None

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Any, Optional
from rich.console import Console
from rich.panel import Panel
@@ -16,7 +16,7 @@ class ConsoleFormatter:
current_flow_tree: Optional[Tree] = None
current_method_branch: Optional[Tree] = None
current_lite_agent_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
tool_usage_counts: dict[str, int] = {}
current_reasoning_branch: Optional[Tree] = None # Track reasoning status
_live_paused: bool = False
current_llm_tool_tree: Optional[Tree] = None
@@ -45,7 +45,7 @@ class ConsoleFormatter:
title: str,
name: str,
status_style: str = "blue",
tool_args: Dict[str, Any] | str = "",
tool_args: dict[str, Any] | str = "",
**fields,
) -> Text:
"""Create standardized status content with consistent formatting."""
@@ -480,7 +480,7 @@ class ConsoleFormatter:
def handle_llm_tool_usage_started(
self,
tool_name: str,
tool_args: Dict[str, Any] | str,
tool_args: dict[str, Any] | str,
):
# Create status content for the tool usage
content = self.create_status_content(
@@ -523,7 +523,7 @@ class ConsoleFormatter:
agent_branch: Optional[Tree],
tool_name: str,
crew_tree: Optional[Tree],
tool_args: Dict[str, Any] | str = "",
tool_args: dict[str, Any] | str = "",
) -> Optional[Tree]:
"""Handle tool usage started event."""
if not self.verbose:
@@ -938,7 +938,7 @@ class ConsoleFormatter:
lite_agent_branch: Optional[Tree],
lite_agent_role: str,
status: str = "completed",
**fields: Dict[str, Any],
**fields: dict[str, Any],
) -> None:
"""Update lite agent status in the tree."""
if not self.verbose or lite_agent_branch is None:
@@ -981,7 +981,7 @@ class ConsoleFormatter:
lite_agent_role: str,
status: str = "started",
error: Any = None,
**fields: Dict[str, Any],
**fields: dict[str, Any],
) -> None:
"""Handle lite agent execution events with consistent formatting."""
if not self.verbose:

View File

@@ -1,7 +1,7 @@
import abc
import enum
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from pydantic import BaseModel, Field
@@ -57,7 +57,7 @@ class BaseEvaluator(abc.ABC):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
execution_trace: dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
@@ -67,7 +67,7 @@ class BaseEvaluator(abc.ABC):
class AgentEvaluationResult(BaseModel):
agent_id: str = Field(description="ID of the evaluated agent")
task_id: str = Field(description="ID of the task that was executed")
metrics: Dict[MetricCategory, EvaluationScore] = Field(
metrics: dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Evaluation scores for each metric category"
)
@@ -97,11 +97,11 @@ class AgentAggregatedEvaluationResult(BaseModel):
default=AggregationStrategy.SIMPLE_AVERAGE,
description="Strategy used for aggregation"
)
metrics: Dict[MetricCategory, EvaluationScore] = Field(
metrics: dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Aggregated metrics across all tasks"
)
task_results: List[str] = Field(
task_results: list[str] = Field(
default_factory=list,
description="IDs of tasks included in this aggregation"
)
@@ -122,4 +122,4 @@ class AgentAggregatedEvaluationResult(BaseModel):
detailed_feedback = "\n ".join(score.feedback.split('\n'))
result += f" {detailed_feedback}\n"
return result
return result

View File

@@ -1,5 +1,5 @@
from collections import defaultdict
from typing import Dict, Any, List
from typing import Any
from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED
from collections.abc import Sequence
@@ -19,7 +19,7 @@ class EvaluationDisplayFormatter:
self.console_formatter = ConsoleFormatter()
def display_evaluation_with_feedback(
self, iterations_results: Dict[int, Dict[str, List[Any]]]
self, iterations_results: dict[int, dict[str, list[Any]]]
):
if not iterations_results:
self.console_formatter.print(
@@ -99,7 +99,7 @@ class EvaluationDisplayFormatter:
def display_summary_results(
self,
iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]],
iterations_results: dict[int, dict[str, list[AgentAggregatedEvaluationResult]]],
):
if not iterations_results:
self.console_formatter.print(
@@ -304,8 +304,8 @@ class EvaluationDisplayFormatter:
self,
agent_role: str,
metric: str,
feedbacks: List[str],
scores: List[float | None],
feedbacks: list[str],
scores: list[float | None],
strategy: AggregationStrategy,
) -> str:
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, Dict, Optional
from typing import Any, Optional
from collections.abc import Sequence
@@ -253,7 +253,7 @@ class EvaluationTraceCallback(BaseEventListener):
if hasattr(self, "current_llm_call"):
self.current_llm_call = {}
def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]:
def get_trace(self, agent_id: str, task_id: str) -> Optional[dict[str, Any]]:
trace_key = f"{agent_id}_{task_id}"
return self.traces.get(trace_key)

View File

@@ -1,4 +1,4 @@
from typing import Dict, Any
from typing import Any
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
@@ -23,7 +23,7 @@ class ExperimentResultsDisplay:
self.console.print(table)
def comparison_summary(self, comparison: Dict[str, Any], baseline_timestamp: str):
def comparison_summary(self, comparison: dict[str, Any], baseline_timestamp: str):
self.console.print(Panel(f"[bold]Comparison with baseline run from {baseline_timestamp}[/bold]",
expand=False))

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict
from typing import Any
from crewai.agent import Agent
from crewai.task import Task
@@ -14,7 +14,7 @@ class GoalAlignmentEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
execution_trace: dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:

View File

@@ -9,7 +9,7 @@ This module provides evaluator implementations for:
import logging
import re
from enum import Enum
from typing import Any, Dict, List, Tuple
from typing import Any
import numpy as np
from collections.abc import Sequence
@@ -36,7 +36,7 @@ class ReasoningEfficiencyEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
execution_trace: dict[str, Any],
final_output: TaskOutput | str,
task: Task | None = None,
) -> EvaluationScore:
@@ -183,7 +183,7 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
raw_response=response
)
def _detect_loops(self, llm_calls: List[Dict]) -> Tuple[bool, List[Dict]]:
def _detect_loops(self, llm_calls: list[dict]) -> tuple[bool, list[dict]]:
loop_details = []
messages = []
@@ -227,7 +227,7 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
return intersection / union if union > 0 else 0.0
def _analyze_reasoning_patterns(self, llm_calls: List[Dict]) -> Dict[str, Any]:
def _analyze_reasoning_patterns(self, llm_calls: list[dict]) -> dict[str, Any]:
call_lengths = []
response_times = []
@@ -331,7 +331,7 @@ Identify any inefficient reasoning patterns and provide specific suggestions for
return np.mean(indicators) if indicators else 0.0
def _get_call_samples(self, llm_calls: List[Dict]) -> str:
def _get_call_samples(self, llm_calls: list[dict]) -> str:
samples = []
if len(llm_calls) <= 6:

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict
from typing import Any
from crewai.agent import Agent
from crewai.task import Task
@@ -14,7 +14,7 @@ class SemanticQualityEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
execution_trace: dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
@@ -65,4 +65,4 @@ Evaluate the semantic quality and reasoning of this output.
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)
)

View File

@@ -1,5 +1,5 @@
import json
from typing import Dict, Any
from typing import Any
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
@@ -16,7 +16,7 @@ class ToolSelectionEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
execution_trace: dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
@@ -132,7 +132,7 @@ class ParameterExtractionEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
execution_trace: dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
@@ -274,7 +274,7 @@ class ToolInvocationEvaluator(BaseEvaluator):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
execution_trace: dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:

View File

@@ -5,12 +5,8 @@ import logging
from typing import (
Any,
Callable,
Dict,
Generic,
List,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
@@ -57,14 +53,14 @@ class FlowState(BaseModel):
# Type variables with explicit bounds
T = TypeVar(
"T", bound=Union[Dict[str, Any], BaseModel]
"T", bound=Union[dict[str, Any], BaseModel]
) # Generic flow state type parameter
StateT = TypeVar(
"StateT", bound=Union[Dict[str, Any], BaseModel]
"StateT", bound=Union[dict[str, Any], BaseModel]
) # State validation type parameter
def ensure_state_type(state: Any, expected_type: Type[StateT]) -> StateT:
def ensure_state_type(state: Any, expected_type: type[StateT]) -> StateT:
"""Ensure state matches expected type with proper validation.
Args:
@@ -436,19 +432,19 @@ class FlowMeta(type):
class Flow(Generic[T], metaclass=FlowMeta):
"""Base class for all flows.
Type parameter T must be either Dict[str, Any] or a subclass of BaseModel."""
Type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
_printer = Printer()
_start_methods: List[str] = []
_listeners: Dict[str, tuple[str, List[str]]] = {}
_routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
_start_methods: list[str] = []
_listeners: dict[str, tuple[str, list[str]]] = {}
_routers: set[str] = set()
_router_paths: dict[str, list[str]] = {}
initial_state: Union[type[T], T, None] = None
name: Optional[str] = None
tracing: Optional[bool] = False
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
def __class_getitem__(cls: type["Flow"], item: type[T]) -> type["Flow"]:
class _FlowGeneric(cls): # type: ignore
_initial_state_T = item # type: ignore
@@ -468,11 +464,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
**kwargs: Additional state values to initialize or override
"""
# Initialize basic instance attributes
self._methods: Dict[str, Callable] = {}
self._method_execution_counts: Dict[str, int] = {}
self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs
self._completed_methods: Set[str] = set() # Track completed methods for reload
self._methods: dict[str, Callable] = {}
self._method_execution_counts: dict[str, int] = {}
self._pending_and_listeners: dict[str, set[str]] = {}
self._method_outputs: list[Any] = [] # List to store all method outputs
self._completed_methods: set[str] = set() # Track completed methods for reload
self._persistence: Optional[FlowPersistence] = persistence
self._is_execution_resuming: bool = False
@@ -600,7 +596,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return self._state
@property
def method_outputs(self) -> List[Any]:
def method_outputs(self) -> list[Any]:
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
@@ -637,7 +633,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
except (AttributeError, TypeError):
return "" # Safely handle any unexpected attribute access issues
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
def _initialize_state(self, inputs: dict[str, Any]) -> None:
"""Initialize or update flow state with new inputs.
Args:
@@ -691,7 +687,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")
def _restore_state(self, stored_state: Dict[str, Any]) -> None:
def _restore_state(self, stored_state: dict[str, Any]) -> None:
"""Restore flow state from persistence.
Args:
@@ -783,7 +779,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif hasattr(self._state, field_name):
object.__setattr__(self._state, field_name, value)
def _apply_state_updates(self, updates: Dict[str, Any]) -> None:
def _apply_state_updates(self, updates: dict[str, Any]) -> None:
"""Apply multiple state updates efficiently."""
if isinstance(self._state, dict):
self._state.update(updates)
@@ -792,7 +788,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if hasattr(self._state, key):
object.__setattr__(self._state, key, value)
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
def kickoff(self, inputs: Optional[dict[str, Any]] = None) -> Any:
"""
Start the flow execution in a synchronous context.
@@ -805,7 +801,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return asyncio.run(run_flow())
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
async def kickoff_async(self, inputs: Optional[dict[str, Any]] = None) -> Any:
"""
Start the flow execution asynchronously.
@@ -1109,7 +1105,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
def _find_triggered_methods(
self, trigger_method: str, router_only: bool
) -> List[str]:
) -> list[str]:
"""
Finds all methods that should be triggered based on conditions.
@@ -1126,7 +1122,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns
-------
List[str]
list[str]
Names of methods that should be triggered.
Notes

View File

@@ -7,7 +7,7 @@ traversal attacks and ensure paths remain within allowed boundaries.
import os
from pathlib import Path
from typing import List, Union
from typing import Union
def safe_path_join(*parts: str, root: Union[str, Path, None] = None) -> str:
@@ -101,7 +101,7 @@ def validate_path_exists(path: Union[str, Path], file_type: str = "file") -> str
raise ValueError(f"Invalid path: {str(e)}")
def list_files(directory: Union[str, Path], pattern: str = "*") -> List[str]:
def list_files(directory: Union[str, Path], pattern: str = "*") -> list[str]:
"""
Safely list files in a directory matching a pattern.
@@ -114,7 +114,7 @@ def list_files(directory: Union[str, Path], pattern: str = "*") -> List[str]:
Returns
-------
List[str]
list[str]
List of matching file paths.
Raises

View File

@@ -4,7 +4,7 @@ CrewAI Flow Persistence.
This module provides interfaces and implementations for persisting flow states.
"""
from typing import Any, Dict, TypeVar, Union
from typing import Any, TypeVar, Union
from pydantic import BaseModel
@@ -14,5 +14,5 @@ from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
__all__ = ["FlowPersistence", "persist", "SQLiteFlowPersistence"]
StateType = TypeVar('StateType', bound=Union[Dict[str, Any], BaseModel])
DictStateType = Dict[str, Any]
StateType = TypeVar('StateType', bound=Union[dict[str, Any], BaseModel])
DictStateType = dict[str, Any]

View File

@@ -1,7 +1,7 @@
"""Base class for flow state persistence."""
import abc
from typing import Any, Dict, Optional, Union
from typing import Any, Optional, Union
from pydantic import BaseModel
@@ -29,7 +29,7 @@ class FlowPersistence(abc.ABC):
self,
flow_uuid: str,
method_name: str,
state_data: Union[Dict[str, Any], BaseModel]
state_data: Union[dict[str, Any], BaseModel]
) -> None:
"""Persist the flow state after method completion.
@@ -41,7 +41,7 @@ class FlowPersistence(abc.ABC):
pass
@abc.abstractmethod
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
def load_state(self, flow_uuid: str) -> Optional[dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
Args:

View File

@@ -6,7 +6,7 @@ import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Optional, Union
from pydantic import BaseModel
@@ -70,7 +70,7 @@ class SQLiteFlowPersistence(FlowPersistence):
self,
flow_uuid: str,
method_name: str,
state_data: Union[Dict[str, Any], BaseModel],
state_data: Union[dict[str, Any], BaseModel],
) -> None:
"""Save the current flow state to SQLite.
@@ -107,7 +107,7 @@ class SQLiteFlowPersistence(FlowPersistence):
),
)
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
def load_state(self, flow_uuid: str) -> Optional[dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
Args:

View File

@@ -17,10 +17,10 @@ import ast
import inspect
import textwrap
from collections import defaultdict, deque
from typing import Any, Deque, Dict, List, Optional, Set, Union
from typing import Any, Deque, Optional, Union
def get_possible_return_constants(function: Any) -> Optional[List[str]]:
def get_possible_return_constants(function: Any) -> Optional[list[str]]:
try:
source = inspect.getsource(function)
except OSError:
@@ -94,7 +94,7 @@ def get_possible_return_constants(function: Any) -> Optional[List[str]]:
return list(return_values) if return_values else None
def calculate_node_levels(flow: Any) -> Dict[str, int]:
def calculate_node_levels(flow: Any) -> dict[str, int]:
"""
Calculate the hierarchical level of each node in the flow.
@@ -108,7 +108,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
Returns
-------
Dict[str, int]
dict[str, int]
Dictionary mapping method names to their hierarchical levels.
Notes
@@ -118,10 +118,10 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
- Handles both OR and AND conditions for listeners
- Processes router paths separately
"""
levels: Dict[str, int] = {}
levels: dict[str, int] = {}
queue: Deque[str] = deque()
visited: Set[str] = set()
pending_and_listeners: Dict[str, Set[str]] = {}
visited: set[str] = set()
pending_and_listeners: dict[str, set[str]] = {}
# Make all start methods at level 0
for method_name, method in flow._methods.items():
@@ -172,7 +172,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
return levels
def count_outgoing_edges(flow: Any) -> Dict[str, int]:
def count_outgoing_edges(flow: Any) -> dict[str, int]:
"""
Count the number of outgoing edges for each method in the flow.
@@ -183,7 +183,7 @@ def count_outgoing_edges(flow: Any) -> Dict[str, int]:
Returns
-------
Dict[str, int]
dict[str, int]
Dictionary mapping method names to their outgoing edge count.
"""
counts = {}
@@ -197,7 +197,7 @@ def count_outgoing_edges(flow: Any) -> Dict[str, int]:
return counts
def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
def build_ancestor_dict(flow: Any) -> dict[str, set[str]]:
"""
Build a dictionary mapping each node to its ancestor nodes.
@@ -208,11 +208,11 @@ def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
Returns
-------
Dict[str, Set[str]]
dict[str, set[str]]
Dictionary mapping each node to a set of its ancestor nodes.
"""
ancestors: Dict[str, Set[str]] = {node: set() for node in flow._methods}
visited: Set[str] = set()
ancestors: dict[str, set[str]] = {node: set() for node in flow._methods}
visited: set[str] = set()
for node in flow._methods:
if node not in visited:
dfs_ancestors(node, ancestors, visited, flow)
@@ -220,7 +220,7 @@ def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
def dfs_ancestors(
node: str, ancestors: Dict[str, Set[str]], visited: Set[str], flow: Any
node: str, ancestors: dict[str, set[str]], visited: set[str], flow: Any
) -> None:
"""
Perform depth-first search to build ancestor relationships.
@@ -229,9 +229,9 @@ def dfs_ancestors(
----------
node : str
Current node being processed.
ancestors : Dict[str, Set[str]]
ancestors : dict[str, set[str]]
Dictionary tracking ancestor relationships.
visited : Set[str]
visited : set[str]
Set of already visited nodes.
flow : Any
The flow instance being analyzed.
@@ -265,7 +265,7 @@ def dfs_ancestors(
def is_ancestor(
node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]
node: str, ancestor_candidate: str, ancestors: dict[str, set[str]]
) -> bool:
"""
Check if one node is an ancestor of another.
@@ -287,7 +287,7 @@ def is_ancestor(
return ancestor_candidate in ancestors.get(node, set())
def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
def build_parent_children_dict(flow: Any) -> dict[str, list[str]]:
"""
Build a dictionary mapping parent nodes to their children.
@@ -298,7 +298,7 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
Returns
-------
Dict[str, List[str]]
dict[str, list[str]]
Dictionary mapping parent method names to lists of their child method names.
Notes
@@ -307,7 +307,7 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
- Maps router methods to their paths and listeners
- Children lists are sorted for consistent ordering
"""
parent_children: Dict[str, List[str]] = {}
parent_children: dict[str, list[str]] = {}
# Map listeners to their trigger methods
for listener_name, (_, trigger_methods) in flow._listeners.items():
@@ -332,7 +332,7 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
def get_child_index(
parent: str, child: str, parent_children: Dict[str, List[str]]
parent: str, child: str, parent_children: dict[str, list[str]]
) -> int:
"""
Get the index of a child node in its parent's sorted children list.
@@ -343,7 +343,7 @@ def get_child_index(
The parent node name.
child : str
The child node name to find the index for.
parent_children : Dict[str, List[str]]
parent_children : dict[str, list[str]]
Dictionary mapping parents to their children lists.
Returns

View File

@@ -17,7 +17,7 @@ Example
import ast
import inspect
from typing import Any, Dict, List, Tuple, Union
from typing import Any, Union
from .utils import (
build_ancestor_dict,
@@ -73,8 +73,8 @@ def method_calls_crew(method: Any) -> bool:
def add_nodes_to_network(
net: Any,
flow: Any,
node_positions: Dict[str, Tuple[float, float]],
node_styles: Dict[str, Dict[str, Any]]
node_positions: dict[str, tuple[float, float]],
node_styles: dict[str, dict[str, Any]]
) -> None:
"""
Add nodes to the network visualization with appropriate styling.
@@ -85,9 +85,9 @@ def add_nodes_to_network(
The pyvis Network instance to add nodes to.
flow : Any
The flow instance containing method information.
node_positions : Dict[str, Tuple[float, float]]
node_positions : dict[str, tuple[float, float]]
Dictionary mapping node names to their (x, y) positions.
node_styles : Dict[str, Dict[str, Any]]
node_styles : dict[str, dict[str, Any]]
Dictionary containing style configurations for different node types.
Notes
@@ -138,10 +138,10 @@ def add_nodes_to_network(
def compute_positions(
flow: Any,
node_levels: Dict[str, int],
node_levels: dict[str, int],
y_spacing: float = 150,
x_spacing: float = 300
) -> Dict[str, Tuple[float, float]]:
) -> dict[str, tuple[float, float]]:
"""
Compute the (x, y) positions for each node in the flow graph.
@@ -149,7 +149,7 @@ def compute_positions(
----------
flow : Any
The flow instance to compute positions for.
node_levels : Dict[str, int]
node_levels : dict[str, int]
Dictionary mapping node names to their hierarchical levels.
y_spacing : float, optional
Vertical spacing between levels, by default 150.
@@ -158,11 +158,11 @@ def compute_positions(
Returns
-------
Dict[str, Tuple[float, float]]
dict[str, tuple[float, float]]
Dictionary mapping node names to their (x, y) coordinates.
"""
level_nodes: Dict[int, List[str]] = {}
node_positions: Dict[str, Tuple[float, float]] = {}
level_nodes: dict[int, list[str]] = {}
node_positions: dict[str, tuple[float, float]] = {}
for method_name, level in node_levels.items():
level_nodes.setdefault(level, []).append(method_name)
@@ -180,10 +180,10 @@ def compute_positions(
def add_edges(
net: Any,
flow: Any,
node_positions: Dict[str, Tuple[float, float]],
colors: Dict[str, str]
node_positions: dict[str, tuple[float, float]],
colors: dict[str, str]
) -> None:
edge_smooth: Dict[str, Union[str, float]] = {"type": "continuous"} # Default value
edge_smooth: dict[str, Union[str, float]] = {"type": "continuous"} # Default value
"""
Add edges to the network visualization with appropriate styling.
@@ -193,9 +193,9 @@ def add_edges(
The pyvis Network instance to add edges to.
flow : Any
The flow instance containing edge information.
node_positions : Dict[str, Tuple[float, float]]
node_positions : dict[str, tuple[float, float]]
Dictionary mapping node names to their positions.
colors : Dict[str, str]
colors : dict[str, str]
Dictionary mapping edge types to their colors.
Notes

View File

@@ -1,5 +1,5 @@
import os
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field
@@ -13,22 +13,22 @@ class Knowledge(BaseModel):
"""
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
sources: list[BaseKnowledgeSource] = Field(default_factory=list)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder: Optional[Dict[str, Any]] = None
embedder: Optional[dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
sources: list[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder: Optional[Dict[str, Any]] = None
embedder: Optional[dict[str, Any]] = None
collection_name: Optional[str] = None
def __init__(
self,
collection_name: str,
sources: List[BaseKnowledgeSource],
embedder: Optional[Dict[str, Any]] = None,
sources: list[BaseKnowledgeSource],
embedder: Optional[dict[str, Any]] = None,
storage: Optional[KnowledgeStorage] = None,
**data,
):
@@ -43,8 +43,8 @@ class Knowledge(BaseModel):
self.storage.initialize_knowledge_storage()
def query(
self, query: List[str], results_limit: int = 3, score_threshold: float = 0.35
) -> List[Dict[str, Any]]:
self, query: list[str], results_limit: int = 3, score_threshold: float = 0.35
) -> list[dict[str, Any]]:
"""
Query across all knowledge sources to find the most relevant information.
Returns the top_k most relevant chunks.

View File

@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Optional, Union
from pydantic import Field, field_validator
@@ -14,16 +14,16 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
"""Base class for knowledge sources that load content from files."""
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
file_path: Optional[Union[Path, list[Path], str, list[str]]] = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
file_paths: Optional[Union[Path, list[Path], str, list[str]]] = Field(
default_factory=list, description="The path to the file"
)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
content: dict[Path, str] = Field(init=False, default_factory=dict)
storage: Optional[KnowledgeStorage] = Field(default=None)
safe_file_paths: List[Path] = Field(default_factory=list)
safe_file_paths: list[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
@@ -46,7 +46,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
self.content = self.load_content()
@abstractmethod
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess file content. Should be overridden by subclasses. Assume that the file path is relative to the project root in the knowledge directory."""
pass
@@ -78,7 +78,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
def _process_file_paths(self) -> List[Path]:
def _process_file_paths(self) -> list[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
@@ -93,7 +93,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list
path_list: List[Union[Path, str]] = (
path_list: list[Union[Path, str]] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any, Optional
import numpy as np
from pydantic import BaseModel, ConfigDict, Field
@@ -12,12 +12,12 @@ class BaseKnowledgeSource(BaseModel, ABC):
chunk_size: int = 4000
chunk_overlap: int = 200
chunks: List[str] = Field(default_factory=list)
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
chunks: list[str] = Field(default_factory=list)
chunk_embeddings: list[np.ndarray] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
metadata: Dict[str, Any] = Field(default_factory=dict) # Currently unused
metadata: dict[str, Any] = Field(default_factory=dict) # Currently unused
collection_name: Optional[str] = Field(default=None)
@abstractmethod
@@ -30,11 +30,11 @@ class BaseKnowledgeSource(BaseModel, ABC):
"""Process content, chunk it, compute embeddings, and save them."""
pass
def get_embeddings(self) -> List[np.ndarray]:
def get_embeddings(self) -> list[np.ndarray]:
"""Return the list of embeddings for the chunks."""
return self.chunk_embeddings
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Iterator, List, Optional, Union
from typing import Iterator, Optional, Union
from urllib.parse import urlparse
try:
@@ -35,11 +35,11 @@ class CrewDoclingSource(BaseKnowledgeSource):
_logger: Logger = Logger(verbose=True)
file_path: Optional[List[Union[Path, str]]] = Field(default=None)
file_paths: List[Union[Path, str]] = Field(default_factory=list)
chunks: List[str] = Field(default_factory=list)
safe_file_paths: List[Union[Path, str]] = Field(default_factory=list)
content: List["DoclingDocument"] = Field(default_factory=list)
file_path: Optional[list[Union[Path, str]]] = Field(default=None)
file_paths: list[Union[Path, str]] = Field(default_factory=list)
chunks: list[str] = Field(default_factory=list)
safe_file_paths: list[Union[Path, str]] = Field(default_factory=list)
content: list["DoclingDocument"] = Field(default_factory=list)
document_converter: "DocumentConverter" = Field(
default_factory=lambda: DocumentConverter(
allowed_formats=[
@@ -66,7 +66,7 @@ class CrewDoclingSource(BaseKnowledgeSource):
self.safe_file_paths = self.validate_content()
self.content = self._load_content()
def _load_content(self) -> List["DoclingDocument"]:
def _load_content(self) -> list["DoclingDocument"]:
try:
return self._convert_source_to_docling_documents()
except ConversionError as e:
@@ -88,7 +88,7 @@ class CrewDoclingSource(BaseKnowledgeSource):
self.chunks.extend(list(new_chunks_iterable))
self._save_documents()
def _convert_source_to_docling_documents(self) -> List["DoclingDocument"]:
def _convert_source_to_docling_documents(self) -> list["DoclingDocument"]:
conv_results_iter = self.document_converter.convert_all(self.safe_file_paths)
return [result.document for result in conv_results_iter]
@@ -97,8 +97,8 @@ class CrewDoclingSource(BaseKnowledgeSource):
for chunk in chunker.chunk(doc):
yield chunk.text
def validate_content(self) -> List[Union[Path, str]]:
processed_paths: List[Union[Path, str]] = []
def validate_content(self) -> list[Union[Path, str]]:
processed_paths: list[Union[Path, str]] = []
for path in self.file_paths:
if isinstance(path, str):
if path.startswith(("http://", "https://")):

View File

@@ -1,6 +1,5 @@
import csv
from pathlib import Path
from typing import Dict, List
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
@@ -8,7 +7,7 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge
class CSVKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries CSV file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess CSV file content."""
content_dict = {}
for file_path in self.safe_file_paths:
@@ -32,7 +31,7 @@ class CSVKnowledgeSource(BaseFileKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union
from typing import Iterator, Optional, Union
from urllib.parse import urlparse
from pydantic import Field, field_validator
@@ -16,16 +16,16 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
file_path: Optional[Union[Path, list[Path], str, list[str]]] = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
file_paths: Optional[Union[Path, list[Path], str, list[str]]] = Field(
default_factory=list, description="The path to the file"
)
chunks: List[str] = Field(default_factory=list)
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
safe_file_paths: List[Path] = Field(default_factory=list)
chunks: list[str] = Field(default_factory=list)
content: dict[Path, dict[str, str]] = Field(default_factory=dict)
safe_file_paths: list[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
@@ -41,7 +41,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
raise ValueError("Either file_path or file_paths must be provided")
return v
def _process_file_paths(self) -> List[Path]:
def _process_file_paths(self) -> list[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
@@ -56,7 +56,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list
path_list: List[Union[Path, str]] = (
path_list: list[Union[Path, str]] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)
@@ -100,13 +100,13 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
self.validate_content()
self.content = self._load_content()
def _load_content(self) -> Dict[Path, Dict[str, str]]:
def _load_content(self) -> dict[Path, dict[str, str]]:
"""Load and preprocess Excel file content from multiple sheets.
Each sheet's content is converted to CSV format and stored.
Returns:
Dict[Path, Dict[str, str]]: A mapping of file paths to their respective sheet contents.
dict[Path, dict[str, str]]: A mapping of file paths to their respective sheet contents.
Raises:
ImportError: If required dependencies are missing.
@@ -161,7 +161,7 @@ class ExcelKnowledgeSource(BaseKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import Any, Dict, List
from typing import Any
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
@@ -8,9 +8,9 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge
class JSONKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries JSON file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess JSON file content."""
content: Dict[Path, str] = {}
content: dict[Path, str] = {}
for path in self.safe_file_paths:
path = self.convert_to_path(path)
with open(path, "r", encoding="utf-8") as json_file:
@@ -44,7 +44,7 @@ class JSONKnowledgeSource(BaseFileKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Dict, List
from typing import TYPE_CHECKING
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
@@ -7,7 +7,7 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge
class PDFKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries PDF file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess PDF file content."""
pdfplumber = self._import_pdfplumber()
@@ -45,7 +45,7 @@ class PDFKnowledgeSource(BaseFileKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Optional
from pydantic import Field
@@ -26,7 +26,7 @@ class StringKnowledgeSource(BaseKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Dict, List
from typing import TYPE_CHECKING
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
@@ -7,7 +7,7 @@ from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledge
class TextFileKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries text file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
def load_content(self) -> dict[Path, str]:
"""Load and preprocess text file content."""
content = {}
for path in self.safe_file_paths:
@@ -26,7 +26,7 @@ class TextFileKnowledgeSource(BaseFileKnowledgeSource):
self.chunks.extend(new_chunks)
self._save_documents()
def _chunk_text(self, text: str) -> List[str]:
def _chunk_text(self, text: str) -> list[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any, Optional
class BaseKnowledgeStorage(ABC):
@@ -8,17 +8,17 @@ class BaseKnowledgeStorage(ABC):
@abstractmethod
def search(
self,
query: List[str],
query: list[str],
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
) -> list[dict[str, Any]]:
"""Search for documents in the knowledge base."""
pass
@abstractmethod
def save(
self, documents: List[str], metadata: Dict[str, Any] | List[Dict[str, Any]]
self, documents: list[str], metadata: dict[str, Any] | list[dict[str, Any]]
) -> None:
"""Save documents to the knowledge base."""
pass

View File

@@ -2,7 +2,7 @@ import hashlib
import logging
import os
import shutil
from typing import Any, Dict, List, Optional, Union
from typing import Any, Optional, Union
import chromadb
import chromadb.errors
@@ -33,7 +33,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
def __init__(
self,
embedder: Optional[Dict[str, Any]] = None,
embedder: Optional[dict[str, Any]] = None,
collection_name: Optional[str] = None,
):
self.collection_name = collection_name
@@ -41,11 +41,11 @@ class KnowledgeStorage(BaseKnowledgeStorage):
def search(
self,
query: List[str],
query: list[str],
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
) -> list[dict[str, Any]]:
with suppress_logging(
"chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR
):
@@ -113,8 +113,8 @@ class KnowledgeStorage(BaseKnowledgeStorage):
def save(
self,
documents: List[str],
metadata: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
documents: list[str],
metadata: Optional[Union[dict[str, Any], list[dict[str, Any]]]] = None,
):
if not self.collection:
raise Exception("Collection not initialized")
@@ -179,11 +179,11 @@ class KnowledgeStorage(BaseKnowledgeStorage):
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
def _set_embedder_config(self, embedder: Optional[Dict[str, Any]] = None) -> None:
def _set_embedder_config(self, embedder: Optional[dict[str, Any]] = None) -> None:
"""Set the embedding configuration for the knowledge storage.
Args:
embedder_config (Optional[Dict[str, Any]]): Configuration dictionary for the embedder.
embedder_config (Optional[dict[str, Any]]): Configuration dictionary for the embedder.
If None or empty, defaults to the default embedding function.
"""
self.embedder = (

View File

@@ -1,7 +1,7 @@
from typing import Any, Dict, List
from typing import Any
def extract_knowledge_context(knowledge_snippets: List[Dict[str, Any]]) -> str:
def extract_knowledge_context(knowledge_snippets: list[dict[str, Any]]) -> str:
"""Extract knowledge from the task prompt."""
valid_snippets = [
result["context"]

View File

@@ -9,8 +9,6 @@ from contextlib import contextmanager
from typing import (
Any,
DefaultDict,
Dict,
List,
Literal,
Optional,
Type,
@@ -297,12 +295,12 @@ class LLM(BaseLLM):
temperature: Optional[float] = None,
top_p: Optional[float] = None,
n: Optional[int] = None,
stop: Optional[Union[str, List[str]]] = None,
stop: Optional[Union[str, list[str]]] = None,
max_completion_tokens: Optional[int] = None,
max_tokens: Optional[int] = None,
presence_penalty: Optional[float] = None,
frequency_penalty: Optional[float] = None,
logit_bias: Optional[Dict[int, float]] = None,
logit_bias: Optional[dict[int, float]] = None,
response_format: Optional[Type[BaseModel]] = None,
seed: Optional[int] = None,
logprobs: Optional[int] = None,
@@ -311,7 +309,7 @@ class LLM(BaseLLM):
api_base: Optional[str] = None,
api_version: Optional[str] = None,
api_key: Optional[str] = None,
callbacks: List[Any] | None = None,
callbacks: list[Any] | None = None,
reasoning_effort: Optional[Literal["none", "low", "medium", "high"]] = None,
stream: bool = False,
**kwargs,
@@ -343,9 +341,9 @@ class LLM(BaseLLM):
litellm.drop_params = True
# Normalize self.stop to always be a List[str]
# Normalize self.stop to always be a list[str]
if stop is None:
self.stop: List[str] = []
self.stop: list[str] = []
elif isinstance(stop, str):
self.stop = [stop]
else:
@@ -368,9 +366,9 @@ class LLM(BaseLLM):
def _prepare_completion_params(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
) -> Dict[str, Any]:
messages: Union[str, list[dict[str, str]]],
tools: Optional[list[dict]] = None,
) -> dict[str, Any]:
"""Prepare parameters for the completion call.
Args:
@@ -380,7 +378,7 @@ class LLM(BaseLLM):
available_functions: Optional dict of available functions
Returns:
Dict[str, Any]: Parameters for the completion call
dict[str, Any]: Parameters for the completion call
"""
# --- 1) Format messages according to provider requirements
if isinstance(messages, str):
@@ -419,9 +417,9 @@ class LLM(BaseLLM):
def _handle_streaming_response(
self,
params: Dict[str, Any],
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
params: dict[str, Any],
callbacks: Optional[list[Any]] = None,
available_functions: Optional[dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> str:
@@ -447,7 +445,7 @@ class LLM(BaseLLM):
usage_info = None
tool_calls = None
accumulated_tool_args: DefaultDict[int, AccumulatedToolArgs] = defaultdict(
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict(
AccumulatedToolArgs
)
@@ -699,9 +697,9 @@ class LLM(BaseLLM):
def _handle_streaming_tool_calls(
self,
tool_calls: List[ChatCompletionDeltaToolCall],
accumulated_tool_args: DefaultDict[int, AccumulatedToolArgs],
available_functions: Optional[Dict[str, Any]] = None,
tool_calls: list[ChatCompletionDeltaToolCall],
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs],
available_functions: Optional[dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> None | str:
@@ -744,8 +742,8 @@ class LLM(BaseLLM):
def _handle_streaming_callbacks(
self,
callbacks: Optional[List[Any]],
usage_info: Optional[Dict[str, Any]],
callbacks: Optional[list[Any]],
usage_info: Optional[dict[str, Any]],
last_chunk: Optional[Any],
) -> None:
"""Handle callbacks with usage info for streaming responses.
@@ -786,9 +784,9 @@ class LLM(BaseLLM):
def _handle_non_streaming_response(
self,
params: Dict[str, Any],
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
params: dict[str, Any],
callbacks: Optional[list[Any]] = None,
available_functions: Optional[dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> str | Any:
@@ -868,8 +866,8 @@ class LLM(BaseLLM):
def _handle_tool_call(
self,
tool_calls: List[Any],
available_functions: Optional[Dict[str, Any]] = None,
tool_calls: list[Any],
available_functions: Optional[dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> Optional[str]:
@@ -958,10 +956,10 @@ class LLM(BaseLLM):
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
messages: Union[str, list[dict[str, str]]],
tools: Optional[list[dict]] = None,
callbacks: Optional[list[Any]] = None,
available_functions: Optional[dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> Union[str, Any]:
@@ -1105,8 +1103,8 @@ class LLM(BaseLLM):
)
def _format_messages_for_provider(
self, messages: List[Dict[str, str]]
) -> List[Dict[str, str]]:
self, messages: list[dict[str, str]]
) -> list[dict[str, str]]:
"""Format messages according to provider requirements.
Args:
@@ -1247,7 +1245,7 @@ class LLM(BaseLLM):
self.context_window_size = int(value * CONTEXT_WINDOW_USAGE_RATIO)
return self.context_window_size
def set_callbacks(self, callbacks: List[Any]):
def set_callbacks(self, callbacks: list[Any]):
"""
Attempt to keep a single set of callbacks in litellm by removing old
duplicates and adding new ones.

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from typing import Any, Optional, Union
class BaseLLM(ABC):
@@ -21,7 +21,7 @@ class BaseLLM(ABC):
model: str
temperature: Optional[float] = None
stop: Optional[List[str]] = None
stop: Optional[list[str]] = None
def __init__(
self,
@@ -43,10 +43,10 @@ class BaseLLM(ABC):
@abstractmethod
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
messages: Union[str, list[dict[str, str]]],
tools: Optional[list[dict]] = None,
callbacks: Optional[list[Any]] = None,
available_functions: Optional[dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> Union[str, Any]:

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Union
from typing import Any, Optional, Union
import aisuite as ai
@@ -12,10 +12,10 @@ class AISuiteLLM(BaseLLM):
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
messages: Union[str, list[dict[str, str]]],
tools: Optional[list[dict]] = None,
callbacks: Optional[list[Any]] = None,
available_functions: Optional[dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> Union[str, Any]:
@@ -26,9 +26,9 @@ class AISuiteLLM(BaseLLM):
def _prepare_completion_params(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
) -> Dict[str, Any]:
messages: Union[str, list[dict[str, str]]],
tools: Optional[list[dict]] = None,
) -> dict[str, Any]:
return {
"model": self.model,
"messages": messages,

View File

@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Optional
import time
from crewai.memory.external.external_memory_item import ExternalMemoryItem
@@ -23,19 +23,19 @@ class ExternalMemory(Memory):
super().__init__(storage=storage, **data)
@staticmethod
def _configure_mem0(crew: Any, config: Dict[str, Any]) -> "Mem0Storage":
def _configure_mem0(crew: Any, config: dict[str, Any]) -> "Mem0Storage":
from crewai.memory.storage.mem0_storage import Mem0Storage
return Mem0Storage(type="external", crew=crew, config=config)
@staticmethod
def external_supported_storages() -> Dict[str, Any]:
def external_supported_storages() -> dict[str, Any]:
return {
"mem0": ExternalMemory._configure_mem0,
}
@staticmethod
def create_storage(crew: Any, embedder_config: Optional[Dict[str, Any]]) -> Storage:
def create_storage(crew: Any, embedder_config: Optional[dict[str, Any]]) -> Storage:
if not embedder_config:
raise ValueError("embedder_config is required")
@@ -52,7 +52,7 @@ class ExternalMemory(Memory):
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
) -> None:
"""Saves a value into the external storage."""
crewai_event_bus.emit(

View File

@@ -1,11 +1,11 @@
from typing import Any, Dict, Optional
from typing import Any, Optional
class ExternalMemoryItem:
def __init__(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
agent: Optional[str] = None,
):
self.value = value

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any
import time
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
@@ -84,7 +84,7 @@ class LongTermMemory(Memory):
self,
task: str,
latest_n: int = 3,
) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
) -> list[dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional, Union
from typing import Any, Optional, Union
class LongTermMemoryItem:
@@ -9,7 +9,7 @@ class LongTermMemoryItem:
expected_output: str,
datetime: str,
quality: Optional[Union[int, float]] = None,
metadata: Optional[Dict[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
):
self.task = task
self.agent = agent

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from typing import Any, Optional, TYPE_CHECKING
from pydantic import BaseModel
@@ -12,7 +12,7 @@ class Memory(BaseModel):
Base class for memory, now supporting agent tags and generic metadata.
"""
embedder_config: Optional[Dict[str, Any]] = None
embedder_config: Optional[dict[str, Any]] = None
crew: Optional[Any] = None
storage: Any
@@ -45,7 +45,7 @@ class Memory(BaseModel):
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
) -> None:
metadata = metadata or {}
@@ -56,7 +56,7 @@ class Memory(BaseModel):
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
) -> list[Any]:
return self.storage.search(
query=query, limit=limit, score_threshold=score_threshold
)

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Any, Optional
import time
from pydantic import PrivateAttr
@@ -56,7 +56,7 @@ class ShortTermMemory(Memory):
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
) -> None:
crewai_event_bus.emit(
self,

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Any, Optional
class ShortTermMemoryItem:
@@ -6,7 +6,7 @@ class ShortTermMemoryItem:
self,
data: Any,
agent: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
):
self.data = data
self.agent = agent

View File

@@ -1,15 +1,15 @@
from typing import Any, Dict, List
from typing import Any
class Storage:
"""Abstract base class defining the storage interface"""
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
def save(self, value: Any, metadata: dict[str, Any]) -> None:
pass
def search(
self, query: str, limit: int, score_threshold: float
) -> Dict[str, Any] | List[Any]:
) -> dict[str, Any] | list[Any]:
return {}
def reset(self) -> None:

View File

@@ -2,7 +2,7 @@ import json
import logging
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from crewai.task import Task
from crewai.utilities import Printer
@@ -62,10 +62,10 @@ class KickoffTaskOutputsSQLiteStorage:
def add(
self,
task: Task,
output: Dict[str, Any],
output: dict[str, Any],
task_index: int,
was_replayed: bool = False,
inputs: Dict[str, Any] | None = None,
inputs: dict[str, Any] | None = None,
) -> None:
"""Add a new task output record to the database.
@@ -153,7 +153,7 @@ class KickoffTaskOutputsSQLiteStorage:
logger.error(error_msg)
raise DatabaseOperationError(error_msg, e)
def load(self) -> List[Dict[str, Any]]:
def load(self) -> list[dict[str, Any]]:
"""Load all task output records from the database.
Returns:

View File

@@ -1,7 +1,7 @@
import json
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from typing import Any, Optional, Union
from crewai.utilities import Printer
from crewai.utilities.paths import db_storage_path
@@ -53,7 +53,7 @@ class LTMSQLiteStorage:
def save(
self,
task_description: str,
metadata: Dict[str, Any],
metadata: dict[str, Any],
datetime: str,
score: Union[int, float],
) -> None:
@@ -77,7 +77,7 @@ class LTMSQLiteStorage:
def load(
self, task_description: str, latest_n: int
) -> Optional[List[Dict[str, Any]]]:
) -> Optional[list[dict[str, Any]]]:
"""Queries the LTM table by task description with error handling."""
try:
with sqlite3.connect(self.db_path) as conn:

View File

@@ -1,5 +1,5 @@
import os
from typing import Any, Dict, List
from typing import Any
from collections import defaultdict
from mem0 import Memory, MemoryClient
from crewai.utilities.chromadb import sanitize_collection_name
@@ -86,7 +86,7 @@ class Mem0Storage(Storage):
return filter
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
def save(self, value: Any, metadata: dict[str, Any]) -> None:
user_id = self.config.get("user_id", "")
assistant_message = [{"role" : "assistant","content" : value}]
@@ -121,7 +121,7 @@ class Mem0Storage(Storage):
self.memory.add(assistant_message, **params)
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> List[Any]:
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> list[Any]:
params = {
"query": query,
"limit": limit,

View File

@@ -3,7 +3,7 @@ import os
import shutil
import uuid
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from chromadb.api import ClientAPI
from crewai.rag.storage.base_rag_storage import BaseRAGStorage
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
@@ -85,7 +85,7 @@ class RAGStorage(BaseRAGStorage):
return f"{base_path}/{file_name}"
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
def save(self, value: Any, metadata: dict[str, Any]) -> None:
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
try:
@@ -99,7 +99,7 @@ class RAGStorage(BaseRAGStorage):
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Any]:
) -> list[Any]:
if not hasattr(self, "app"):
self._initialize_app()
@@ -125,7 +125,7 @@ class RAGStorage(BaseRAGStorage):
logging.error(f"Error during {self.type} search: {str(e)}")
return []
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
def _generate_embedding(self, text: str, metadata: dict[str, Any]) -> None: # type: ignore
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()

View File

@@ -1,7 +1,7 @@
import inspect
import logging
from pathlib import Path
from typing import Any, Callable, Dict, TypeVar, cast, List
from typing import Any, Callable, TypeVar, cast
from crewai.tools import BaseTool
import yaml
@@ -86,7 +86,7 @@ def CrewBase(cls: T) -> T:
import types
return types.MethodType(_close_mcp_server, self)
def get_mcp_tools(self, *tool_names: list[str]) -> List[BaseTool]:
def get_mcp_tools(self, *tool_names: list[str]) -> list[BaseTool]:
if not self.mcp_server_params:
return []
@@ -154,8 +154,8 @@ def CrewBase(cls: T) -> T:
}
def _filter_functions(
self, functions: Dict[str, Callable], attribute: str
) -> Dict[str, Callable]:
self, functions: dict[str, Callable], attribute: str
) -> dict[str, Callable]:
return {
name: func
for name, func in functions.items()
@@ -184,11 +184,11 @@ def CrewBase(cls: T) -> T:
def _map_agent_variables(
self,
agent_name: str,
agent_info: Dict[str, Any],
llms: Dict[str, Callable],
tool_functions: Dict[str, Callable],
cache_handler_functions: Dict[str, Callable],
callbacks: Dict[str, Callable],
agent_info: dict[str, Any],
llms: dict[str, Callable],
tool_functions: dict[str, Callable],
cache_handler_functions: dict[str, Callable],
callbacks: dict[str, Callable],
) -> None:
if llm := agent_info.get("llm"):
try:
@@ -245,13 +245,13 @@ def CrewBase(cls: T) -> T:
def _map_task_variables(
self,
task_name: str,
task_info: Dict[str, Any],
agents: Dict[str, Callable],
tasks: Dict[str, Callable],
output_json_functions: Dict[str, Callable],
tool_functions: Dict[str, Callable],
callback_functions: Dict[str, Callable],
output_pydantic_functions: Dict[str, Callable],
task_info: dict[str, Any],
agents: dict[str, Callable],
tasks: dict[str, Callable],
output_json_functions: dict[str, Callable],
tool_functions: dict[str, Callable],
callback_functions: dict[str, Callable],
output_pydantic_functions: dict[str, Callable],
) -> None:
if context_list := task_info.get("context"):
self.tasks_config[task_name]["context"] = [

View File

@@ -1,5 +1,5 @@
import os
from typing import Any, Dict, Optional, cast
from typing import Any, Optional, cast
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
@@ -23,7 +23,7 @@ class EmbeddingConfigurator:
def configure_embedder(
self,
embedder_config: Optional[Dict[str, Any]] = None,
embedder_config: Optional[dict[str, Any]] = None,
) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config."""
if embedder_config is None:

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any, Optional
class BaseRAGStorage(ABC):
@@ -13,7 +13,7 @@ class BaseRAGStorage(ABC):
self,
type: str,
allow_reset: bool = True,
embedder_config: Optional[Dict[str, Any]] = None,
embedder_config: Optional[dict[str, Any]] = None,
crew: Any = None,
):
self.type = type
@@ -35,7 +35,7 @@ class BaseRAGStorage(ABC):
pass
@abstractmethod
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
def save(self, value: Any, metadata: dict[str, Any]) -> None:
"""Save a value with metadata to the storage."""
pass
@@ -46,7 +46,7 @@ class BaseRAGStorage(ABC):
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Any]:
) -> list[Any]:
"""Search for entries in the storage."""
pass
@@ -57,7 +57,7 @@ class BaseRAGStorage(ABC):
@abstractmethod
def _generate_embedding(
self, text: str, metadata: Optional[Dict[str, Any]] = None
self, text: str, metadata: Optional[dict[str, Any]] = None
) -> Any:
"""Generate an embedding for the given text and metadata."""
pass
@@ -67,7 +67,7 @@ class BaseRAGStorage(ABC):
"""Initialize the vector db."""
pass
def setup_config(self, config: Dict[str, Any]):
def setup_config(self, config: dict[str, Any]):
"""Setup the config of the storage."""
pass

View File

@@ -7,7 +7,7 @@ for CrewAI agents. These identifiers are used for tracking, auditing, and securi
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field, field_validator
@@ -23,12 +23,12 @@ class Fingerprint(BaseModel):
Attributes:
uuid_str (str): String representation of the UUID for this fingerprint, auto-generated
created_at (datetime): When this fingerprint was created, auto-generated
metadata (Dict[str, Any]): Additional metadata associated with this fingerprint
metadata (dict[str, Any]): Additional metadata associated with this fingerprint
"""
uuid_str: str = Field(default_factory=lambda: str(uuid.uuid4()), description="String representation of the UUID")
created_at: datetime = Field(default_factory=datetime.now, description="When this fingerprint was created")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata for this fingerprint")
metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata for this fingerprint")
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -100,14 +100,14 @@ class Fingerprint(BaseModel):
return str(uuid.uuid5(CREW_AI_NAMESPACE, seed))
@classmethod
def generate(cls, seed: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> 'Fingerprint':
def generate(cls, seed: Optional[str] = None, metadata: Optional[dict[str, Any]] = None) -> 'Fingerprint':
"""
Static factory method to create a new Fingerprint.
Args:
seed (Optional[str]): A string to use as seed for the UUID generation.
If None, a random UUID is generated.
metadata (Optional[Dict[str, Any]]): Additional metadata to store with the fingerprint.
metadata (Optional[dict[str, Any]]): Additional metadata to store with the fingerprint.
Returns:
Fingerprint: A new Fingerprint instance
@@ -132,12 +132,12 @@ class Fingerprint(BaseModel):
"""Hash of the fingerprint (based on UUID)."""
return hash(self.uuid_str)
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
"""
Convert the fingerprint to a dictionary representation.
Returns:
Dict[str, Any]: Dictionary representation of the fingerprint
dict[str, Any]: Dictionary representation of the fingerprint
"""
return {
"uuid_str": self.uuid_str,
@@ -146,12 +146,12 @@ class Fingerprint(BaseModel):
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Fingerprint':
def from_dict(cls, data: dict[str, Any]) -> 'Fingerprint':
"""
Create a Fingerprint from a dictionary representation.
Args:
data (Dict[str, Any]): Dictionary representation of a fingerprint
data (dict[str, Any]): Dictionary representation of a fingerprint
Returns:
Fingerprint: A new Fingerprint instance

View File

@@ -10,7 +10,7 @@ The SecurityConfig class is the primary interface for managing security settings
in CrewAI applications.
"""
from typing import Any, Dict, Optional
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field, model_validator
@@ -84,12 +84,12 @@ class SecurityConfig(BaseModel):
values['fingerprint'] = Fingerprint.generate(seed=values['fingerprint'])
return values
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
"""
Convert the security config to a dictionary.
Returns:
Dict[str, Any]: Dictionary representation of the security config
dict[str, Any]: Dictionary representation of the security config
"""
result = {
"fingerprint": self.fingerprint.to_dict()
@@ -97,12 +97,12 @@ class SecurityConfig(BaseModel):
return result
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SecurityConfig':
def from_dict(cls, data: dict[str, Any]) -> 'SecurityConfig':
"""
Create a SecurityConfig from a dictionary.
Args:
data (Dict[str, Any]): Dictionary representation of a security config
data (dict[str, Any]): Dictionary representation of a security config
Returns:
SecurityConfig: A new SecurityConfig instance

View File

@@ -13,12 +13,7 @@ from typing import (
Any,
Callable,
ClassVar,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
get_args,
get_origin,
@@ -91,7 +86,7 @@ class Task(BaseModel):
expected_output: str = Field(
description="Clear definition of expected output for the task."
)
config: Optional[Dict[str, Any]] = Field(
config: Optional[dict[str, Any]] = Field(
description="Configuration for the agent",
default=None,
)
@@ -101,7 +96,7 @@ class Task(BaseModel):
agent: Optional[BaseAgent] = Field(
description="Agent responsible for execution the task.", default=None
)
context: Union[List["Task"], None, _NotSpecified] = Field(
context: Union[list["Task"], None, _NotSpecified] = Field(
description="Other tasks that will have their output used as context for this task.",
default=NOT_SPECIFIED,
)
@@ -109,11 +104,11 @@ class Task(BaseModel):
description="Whether the task should be executed asynchronously or not.",
default=False,
)
output_json: Optional[Type[BaseModel]] = Field(
output_json: Optional[type[BaseModel]] = Field(
description="A Pydantic model to be used to create a JSON output.",
default=None,
)
output_pydantic: Optional[Type[BaseModel]] = Field(
output_pydantic: Optional[type[BaseModel]] = Field(
description="A Pydantic model to be used to create a Pydantic output.",
default=None,
)
@@ -128,7 +123,7 @@ class Task(BaseModel):
output: Optional[TaskOutput] = Field(
description="Task output, it's final result after being executed", default=None
)
tools: Optional[List[BaseTool]] = Field(
tools: Optional[list[BaseTool]] = Field(
default_factory=list,
description="Tools the agent is limited to use for this task.",
)
@@ -149,12 +144,12 @@ class Task(BaseModel):
description="Whether the task should instruct the agent to return the final answer formatted in Markdown",
default=False,
)
converter_cls: Optional[Type[Converter]] = Field(
converter_cls: Optional[type[Converter]] = Field(
description="A converter class used to export structured output",
default=None,
)
processed_by_agents: Set[str] = Field(default_factory=set)
guardrail: Optional[Union[Callable[[TaskOutput], Tuple[bool, Any]], str]] = Field(
processed_by_agents: set[str] = Field(default_factory=set)
guardrail: Optional[Union[Callable[[TaskOutput], tuple[bool, Any]], str]] = Field(
default=None,
description="Function or string description of a guardrail to validate task output before proceeding to next task",
)
@@ -189,7 +184,7 @@ class Task(BaseModel):
While type hints provide static checking, this validator ensures runtime safety by:
1. Verifying the function accepts exactly one parameter (the TaskOutput)
2. Checking return type annotations match Tuple[bool, Any] if present
2. Checking return type annotations match tuple[bool, Any] if present
3. Providing clear, immediate error messages for debugging
This runtime validation is crucial because:
@@ -205,7 +200,7 @@ class Task(BaseModel):
Raises:
ValueError: If the function signature is invalid or return annotation
doesn't match Tuple[bool, Any]
doesn't match tuple[bool, Any]
"""
if v is not None and callable(v):
sig = inspect.signature(v)
@@ -229,11 +224,11 @@ class Task(BaseModel):
return_annotation_args[1] is Any
or return_annotation_args[1] is str
or return_annotation_args[1] is TaskOutput
or return_annotation_args[1] == Union[str, TaskOutput]
or return_annotation_args[1] == (str | TaskOutput)
)
):
raise ValueError(
"If return type is annotated, it must be Tuple[bool, Any]"
"If return type is annotated, it must be tuple[bool, Any]"
)
return v
@@ -375,7 +370,7 @@ class Task(BaseModel):
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> TaskOutput:
"""Execute the task synchronously."""
return self._execute_core(agent, context, tools)
@@ -398,7 +393,7 @@ class Task(BaseModel):
self,
agent: BaseAgent | None = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
@@ -413,7 +408,7 @@ class Task(BaseModel):
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
tools: Optional[list[Any]],
future: Future[TaskOutput],
) -> None:
"""Execute the task asynchronously with context handling."""
@@ -424,7 +419,7 @@ class Task(BaseModel):
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
tools: Optional[list[Any]],
) -> TaskOutput:
"""Run the core execution logic of the task."""
try:
@@ -604,7 +599,7 @@ Follow these guidelines:
return "\n".join(tasks_slices)
def interpolate_inputs_and_add_conversation_history(
self, inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]]
self, inputs: dict[str, Union[str, int, float, dict[str, Any], list[Any]]]
) -> None:
"""Interpolate inputs into the task description, expected output, and output file path.
Add conversation history if present.
@@ -688,7 +683,7 @@ Follow these guidelines:
self.delegations += 1
def copy(
self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"]
self, agents: list["BaseAgent"], task_mapping: dict[str, "Task"]
) -> "Task":
"""Creates a deep copy of the Task while preserving its original class type.
@@ -732,9 +727,9 @@ Follow these guidelines:
def _export_output(
self, result: str
) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]:
) -> tuple[Optional[BaseModel], Optional[dict[str, Any]]]:
pydantic_output: Optional[BaseModel] = None
json_output: Optional[Dict[str, Any]] = None
json_output: Optional[dict[str, Any]] = None
if self.output_pydantic or self.output_json:
model_output = convert_to_model(
@@ -764,7 +759,7 @@ Follow these guidelines:
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _save_file(self, result: Union[Dict, str, Any]) -> None:
def _save_file(self, result: Union[dict, str, Any]) -> None:
"""Save task output to a file.
Note:

View File

@@ -6,7 +6,7 @@ Classes:
HallucinationGuardrail: Placeholder guardrail that validates task outputs.
"""
from typing import Any, Optional, Tuple
from typing import Any, Optional
from crewai.llm import LLM
from crewai.tasks.task_output import TaskOutput
@@ -75,7 +75,7 @@ class HallucinationGuardrail:
"""Generate a description of this guardrail for event logging."""
return "HallucinationGuardrail (no-op)"
def __call__(self, task_output: TaskOutput) -> Tuple[bool, Any]:
def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]:
"""Validate a task output against hallucination criteria.
In the open source, this method always returns that the output is valid.

View File

@@ -1,4 +1,4 @@
from typing import Any, Tuple
from typing import Any
from pydantic import BaseModel, Field
@@ -65,14 +65,14 @@ class LLMGuardrail:
return result
def __call__(self, task_output: TaskOutput) -> Tuple[bool, Any]:
def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]:
"""Validates the output of a task based on specified criteria.
Args:
task_output (TaskOutput): The output to be validated.
Returns:
Tuple[bool, Any]: A tuple containing:
tuple[bool, Any]: A tuple containing:
- bool: True if validation passed, False otherwise
- Any: The validation result or error message
"""

View File

@@ -1,5 +1,5 @@
import json
from typing import Any, Dict, Optional
from typing import Any, Optional
from pydantic import BaseModel, Field, model_validator
@@ -19,7 +19,7 @@ class TaskOutput(BaseModel):
pydantic: Optional[BaseModel] = Field(
description="Pydantic output of task", default=None
)
json_dict: Optional[Dict[str, Any]] = Field(
json_dict: Optional[dict[str, Any]] = Field(
description="JSON dictionary of task", default=None
)
agent: str = Field(description="Agent that executed the task")
@@ -47,7 +47,7 @@ class TaskOutput(BaseModel):
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json_dict:

View File

@@ -1,4 +1,4 @@
from typing import Dict, Optional, Union
from typing import Optional, Union
from pydantic import BaseModel, Field

View File

@@ -1,7 +1,7 @@
import asyncio
from abc import ABC, abstractmethod
from inspect import signature
from typing import Any, Callable, Type, get_args, get_origin, Optional, List
from typing import Any, Callable, get_args, get_origin, Optional
from pydantic import (
BaseModel,
@@ -32,9 +32,9 @@ class BaseTool(BaseModel, ABC):
"""The unique name of the tool that clearly communicates its purpose."""
description: str
"""Used to tell the model how/when/why to use the tool."""
env_vars: List[EnvVar] = []
env_vars: list[EnvVar] = []
"""List of environment variables used by the tool."""
args_schema: Type[PydanticBaseModel] = Field(
args_schema: type[PydanticBaseModel] = Field(
default_factory=_ArgsSchemaPlaceholder, validate_default=True
)
"""The schema for the arguments that the tool accepts."""
@@ -52,8 +52,8 @@ class BaseTool(BaseModel, ABC):
@field_validator("args_schema", mode="before")
@classmethod
def _default_args_schema(
cls, v: Type[PydanticBaseModel]
) -> Type[PydanticBaseModel]:
cls, v: type[PydanticBaseModel]
) -> type[PydanticBaseModel]:
if not isinstance(v, cls._ArgsSchemaPlaceholder):
return v

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Any, Optional
from pydantic import BaseModel, Field
from pydantic import BaseModel as PydanticBaseModel
@@ -7,7 +7,7 @@ from pydantic import Field as PydanticField
class ToolCalling(BaseModel):
tool_name: str = Field(..., description="The name of the tool to be called.")
arguments: Optional[Dict[str, Any]] = Field(
arguments: Optional[dict[str, Any]] = Field(
..., description="A dictionary of arguments to be passed to the tool."
)
@@ -16,6 +16,6 @@ class InstructorToolCalling(PydanticBaseModel):
tool_name: str = PydanticField(
..., description="The name of the tool to be called."
)
arguments: Optional[Dict[str, Any]] = PydanticField(
arguments: Optional[dict[str, Any]] = PydanticField(
..., description="A dictionary of arguments to be passed to the tool."
)

View File

@@ -5,7 +5,7 @@ import time
from difflib import SequenceMatcher
from json import JSONDecodeError
from textwrap import dedent
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Optional, Union
import json5
from json_repair import repair_json
@@ -69,12 +69,12 @@ class ToolUsage:
def __init__(
self,
tools_handler: Optional[ToolsHandler],
tools: List[CrewStructuredTool],
tools: list[CrewStructuredTool],
task: Optional[Task],
function_calling_llm: Any,
agent: Optional[Union["BaseAgent", "LiteAgent"]] = None,
action: Any = None,
fingerprint_context: Optional[Dict[str, str]] = None,
fingerprint_context: Optional[dict[str, str]] = None,
) -> None:
self._i18n: I18N = agent.i18n if agent else I18N()
self._printer: Printer = Printer()
@@ -393,7 +393,7 @@ class ToolUsage:
return tool
if self.task:
self.task.increment_tools_errors()
tool_selection_data: Dict[str, Any] = {
tool_selection_data: dict[str, Any] = {
"agent_key": getattr(self.agent, "key", None) if self.agent else None,
"agent_role": getattr(self.agent, "role", None) if self.agent else None,
"tool_name": tool_name,
@@ -510,7 +510,7 @@ class ToolUsage:
)
return self._tool_calling(tool_string)
def _validate_tool_input(self, tool_input: Optional[str]) -> Dict[str, Any]:
def _validate_tool_input(self, tool_input: Optional[str]) -> dict[str, Any]:
if tool_input is None:
return {}

View File

@@ -1,4 +1,4 @@
from typing import List
from typing import TYPE_CHECKING
from pydantic import BaseModel, Field
@@ -35,6 +35,6 @@ class ChatInputs(BaseModel):
crew_description: str = Field(
..., description="A description of the crew's purpose"
)
inputs: List[ChatInputField] = Field(
inputs: list[ChatInputField] = Field(
default_factory=list, description="A list of input fields for the crew"
)

View File

@@ -1,4 +1,4 @@
from typing import List, Dict, TypedDict
from typing import TypedDict
class HITLResumeInfo(TypedDict, total=False):
@@ -9,7 +9,7 @@ class HITLResumeInfo(TypedDict, total=False):
task_key: str
task_output: str
human_feedback: str
previous_messages: List[Dict[str, str]]
previous_messages: list[dict[str, str]]
class CrewInputsWithHITL(TypedDict, total=False):

View File

@@ -1,6 +1,6 @@
import json
import re
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
from typing import Any, Callable, Optional, Sequence, Union
from crewai.agents.constants import FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE
from crewai.agents.parser import (
@@ -26,7 +26,7 @@ from crewai.cli.config import Settings
console = Console()
def parse_tools(tools: List[BaseTool]) -> List[CrewStructuredTool]:
def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]:
"""Parse tools to be used for the task."""
tools_list = []
@@ -69,9 +69,9 @@ def handle_max_iterations_exceeded(
formatted_answer: Union[AgentAction, AgentFinish, None],
printer: Printer,
i18n: I18N,
messages: List[Dict[str, str]],
messages: list[dict[str, str]],
llm: Union[LLM, BaseLLM],
callbacks: List[Any],
callbacks: list[Any],
) -> Union[AgentAction, AgentFinish]:
"""
Handles the case when the maximum number of iterations is exceeded.
@@ -115,7 +115,7 @@ def handle_max_iterations_exceeded(
return formatted_answer
def format_message_for_llm(prompt: str, role: str = "user") -> Dict[str, str]:
def format_message_for_llm(prompt: str, role: str = "user") -> dict[str, str]:
prompt = prompt.rstrip()
return {"role": role, "content": prompt}
@@ -143,8 +143,8 @@ def enforce_rpm_limit(
def get_llm_response(
llm: Union[LLM, BaseLLM],
messages: List[Dict[str, str]],
callbacks: List[Any],
messages: list[dict[str, str]],
callbacks: list[Any],
printer: Printer,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
@@ -187,7 +187,7 @@ def process_llm_response(
def handle_agent_action_core(
formatted_answer: AgentAction,
tool_result: ToolResult,
messages: Optional[List[Dict[str, str]]] = None,
messages: Optional[list[dict[str, str]]] = None,
step_callback: Optional[Callable] = None,
show_logs: Optional[Callable] = None,
) -> Union[AgentAction, AgentFinish]:
@@ -246,7 +246,7 @@ def handle_unknown_error(printer: Any, exception: Exception) -> None:
def handle_output_parser_exception(
e: OutputParserException,
messages: List[Dict[str, str]],
messages: list[dict[str, str]],
iterations: int,
log_error_after: int = 3,
printer: Optional[Any] = None,
@@ -298,9 +298,9 @@ def is_context_length_exceeded(exception: Exception) -> bool:
def handle_context_length(
respect_context_window: bool,
printer: Any,
messages: List[Dict[str, str]],
messages: list[dict[str, str]],
llm: Any,
callbacks: List[Any],
callbacks: list[Any],
i18n: Any,
) -> None:
"""Handle context length exceeded by either summarizing or raising an error.
@@ -330,9 +330,9 @@ def handle_context_length(
def summarize_messages(
messages: List[Dict[str, str]],
messages: list[dict[str, str]],
llm: Any,
callbacks: List[Any],
callbacks: list[Any],
i18n: Any,
) -> None:
"""Summarize messages to fit within context window.
@@ -458,8 +458,8 @@ def _print_current_organization():
)
def load_agent_from_repository(from_repository: str) -> Dict[str, Any]:
attributes: Dict[str, Any] = {}
def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
attributes: dict[str, Any] = {}
if from_repository:
import importlib

View File

@@ -1,20 +1,20 @@
from typing import Any, Dict, Type
from typing import Any
from pydantic import BaseModel
def process_config(
values: Dict[str, Any], model_class: Type[BaseModel]
) -> Dict[str, Any]:
values: dict[str, Any], model_class: type[BaseModel]
) -> dict[str, Any]:
"""
Process the config dictionary and update the values accordingly.
Args:
values (Dict[str, Any]): The dictionary of values to update.
model_class (Type[BaseModel]): The Pydantic model class to reference for field validation.
values (dict[str, Any]): The dictionary of values to update.
model_class (type[BaseModel]): The Pydantic model class to reference for field validation.
Returns:
Dict[str, Any]: The updated values dictionary.
dict[str, Any]: The updated values dictionary.
"""
config = values.get("config", {})
if not config:

View File

@@ -1,6 +1,6 @@
import json
import re
from typing import Any, Optional, Type, Union, get_args, get_origin
from typing import Any, Optional, Union, get_args, get_origin
from pydantic import BaseModel, ValidationError
@@ -116,10 +116,10 @@ class Converter(OutputConverter):
def convert_to_model(
result: str,
output_pydantic: Optional[Type[BaseModel]],
output_json: Optional[Type[BaseModel]],
output_pydantic: Optional[type[BaseModel]],
output_json: Optional[type[BaseModel]],
agent: Any,
converter_cls: Optional[Type[Converter]] = None,
converter_cls: Optional[type[Converter]] = None,
) -> Union[dict, BaseModel, str]:
model = output_pydantic or output_json
if model is None:
@@ -146,7 +146,7 @@ def convert_to_model(
def validate_model(
result: str, model: Type[BaseModel], is_json_output: bool
result: str, model: type[BaseModel], is_json_output: bool
) -> Union[dict, BaseModel]:
exported_result = model.model_validate_json(result)
if is_json_output:
@@ -156,10 +156,10 @@ def validate_model(
def handle_partial_json(
result: str,
model: Type[BaseModel],
model: type[BaseModel],
is_json_output: bool,
agent: Any,
converter_cls: Optional[Type[Converter]] = None,
converter_cls: Optional[type[Converter]] = None,
) -> Union[dict, BaseModel, str]:
match = re.search(r"({.*})", result, re.DOTALL)
if match:
@@ -185,10 +185,10 @@ def handle_partial_json(
def convert_with_instructions(
result: str,
model: Type[BaseModel],
model: type[BaseModel],
is_json_output: bool,
agent: Any,
converter_cls: Optional[Type[Converter]] = None,
converter_cls: Optional[type[Converter]] = None,
) -> Union[dict, BaseModel, str]:
llm = agent.function_calling_llm or agent.llm
instructions = get_conversion_instructions(model, llm)
@@ -214,7 +214,7 @@ def convert_with_instructions(
return exported_result
def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str:
def get_conversion_instructions(model: type[BaseModel], llm: Any) -> str:
instructions = "Please convert the following text into valid JSON."
if llm and not isinstance(llm, str) and llm.supports_function_calling():
model_schema = PydanticSchemaParser(model=model).get_schema()
@@ -233,7 +233,7 @@ def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str:
def create_converter(
agent: Optional[Any] = None,
converter_cls: Optional[Type[Converter]] = None,
converter_cls: Optional[type[Converter]] = None,
*args,
**kwargs,
) -> Converter:
@@ -253,7 +253,7 @@ def create_converter(
return converter
def generate_model_description(model: Type[BaseModel]) -> str:
def generate_model_description(model: type[BaseModel]) -> str:
"""
Generate a string description of a Pydantic model's fields and their types.
@@ -275,11 +275,11 @@ def generate_model_description(model: Type[BaseModel]) -> str:
else:
return f"Optional[Union[{', '.join(describe_field(arg) for arg in non_none_args)}]]"
elif origin is list:
return f"List[{describe_field(args[0])}]"
return f"list[{describe_field(args[0])}]"
elif origin is dict:
key_type = describe_field(args[0])
value_type = describe_field(args[1])
return f"Dict[{key_type}, {value_type}]"
return f"dict[{key_type}, {value_type}]"
elif isinstance(field_type, type) and issubclass(field_type, BaseModel):
return generate_model_description(field_type)
elif hasattr(field_type, "__name__"):

View File

@@ -1,4 +1,4 @@
from typing import List
from typing import TYPE_CHECKING
from pydantic import BaseModel, Field
@@ -13,23 +13,23 @@ class Entity(BaseModel):
name: str = Field(description="The name of the entity.")
type: str = Field(description="The type of the entity.")
description: str = Field(description="Description of the entity.")
relationships: List[str] = Field(description="Relationships of the entity.")
relationships: list[str] = Field(description="Relationships of the entity.")
class TaskEvaluation(BaseModel):
suggestions: List[str] = Field(
suggestions: list[str] = Field(
description="Suggestions to improve future similar tasks."
)
quality: float = Field(
description="A score from 0 to 10 evaluating on completion, quality, and overall performance, all taking into account the task description, expected output, and the result of the task."
)
entities: List[Entity] = Field(
entities: list[Entity] = Field(
description="Entities extracted from the task output."
)
class TrainingTaskEvaluation(BaseModel):
suggestions: List[str] = Field(
suggestions: list[str] = Field(
description="List of clear, actionable instructions derived from the Human Feedbacks to enhance the Agent's performance. Analyze the differences between Initial Outputs and Improved Outputs to generate specific action items for future tasks. Ensure all key and specific points from the human feedback are incorporated into these instructions."
)
quality: float = Field(

View File

@@ -3,7 +3,7 @@
import warnings
from abc import ABC
from collections.abc import Callable
from typing import Any, Type, TypeVar
from typing import Any, TypeVar
from typing_extensions import deprecated
import crewai.events as new_events
@@ -32,7 +32,7 @@ class crewai_event_bus: # noqa: N801
@classmethod
def on(
cls, event_type: Type[EventT]
cls, event_type: type[EventT]
) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]:
"""Delegate to the actual event bus instance."""
return new_events.crewai_event_bus.on(event_type)
@@ -44,7 +44,7 @@ class crewai_event_bus: # noqa: N801
@classmethod
def register_handler(
cls, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None]
cls, event_type: type[EventTypes], handler: Callable[[Any, EventTypes], None]
) -> None:
"""Delegate to the actual event bus instance."""
return new_events.crewai_event_bus.register_handler(event_type, handler)

View File

@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, List, Union
from typing import TYPE_CHECKING, Union
from crewai.utilities.constants import _NotSpecified
if TYPE_CHECKING:
@@ -6,7 +6,7 @@ if TYPE_CHECKING:
from crewai.tasks.task_output import TaskOutput
def aggregate_raw_outputs_from_task_outputs(task_outputs: List["TaskOutput"]) -> str:
def aggregate_raw_outputs_from_task_outputs(task_outputs: list["TaskOutput"]) -> str:
"""Generate string context from the task outputs."""
dividers = "\n\n----------\n\n"
@@ -15,7 +15,7 @@ def aggregate_raw_outputs_from_task_outputs(task_outputs: List["TaskOutput"]) ->
return context
def aggregate_raw_outputs_from_tasks(tasks: Union[List["Task"],_NotSpecified]) -> str:
def aggregate_raw_outputs_from_tasks(tasks: Union[list["Task"],_NotSpecified]) -> str:
"""Generate string context from the tasks."""
task_outputs = (

View File

@@ -1,4 +1,4 @@
from typing import Any, Callable, Optional, Tuple, Union
from typing import Any, Callable, Optional, Union
from pydantic import BaseModel, field_validator
@@ -36,7 +36,7 @@ class GuardrailResult(BaseModel):
return v
@classmethod
def from_tuple(cls, result: Tuple[bool, Union[Any, str]]) -> "GuardrailResult":
def from_tuple(cls, result: tuple[bool, Union[Any, str]]) -> "GuardrailResult":
"""Create a GuardrailResult from a validation tuple.
Args:

Some files were not shown because too many files have changed in this diff Show More