feat: mid-task checkpoint resume and executor refactor

This commit is contained in:
Greyson LaLonde
2026-04-04 01:07:45 +08:00
parent 5ace0bfe4a
commit 6dc9f462f9
17 changed files with 160 additions and 102 deletions

View File

@@ -98,8 +98,8 @@ def __getattr__(name: str) -> Any:
try:
from crewai.agents.agent_builder.base_agent import BaseAgent as _BaseAgent
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin as _CrewAgentExecutorMixin,
from crewai.agents.agent_builder.base_agent_executor import (
BaseAgentExecutor as _BaseAgentExecutor,
)
from crewai.agents.tools_handler import ToolsHandler as _ToolsHandler
from crewai.experimental.agent_executor import AgentExecutor as _AgentExecutor
@@ -117,7 +117,7 @@ try:
"Flow": Flow,
"BaseLLM": BaseLLM,
"Task": Task,
"CrewAgentExecutorMixin": _CrewAgentExecutorMixin,
"BaseAgentExecutor": _BaseAgentExecutor,
"ExecutionContext": ExecutionContext,
"StandardPromptResult": _StandardPromptResult,
"SystemPromptResult": _SystemPromptResult,

View File

@@ -194,12 +194,12 @@ class Agent(BaseAgent):
llm: Annotated[
str | BaseLLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(description="Language model that will run the agent.", default=None)
function_calling_llm: Annotated[
str | BaseLLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(description="Language model that will run the agent.", default=None)
system_template: str | None = Field(
default=None, description="System format for the agent."
@@ -1056,7 +1056,8 @@ class Agent(BaseAgent):
if self.agent_executor is None:
raise RuntimeError("Agent executor is not initialized.")
self.agent_executor.task = task
if task is not None:
self.agent_executor.task = task
self.agent_executor.tools = tools
self.agent_executor.original_tools = raw_tools
self.agent_executor.prompt = prompt

View File

@@ -15,6 +15,7 @@ from pydantic import (
BeforeValidator,
Field,
PrivateAttr,
SerializeAsAny,
field_validator,
model_validator,
)
@@ -23,7 +24,7 @@ from pydantic_core import PydanticCustomError
from typing_extensions import Self
from crewai.agent.internal.meta import AgentMeta
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.tools_handler import ToolsHandler
@@ -63,6 +64,10 @@ def _serialize_crew_ref(value: Any) -> str | None:
def _validate_llm_ref(value: Any) -> Any:
if isinstance(value, dict):
from crewai.llm import LLM
return LLM(**value)
return value
@@ -74,12 +79,21 @@ def _resolve_agent(value: Any, info: Any) -> Any:
return Agent.model_validate(value, context=getattr(info, "context", None))
def _serialize_llm_ref(value: Any) -> str | None:
def _validate_executor_ref(value: Any) -> Any:
if isinstance(value, dict):
from crewai.agents.crew_agent_executor import CrewAgentExecutor
return CrewAgentExecutor.model_validate(value)
return value
def _serialize_llm_ref(value: Any) -> dict[str, Any] | None:
if value is None:
return None
if isinstance(value, str):
return value
return getattr(value, "model", str(value))
return {"model": value}
result: dict[str, Any] = value.model_dump()
return result
_SLUG_RE: Final[re.Pattern[str]] = re.compile(
@@ -196,13 +210,19 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
max_iter: int = Field(
default=25, description="Maximum iterations for an agent to execute a task"
)
agent_executor: CrewAgentExecutorMixin | None = Field(
agent_executor: SerializeAsAny[BaseAgentExecutor] | None = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
@field_validator("agent_executor", mode="before")
@classmethod
def _validate_agent_executor(cls, v: Any) -> Any:
return _validate_executor_ref(v)
llm: Annotated[
str | BaseLLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(default=None, description="Language model that will run the agent.")
crew: Annotated[
Crew | str | None,

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Any
from typing import TYPE_CHECKING
from pydantic import BaseModel, Field, PrivateAttr
@@ -8,18 +8,27 @@ from crewai.agents.parser import AgentFinish
from crewai.memory.utils import sanitize_scope_name
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.types import LLMMessage
class CrewAgentExecutorMixin(BaseModel):
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crew import Crew
from crewai.task import Task
from crewai.utilities.i18n import I18N
class BaseAgentExecutor(BaseModel):
model_config = {"arbitrary_types_allowed": True}
crew: Any = Field(default=None, exclude=True)
agent: Any = Field(default=None, exclude=True)
task: Any = Field(default=None, exclude=True)
crew: Crew = Field(default=None, exclude=True) # type: ignore[assignment]
agent: BaseAgent = Field(default=None, exclude=True) # type: ignore[assignment]
task: Task = Field(default=None, exclude=True) # type: ignore[assignment]
iterations: int = Field(default=0)
max_iter: int = Field(default=25)
messages: list[Any] = Field(default_factory=list)
_i18n: Any = PrivateAttr(default=None)
messages: list[LLMMessage] = Field(default_factory=list)
_resuming: bool = PrivateAttr(default=False)
_i18n: I18N = PrivateAttr(default=None) # type: ignore[assignment]
_printer: Printer = PrivateAttr(default_factory=Printer)
def _save_to_memory(self, output: AgentFinish) -> None:

View File

@@ -25,7 +25,7 @@ from pydantic import (
from pydantic.functional_serializers import PlainSerializer
from crewai.agents.agent_builder.base_agent import _serialize_llm_ref, _validate_llm_ref
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor
from crewai.agents.parser import (
AgentAction,
AgentFinish,
@@ -88,7 +88,7 @@ if TYPE_CHECKING:
from crewai.utilities.types import LLMMessage
class CrewAgentExecutor(CrewAgentExecutorMixin):
class CrewAgentExecutor(BaseAgentExecutor):
"""Executor for crew agents.
Manages the execution lifecycle of an agent including prompt formatting,
@@ -98,7 +98,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
llm: Annotated[
BaseLLM | str | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(default=None)
prompt: SystemPromptResult | StandardPromptResult | None = Field(default=None)
tools: list[CrewStructuredTool] = Field(default_factory=list)
@@ -113,7 +113,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
function_calling_llm: Annotated[
BaseLLM | str | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(default=None)
respect_context_window: bool = Field(default=False)
request_within_rpm_limit: SerializableCallable | None = Field(
@@ -194,9 +194,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
Dictionary with agent output.
"""
self._setup_messages(inputs)
self._inject_multimodal_files(inputs)
if self._resuming:
self._resuming = False
else:
self._setup_messages(inputs)
self._inject_multimodal_files(inputs)
self._show_start_logs()
@@ -1098,9 +1100,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
Dictionary with agent output.
"""
self._setup_messages(inputs)
await self._ainject_multimodal_files(inputs)
if self._resuming:
self._resuming = False
else:
self._setup_messages(inputs)
await self._ainject_multimodal_files(inputs)
self._show_start_logs()

View File

@@ -30,7 +30,7 @@ from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
logger = logging.getLogger(__name__)
@@ -56,7 +56,7 @@ class PlannerObserver:
def __init__(
self,
agent: Agent,
agent: BaseAgent,
task: Task | None = None,
kickoff_input: str = "",
) -> None:

View File

@@ -48,7 +48,7 @@ from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.tools_handler import ToolsHandler
from crewai.crew import Crew
from crewai.llms.base_llm import BaseLLM
@@ -88,7 +88,7 @@ class StepExecutor:
self,
llm: BaseLLM,
tools: list[CrewStructuredTool],
agent: Agent,
agent: BaseAgent,
original_tools: list[BaseTool] | None = None,
tools_handler: ToolsHandler | None = None,
task: Task | None = None,

View File

@@ -234,7 +234,7 @@ class Crew(FlowTrackable, BaseModel):
manager_llm: Annotated[
str | BaseLLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(description="Language model that will run the agent.", default=None)
manager_agent: Annotated[
BaseAgent | None,
@@ -243,7 +243,7 @@ class Crew(FlowTrackable, BaseModel):
function_calling_llm: Annotated[
str | LLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(description="Language model that will run the agent.", default=None)
config: Json[dict[str, Any]] | dict[str, Any] | None = Field(default=None)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
@@ -296,7 +296,7 @@ class Crew(FlowTrackable, BaseModel):
planning_llm: Annotated[
str | BaseLLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(
default=None,
description=(
@@ -321,7 +321,7 @@ class Crew(FlowTrackable, BaseModel):
chat_llm: Annotated[
str | BaseLLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=str | None, when_used="json"),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(
default=None,
description="LLM used to handle chatting with the crew.",
@@ -386,15 +386,21 @@ class Crew(FlowTrackable, BaseModel):
def _restore_runtime(self) -> None:
"""Re-create runtime objects after restoring from a checkpoint."""
for agent in self.agents:
if isinstance(agent.llm, str):
agent.llm = create_llm(agent.llm)
agent.crew = self
agent.agent_executor = None
executor = agent.agent_executor
if executor and executor.messages:
executor.crew = self
executor.agent = agent
executor._resuming = True
else:
agent.agent_executor = None
for task in self.tasks:
if task.agent is not None:
for agent in self.agents:
if agent.role == task.agent.role:
task.agent = agent
if agent.agent_executor is not None and task.output is None:
agent.agent_executor.task = task
break
if self.checkpoint_inputs is not None:
self._inputs = self.checkpoint_inputs

View File

@@ -183,7 +183,9 @@ def prepare_task_execution(
tools_for_task,
)
crew._log_task_start(task, agent_to_use.role)
executor = agent_to_use.agent_executor
if not (executor and executor._resuming):
crew._log_task_start(task, agent_to_use.role)
return (
TaskExecutionData(agent=agent_to_use, tools=tools_for_task),
@@ -275,7 +277,11 @@ def prepare_kickoff(
"""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import get_current_parent_id, reset_last_event_id
from crewai.events.event_context import (
get_current_parent_id,
push_event_scope,
reset_last_event_id,
)
from crewai.events.types.crew_events import CrewKickoffStartedEvent
resuming = crew._kickoff_event_id is not None
@@ -298,7 +304,8 @@ def prepare_kickoff(
normalized = {}
normalized = before_callback(normalized)
if resuming:
if resuming and crew._kickoff_event_id:
push_event_scope(crew._kickoff_event_id, "crew_kickoff_started")
if crew.verbose:
from crewai.events.utils.console_formatter import ConsoleFormatter

View File

@@ -21,7 +21,7 @@ from rich.console import Console
from rich.text import Text
from typing_extensions import Self
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor
from crewai.agents.parser import (
AgentAction,
AgentFinish,
@@ -152,7 +152,7 @@ class AgentExecutorState(BaseModel):
)
class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type: ignore[pydantic-unexpected]
class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor): # type: ignore[pydantic-unexpected]
"""Agent Executor for both standalone agents and crew-bound agents.
_skip_auto_memory prevents Flow from eagerly allocating a Memory
@@ -160,7 +160,7 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type:
Inherits from:
- Flow[AgentExecutorState]: Provides flow orchestration capabilities
- CrewAgentExecutorMixin: Provides memory methods (short/long/external term)
- BaseAgentExecutor: Provides memory methods (short/long/external term)
This executor can operate in two modes:
- Standalone mode: When crew and task are None (used by Agent.kickoff())
@@ -297,12 +297,12 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type:
from crewai.utilities.reasoning_handler import AgentReasoning
if self.task:
planning_handler = AgentReasoning(agent=self.agent, task=self.task)
planning_handler = AgentReasoning(agent=self.agent, task=self.task) # type: ignore[arg-type]
else:
# For kickoff() path - use input text directly, no Task needed
input_text = getattr(self, "_kickoff_input", "")
planning_handler = AgentReasoning(
agent=self.agent,
agent=self.agent, # type: ignore[arg-type]
description=input_text or "Complete the requested task",
expected_output="Complete the task successfully",
)
@@ -387,28 +387,28 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type:
step failures reliably trigger replanning rather than being
silently ignored.
"""
config = self.agent.planning_config
config = self.agent.planning_config # type: ignore[attr-defined]
if config is not None:
return str(config.reasoning_effort)
return "medium"
def _get_max_replans(self) -> int:
"""Get max replans from planning config or default to 3."""
config = self.agent.planning_config
config = self.agent.planning_config # type: ignore[attr-defined]
if config is not None:
return int(config.max_replans)
return 3
def _get_max_step_iterations(self) -> int:
"""Get max step iterations from planning config or default to 15."""
config = self.agent.planning_config
config = self.agent.planning_config # type: ignore[attr-defined]
if config is not None:
return int(config.max_step_iterations)
return 15
def _get_step_timeout(self) -> int | None:
"""Get per-step timeout from planning config or default to None."""
config = self.agent.planning_config
config = self.agent.planning_config # type: ignore[attr-defined]
if config is not None:
return int(config.step_timeout) if config.step_timeout is not None else None
return None
@@ -2358,11 +2358,11 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type:
from crewai.utilities.reasoning_handler import AgentReasoning
if self.task:
planning_handler = AgentReasoning(agent=self.agent, task=self.task)
planning_handler = AgentReasoning(agent=self.agent, task=self.task) # type: ignore[arg-type]
else:
input_text = getattr(self, "_kickoff_input", "")
planning_handler = AgentReasoning(
agent=self.agent,
agent=self.agent, # type: ignore[arg-type]
description=input_text or "Complete the requested task",
expected_output="Complete the task successfully",
)
@@ -2379,7 +2379,7 @@ class AgentExecutor(Flow[AgentExecutorState], CrewAgentExecutorMixin): # type:
# description is a read-only property — recreate with enhanced text
input_text = getattr(self, "_kickoff_input", "")
planning_handler = AgentReasoning(
agent=self.agent,
agent=self.agent, # type: ignore[arg-type]
description=enhanced_description
or input_text
or "Complete the requested task",

View File

@@ -66,7 +66,7 @@ except ImportError:
if TYPE_CHECKING:
from crewai.agent.core import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.utilities.types import LLMMessage
@@ -735,7 +735,7 @@ class LLM(BaseLLM):
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> Any:
"""Handle a streaming response from the LLM.
@@ -1048,7 +1048,7 @@ class LLM(BaseLLM):
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs],
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_id: str | None = None,
) -> Any:
for tool_call in tool_calls:
@@ -1137,7 +1137,7 @@ class LLM(BaseLLM):
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Handle a non-streaming response from the LLM.
@@ -1289,7 +1289,7 @@ class LLM(BaseLLM):
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Handle an async non-streaming response from the LLM.
@@ -1430,7 +1430,7 @@ class LLM(BaseLLM):
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> Any:
"""Handle an async streaming response from the LLM.
@@ -1606,7 +1606,7 @@ class LLM(BaseLLM):
tool_calls: list[Any],
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
) -> Any:
"""Handle a tool call from the LLM.
@@ -1702,7 +1702,7 @@ class LLM(BaseLLM):
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""High-level LLM call method.
@@ -1852,7 +1852,7 @@ class LLM(BaseLLM):
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Async high-level LLM call method.
@@ -2001,7 +2001,7 @@ class LLM(BaseLLM):
response: Any,
call_type: LLMCallType,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
messages: str | list[LLMMessage] | None = None,
usage: dict[str, Any] | None = None,
) -> None:

View File

@@ -53,7 +53,7 @@ except ImportError:
if TYPE_CHECKING:
from crewai.agent.core import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.utilities.types import LLMMessage
@@ -240,7 +240,7 @@ class BaseLLM(BaseModel, ABC):
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Call the LLM with the given messages.
@@ -277,7 +277,7 @@ class BaseLLM(BaseModel, ABC):
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Call the LLM with the given messages.
@@ -434,7 +434,7 @@ class BaseLLM(BaseModel, ABC):
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
) -> None:
"""Emit LLM call started event."""
from crewai.utilities.serialization import to_serializable
@@ -458,7 +458,7 @@ class BaseLLM(BaseModel, ABC):
response: Any,
call_type: LLMCallType,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
messages: str | list[LLMMessage] | None = None,
usage: dict[str, Any] | None = None,
) -> None:
@@ -483,7 +483,7 @@ class BaseLLM(BaseModel, ABC):
self,
error: str,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
) -> None:
"""Emit LLM call failed event."""
crewai_event_bus.emit(
@@ -501,7 +501,7 @@ class BaseLLM(BaseModel, ABC):
self,
chunk: str,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
tool_call: dict[str, Any] | None = None,
call_type: LLMCallType | None = None,
response_id: str | None = None,
@@ -533,7 +533,7 @@ class BaseLLM(BaseModel, ABC):
self,
chunk: str,
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
response_id: str | None = None,
) -> None:
"""Emit thinking/reasoning chunk event from a thinking model.
@@ -561,7 +561,7 @@ class BaseLLM(BaseModel, ABC):
function_args: dict[str, Any],
available_functions: dict[str, Any],
from_task: Task | None = None,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
) -> str | None:
"""Handle tool execution with proper event emission.
@@ -827,7 +827,7 @@ class BaseLLM(BaseModel, ABC):
def _invoke_before_llm_call_hooks(
self,
messages: list[LLMMessage],
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
) -> bool:
"""Invoke before_llm_call hooks for direct LLM calls (no agent context).
@@ -896,7 +896,7 @@ class BaseLLM(BaseModel, ABC):
self,
messages: list[LLMMessage],
response: str,
from_agent: Agent | None = None,
from_agent: BaseAgent | None = None,
) -> str:
"""Invoke after_llm_call hooks for direct LLM calls (no agent context).

View File

@@ -39,6 +39,7 @@ from crewai.agents.agent_builder.base_agent import BaseAgent, _resolve_agent
from crewai.context import reset_current_task_id, set_current_task_id
from crewai.core.providers.content_processor import process_content
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import push_event_scope
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
@@ -598,7 +599,12 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))
if agent.agent_executor and agent.agent_executor._resuming:
push_event_scope(str(self.id), "task_started")
else:
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)
result = await agent.aexecute_task(
task=self,
context=context,
@@ -717,7 +723,12 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))
if agent.agent_executor and agent.agent_executor._resuming:
push_event_scope(str(self.id), "task_started")
else:
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)
result = agent.execute_task(
task=self,
context=context,

View File

@@ -40,7 +40,7 @@ from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.agents.tools_handler import ToolsHandler
from crewai.experimental.agent_executor import AgentExecutor
@@ -431,7 +431,7 @@ def get_llm_response(
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None = None,
verbose: bool = True,
@@ -468,7 +468,7 @@ def get_llm_response(
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
@@ -487,7 +487,7 @@ async def aget_llm_response(
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | AgentExecutor | None = None,
verbose: bool = True,
@@ -524,7 +524,7 @@ async def aget_llm_response(
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
@@ -1363,7 +1363,7 @@ def execute_single_native_tool_call(
original_tools: list[BaseTool],
structured_tools: list[CrewStructuredTool] | None,
tools_handler: ToolsHandler | None,
agent: Agent | None,
agent: BaseAgent | None,
task: Task | None,
crew: Any | None,
event_source: Any,

View File

@@ -523,8 +523,8 @@ class TestAgentScopeExtension:
def test_agent_save_extends_crew_root_scope(self) -> None:
"""Agent._save_to_memory extends crew's root_scope with agent info."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
from crewai.agents.agent_builder.base_agent_executor import (
BaseAgentExecutor,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
@@ -543,7 +543,7 @@ class TestAgentScopeExtension:
mock_task.description = "Research task"
mock_task.expected_output = "Report"
executor = CrewAgentExecutorMixin(
executor = BaseAgentExecutor(
crew=None,
agent=mock_agent,
task=mock_task,
@@ -557,8 +557,8 @@ class TestAgentScopeExtension:
def test_agent_save_sanitizes_role(self) -> None:
"""Agent role with special chars is sanitized for scope path."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
from crewai.agents.agent_builder.base_agent_executor import (
BaseAgentExecutor,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
@@ -577,7 +577,7 @@ class TestAgentScopeExtension:
mock_task.description = "Task"
mock_task.expected_output = "Output"
executor = CrewAgentExecutorMixin(
executor = BaseAgentExecutor(
crew=None,
agent=mock_agent,
task=mock_task,
@@ -1047,8 +1047,8 @@ class TestAgentExecutorBackwardCompat:
def test_agent_executor_no_root_scope_when_memory_has_none(self) -> None:
"""Agent executor doesn't inject root_scope when memory has none."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
from crewai.agents.agent_builder.base_agent_executor import (
BaseAgentExecutor,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
@@ -1067,7 +1067,7 @@ class TestAgentExecutorBackwardCompat:
mock_task.description = "Task"
mock_task.expected_output = "Output"
executor = CrewAgentExecutorMixin(
executor = BaseAgentExecutor(
crew=None,
agent=mock_agent,
task=mock_task,
@@ -1082,8 +1082,8 @@ class TestAgentExecutorBackwardCompat:
def test_agent_executor_extends_root_scope_when_memory_has_one(self) -> None:
"""Agent executor extends root_scope when memory has one."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
from crewai.agents.agent_builder.base_agent_executor import (
BaseAgentExecutor,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
@@ -1102,7 +1102,7 @@ class TestAgentExecutorBackwardCompat:
mock_task.description = "Task"
mock_task.expected_output = "Output"
executor = CrewAgentExecutorMixin(
executor = BaseAgentExecutor(
crew=None,
agent=mock_agent,
task=mock_task,

View File

@@ -315,7 +315,7 @@ def test_memory_extract_memories_empty_content_returns_empty_list(tmp_path: Path
def test_executor_save_to_memory_calls_extract_then_remember_per_item() -> None:
"""_save_to_memory calls memory.extract_memories(raw) then memory.remember(m) for each."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor
from crewai.agents.parser import AgentFinish
mock_memory = MagicMock()
@@ -331,7 +331,7 @@ def test_executor_save_to_memory_calls_extract_then_remember_per_item() -> None:
mock_task.description = "Do research"
mock_task.expected_output = "A report"
executor = CrewAgentExecutorMixin(
executor = BaseAgentExecutor(
crew=None,
agent=mock_agent,
task=mock_task,
@@ -349,7 +349,7 @@ def test_executor_save_to_memory_calls_extract_then_remember_per_item() -> None:
def test_executor_save_to_memory_skips_delegation_output() -> None:
"""_save_to_memory does nothing when output contains delegate action."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.agent_builder.base_agent_executor import BaseAgentExecutor
from crewai.agents.parser import AgentFinish
from crewai.utilities.string_utils import sanitize_tool_name
@@ -362,7 +362,7 @@ def test_executor_save_to_memory_skips_delegation_output() -> None:
delegate_text = f"Action: {sanitize_tool_name('Delegate work to coworker')}"
full_text = delegate_text + " rest"
executor = CrewAgentExecutorMixin(
executor = BaseAgentExecutor(
crew=None,
agent=mock_agent,
task=mock_task,

View File

@@ -102,7 +102,7 @@ def test_crew_memory_with_google_vertex_embedder(
# Mock _save_to_memory during kickoff so it doesn't make embedding API calls
# that VCR can't replay (GCP metadata auth, embedding endpoints).
with patch(
"crewai.agents.agent_builder.base_agent_executor_mixin.CrewAgentExecutorMixin._save_to_memory"
"crewai.agents.agent_builder.base_agent_executor.BaseAgentExecutor._save_to_memory"
):
result = crew.kickoff()
@@ -163,7 +163,7 @@ def test_crew_memory_with_google_vertex_project_id(simple_agent, simple_task) ->
assert crew._memory is memory
with patch(
"crewai.agents.agent_builder.base_agent_executor_mixin.CrewAgentExecutorMixin._save_to_memory"
"crewai.agents.agent_builder.base_agent_executor.BaseAgentExecutor._save_to_memory"
):
result = crew.kickoff()