Merge branch 'main' into Branch_2260

This commit is contained in:
Lucas Gomide
2025-03-26 14:24:03 -03:00
committed by GitHub
57 changed files with 4261 additions and 753 deletions

View File

@@ -5,6 +5,7 @@ from crewai.crew import Crew
from crewai.flow.flow import Flow
from crewai.knowledge.knowledge import Knowledge
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.process import Process
from crewai.task import Task
@@ -21,6 +22,7 @@ __all__ = [
"Process",
"Task",
"LLM",
"BaseLLM",
"Flow",
"Knowledge",
]

View File

@@ -11,7 +11,7 @@ from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
from crewai.llm import LLM
from crewai.llm import BaseLLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.security import Fingerprint
from crewai.task import Task
@@ -71,10 +71,10 @@ class Agent(BaseAgent):
default=True,
description="Use system prompt for the agent.",
)
llm: Union[str, InstanceOf[LLM], Any] = Field(
llm: Union[str, InstanceOf[BaseLLM], Any] = Field(
description="Language model that will run the agent.", default=None
)
function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
function_calling_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
description="Language model that will run the agent.", default=None
)
system_template: Optional[str] = Field(
@@ -118,7 +118,9 @@ class Agent(BaseAgent):
self.agent_ops_agent_name = self.role
self.llm = create_llm(self.llm)
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
if self.function_calling_llm and not isinstance(
self.function_calling_llm, BaseLLM
):
self.function_calling_llm = create_llm(self.function_calling_llm)
if not self.agent_executor:
@@ -140,15 +142,13 @@ class Agent(BaseAgent):
self.embedder = crew_embedder
if self.knowledge_sources:
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"
if isinstance(self.knowledge_sources, list) and all(
isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources
):
self.knowledge = Knowledge(
sources=self.knowledge_sources,
embedder=self.embedder,
collection_name=knowledge_agent_name,
collection_name=self.role,
storage=self.knowledge_storage or None,
)
except (TypeError, ValueError) as e:

View File

@@ -25,6 +25,7 @@ from crewai.tools.base_tool import BaseTool, Tool
from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter
from crewai.utilities.string_utils import interpolate_only
T = TypeVar("T", bound="BaseAgent")
@@ -333,9 +334,15 @@ class BaseAgent(ABC, BaseModel):
self._original_backstory = self.backstory
if inputs:
self.role = self._original_role.format(**inputs)
self.goal = self._original_goal.format(**inputs)
self.backstory = self._original_backstory.format(**inputs)
self.role = interpolate_only(
input_string=self._original_role, inputs=inputs
)
self.goal = interpolate_only(
input_string=self._original_goal, inputs=inputs
)
self.backstory = interpolate_only(
input_string=self._original_backstory, inputs=inputs
)
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.

View File

@@ -13,7 +13,7 @@ from crewai.agents.parser import (
OutputParserException,
)
from crewai.agents.tools_handler import ToolsHandler
from crewai.llm import LLM
from crewai.llm import BaseLLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N, Printer
@@ -61,7 +61,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
callbacks: List[Any] = [],
):
self._i18n: I18N = I18N()
self.llm: LLM = llm
self.llm: BaseLLM = llm
self.task = task
self.agent = agent
self.crew = crew
@@ -87,8 +87,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.tool_name_to_tool_map: Dict[str, BaseTool] = {
tool.name: tool for tool in self.tools
}
self.stop = stop_words
self.llm.stop = list(set(self.llm.stop + self.stop))
existing_stop = self.llm.stop or []
self.llm.stop = list(
set(
existing_stop + self.stop
if isinstance(existing_stop, list)
else self.stop
)
)
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
if "system" in self.prompt:

View File

@@ -124,9 +124,9 @@ class CrewAgentParser:
)
def _extract_thought(self, text: str) -> str:
thought_index = text.find("\n\nAction")
thought_index = text.find("\nAction")
if thought_index == -1:
thought_index = text.find("\n\nFinal Answer")
thought_index = text.find("\nFinal Answer")
if thought_index == -1:
return ""
thought = text[:thought_index].strip()
@@ -136,7 +136,7 @@ class CrewAgentParser:
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""
return re.sub(r"^\s*\*+\s*|\s*\*+\s*$", "", text).strip()
return text.strip().strip("*").strip()
def _safe_repair_json(self, tool_input: str) -> str:
UNABLE_TO_REPAIR_JSON_RESULTS = ['""', "{}"]

View File

@@ -14,7 +14,7 @@ from packaging import version
from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
from crewai.crew import Crew
from crewai.llm import LLM
from crewai.llm import LLM, BaseLLM
from crewai.types.crew_chat import ChatInputField, ChatInputs
from crewai.utilities.llm_utils import create_llm
@@ -116,7 +116,7 @@ def show_loading(event: threading.Event):
print()
def initialize_chat_llm(crew: Crew) -> Optional[LLM]:
def initialize_chat_llm(crew: Crew) -> Optional[LLM | BaseLLM]:
"""Initializes the chat LLM and handles exceptions."""
try:
return create_llm(crew.chat_llm)

