fix: make task required in CrewAgentExecutor and fix all type annotations

- Make task parameter required in CrewAgentExecutor.__init__
- Update Agent.create_agent_executor to require task parameter
- Handle cases where crew can be None (standalone agent usage)
- Update base class signatures to match
- Remove unnecessary create_agent_executor calls during setup
- Add missing type annotations in base_agent_executor_mixin
- Fix all type errors in base_agent.py using Self return type
- Add assert for agent_executor before use
- Fix crew access checks to handle None case
This commit is contained in:
Greyson LaLonde
2025-09-04 22:13:46 -04:00
parent 2faa13ddcb
commit 843801f554
9 changed files with 228 additions and 169 deletions

View File

@@ -8,7 +8,15 @@ from typing import (
Optional,
)
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
from pydantic import (
BeforeValidator,
Field,
InstanceOf,
PrivateAttr,
computed_field,
field_validator,
model_validator,
)
from typing_extensions import Self
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -51,7 +59,7 @@ from crewai.utilities.agent_utils import (
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.llm_utils import create_default_llm, create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -82,6 +90,8 @@ class Agent(BaseAgent):
"""
_times_executed: int = PrivateAttr(default=0)
_llm: BaseLLM = PrivateAttr()
_function_calling_llm: BaseLLM | None = PrivateAttr(default=None)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -96,10 +106,11 @@ class Agent(BaseAgent):
default=True,
description="Use system prompt for the agent.",
)
llm: str | InstanceOf[BaseLLM] | Any = Field(
description="Language model that will run the agent.", default=None
llm: str | InstanceOf[BaseLLM] | None = Field(
description="Language model that will run the agent.",
default_factory=create_default_llm,
)
function_calling_llm: Optional[str | InstanceOf[BaseLLM] | Any] = Field(
function_calling_llm: str | InstanceOf[BaseLLM] | None = Field(
description="Language model that will run the agent.", default=None
)
system_template: Optional[str] = Field(
@@ -181,15 +192,30 @@ class Agent(BaseAgent):
return load_agent_from_repository(from_repository) | v
return v
@field_validator("function_calling_llm", mode="after")
@classmethod
def validate_function_calling_llm(cls, v: Any) -> BaseLLM | None:
if not v or isinstance(v, BaseLLM):
return v
return create_llm(v)
@model_validator(mode="after")
def post_init_setup(self) -> Self:
self.agent_ops_agent_name = self.role
self.llm = create_llm(self.llm)
if self.function_calling_llm and not isinstance(
self.function_calling_llm, BaseLLM
):
self.function_calling_llm = create_llm(self.function_calling_llm)
# Validate and set the private LLM attributes
if isinstance(self.llm, BaseLLM):
self._llm = self.llm
elif self.llm is None:
self._llm = create_default_llm()
else:
self._llm = create_llm(self.llm)
if self.function_calling_llm:
if isinstance(self.function_calling_llm, BaseLLM):
self._function_calling_llm = self.function_calling_llm
else:
self._function_calling_llm = create_llm(self.function_calling_llm)
if not self.agent_executor:
self._setup_agent_executor()
@@ -413,7 +439,7 @@ class Agent(BaseAgent):
)
tools = tools or self.tools or []
self.create_agent_executor(tools=tools, task=task)
self.create_agent_executor(task=task, tools=tools)
if self.crew and self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt)
@@ -540,6 +566,9 @@ class Agent(BaseAgent):
Returns:
The output of the agent.
"""
assert self.agent_executor is not None, (
"Agent executor must be created before execution"
)
return self.agent_executor.invoke(
{
"input": task_prompt,
@@ -550,12 +579,13 @@ class Agent(BaseAgent):
)["output"]
def create_agent_executor(
self, tools: Optional[list[BaseTool]] = None, task: Optional[Task] = None
self, task: Task, tools: Optional[list[BaseTool]] = None
) -> None:
"""Create an agent executor for the agent.
Returns:
An instance of the CrewAgentExecutor class.
Args:
task: Task to execute.
tools: Optional list of tools to use.
"""
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
@@ -578,7 +608,7 @@ class Agent(BaseAgent):
)
self.agent_executor = CrewAgentExecutor(
llm=self.llm,
llm=self._llm,
task=task,
agent=self,
crew=self.crew,
@@ -591,12 +621,12 @@ class Agent(BaseAgent):
tools_names=get_tool_names(parsed_tools),
tools_description=render_text_description_and_args(parsed_tools),
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
function_calling_llm=self._function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=(
self._rpm_controller.check_or_wait if self._rpm_controller else None
),
callbacks=[TokenCalcHandler(self._token_process)],
litellm_callbacks=[TokenCalcHandler(self._token_process)],
)
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
@@ -751,22 +781,8 @@ class Agent(BaseAgent):
task_prompt=task_prompt
)
rewriter_prompt = self.i18n.slice("knowledge_search_query_system_prompt")
if not isinstance(self.llm, BaseLLM):
self._logger.log(
"warning",
f"Knowledge search query failed: LLM for agent '{self.role}' is not an instance of BaseLLM",
)
crewai_event_bus.emit(
self,
event=KnowledgeQueryFailedEvent(
agent=self,
error="LLM is not compatible with knowledge search queries",
),
)
return None
try:
rewritten_query = self.llm.call(
rewritten_query = self._llm.call(
[
{
"role": "system",
@@ -818,7 +834,7 @@ class Agent(BaseAgent):
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
llm=self._llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
@@ -856,7 +872,7 @@ class Agent(BaseAgent):
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
llm=self._llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from pydantic import Field, PrivateAttr
@@ -10,16 +10,16 @@ from crewai.agents.agent_adapters.langgraph.structured_output_converter import (
LangGraphConverterAdapter,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.utilities import Logger
from crewai.utilities.converter import Converter
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.utilities import Logger
from crewai.utilities.converter import Converter
try:
from langgraph.checkpoint.memory import MemorySaver
@@ -51,10 +51,10 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
role: str,
goal: str,
backstory: str,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
llm: Any = None,
max_iterations: int = 10,
agent_config: Optional[Dict[str, Any]] = None,
agent_config: Optional[dict[str, Any]] = None,
**kwargs,
):
"""Initialize the LangGraph agent adapter."""
@@ -81,7 +81,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
try:
self._memory = MemorySaver()
converted_tools: List[Any] = self._tool_adapter.tools()
converted_tools: list[Any] = self._tool_adapter.tools()
if self._agent_config:
self._graph = create_react_agent(
model=self.llm,
@@ -111,7 +111,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
"""Build a system prompt for the LangGraph agent."""
base_prompt = f"""
You are {self.role}.
Your goal is: {self.goal}
Your backstory: {self.backstory}
@@ -124,10 +124,10 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> str:
"""Execute a task using the LangGraph workflow."""
self.create_agent_executor(tools)
self.create_agent_executor(task, tools)
self.configure_structured_output(task)
@@ -197,11 +197,13 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
def create_agent_executor(
self, task=None, tools: Optional[list[BaseTool]] = None
) -> None:
"""Configure the LangGraph agent for execution."""
self.configure_tools(tools)
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[list[BaseTool]] = None) -> None:
"""Configure tools for the LangGraph agent."""
if tools:
all_tools = list(self.tools or []) + list(tools or [])
@@ -209,7 +211,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
available_tools = self._tool_adapter.tools()
self._graph.tools = available_tools
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
"""Implement delegation tools support for LangGraph."""
agent_tools = AgentTools(agents=agents)
return agent_tools.tools()

View File

@@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import Any, Optional
from pydantic import Field, PrivateAttr
@@ -7,15 +7,15 @@ from crewai.agents.agent_adapters.openai_agents.structured_output_converter impo
OpenAIConverterAdapter,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Logger
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Logger
try:
from agents import Agent as OpenAIAgent # type: ignore
@@ -44,7 +44,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
def __init__(
self,
model: str = "gpt-4o-mini",
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
agent_config: Optional[dict] = None,
**kwargs,
):
@@ -72,7 +72,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
"""Build a system prompt for the OpenAI agent."""
base_prompt = f"""
You are {self.role}.
Your goal is: {self.goal}
Your backstory: {self.backstory}
@@ -85,11 +85,11 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> str:
"""Execute a task using the OpenAI Assistant"""
self._converter_adapter.configure_structured_output(task)
self.create_agent_executor(tools)
self.create_agent_executor(task, tools)
if self.verbose:
enable_verbose_stdout_logging()
@@ -131,7 +131,9 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
def create_agent_executor(
self, task=None, tools: Optional[list[BaseTool]] = None
) -> None:
"""
Configure the OpenAI agent for execution.
While OpenAI handles execution differently through Runner,
@@ -152,7 +154,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
self.agent_executor = Runner
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[list[BaseTool]] = None) -> None:
"""Configure tools for the OpenAI Assistant"""
if tools:
self._tool_adapter.configure_tools(tools)
@@ -163,7 +165,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
"""Process OpenAI Assistant execution result converting any structured output to a string"""
return self._converter_adapter.post_process_result(result.final_output)
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
"""Implement delegation tools support"""
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()

View File

@@ -1,8 +1,9 @@
import uuid
from abc import ABC, abstractmethod
from collections.abc import Callable
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, TypeVar
from typing import Any, Optional, TypeVar
from pydantic import (
UUID4,
@@ -14,6 +15,7 @@ from pydantic import (
model_validator,
)
from pydantic_core import PydanticCustomError
from typing_extensions import Self
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.agents.cache.cache_handler import CacheHandler
@@ -61,7 +63,7 @@ class BaseAgent(ABC, BaseModel):
Methods:
execute_task(task: Any, context: Optional[str] = None, tools: Optional[List[BaseTool]] = None) -> str:
Abstract method to execute a task.
create_agent_executor(tools=None) -> None:
create_agent_executor(task, tools=None) -> None:
Abstract method to create an agent executor.
get_delegation_tools(agents: List["BaseAgent"]):
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
@@ -79,7 +81,7 @@ class BaseAgent(ABC, BaseModel):
Set private attributes.
"""
__hash__ = object.__hash__ # type: ignore
__hash__ = object.__hash__
_logger: Logger = PrivateAttr(default_factory=lambda: Logger(verbose=False))
_rpm_controller: Optional[RPMController] = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
@@ -91,7 +93,7 @@ class BaseAgent(ABC, BaseModel):
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
config: Optional[Dict[str, Any]] = Field(
config: Optional[dict[str, Any]] = Field(
description="Configuration for the agent", default=None, exclude=True
)
cache: bool = Field(
@@ -108,14 +110,14 @@ class BaseAgent(ABC, BaseModel):
default=False,
description="Enable agent to delegate and ask questions among each other.",
)
tools: Optional[List[BaseTool]] = Field(
tools: Optional[list[BaseTool]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: int = Field(
default=25, description="Maximum iterations for an agent to execute a task"
)
agent_executor: InstanceOf = Field(
default=None, description="An instance of the CrewAgentExecutor class."
agent_executor: Optional[Any] = Field(
default=None, description="An instance of the agent executor class."
)
llm: Any = Field(
default=None, description="Language model that will run the agent."
@@ -129,7 +131,7 @@ class BaseAgent(ABC, BaseModel):
default_factory=ToolsHandler,
description="An instance of the ToolsHandler class.",
)
tools_results: List[Dict[str, Any]] = Field(
tools_results: list[dict[str, Any]] = Field(
default=[], description="Results of the tools used by the agent."
)
max_tokens: Optional[int] = Field(
@@ -138,7 +140,7 @@ class BaseAgent(ABC, BaseModel):
knowledge: Optional[Knowledge] = Field(
default=None, description="Knowledge for the agent."
)
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
knowledge_sources: Optional[list[BaseKnowledgeSource]] = Field(
default=None,
description="Knowledge sources for the agent.",
)
@@ -150,7 +152,7 @@ class BaseAgent(ABC, BaseModel):
default_factory=SecurityConfig,
description="Security configuration for the agent, including fingerprinting.",
)
callbacks: List[Callable] = Field(
callbacks: list[Callable[..., Any]] = Field(
default=[], description="Callbacks to be used for the agent"
)
adapted_agent: bool = Field(
@@ -163,12 +165,12 @@ class BaseAgent(ABC, BaseModel):
@model_validator(mode="before")
@classmethod
def process_model_config(cls, values):
def process_model_config(cls, values: Any) -> Any:
return process_config(values, cls)
@field_validator("tools")
@classmethod
def validate_tools(cls, tools: List[Any]) -> List[BaseTool]:
def validate_tools(cls, tools: list[Any]) -> list[BaseTool]:
"""Validate and process the tools provided to the agent.
This method ensures that each tool is either an instance of BaseTool
@@ -196,7 +198,7 @@ class BaseAgent(ABC, BaseModel):
return processed_tools
@model_validator(mode="after")
def validate_and_set_attributes(self):
def validate_and_set_attributes(self) -> Self:
# Validate required fields
for field in ["role", "goal", "backstory"]:
if getattr(self, field) is None:
@@ -228,7 +230,7 @@ class BaseAgent(ABC, BaseModel):
)
@model_validator(mode="after")
def set_private_attrs(self):
def set_private_attrs(self) -> Self:
"""Set private attributes."""
self._logger = Logger(verbose=self.verbose)
if self.max_rpm and not self._rpm_controller:
@@ -240,7 +242,7 @@ class BaseAgent(ABC, BaseModel):
return self
@property
def key(self):
def key(self) -> str:
source = [
self._original_role or self.role,
self._original_goal or self.goal,
@@ -253,16 +255,18 @@ class BaseAgent(ABC, BaseModel):
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> str:
pass
@abstractmethod
def create_agent_executor(self, tools=None) -> None:
def create_agent_executor(
self, task: Any, tools: Optional[list[BaseTool]] = None
) -> None:
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
def get_delegation_tools(self, agents: list["BaseAgent"]) -> list[BaseTool]:
"""Set the task tools that init BaseAgenTools class."""
pass
@@ -320,7 +324,7 @@ class BaseAgent(ABC, BaseModel):
return copied_agent
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
def interpolate_inputs(self, inputs: dict[str, Any]) -> None:
"""Interpolate inputs into the agent description and backstory."""
if self._original_role is None:
self._original_role = self.role
@@ -350,7 +354,7 @@ class BaseAgent(ABC, BaseModel):
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
self.create_agent_executor()
# Executor will be created when a task is executed
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
@@ -360,7 +364,7 @@ class BaseAgent(ABC, BaseModel):
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()
# Executor will be created when a task is executed
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
def set_knowledge(self, crew_embedder: Optional[dict[str, Any]] = None) -> None:
pass

View File

@@ -1,31 +1,32 @@
import time
from typing import TYPE_CHECKING, Dict, List
from typing import TYPE_CHECKING
from crewai.events.event_listener import event_listener
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.utilities import I18N
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.printer import Printer
from crewai.events.event_listener import event_listener
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.parser import AgentFinish
from crewai.crew import Crew
from crewai.task import Task
class CrewAgentExecutorMixin:
crew: "Crew"
crew: "Crew | None"
agent: "BaseAgent"
task: "Task"
iterations: int
max_iter: int
messages: List[Dict[str, str]]
messages: list[dict[str, str]]
_i18n: I18N
_printer: Printer = Printer()
def _create_short_term_memory(self, output) -> None:
def _create_short_term_memory(self, output: "AgentFinish") -> None:
"""Create and save a short-term memory item if conditions are met."""
if (
self.crew
@@ -35,7 +36,8 @@ class CrewAgentExecutorMixin:
):
try:
if (
hasattr(self.crew, "_short_term_memory")
self.crew
and hasattr(self.crew, "_short_term_memory")
and self.crew._short_term_memory
):
self.crew._short_term_memory.save(
@@ -48,7 +50,7 @@ class CrewAgentExecutorMixin:
print(f"Failed to add to short term memory: {e}")
pass
def _create_external_memory(self, output) -> None:
def _create_external_memory(self, output: "AgentFinish") -> None:
"""Create and save a external-term memory item if conditions are met."""
if (
self.crew
@@ -69,7 +71,7 @@ class CrewAgentExecutorMixin:
print(f"Failed to add to external memory: {e}")
pass
def _create_long_term_memory(self, output) -> None:
def _create_long_term_memory(self, output: "AgentFinish") -> None:
"""Create and save long-term and entity memory items based on evaluation."""
if (
self.crew

View File

@@ -60,7 +60,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self,
llm: BaseLLM,
task: Task,
crew: Crew,
crew: Crew | None,
agent: BaseAgent,
prompt: dict[str, str],
max_iter: int,
@@ -74,14 +74,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
function_calling_llm: BaseLLM | None = None,
respect_context_window: bool = False,
request_within_rpm_limit: Callable[[], bool] | None = None,
callbacks: list[Callable[..., Any]] | None = None,
litellm_callbacks: list[Any] | None = None,
) -> None:
"""Initialize executor.
Args:
llm: Language model instance.
task: Task to execute.
crew: Crew instance.
crew: Optional Crew instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
@@ -95,19 +95,19 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
function_calling_llm: Optional function calling LLM.
respect_context_window: Respect context limits.
request_within_rpm_limit: RPM limit check function.
callbacks: Optional callbacks list.
litellm_callbacks: Optional litellm callbacks list.
"""
self._i18n: I18N = I18N()
self.llm = llm
self.task = task
self.agent = agent
self.crew = crew
self.crew: Crew | None = crew
self.prompt = prompt
self.tools = tools
self.tools_names = tools_names
self.stop = stop_words
self.max_iter = max_iter
self.callbacks = callbacks or []
self.litellm_callbacks = litellm_callbacks or []
self._printer: Printer = Printer()
self.tools_handler = tools_handler
self.original_tools = original_tools or []
@@ -195,7 +195,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
callbacks=self.litellm_callbacks,
)
enforce_rpm_limit(self.request_within_rpm_limit)
@@ -203,7 +203,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
callbacks=self.litellm_callbacks,
printer=self._printer,
from_task=self.task,
)
@@ -259,7 +259,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
callbacks=self.litellm_callbacks,
i18n=self._i18n,
)
continue
@@ -334,7 +334,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
AgentLogsStartedEvent(
agent_role=self.agent.role,
task_description=self.task.description,
verbose=self.agent.verbose or self.crew.verbose,
verbose=self.agent.verbose
or (self.crew.verbose if self.crew else False),
),
)
@@ -349,7 +350,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
AgentLogsExecutionEvent(
agent_role=self.agent.role,
formatted_answer=formatted_answer,
verbose=self.agent.verbose or self.crew.verbose,
verbose=self.agent.verbose
or (self.crew.verbose if self.crew else False),
),
)
@@ -440,7 +442,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
True if in training mode.
"""
return bool(self.crew._train)
return bool(self.crew and self.crew._train)
def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str

View File

@@ -650,7 +650,7 @@ class Crew(FlowTrackable, BaseModel):
if not agent.step_callback: # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.step_callback = self.step_callback # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.create_agent_executor()
# Agent executor will be created when tasks are executed
if self.planning:
self._handle_crew_planning()

View File

@@ -1,78 +1,105 @@
import os
from typing import Any, Dict, List, Optional, Union
from typing import Any, Protocol, TypedDict, runtime_checkable
from typing_extensions import Required
from crewai.cli.constants import DEFAULT_LLM_MODEL, ENV_VARS, LITELLM_PARAMS
from crewai.llm import LLM, BaseLLM
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
class LLMParams(TypedDict, total=False):
"""TypedDict defining LLM parameters we extract from LLMLike objects."""
model: Required[str]
temperature: float
max_tokens: int
logprobs: int
timeout: float
api_key: str
base_url: str
api_base: str
@runtime_checkable
class LLMLike(Protocol):
"""Protocol for objects that can be converted to an LLM instance."""
model: str | None
model_name: str | None
deployment_name: str | None
temperature: float | None
max_tokens: int | None
logprobs: int | None
timeout: float | None
api_key: str | None
base_url: str | None
api_base: str | None
def create_default_llm() -> LLM:
"""Creates a default LLM instance using environment variables or fallback defaults.
Returns:
A default LLM instance configured from environment or using defaults.
Raises:
ValueError: If LLM creation fails.
"""
result = _llm_via_environment_or_fallback()
if result is None:
raise ValueError("Failed to create default LLM instance")
return result
def create_llm(
llm_value: Union[str, LLM, Any, None] = None,
) -> Optional[LLM | BaseLLM]:
llm_value: str | BaseLLM | LLMLike,
) -> BaseLLM:
"""
Creates or returns an LLM instance based on the given llm_value.
Args:
llm_value (str | BaseLLM | Any | None):
llm_value (str | BaseLLM | LLMLike):
- str: The model name (e.g., "gpt-4").
- BaseLLM: Already instantiated BaseLLM (including LLM), returned as-is.
- Any: Attempt to extract known attributes like model_name, temperature, etc.
- None: Use environment-based or fallback default model.
- LLMLike: Object with LLM-compatible attributes (model_name, temperature, etc.)
Returns:
A BaseLLM instance if successful, or None if something fails.
"""
A BaseLLM instance.
# 1) If llm_value is already a BaseLLM or LLM object, return it directly
if isinstance(llm_value, LLM) or isinstance(llm_value, BaseLLM):
Raises:
ValueError: If LLM creation fails.
"""
if isinstance(llm_value, BaseLLM):
return llm_value
# 2) If llm_value is a string (model name)
if isinstance(llm_value, str):
try:
created_llm = LLM(model=llm_value)
return created_llm
except Exception as e:
print(f"Failed to instantiate LLM with model='{llm_value}': {e}")
return None
return LLM(model=llm_value)
# 3) If llm_value is None, parse environment variables or use default
if llm_value is None:
return _llm_via_environment_or_fallback()
# 4) Otherwise, attempt to extract relevant attributes from an unknown object
try:
# Extract attributes with explicit types
model = (
obj_attrs = set(dir(llm_value))
llm_kwargs = {
param: getattr(llm_value, param)
for param in LLMParams.__annotations__
if param != "model"
and param in obj_attrs
and getattr(llm_value, param) is not None
}
llm_kwargs["model"] = (
getattr(llm_value, "model", None)
or getattr(llm_value, "model_name", None)
or getattr(llm_value, "deployment_name", None)
or str(llm_value)
)
temperature: Optional[float] = getattr(llm_value, "temperature", None)
max_tokens: Optional[int] = getattr(llm_value, "max_tokens", None)
logprobs: Optional[int] = getattr(llm_value, "logprobs", None)
timeout: Optional[float] = getattr(llm_value, "timeout", None)
api_key: Optional[str] = getattr(llm_value, "api_key", None)
base_url: Optional[str] = getattr(llm_value, "base_url", None)
api_base: Optional[str] = getattr(llm_value, "api_base", None)
created_llm = LLM(
model=model,
temperature=temperature,
max_tokens=max_tokens,
logprobs=logprobs,
timeout=timeout,
api_key=api_key,
base_url=base_url,
api_base=api_base,
)
return created_llm
return LLM(**llm_kwargs)
except Exception as e:
print(f"Error instantiating LLM from unknown object type: {e}")
return None
raise ValueError(f"Error instantiating LLM from object: {e}")
def _llm_via_environment_or_fallback() -> Optional[LLM]:
def _llm_via_environment_or_fallback() -> LLM | None:
"""
Helper function: if llm_value is None, we load environment variables or fallback default model.
"""
@@ -85,24 +112,24 @@ def _llm_via_environment_or_fallback() -> Optional[LLM]:
# Initialize parameters with correct types
model: str = model_name
temperature: Optional[float] = None
max_tokens: Optional[int] = None
max_completion_tokens: Optional[int] = None
logprobs: Optional[int] = None
timeout: Optional[float] = None
api_key: Optional[str] = None
base_url: Optional[str] = None
api_version: Optional[str] = None
presence_penalty: Optional[float] = None
frequency_penalty: Optional[float] = None
top_p: Optional[float] = None
n: Optional[int] = None
stop: Optional[Union[str, List[str]]] = None
logit_bias: Optional[Dict[int, float]] = None
response_format: Optional[Dict[str, Any]] = None
seed: Optional[int] = None
top_logprobs: Optional[int] = None
callbacks: List[Any] = []
temperature: float | None = None
max_tokens: int | None = None
max_completion_tokens: int | None = None
logprobs: int | None = None
timeout: float | None = None
api_key: str | None = None
base_url: str | None = None
api_version: str | None = None
presence_penalty: float | None = None
frequency_penalty: float | None = None
top_p: float | None = None
n: int | None = None
stop: str | list[str] | None = None
logit_bias: dict[int, float] | None = None
response_format: dict[str, Any] | None = None
seed: int | None = None
top_logprobs: int | None = None
callbacks: list[Any] = []
# Optional base URL from env
base_url = (
@@ -120,7 +147,7 @@ def _llm_via_environment_or_fallback() -> Optional[LLM]:
base_url = api_base
# Initialize llm_params dictionary
llm_params: Dict[str, Any] = {
llm_params: dict[str, Any] = {
"model": model,
"temperature": temperature,
"max_tokens": max_tokens,

View File

@@ -1116,7 +1116,9 @@ def test_not_using_system_prompt():
use_system_prompt=False,
)
agent.create_agent_executor()
# Create a dummy task for testing
task = Task(description="Test task", expected_output="Test output", agent=agent)
agent.create_agent_executor(task)
assert not agent.agent_executor.prompt.get("user")
assert not agent.agent_executor.prompt.get("system")
@@ -1128,7 +1130,9 @@ def test_using_system_prompt():
backstory="I am the master of {role}",
)
agent.create_agent_executor()
# Create a dummy task for testing
task = Task(description="Test task", expected_output="Test output", agent=agent)
agent.create_agent_executor(task)
assert agent.agent_executor.prompt.get("user")
assert agent.agent_executor.prompt.get("system")