From 6dc9f462f934600ddbd8666b81d2ed8bc6a4595d Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Sat, 4 Apr 2026 01:07:45 +0800 Subject: [PATCH] feat: mid-task checkpoint resume and executor refactor --- lib/crewai/src/crewai/__init__.py | 6 ++-- lib/crewai/src/crewai/agent/core.py | 7 ++-- .../crewai/agents/agent_builder/base_agent.py | 32 +++++++++++++++---- ...ecutor_mixin.py => base_agent_executor.py} | 23 +++++++++---- .../src/crewai/agents/crew_agent_executor.py | 24 ++++++++------ .../src/crewai/agents/planner_observer.py | 4 +-- lib/crewai/src/crewai/agents/step_executor.py | 4 +-- lib/crewai/src/crewai/crew.py | 20 ++++++++---- lib/crewai/src/crewai/crews/utils.py | 13 ++++++-- .../src/crewai/experimental/agent_executor.py | 24 +++++++------- lib/crewai/src/crewai/llm.py | 20 ++++++------ lib/crewai/src/crewai/llms/base_llm.py | 22 ++++++------- lib/crewai/src/crewai/task.py | 15 +++++++-- .../src/crewai/utilities/agent_utils.py | 12 +++---- .../tests/memory/test_memory_root_scope.py | 24 +++++++------- .../tests/memory/test_unified_memory.py | 8 ++--- .../test_google_vertex_memory_integration.py | 4 +-- 17 files changed, 160 insertions(+), 102 deletions(-) rename lib/crewai/src/crewai/agents/agent_builder/{base_agent_executor_mixin.py => base_agent_executor.py} (74%) diff --git a/lib/crewai/src/crewai/__init__.py b/lib/crewai/src/crewai/__init__.py index 24d9fd085..d98fdb30f 100644 --- a/lib/crewai/src/crewai/__init__.py +++ b/lib/crewai/src/crewai/__init__.py @@ -98,8 +98,8 @@ def __getattr__(name: str) -> Any: try: from crewai.agents.agent_builder.base_agent import BaseAgent as _BaseAgent - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin as _CrewAgentExecutorMixin, + from crewai.agents.agent_builder.base_agent_executor import ( + BaseAgentExecutor as _BaseAgentExecutor, ) from crewai.agents.tools_handler import ToolsHandler as _ToolsHandler from crewai.experimental.agent_executor import AgentExecutor as _AgentExecutor @@ -117,7 +117,7 @@ try: "Flow": Flow, "BaseLLM": BaseLLM, "Task": Task, - "CrewAgentExecutorMixin": _CrewAgentExecutorMixin, + "BaseAgentExecutor": _BaseAgentExecutor, "ExecutionContext": ExecutionContext, "StandardPromptResult": _StandardPromptResult, "SystemPromptResult": _SystemPromptResult, diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 214e55227..66554c59d 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -194,12 +194,12 @@ class Agent(BaseAgent): llm: Annotated[ str | BaseLLM | None, BeforeValidator(_validate_llm_ref), - PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"), + PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"), ] = Field(description="Language model that will run the agent.", default=None) function_calling_llm: Annotated[ str | BaseLLM | None, BeforeValidator(_validate_llm_ref), - PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"), + PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"), ] = Field(description="Language model that will run the agent.", default=None) system_template: str | None = Field( default=None, description="System format for the agent." @@ -1056,7 +1056,8 @@ class Agent(BaseAgent): if self.agent_executor is None: raise RuntimeError("Agent executor is not initialized.") - self.agent_executor.task = task + if task is not None: + self.agent_executor.task = task self.agent_executor.tools = tools self.agent_executor.original_tools = raw_tools self.agent_executor.prompt = prompt diff --git a/lib/crewai/src/crewai/agents/agent_builder/base_agent.py b/lib/crewai/src/crewai/agents/agent_builder/base_agent.py index 4e8187892..5cbe3ccbf 100644 --- a/lib/crewai/src/crewai/agents/agent_builder/base_agent.py +++ b/lib/crewai/src/crewai/agents/agent_builder/base_agent.py @@ -15,6 +15,7 @@ from pydantic import ( BeforeValidator, Field, PrivateAttr, + SerializeAsAny, field_validator, model_validator, ) @@ -23,7 +24,7 @@ from pydantic_core import PydanticCustomError from typing_extensions import Self from crewai.agent.internal.meta import AgentMeta -from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin +from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess from crewai.agents.cache.cache_handler import CacheHandler from crewai.agents.tools_handler import ToolsHandler @@ -63,6 +64,10 @@ def _serialize_crew_ref(value: Any) -> str | None: def _validate_llm_ref(value: Any) -> Any: + if isinstance(value, dict): + from crewai.llm import LLM + + return LLM(**value) return value @@ -74,12 +79,21 @@ def _resolve_agent(value: Any, info: Any) -> Any: return Agent.model_validate(value, context=getattr(info, "context", None)) -def _serialize_llm_ref(value: Any) -> str | None: +def _validate_executor_ref(value: Any) -> Any: + if isinstance(value, dict): + from crewai.agents.crew_agent_executor import CrewAgentExecutor + + return CrewAgentExecutor.model_validate(value) + return value + + +def _serialize_llm_ref(value: Any) -> dict[str, Any] | None: if value is None: return None if isinstance(value, str): - return value - return getattr(value, "model", str(value)) + return {"model": value} + result: dict[str, Any] = value.model_dump() + return result _SLUG_RE: Final[re.Pattern[str]] = re.compile( @@ -196,13 +210,19 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta): max_iter: int = Field( default=25, description="Maximum iterations for an agent to execute a task" ) - agent_executor: CrewAgentExecutorMixin | None = Field( + agent_executor: SerializeAsAny[BaseAgentExecutor] | None = Field( default=None, description="An instance of the CrewAgentExecutor class." ) + + @field_validator("agent_executor", mode="before") + @classmethod + def _validate_agent_executor(cls, v: Any) -> Any: + return _validate_executor_ref(v) + llm: Annotated[ str | BaseLLM | None, BeforeValidator(_validate_llm_ref), - PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"), + PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"), ] = Field(default=None, description="Language model that will run the agent.") crew: Annotated[ Crew | str | None, diff --git a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor.py similarity index 74% rename from lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py rename to lib/crewai/src/crewai/agents/agent_builder/base_agent_executor.py index 224bcb776..ee076dfe0 100644 --- a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py +++ b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING from pydantic import BaseModel, Field, PrivateAttr @@ -8,18 +8,27 @@ from crewai.agents.parser import AgentFinish from crewai.memory.utils import sanitize_scope_name from crewai.utilities.printer import Printer from crewai.utilities.string_utils import sanitize_tool_name +from crewai.utilities.types import LLMMessage -class CrewAgentExecutorMixin(BaseModel): +if TYPE_CHECKING: + from crewai.agents.agent_builder.base_agent import BaseAgent + from crewai.crew import Crew + from crewai.task import Task + from crewai.utilities.i18n import I18N + + +class BaseAgentExecutor(BaseModel): model_config = {"arbitrary_types_allowed": True} - crew: Any = Field(default=None, exclude=True) - agent: Any = Field(default=None, exclude=True) - task: Any = Field(default=None, exclude=True) + crew: Crew = Field(default=None, exclude=True) # type: ignore[assignment] + agent: BaseAgent = Field(default=None, exclude=True) # type: ignore[assignment] + task: Task = Field(default=None, exclude=True) # type: ignore[assignment] iterations: int = Field(default=0) max_iter: int = Field(default=25) - messages: list[Any] = Field(default_factory=list) - _i18n: Any = PrivateAttr(default=None) + messages: list[LLMMessage] = Field(default_factory=list) + _resuming: bool = PrivateAttr(default=False) + _i18n: I18N = PrivateAttr(default=None) # type: ignore[assignment] _printer: Printer = PrivateAttr(default_factory=Printer) def _save_to_memory(self, output: AgentFinish) -> None: diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index 7fdf8e49e..02d2e0e40 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -25,7 +25,7 @@ from pydantic import ( from pydantic.functional_serializers import PlainSerializer from crewai.agents.agent_builder.base_agent import _serialize_llm_ref, _validate_llm_ref -from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin +from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor from crewai.agents.parser import ( AgentAction, AgentFinish, @@ -88,7 +88,7 @@ if TYPE_CHECKING: from crewai.utilities.types import LLMMessage -class CrewAgentExecutor(CrewAgentExecutorMixin): +class CrewAgentExecutor(BaseAgentExecutor): """Executor for crew agents. Manages the execution lifecycle of an agent including prompt formatting, @@ -98,7 +98,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): llm: Annotated[ BaseLLM | str | None, BeforeValidator(_validate_llm_ref), - PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"), + PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"), ] = Field(default=None) prompt: SystemPromptResult | StandardPromptResult | None = Field(default=None) tools: list[CrewStructuredTool] = Field(default_factory=list) @@ -113,7 +113,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): function_calling_llm: Annotated[ BaseLLM | str | None, BeforeValidator(_validate_llm_ref), - PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"), + PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"), ] = Field(default=None) respect_context_window: bool = Field(default=False) request_within_rpm_limit: SerializableCallable | None = Field( @@ -194,9 +194,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): Returns: Dictionary with agent output. """ - self._setup_messages(inputs) - - self._inject_multimodal_files(inputs) + if self._resuming: + self._resuming = False + else: + self._setup_messages(inputs) + self._inject_multimodal_files(inputs) self._show_start_logs() @@ -1098,9 +1100,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): Returns: Dictionary with agent output. """ - self._setup_messages(inputs) - - await self._ainject_multimodal_files(inputs) + if self._resuming: + self._resuming = False + else: + self._setup_messages(inputs) + await self._ainject_multimodal_files(inputs) self._show_start_logs() diff --git a/lib/crewai/src/crewai/agents/planner_observer.py b/lib/crewai/src/crewai/agents/planner_observer.py index 8be1c7368..16d1a747e 100644 --- a/lib/crewai/src/crewai/agents/planner_observer.py +++ b/lib/crewai/src/crewai/agents/planner_observer.py @@ -30,7 +30,7 @@ from crewai.utilities.types import LLMMessage if TYPE_CHECKING: - from crewai.agent import Agent + from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.task import Task logger = logging.getLogger(__name__) @@ -56,7 +56,7 @@ class PlannerObserver: def __init__( self, - agent: Agent, + agent: BaseAgent, task: Task | None = None, kickoff_input: str = "", ) -> None: diff --git a/lib/crewai/src/crewai/agents/step_executor.py b/lib/crewai/src/crewai/agents/step_executor.py index dad13afa2..29836497c 100644 --- a/lib/crewai/src/crewai/agents/step_executor.py +++ b/lib/crewai/src/crewai/agents/step_executor.py @@ -48,7 +48,7 @@ from crewai.utilities.types import LLMMessage if TYPE_CHECKING: - from crewai.agent import Agent + from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.tools_handler import ToolsHandler from crewai.crew import Crew from crewai.llms.base_llm import BaseLLM @@ -88,7 +88,7 @@ class StepExecutor: self, llm: BaseLLM, tools: list[CrewStructuredTool], - agent: Agent, + agent: BaseAgent, original_tools: list[BaseTool] | None = None, tools_handler: ToolsHandler | None = None, task: Task | None = None, diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 37273f800..4f0162314 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -234,7 +234,7 @@ class Crew(FlowTrackable, BaseModel): manager_llm: Annotated[ str | BaseLLM | None, BeforeValidator(_validate_llm_ref), - PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"), + PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"), ] = Field(description="Language model that will run the agent.", default=None) manager_agent: Annotated[ BaseAgent | None, @@ -243,7 +243,7 @@ class Crew(FlowTrackable, BaseModel): function_calling_llm: Annotated[ str | LLM | None, BeforeValidator(_validate_llm_ref), - PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"), + PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"), ] = Field(description="Language model that will run the agent.", default=None) config: Json[dict[str, Any]] | dict[str, Any] | None = Field(default=None) id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True) @@ -296,7 +296,7 @@ class Crew(FlowTrackable, BaseModel): planning_llm: Annotated[ str | BaseLLM | None, BeforeValidator(_validate_llm_ref), - PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"), + PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"), ] = Field( default=None, description=( @@ -321,7 +321,7 @@ class Crew(FlowTrackable, BaseModel): chat_llm: Annotated[ str | BaseLLM | None, BeforeValidator(_validate_llm_ref), - PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"), + PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"), ] = Field( default=None, description="LLM used to handle chatting with the crew.", @@ -386,15 +386,21 @@ class Crew(FlowTrackable, BaseModel): def _restore_runtime(self) -> None: """Re-create runtime objects after restoring from a checkpoint.""" for agent in self.agents: - if isinstance(agent.llm, str): - agent.llm = create_llm(agent.llm) agent.crew = self - agent.agent_executor = None + executor = agent.agent_executor + if executor and executor.messages: + executor.crew = self + executor.agent = agent + executor._resuming = True + else: + agent.agent_executor = None for task in self.tasks: if task.agent is not None: for agent in self.agents: if agent.role == task.agent.role: task.agent = agent + if agent.agent_executor is not None and task.output is None: + agent.agent_executor.task = task break if self.checkpoint_inputs is not None: self._inputs = self.checkpoint_inputs diff --git a/lib/crewai/src/crewai/crews/utils.py b/lib/crewai/src/crewai/crews/utils.py index b149d3447..a047f508a 100644 --- a/lib/crewai/src/crewai/crews/utils.py +++ b/lib/crewai/src/crewai/crews/utils.py @@ -183,7 +183,9 @@ def prepare_task_execution( tools_for_task, ) - crew._log_task_start(task, agent_to_use.role) + executor = agent_to_use.agent_executor + if not (executor and executor._resuming): + crew._log_task_start(task, agent_to_use.role) return ( TaskExecutionData(agent=agent_to_use, tools=tools_for_task), @@ -275,7 +277,11 @@ def prepare_kickoff( """ from crewai.events.base_events import reset_emission_counter from crewai.events.event_bus import crewai_event_bus - from crewai.events.event_context import get_current_parent_id, reset_last_event_id + from crewai.events.event_context import ( + get_current_parent_id, + push_event_scope, + reset_last_event_id, + ) from crewai.events.types.crew_events import CrewKickoffStartedEvent resuming = crew._kickoff_event_id is not None @@ -298,7 +304,8 @@ def prepare_kickoff( normalized = {} normalized = before_callback(normalized) - if resuming: + if resuming and crew._kickoff_event_id: + push_event_scope(crew._kickoff_event_id, "crew_kickoff_started") if crew.verbose: from crewai.events.utils.console_formatter import ConsoleFormatter diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 4981d5e1c..c54f18f2e 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -21,7 +21,7 @@ from rich.console import Console from rich.text import Text from typing_extensions import Self -from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin +from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor from crewai.agents.parser import ( AgentAction, AgentFinish, @@ -152,7 +152,7 @@ class AgentExecutorState(BaseModel): ) -class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type: ignore[pydantic-unexpected] +class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): # type: ignore[pydantic-unexpected] """Agent Executor for both standalone agents and crew-bound agents. _skip_auto_memory prevents Flow from eagerly allocating a Memory @@ -160,7 +160,7 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type: Inherits from: - Flow[AgentExecutorState]: Provides flow orchestration capabilities - - CrewAgentExecutorMixin: Provides memory methods (short/long/external term) + - BaseAgentExecutor: Provides memory methods (short/long/external term) This executor can operate in two modes: - Standalone mode: When crew and task are None (used by Agent.kickoff()) @@ -297,12 +297,12 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type: from crewai.utilities.reasoning_handler import AgentReasoning if self.task: - planning_handler = AgentReasoning(agent=self.agent, task=self.task) + planning_handler = AgentReasoning(agent=self.agent, task=self.task) # type: ignore[arg-type] else: # For kickoff() path - use input text directly, no Task needed input_text = getattr(self, "_kickoff_input", "") planning_handler = AgentReasoning( - agent=self.agent, + agent=self.agent, # type: ignore[arg-type] description=input_text or "Complete the requested task", expected_output="Complete the task successfully", ) @@ -387,28 +387,28 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type: step failures reliably trigger replanning rather than being silently ignored. """ - config = self.agent.planning_config + config = self.agent.planning_config # type: ignore[attr-defined] if config is not None: return str(config.reasoning_effort) return "medium" def _get_max_replans(self) -> int: """Get max replans from planning config or default to 3.""" - config = self.agent.planning_config + config = self.agent.planning_config # type: ignore[attr-defined] if config is not None: return int(config.max_replans) return 3 def _get_max_step_iterations(self) -> int: """Get max step iterations from planning config or default to 15.""" - config = self.agent.planning_config + config = self.agent.planning_config # type: ignore[attr-defined] if config is not None: return int(config.max_step_iterations) return 15 def _get_step_timeout(self) -> int | None: """Get per-step timeout from planning config or default to None.""" - config = self.agent.planning_config + config = self.agent.planning_config # type: ignore[attr-defined] if config is not None: return int(config.step_timeout) if config.step_timeout is not None else None return None @@ -2358,11 +2358,11 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type: from crewai.utilities.reasoning_handler import AgentReasoning if self.task: - planning_handler = AgentReasoning(agent=self.agent, task=self.task) + planning_handler = AgentReasoning(agent=self.agent, task=self.task) # type: ignore[arg-type] else: input_text = getattr(self, "_kickoff_input", "") planning_handler = AgentReasoning( - agent=self.agent, + agent=self.agent, # type: ignore[arg-type] description=input_text or "Complete the requested task", expected_output="Complete the task successfully", ) @@ -2379,7 +2379,7 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type: # description is a read-only property — recreate with enhanced text input_text = getattr(self, "_kickoff_input", "") planning_handler = AgentReasoning( - agent=self.agent, + agent=self.agent, # type: ignore[arg-type] description=enhanced_description or input_text or "Complete the requested task", diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index c294d6a84..57079f63e 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -66,7 +66,7 @@ except ImportError: if TYPE_CHECKING: - from crewai.agent.core import Agent + from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.task import Task from crewai.tools.base_tool import BaseTool from crewai.utilities.types import LLMMessage @@ -735,7 +735,7 @@ class LLM(BaseLLM): callbacks: list[Any] | None = None, available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, ) -> Any: """Handle a streaming response from the LLM. @@ -1048,7 +1048,7 @@ class LLM(BaseLLM): accumulated_tool_args: defaultdict[int, AccumulatedToolArgs], available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_id: str | None = None, ) -> Any: for tool_call in tool_calls: @@ -1137,7 +1137,7 @@ class LLM(BaseLLM): callbacks: list[Any] | None = None, available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, ) -> str | Any: """Handle a non-streaming response from the LLM. @@ -1289,7 +1289,7 @@ class LLM(BaseLLM): callbacks: list[Any] | None = None, available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, ) -> str | Any: """Handle an async non-streaming response from the LLM. @@ -1430,7 +1430,7 @@ class LLM(BaseLLM): callbacks: list[Any] | None = None, available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, ) -> Any: """Handle an async streaming response from the LLM. @@ -1606,7 +1606,7 @@ class LLM(BaseLLM): tool_calls: list[Any], available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, ) -> Any: """Handle a tool call from the LLM. @@ -1702,7 +1702,7 @@ class LLM(BaseLLM): callbacks: list[Any] | None = None, available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, ) -> str | Any: """High-level LLM call method. @@ -1852,7 +1852,7 @@ class LLM(BaseLLM): callbacks: list[Any] | None = None, available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, ) -> str | Any: """Async high-level LLM call method. @@ -2001,7 +2001,7 @@ class LLM(BaseLLM): response: Any, call_type: LLMCallType, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, messages: str | list[LLMMessage] | None = None, usage: dict[str, Any] | None = None, ) -> None: diff --git a/lib/crewai/src/crewai/llms/base_llm.py b/lib/crewai/src/crewai/llms/base_llm.py index a0bf7c56a..9f00d1db8 100644 --- a/lib/crewai/src/crewai/llms/base_llm.py +++ b/lib/crewai/src/crewai/llms/base_llm.py @@ -53,7 +53,7 @@ except ImportError: if TYPE_CHECKING: - from crewai.agent.core import Agent + from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.task import Task from crewai.tools.base_tool import BaseTool from crewai.utilities.types import LLMMessage @@ -240,7 +240,7 @@ class BaseLLM(BaseModel, ABC): callbacks: list[Any] | None = None, available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, ) -> str | Any: """Call the LLM with the given messages. @@ -277,7 +277,7 @@ class BaseLLM(BaseModel, ABC): callbacks: list[Any] | None = None, available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, ) -> str | Any: """Call the LLM with the given messages. @@ -434,7 +434,7 @@ class BaseLLM(BaseModel, ABC): callbacks: list[Any] | None = None, available_functions: dict[str, Any] | None = None, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, ) -> None: """Emit LLM call started event.""" from crewai.utilities.serialization import to_serializable @@ -458,7 +458,7 @@ class BaseLLM(BaseModel, ABC): response: Any, call_type: LLMCallType, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, messages: str | list[LLMMessage] | None = None, usage: dict[str, Any] | None = None, ) -> None: @@ -483,7 +483,7 @@ class BaseLLM(BaseModel, ABC): self, error: str, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, ) -> None: """Emit LLM call failed event.""" crewai_event_bus.emit( @@ -501,7 +501,7 @@ class BaseLLM(BaseModel, ABC): self, chunk: str, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, tool_call: dict[str, Any] | None = None, call_type: LLMCallType | None = None, response_id: str | None = None, @@ -533,7 +533,7 @@ class BaseLLM(BaseModel, ABC): self, chunk: str, from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, response_id: str | None = None, ) -> None: """Emit thinking/reasoning chunk event from a thinking model. @@ -561,7 +561,7 @@ class BaseLLM(BaseModel, ABC): function_args: dict[str, Any], available_functions: dict[str, Any], from_task: Task | None = None, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, ) -> str | None: """Handle tool execution with proper event emission. @@ -827,7 +827,7 @@ class BaseLLM(BaseModel, ABC): def _invoke_before_llm_call_hooks( self, messages: list[LLMMessage], - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, ) -> bool: """Invoke before_llm_call hooks for direct LLM calls (no agent context). @@ -896,7 +896,7 @@ class BaseLLM(BaseModel, ABC): self, messages: list[LLMMessage], response: str, - from_agent: Agent | None = None, + from_agent: BaseAgent | None = None, ) -> str: """Invoke after_llm_call hooks for direct LLM calls (no agent context). diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index 7cd0bdca5..eff5a3b60 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -39,6 +39,7 @@ from crewai.agents.agent_builder.base_agent import BaseAgent, _resolve_agent from crewai.context import reset_current_task_id, set_current_task_id from crewai.core.providers.content_processor import process_content from crewai.events.event_bus import crewai_event_bus +from crewai.events.event_context import push_event_scope from crewai.events.types.task_events import ( TaskCompletedEvent, TaskFailedEvent, @@ -598,7 +599,12 @@ class Task(BaseModel): tools = tools or self.tools or [] self.processed_by_agents.add(agent.role) - crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self)) + if agent.agent_executor and agent.agent_executor._resuming: + push_event_scope(str(self.id), "task_started") + else: + crewai_event_bus.emit( + self, TaskStartedEvent(context=context, task=self) + ) result = await agent.aexecute_task( task=self, context=context, @@ -717,7 +723,12 @@ class Task(BaseModel): tools = tools or self.tools or [] self.processed_by_agents.add(agent.role) - crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self)) + if agent.agent_executor and agent.agent_executor._resuming: + push_event_scope(str(self.id), "task_started") + else: + crewai_event_bus.emit( + self, TaskStartedEvent(context=context, task=self) + ) result = agent.execute_task( task=self, context=context, diff --git a/lib/crewai/src/crewai/utilities/agent_utils.py b/lib/crewai/src/crewai/utilities/agent_utils.py index c1a341c39..09c570fac 100644 --- a/lib/crewai/src/crewai/utilities/agent_utils.py +++ b/lib/crewai/src/crewai/utilities/agent_utils.py @@ -40,7 +40,7 @@ from crewai.utilities.types import LLMMessage if TYPE_CHECKING: - from crewai.agent import Agent + from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.crew_agent_executor import CrewAgentExecutor from crewai.agents.tools_handler import ToolsHandler from crewai.experimental.agent_executor import AgentExecutor @@ -431,7 +431,7 @@ def get_llm_response( tools: list[dict[str, Any]] | None = None, available_functions: dict[str, Callable[..., Any]] | None = None, from_task: Task | None = None, - from_agent: Agent | LiteAgent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None = None, verbose: bool = True, @@ -468,7 +468,7 @@ def get_llm_response( callbacks=callbacks, available_functions=available_functions, from_task=from_task, - from_agent=from_agent, # type: ignore[arg-type] + from_agent=from_agent, response_model=response_model, ) except Exception as e: @@ -487,7 +487,7 @@ async def aget_llm_response( tools: list[dict[str, Any]] | None = None, available_functions: dict[str, Callable[..., Any]] | None = None, from_task: Task | None = None, - from_agent: Agent | LiteAgent | None = None, + from_agent: BaseAgent | None = None, response_model: type[BaseModel] | None = None, executor_context: CrewAgentExecutor | AgentExecutor | None = None, verbose: bool = True, @@ -524,7 +524,7 @@ async def aget_llm_response( callbacks=callbacks, available_functions=available_functions, from_task=from_task, - from_agent=from_agent, # type: ignore[arg-type] + from_agent=from_agent, response_model=response_model, ) except Exception as e: @@ -1363,7 +1363,7 @@ def execute_single_native_tool_call( original_tools: list[BaseTool], structured_tools: list[CrewStructuredTool] | None, tools_handler: ToolsHandler | None, - agent: Agent | None, + agent: BaseAgent | None, task: Task | None, crew: Any | None, event_source: Any, diff --git a/lib/crewai/tests/memory/test_memory_root_scope.py b/lib/crewai/tests/memory/test_memory_root_scope.py index 0647fa93b..9fd5ffb70 100644 --- a/lib/crewai/tests/memory/test_memory_root_scope.py +++ b/lib/crewai/tests/memory/test_memory_root_scope.py @@ -523,8 +523,8 @@ class TestAgentScopeExtension: def test_agent_save_extends_crew_root_scope(self) -> None: """Agent._save_to_memory extends crew's root_scope with agent info.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, + from crewai.agents.agent_builder.base_agent_executor import ( + BaseAgentExecutor, ) from crewai.agents.parser import AgentFinish from crewai.utilities.printer import Printer @@ -543,7 +543,7 @@ class TestAgentScopeExtension: mock_task.description = "Research task" mock_task.expected_output = "Report" - executor = CrewAgentExecutorMixin( + executor = BaseAgentExecutor( crew=None, agent=mock_agent, task=mock_task, @@ -557,8 +557,8 @@ class TestAgentScopeExtension: def test_agent_save_sanitizes_role(self) -> None: """Agent role with special chars is sanitized for scope path.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, + from crewai.agents.agent_builder.base_agent_executor import ( + BaseAgentExecutor, ) from crewai.agents.parser import AgentFinish from crewai.utilities.printer import Printer @@ -577,7 +577,7 @@ class TestAgentScopeExtension: mock_task.description = "Task" mock_task.expected_output = "Output" - executor = CrewAgentExecutorMixin( + executor = BaseAgentExecutor( crew=None, agent=mock_agent, task=mock_task, @@ -1047,8 +1047,8 @@ class TestAgentExecutorBackwardCompat: def test_agent_executor_no_root_scope_when_memory_has_none(self) -> None: """Agent executor doesn't inject root_scope when memory has none.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, + from crewai.agents.agent_builder.base_agent_executor import ( + BaseAgentExecutor, ) from crewai.agents.parser import AgentFinish from crewai.utilities.printer import Printer @@ -1067,7 +1067,7 @@ class TestAgentExecutorBackwardCompat: mock_task.description = "Task" mock_task.expected_output = "Output" - executor = CrewAgentExecutorMixin( + executor = BaseAgentExecutor( crew=None, agent=mock_agent, task=mock_task, @@ -1082,8 +1082,8 @@ class TestAgentExecutorBackwardCompat: def test_agent_executor_extends_root_scope_when_memory_has_one(self) -> None: """Agent executor extends root_scope when memory has one.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, + from crewai.agents.agent_builder.base_agent_executor import ( + BaseAgentExecutor, ) from crewai.agents.parser import AgentFinish from crewai.utilities.printer import Printer @@ -1102,7 +1102,7 @@ class TestAgentExecutorBackwardCompat: mock_task.description = "Task" mock_task.expected_output = "Output" - executor = CrewAgentExecutorMixin( + executor = BaseAgentExecutor( crew=None, agent=mock_agent, task=mock_task, diff --git a/lib/crewai/tests/memory/test_unified_memory.py b/lib/crewai/tests/memory/test_unified_memory.py index cb6fca8bb..e6b48333a 100644 --- a/lib/crewai/tests/memory/test_unified_memory.py +++ b/lib/crewai/tests/memory/test_unified_memory.py @@ -315,7 +315,7 @@ def test_memory_extract_memories_empty_content_returns_empty_list(tmp_path: Path def test_executor_save_to_memory_calls_extract_then_remember_per_item() -> None: """_save_to_memory calls memory.extract_memories(raw) then memory.remember(m) for each.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor from crewai.agents.parser import AgentFinish mock_memory = MagicMock() @@ -331,7 +331,7 @@ def test_executor_save_to_memory_calls_extract_then_remember_per_item() -> None: mock_task.description = "Do research" mock_task.expected_output = "A report" - executor = CrewAgentExecutorMixin( + executor = BaseAgentExecutor( crew=None, agent=mock_agent, task=mock_task, @@ -349,7 +349,7 @@ def test_executor_save_to_memory_calls_extract_then_remember_per_item() -> None: def test_executor_save_to_memory_skips_delegation_output() -> None: """_save_to_memory does nothing when output contains delegate action.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor from crewai.agents.parser import AgentFinish from crewai.utilities.string_utils import sanitize_tool_name @@ -362,7 +362,7 @@ def test_executor_save_to_memory_skips_delegation_output() -> None: delegate_text = f"Action: {sanitize_tool_name('Delegate work to coworker')}" full_text = delegate_text + " rest" - executor = CrewAgentExecutorMixin( + executor = BaseAgentExecutor( crew=None, agent=mock_agent, task=mock_task, diff --git a/lib/crewai/tests/rag/embeddings/test_google_vertex_memory_integration.py b/lib/crewai/tests/rag/embeddings/test_google_vertex_memory_integration.py index 149320adf..28ea84304 100644 --- a/lib/crewai/tests/rag/embeddings/test_google_vertex_memory_integration.py +++ b/lib/crewai/tests/rag/embeddings/test_google_vertex_memory_integration.py @@ -102,7 +102,7 @@ def test_crew_memory_with_google_vertex_embedder( # Mock _save_to_memory during kickoff so it doesn't make embedding API calls # that VCR can't replay (GCP metadata auth, embedding endpoints). with patch( - "crewai.agents.agent_builder.base_agent_executor_mixin.CrewAgentExecutorMixin._save_to_memory" + "crewai.agents.agent_builder.base_agent_executor.BaseAgentExecutor._save_to_memory" ): result = crew.kickoff() @@ -163,7 +163,7 @@ def test_crew_memory_with_google_vertex_project_id(simple_agent, simple_task) -> assert crew._memory is memory with patch( - "crewai.agents.agent_builder.base_agent_executor_mixin.CrewAgentExecutorMixin._save_to_memory" + "crewai.agents.agent_builder.base_agent_executor.BaseAgentExecutor._save_to_memory" ): result = crew.kickoff()