View File

@@ -1,4 +1,5 @@
import subprocess
from functools import lru_cache
class Repository:
@@ -35,6 +36,7 @@ class Repository:
encoding="utf-8",
).strip()
@lru_cache(maxsize=None)
def is_git_repo(self) -> bool:
"""Check if the current directory is a git repository."""
try:

View File

@@ -6,7 +6,7 @@ import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, cast
from pydantic import (
UUID4,
@@ -26,7 +26,7 @@ from crewai.agents.cache import CacheHandler
from crewai.crews.crew_output import CrewOutput
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM
from crewai.llm import LLM, BaseLLM
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
@@ -37,7 +37,7 @@ from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.tools.base_tool import BaseTool, Tool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -153,7 +153,7 @@ class Crew(BaseModel):
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
manager_llm: Optional[Any] = Field(
manager_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
description="Language model that will run the agent.", default=None
)
manager_agent: Optional[BaseAgent] = Field(
@@ -187,7 +187,7 @@ class Crew(BaseModel):
default=None,
description="Maximum number of requests per minute for the crew execution to be respected.",
)
prompt_file: str = Field(
prompt_file: Optional[str] = Field(
default=None,
description="Path to the prompt json file to be used for the crew.",
)
@@ -199,7 +199,7 @@ class Crew(BaseModel):
default=False,
description="Plan the crew execution and add the plan to the crew.",
)
planning_llm: Optional[Any] = Field(
planning_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
default=None,
description="Language model that will run the AgentPlanner if planning is True.",
)
@@ -215,7 +215,7 @@ class Crew(BaseModel):
default=None,
description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.",
)
chat_llm: Optional[Any] = Field(
chat_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
default=None,
description="LLM used to handle chatting with the crew.",
)
@@ -489,7 +489,7 @@ class Crew(BaseModel):
task.key for task in self.tasks
]
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@property
def fingerprint(self) -> Fingerprint:
"""
@@ -819,7 +819,12 @@ class Crew(BaseModel):
# Determine which tools to use - task tools take precedence over agent tools
tools_for_task = task.tools or agent_to_use.tools or []
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
# Prepare tools and ensure they're compatible with task execution
tools_for_task = self._prepare_tools(
agent_to_use,
task,
cast(Union[List[Tool], List[BaseTool]], tools_for_task),
)
self._log_task_start(task, agent_to_use.role)
@@ -838,7 +843,7 @@ class Crew(BaseModel):
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=tools_for_task,
tools=cast(List[BaseTool], tools_for_task),
)
futures.append((task, future, task_index))
else:
@@ -850,7 +855,7 @@ class Crew(BaseModel):
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=tools_for_task,
tools=cast(List[BaseTool], tools_for_task),
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
@@ -888,10 +893,12 @@ class Crew(BaseModel):
return None
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: List[Tool]
) -> List[Tool]:
self, agent: BaseAgent, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
# Add delegation tools if agent allows delegation
if agent.allow_delegation:
if hasattr(agent, "allow_delegation") and getattr(
agent, "allow_delegation", False
):
if self.process == Process.hierarchical:
if self.manager_agent:
tools = self._update_manager_tools(task, tools)
@@ -900,17 +907,24 @@ class Crew(BaseModel):
"Manager agent is required for hierarchical process."
)
elif agent and agent.allow_delegation:
elif agent:
tools = self._add_delegation_tools(task, tools)
# Add code execution tools if agent allows code execution
if agent.allow_code_execution:
if hasattr(agent, "allow_code_execution") and getattr(
agent, "allow_code_execution", False
):
tools = self._add_code_execution_tools(agent, tools)
if agent and agent.multimodal:
if (
agent
and hasattr(agent, "multimodal")
and getattr(agent, "multimodal", False)
):
tools = self._add_multimodal_tools(agent, tools)
return tools
# Return a List[BaseTool] which is compatible with both Task.execute_sync and Task.execute_async
return cast(List[BaseTool], tools)
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
@@ -918,11 +932,13 @@ class Crew(BaseModel):
return task.agent
def _merge_tools(
self, existing_tools: List[Tool], new_tools: List[Tool]
) -> List[Tool]:
self,
existing_tools: Union[List[Tool], List[BaseTool]],
new_tools: Union[List[Tool], List[BaseTool]],
) -> List[BaseTool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
if not new_tools:
return existing_tools
return cast(List[BaseTool], existing_tools)
# Create mapping of tool names to new tools
new_tool_map = {tool.name: tool for tool in new_tools}
@@ -933,23 +949,41 @@ class Crew(BaseModel):
# Add all new tools
tools.extend(new_tools)
return tools
return cast(List[BaseTool], tools)
def _inject_delegation_tools(
self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]
):
delegation_tools = task_agent.get_delegation_tools(agents)
return self._merge_tools(tools, delegation_tools)
self,
tools: Union[List[Tool], List[BaseTool]],
task_agent: BaseAgent,
agents: List[BaseAgent],
) -> List[BaseTool]:
if hasattr(task_agent, "get_delegation_tools"):
delegation_tools = task_agent.get_delegation_tools(agents)
# Cast delegation_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], delegation_tools))
return cast(List[BaseTool], tools)
def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]):
multimodal_tools = agent.get_multimodal_tools()
return self._merge_tools(tools, multimodal_tools)
def _add_multimodal_tools(
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
if hasattr(agent, "get_multimodal_tools"):
multimodal_tools = agent.get_multimodal_tools()
# Cast multimodal_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], multimodal_tools))
return cast(List[BaseTool], tools)
def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]):
code_tools = agent.get_code_execution_tools()
return self._merge_tools(tools, code_tools)
def _add_code_execution_tools(
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
if hasattr(agent, "get_code_execution_tools"):
code_tools = agent.get_code_execution_tools()
# Cast code_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], code_tools))
return cast(List[BaseTool], tools)
def _add_delegation_tools(self, task: Task, tools: List[Tool]):
def _add_delegation_tools(
self, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
@@ -957,7 +991,7 @@ class Crew(BaseModel):
tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation
)
return tools
return cast(List[BaseTool], tools)
def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file:
@@ -965,7 +999,9 @@ class Crew(BaseModel):
task_name=task.name, task=task.description, agent=role, status="started"
)
def _update_manager_tools(self, task: Task, tools: List[Tool]):
def _update_manager_tools(
self, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
if self.manager_agent:
if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
@@ -973,7 +1009,7 @@ class Crew(BaseModel):
tools = self._inject_delegation_tools(
tools, self.manager_agent, self.agents
)
return tools
return cast(List[BaseTool], tools)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
context = (
@@ -1229,13 +1265,14 @@ class Crew(BaseModel):
def test(
self,
n_iterations: int,
eval_llm: Union[str, InstanceOf[LLM]],
eval_llm: Union[str, InstanceOf[BaseLLM]],
inputs: Optional[Dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
try:
eval_llm = create_llm(eval_llm)
if not eval_llm:
# Create LLM instance and ensure it's of type LLM for CrewEvaluator
llm_instance = create_llm(eval_llm)
if not llm_instance:
raise ValueError("Failed to create LLM instance.")
crewai_event_bus.emit(
@@ -1243,12 +1280,12 @@ class Crew(BaseModel):
CrewTestStartedEvent(
crew_name=self.name or "crew",
n_iterations=n_iterations,
eval_llm=eval_llm,
eval_llm=llm_instance,
inputs=inputs,
),
)
test_crew = self.copy()
evaluator = CrewEvaluator(test_crew, eval_llm) # type: ignore[arg-type]
evaluator = CrewEvaluator(test_crew, llm_instance)
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)

View File

@@ -1,4 +1,5 @@
import json
import uuid
from datetime import date, datetime
from typing import Any, Dict, List, Union
@@ -32,7 +33,7 @@ def export_state(flow: Flow) -> dict[str, Serializable]:
def to_serializable(
obj: Any, max_depth: int = 5, _current_depth: int = 0
obj: Any, exclude: set[str] | None = None, max_depth: int = 5, _current_depth: int = 0
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.
@@ -42,6 +43,7 @@ def to_serializable(
Args:
obj (Any): Object to transform.
exclude (set[str], optional): Set of keys to exclude from the result.
max_depth (int, optional): Maximum recursion depth. Defaults to 5.
Returns:
@@ -50,21 +52,39 @@ def to_serializable(
if _current_depth >= max_depth:
return repr(obj)
if exclude is None:
exclude = set()
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
elif isinstance(obj, uuid.UUID):
return str(obj)
elif isinstance(obj, (date, datetime)):
return obj.isoformat()
elif isinstance(obj, (list, tuple, set)):
return [to_serializable(item, max_depth, _current_depth + 1) for item in obj]
return [
to_serializable(
item, max_depth=max_depth, _current_depth=_current_depth + 1
)
for item in obj
]
elif isinstance(obj, dict):
return {
_to_serializable_key(key): to_serializable(
value, max_depth, _current_depth + 1
obj=value,
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth + 1,
)
for key, value in obj.items()
if key not in exclude
}
elif isinstance(obj, BaseModel):
return to_serializable(obj.model_dump(), max_depth, _current_depth + 1)
return to_serializable(
obj=obj.model_dump(exclude=exclude),
max_depth=max_depth,
_current_depth=_current_depth + 1,
)
else:
return repr(obj)

View File

@@ -14,6 +14,7 @@ from chromadb.config import Settings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
from crewai.utilities.paths import db_storage_path
@@ -99,7 +100,8 @@ class KnowledgeStorage(BaseKnowledgeStorage):
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=collection_name, embedding_function=self.embedder
name=sanitize_collection_name(collection_name),
embedding_function=self.embedder,
)
else:
raise Exception("Vector Database Client not initialized")

View File

@@ -40,6 +40,7 @@ with warnings.catch_warnings():
from litellm.utils import supports_response_schema
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
@@ -114,6 +115,60 @@ LLM_CONTEXT_WINDOW_SIZES = {
"Llama-3.2-11B-Vision-Instruct": 16384,
"Meta-Llama-3.2-3B-Instruct": 4096,
"Meta-Llama-3.2-1B-Instruct": 16384,
# bedrock
"us.amazon.nova-pro-v1:0": 300000,
"us.amazon.nova-micro-v1:0": 128000,
"us.amazon.nova-lite-v1:0": 300000,
"us.anthropic.claude-3-5-sonnet-20240620-v1:0": 200000,
"us.anthropic.claude-3-5-haiku-20241022-v1:0": 200000,
"us.anthropic.claude-3-5-sonnet-20241022-v2:0": 200000,
"us.anthropic.claude-3-7-sonnet-20250219-v1:0": 200000,
"us.anthropic.claude-3-sonnet-20240229-v1:0": 200000,
"us.anthropic.claude-3-opus-20240229-v1:0": 200000,
"us.anthropic.claude-3-haiku-20240307-v1:0": 200000,
"us.meta.llama3-2-11b-instruct-v1:0": 128000,
"us.meta.llama3-2-3b-instruct-v1:0": 131000,
"us.meta.llama3-2-90b-instruct-v1:0": 128000,
"us.meta.llama3-2-1b-instruct-v1:0": 131000,
"us.meta.llama3-1-8b-instruct-v1:0": 128000,
"us.meta.llama3-1-70b-instruct-v1:0": 128000,
"us.meta.llama3-3-70b-instruct-v1:0": 128000,
"us.meta.llama3-1-405b-instruct-v1:0": 128000,
"eu.anthropic.claude-3-5-sonnet-20240620-v1:0": 200000,
"eu.anthropic.claude-3-sonnet-20240229-v1:0": 200000,
"eu.anthropic.claude-3-haiku-20240307-v1:0": 200000,
"eu.meta.llama3-2-3b-instruct-v1:0": 131000,
"eu.meta.llama3-2-1b-instruct-v1:0": 131000,
"apac.anthropic.claude-3-5-sonnet-20240620-v1:0": 200000,
"apac.anthropic.claude-3-5-sonnet-20241022-v2:0": 200000,
"apac.anthropic.claude-3-sonnet-20240229-v1:0": 200000,
"apac.anthropic.claude-3-haiku-20240307-v1:0": 200000,
"amazon.nova-pro-v1:0": 300000,
"amazon.nova-micro-v1:0": 128000,
"amazon.nova-lite-v1:0": 300000,
"anthropic.claude-3-5-sonnet-20240620-v1:0": 200000,
"anthropic.claude-3-5-haiku-20241022-v1:0": 200000,
"anthropic.claude-3-5-sonnet-20241022-v2:0": 200000,
"anthropic.claude-3-7-sonnet-20250219-v1:0": 200000,
"anthropic.claude-3-sonnet-20240229-v1:0": 200000,
"anthropic.claude-3-opus-20240229-v1:0": 200000,
"anthropic.claude-3-haiku-20240307-v1:0": 200000,
"anthropic.claude-v2:1": 200000,
"anthropic.claude-v2": 100000,
"anthropic.claude-instant-v1": 100000,
"meta.llama3-1-405b-instruct-v1:0": 128000,
"meta.llama3-1-70b-instruct-v1:0": 128000,
"meta.llama3-1-8b-instruct-v1:0": 128000,
"meta.llama3-70b-instruct-v1:0": 8000,
"meta.llama3-8b-instruct-v1:0": 8000,
"amazon.titan-text-lite-v1": 4000,
"amazon.titan-text-express-v1": 8000,
"cohere.command-text-v14": 4000,
"ai21.j2-mid-v1": 8191,
"ai21.j2-ultra-v1": 8191,
"ai21.jamba-instruct-v1:0": 256000,
"mistral.mistral-7b-instruct-v0:2": 32000,
"mistral.mixtral-8x7b-instruct-v0:1": 32000,
# mistral
"mistral-tiny": 32768,
"mistral-small-latest": 32768,
@@ -164,7 +219,7 @@ class StreamingChoices(TypedDict):
finish_reason: Optional[str]
class LLM:
class LLM(BaseLLM):
def __init__(
self,
model: str,

View File

@@ -0,0 +1,91 @@
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Optional, Union
class BaseLLM(ABC):
"""Abstract base class for LLM implementations.
This class defines the interface that all LLM implementations must follow.
Users can extend this class to create custom LLM implementations that don't
rely on litellm's authentication mechanism.
Custom LLM implementations should handle error cases gracefully, including
timeouts, authentication failures, and malformed responses. They should also
implement proper validation for input parameters and provide clear error
messages when things go wrong.
Attributes:
stop (list): A list of stop sequences that the LLM should use to stop generation.
This is used by the CrewAgentExecutor and other components.
"""
model: str
temperature: Optional[float] = None
stop: Optional[List[str]] = None
def __init__(
self,
model: str,
temperature: Optional[float] = None,
):
"""Initialize the BaseLLM with default attributes.
This constructor sets default values for attributes that are expected
by the CrewAgentExecutor and other components.
All custom LLM implementations should call super().__init__() to ensure
that these default attributes are properly initialized.
"""
self.model = model
self.temperature = temperature
self.stop = []
@abstractmethod
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
"""Call the LLM with the given messages.
Args:
messages: Input messages for the LLM.
Can be a string or list of message dictionaries.
If string, it will be converted to a single user message.
If list, each dict must have 'role' and 'content' keys.
tools: Optional list of tool schemas for function calling.
Each tool should define its name, description, and parameters.
callbacks: Optional list of callback functions to be executed
during and after the LLM call.
available_functions: Optional dict mapping function names to callables
that can be invoked by the LLM.
Returns:
Either a text response from the LLM (str) or
the result of a tool function call (Any).
Raises:
ValueError: If the messages format is invalid.
TimeoutError: If the LLM request times out.
RuntimeError: If the LLM request fails for other reasons.
"""
pass
def supports_stop_words(self) -> bool:
"""Check if the LLM supports stop words.
Returns:
bool: True if the LLM supports stop words, False otherwise.
"""
return True # Default implementation assumes support for stop words
def get_context_window_size(self) -> int:
"""Get the context window size for the LLM.
Returns:
int: The number of tokens/characters the model can handle.
"""
# Default implementation - subclasses should override with model-specific values
return 4096

38
src/crewai/llms/third_party/ai_suite.py vendored Normal file
View File

@@ -0,0 +1,38 @@
from typing import Any, Dict, List, Optional, Union
import aisuite as ai
from crewai.llms.base_llm import BaseLLM
class AISuiteLLM(BaseLLM):
def __init__(self, model: str, temperature: Optional[float] = None, **kwargs):
super().__init__(model, temperature, **kwargs)
self.client = ai.Client()
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> Union[str, Any]:
completion_params = self._prepare_completion_params(messages, tools)
response = self.client.chat.completions.create(**completion_params)
return response.choices[0].message.content
def _prepare_completion_params(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
) -> Dict[str, Any]:
return {
"model": self.model,
"messages": messages,
"temperature": self.temperature,
"tools": tools,
}
def supports_function_calling(self) -> bool:
return False

View File

@@ -1,7 +1,7 @@
import os
from typing import Any, Dict, List
from mem0 import MemoryClient
from mem0 import Memory, MemoryClient
from crewai.memory.storage.interface import Storage
@@ -32,13 +32,16 @@ class Mem0Storage(Storage):
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
# Initialize MemoryClient with available parameters
if mem0_org_id and mem0_project_id:
self.memory = MemoryClient(
api_key=mem0_api_key, org_id=mem0_org_id, project_id=mem0_project_id
)
# Initialize MemoryClient or Memory based on the presence of the mem0_api_key
if mem0_api_key:
if mem0_org_id and mem0_project_id:
self.memory = MemoryClient(
api_key=mem0_api_key, org_id=mem0_org_id, project_id=mem0_project_id
)
else:
self.memory = MemoryClient(api_key=mem0_api_key)
else:
self.memory = MemoryClient(api_key=mem0_api_key)
self.memory = Memory() # Fallback to Memory if no Mem0 API key is provided
def _sanitize_role(self, role: str) -> str:
"""

View File

@@ -2,6 +2,7 @@ import datetime
import inspect
import json
import logging
import re
import threading
import uuid
from concurrent.futures import Future
@@ -49,6 +50,7 @@ from crewai.utilities.events import (
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import interpolate_only
class Task(BaseModel):
@@ -507,7 +509,9 @@ class Task(BaseModel):
return
try:
self.description = self._original_description.format(**inputs)
self.description = interpolate_only(
input_string=self._original_description, inputs=inputs
)
except KeyError as e:
raise ValueError(
f"Missing required template variable '{e.args[0]}' in description"
@@ -516,7 +520,7 @@ class Task(BaseModel):
raise ValueError(f"Error interpolating description: {str(e)}") from e
try:
self.expected_output = self.interpolate_only(
self.expected_output = interpolate_only(
input_string=self._original_expected_output, inputs=inputs
)
except (KeyError, ValueError) as e:
@@ -524,7 +528,7 @@ class Task(BaseModel):
if self.output_file is not None:
try:
self.output_file = self.interpolate_only(
self.output_file = interpolate_only(
input_string=self._original_output_file, inputs=inputs
)
except (KeyError, ValueError) as e:
@@ -555,72 +559,6 @@ class Task(BaseModel):
f"\n\n{conversation_instruction}\n\n{conversation_history}"
)
def interpolate_only(
self,
input_string: Optional[str],
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]],
) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
Args:
input_string: The string containing template variables to interpolate.
Can be None or empty, in which case an empty string is returned.
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, floats, and dicts/lists
containing only these types and other nested dicts/lists.
Returns:
The interpolated string with all template variables replaced with their values.
Empty string if input_string is None or empty.
Raises:
ValueError: If a value contains unsupported types
"""
# Validation function for recursive type checking
def validate_type(value: Any) -> None:
if value is None:
return
if isinstance(value, (str, int, float, bool)):
return
if isinstance(value, (dict, list)):
for item in value.values() if isinstance(value, dict) else value:
validate_type(item)
return
raise ValueError(
f"Unsupported type {type(value).__name__} in inputs. "
"Only str, int, float, bool, dict, and list are allowed."
)
# Validate all input values
for key, value in inputs.items():
try:
validate_type(value)
except ValueError as e:
raise ValueError(f"Invalid value for key '{key}': {str(e)}") from e
if input_string is None or not input_string:
return ""
if "{" not in input_string and "}" not in input_string:
return input_string
if not inputs:
raise ValueError(
"Inputs dictionary cannot be empty when interpolating variables"
)
try:
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
for key in inputs.keys():
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
return escaped_string.format(**inputs)
except KeyError as e:
raise KeyError(
f"Template variable '{e.args[0]}' not found in inputs dictionary"
) from e
except ValueError as e:
raise ValueError(f"Error during string interpolation: {str(e)}") from e
def increment_tools_errors(self) -> None:
"""Increment the tools errors counter."""
self.tools_errors += 1
@@ -634,7 +572,15 @@ class Task(BaseModel):
def copy(
self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"]
) -> "Task":
"""Create a deep copy of the Task."""
"""Creates a deep copy of the Task while preserving its original class type.
Args:
agents: List of agents available for the task.
task_mapping: Dictionary mapping task IDs to Task instances.
Returns:
A copy of the task with the same class type as the original.
"""
exclude = {
"id",
"agent",
@@ -657,7 +603,7 @@ class Task(BaseModel):
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
copied_task = Task(
copied_task = self.__class__(
**copied_data,
context=cloned_context,
agent=cloned_agent,

View File

@@ -281,8 +281,16 @@ class Telemetry:
return self._safe_telemetry_operation(operation)
def task_ended(self, span: Span, task: Task, crew: Crew):
"""Records task execution in a crew."""
"""Records the completion of a task execution in a crew.
Args:
span (Span): The OpenTelemetry span tracking the task execution
task (Task): The task that was completed
crew (Crew): The crew context in which the task was executed
Note:
If share_crew is enabled, this will also record the task output
"""
def operation():
if crew.share_crew:
self._add_attribute(
@@ -297,8 +305,13 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int):
"""Records the repeated usage 'error' of a tool by an agent."""
"""Records when a tool is used repeatedly, which might indicate an issue.
Args:
llm (Any): The language model being used
tool_name (str): Name of the tool being repeatedly used
attempts (int): Number of attempts made with this tool
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Repeated Usage")
@@ -317,8 +330,13 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_usage(self, llm: Any, tool_name: str, attempts: int):
"""Records the usage of a tool by an agent."""
"""Records the usage of a tool by an agent.
Args:
llm (Any): The language model being used
tool_name (str): Name of the tool being used
attempts (int): Number of attempts made with this tool
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage")
@@ -337,8 +355,11 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_usage_error(self, llm: Any):
"""Records the usage of a tool by an agent."""
"""Records when a tool usage results in an error.
Args:
llm (Any): The language model being used when the error occurred
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage Error")
@@ -357,6 +378,14 @@ class Telemetry:
def individual_test_result_span(
self, crew: Crew, quality: float, exec_time: int, model_name: str
):
"""Records individual test results for a crew execution.
Args:
crew (Crew): The crew being tested
quality (float): Quality score of the execution
exec_time (int): Execution time in seconds
model_name (str): Name of the model used
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Individual Test Result")
@@ -383,6 +412,14 @@ class Telemetry:
inputs: dict[str, Any] | None,
model_name: str,
):
"""Records the execution of a test suite for a crew.
Args:
crew (Crew): The crew being tested
iterations (int): Number of test iterations
inputs (dict[str, Any] | None): Input parameters for the test
model_name (str): Name of the model used in testing
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Test Execution")
@@ -408,6 +445,7 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def deploy_signup_error_span(self):
"""Records when an error occurs during the deployment signup process."""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Deploy Signup Error")
@@ -417,6 +455,11 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def start_deployment_span(self, uuid: Optional[str] = None):
"""Records the start of a deployment process.
Args:
uuid (Optional[str]): Unique identifier for the deployment
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Start Deployment")
@@ -428,6 +471,7 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def create_crew_deployment_span(self):
"""Records the creation of a new crew deployment."""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Create Crew Deployment")
@@ -437,6 +481,12 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def get_crew_logs_span(self, uuid: Optional[str], log_type: str = "deployment"):
"""Records the retrieval of crew logs.
Args:
uuid (Optional[str]): Unique identifier for the crew
log_type (str, optional): Type of logs being retrieved. Defaults to "deployment".
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Get Crew Logs")
@@ -449,6 +499,11 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def remove_crew_span(self, uuid: Optional[str] = None):
"""Records the removal of a crew.
Args:
uuid (Optional[str]): Unique identifier for the crew being removed
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Remove Crew")
@@ -574,6 +629,11 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def flow_creation_span(self, flow_name: str):
"""Records the creation of a new flow.
Args:
flow_name (str): Name of the flow being created
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
@@ -584,6 +644,12 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def flow_plotting_span(self, flow_name: str, node_names: list[str]):
"""Records flow visualization/plotting activity.
Args:
flow_name (str): Name of the flow being plotted
node_names (list[str]): List of node names in the flow
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Plotting")
@@ -595,6 +661,12 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def flow_execution_span(self, flow_name: str, node_names: list[str]):
"""Records the execution of a flow.
Args:
flow_name (str): Name of the flow being executed
node_names (list[str]): List of nodes being executed in the flow
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Execution")

View File

@@ -455,7 +455,7 @@ class ToolUsage:
# Attempt 4: Repair JSON
try:
repaired_input = repair_json(tool_input)
repaired_input = repair_json(tool_input, skip_json_loads=True)
self._printer.print(
content=f"Repaired JSON: {repaired_input}", color="blue"
)

View File

@@ -0,0 +1,62 @@
import re
from typing import Optional
MIN_COLLECTION_LENGTH = 3
MAX_COLLECTION_LENGTH = 63
DEFAULT_COLLECTION = "default_collection"
# Compiled regex patterns for better performance
INVALID_CHARS_PATTERN = re.compile(r"[^a-zA-Z0-9_-]")
IPV4_PATTERN = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$")
def is_ipv4_pattern(name: str) -> bool:
"""
Check if a string matches an IPv4 address pattern.
Args:
name: The string to check
Returns:
True if the string matches an IPv4 pattern, False otherwise
"""
return bool(IPV4_PATTERN.match(name))
def sanitize_collection_name(name: Optional[str]) -> str:
"""
Sanitize a collection name to meet ChromaDB requirements:
1. 3-63 characters long
2. Starts and ends with alphanumeric character
3. Contains only alphanumeric characters, underscores, or hyphens
4. No consecutive periods
5. Not a valid IPv4 address
Args:
name: The original collection name to sanitize
Returns:
A sanitized collection name that meets ChromaDB requirements
"""
if not name:
return DEFAULT_COLLECTION
if is_ipv4_pattern(name):
name = f"ip_{name}"
sanitized = INVALID_CHARS_PATTERN.sub("_", name)
if not sanitized[0].isalnum():
sanitized = "a" + sanitized
if not sanitized[-1].isalnum():
sanitized = sanitized[:-1] + "z"
if len(sanitized) < MIN_COLLECTION_LENGTH:
sanitized = sanitized + "x" * (MIN_COLLECTION_LENGTH - len(sanitized))
if len(sanitized) > MAX_COLLECTION_LENGTH:
sanitized = sanitized[:MAX_COLLECTION_LENGTH]
if not sanitized[-1].isalnum():
sanitized = sanitized[:-1] + "z"
return sanitized

View File

@@ -6,7 +6,7 @@ from rich.console import Console
from rich.table import Table
from crewai.agent import Agent
from crewai.llm import LLM
from crewai.llm import BaseLLM
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
@@ -24,7 +24,7 @@ class CrewEvaluator:
Attributes:
crew (Crew): The crew of agents to evaluate.
eval_llm (LLM): Language model instance to use for evaluations
eval_llm (BaseLLM): Language model instance to use for evaluations
tasks_scores (defaultdict): A dictionary to store the scores of the agents for each task.
iteration (int): The current iteration of the evaluation.
"""
@@ -33,7 +33,7 @@ class CrewEvaluator:
run_execution_times: defaultdict = defaultdict(list)
iteration: int = 0
def __init__(self, crew, eval_llm: InstanceOf[LLM]):
def __init__(self, crew, eval_llm: InstanceOf[BaseLLM]):
self.crew = crew
self.llm = eval_llm
self._telemetry = Telemetry()

View File

@@ -507,9 +507,10 @@ class ConsoleFormatter:
# Remove the thinking status node when complete
if "Thinking" in str(tool_branch.label):
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
if tool_branch in agent_branch.children:
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
def handle_llm_call_failed(
self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree]
@@ -587,6 +588,7 @@ class ConsoleFormatter:
for child in flow_tree.children:
if "Running tests" in str(child.label):
child.label = Text("✅ Tests completed successfully", style="green")
break
self.print(flow_tree)
self.print()

View File

@@ -1,10 +1,12 @@
from typing import List
import re
from typing import TYPE_CHECKING, List
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
if TYPE_CHECKING:
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> str:
def aggregate_raw_outputs_from_task_outputs(task_outputs: List["TaskOutput"]) -> str:
"""Generate string context from the task outputs."""
dividers = "\n\n----------\n\n"
@@ -13,7 +15,7 @@ def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> s
return context
def aggregate_raw_outputs_from_tasks(tasks: List[Task]) -> str:
def aggregate_raw_outputs_from_tasks(tasks: List["Task"]) -> str:
"""Generate string context from the tasks."""
task_outputs = [task.output for task in tasks if task.output is not None]

View File

@@ -2,28 +2,28 @@ import os
from typing import Any, Dict, List, Optional, Union
from crewai.cli.constants import DEFAULT_LLM_MODEL, ENV_VARS, LITELLM_PARAMS
from crewai.llm import LLM
from crewai.llm import LLM, BaseLLM
def create_llm(
llm_value: Union[str, LLM, Any, None] = None,
) -> Optional[LLM]:
) -> Optional[LLM | BaseLLM]:
"""
Creates or returns an LLM instance based on the given llm_value.
Args:
llm_value (str | LLM | Any | None):
llm_value (str | BaseLLM | Any | None):
- str: The model name (e.g., "gpt-4").
- LLM: Already instantiated LLM, returned as-is.
- BaseLLM: Already instantiated BaseLLM (including LLM), returned as-is.
- Any: Attempt to extract known attributes like model_name, temperature, etc.
- None: Use environment-based or fallback default model.
Returns:
An LLM instance if successful, or None if something fails.
A BaseLLM instance if successful, or None if something fails.
"""
# 1) If llm_value is already an LLM object, return it directly
if isinstance(llm_value, LLM):
# 1) If llm_value is already a BaseLLM or LLM object, return it directly
if isinstance(llm_value, LLM) or isinstance(llm_value, BaseLLM):
return llm_value
# 2) If llm_value is a string (model name)

View File

@@ -96,6 +96,10 @@ class CrewPlanner:
tasks_summary = []
for idx, task in enumerate(self.tasks):
knowledge_list = self._get_agent_knowledge(task)
agent_tools = (
f"[{', '.join(str(tool) for tool in task.agent.tools)}]" if task.agent and task.agent.tools else '"agent has no tools"',
f',\n "agent_knowledge": "[\\"{knowledge_list[0]}\\"]"' if knowledge_list and str(knowledge_list) != "None" else ""
)
task_summary = f"""
Task Number {idx + 1} - {task.description}
"task_description": {task.description}
@@ -103,10 +107,7 @@ class CrewPlanner:
"agent": {task.agent.role if task.agent else "None"}
"agent_goal": {task.agent.goal if task.agent else "None"}
"task_tools": {task.tools}
"agent_tools": %s%s""" % (
f"[{', '.join(str(tool) for tool in task.agent.tools)}]" if task.agent and task.agent.tools else '"agent has no tools"',
f',\n "agent_knowledge": "[\\"{knowledge_list[0]}\\"]"' if knowledge_list and str(knowledge_list) != "None" else ""
)
"agent_tools": {"".join(agent_tools)}"""
tasks_summary.append(task_summary)
return " ".join(tasks_summary)

View File

@@ -0,0 +1,82 @@
import re
from typing import Any, Dict, List, Optional, Union
def interpolate_only(
input_string: Optional[str],
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]],
) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
Only interpolates placeholders that follow the pattern {variable_name} where
variable_name starts with a letter/underscore and contains only letters, numbers, and underscores.
Args:
input_string: The string containing template variables to interpolate.
Can be None or empty, in which case an empty string is returned.
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, floats, and dicts/lists
containing only these types and other nested dicts/lists.
Returns:
The interpolated string with all template variables replaced with their values.
Empty string if input_string is None or empty.
Raises:
ValueError: If a value contains unsupported types or a template variable is missing
"""
# Validation function for recursive type checking
def validate_type(value: Any) -> None:
if value is None:
return
if isinstance(value, (str, int, float, bool)):
return
if isinstance(value, (dict, list)):
for item in value.values() if isinstance(value, dict) else value:
validate_type(item)
return
raise ValueError(
f"Unsupported type {type(value).__name__} in inputs. "
"Only str, int, float, bool, dict, and list are allowed."
)
# Validate all input values
for key, value in inputs.items():
try:
validate_type(value)
except ValueError as e:
raise ValueError(f"Invalid value for key '{key}': {str(e)}") from e
if input_string is None or not input_string:
return ""
if "{" not in input_string and "}" not in input_string:
return input_string
if not inputs:
raise ValueError(
"Inputs dictionary cannot be empty when interpolating variables"
)
# The regex pattern to find valid variable placeholders
# Matches {variable_name} where variable_name starts with a letter/underscore
# and contains only letters, numbers, and underscores
pattern = r"\{([A-Za-z_][A-Za-z0-9_]*)\}"
# Find all matching variables in the input string
variables = re.findall(pattern, input_string)
result = input_string
# Check if all variables exist in inputs
missing_vars = [var for var in variables if var not in inputs]
if missing_vars:
raise KeyError(
f"Template variable '{missing_vars[0]}' not found in inputs dictionary"
)
# Replace each variable with its value
for var in variables:
if var in inputs:
placeholder = "{" + var + "}"
value = str(inputs[var])
result = result.replace(placeholder, value)
return result