diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 9f69129f1..0ffec4888 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -443,16 +443,20 @@ class Crew(FlowTrackable, BaseModel): if node.event.type == "task_started" and node.event.task_id: started_task_ids.add(node.event.task_id) + is_hierarchical = self.process == Process.hierarchical resuming_task_agent_roles: set[str] = set() for task in self.tasks: - if ( - task.output is None - and task.agent is not None - and str(task.id) in started_task_ids - ): - resuming_task_agent_roles.add(task.agent.role) + if task.output is not None or str(task.id) not in started_task_ids: + continue + executing_agent = self.manager_agent if is_hierarchical else task.agent + if executing_agent is not None: + resuming_task_agent_roles.add(executing_agent.role) - for agent in self.agents: + candidate_agents: list[BaseAgent] = list(self.agents) + if self.manager_agent is not None: + candidate_agents.append(self.manager_agent) + + for agent in candidate_agents: agent.crew = self executor = agent.agent_executor if ( @@ -467,7 +471,7 @@ class Crew(FlowTrackable, BaseModel): agent.agent_executor = None for task in self.tasks: if task.agent is not None: - for agent in self.agents: + for agent in candidate_agents: if agent.role == task.agent.role: task.agent = agent if agent.agent_executor is not None and task.output is None: @@ -536,25 +540,9 @@ class Crew(FlowTrackable, BaseModel): if state is None: return - # Restore crew scope and the in-progress task scope. Inner scopes - # (agent, llm, tool) are re-created by the executor on resume. stack: list[tuple[str, str]] = [] if self._kickoff_event_id: stack.append((self._kickoff_event_id, "crew_kickoff_started")) - - # Find the task_started event for the in-progress task (skipped on resume) - for task in self.tasks: - if task.output is None: - task_id_str = str(task.id) - for node in state.event_record.nodes.values(): - if ( - node.event.type == "task_started" - and node.event.task_id == task_id_str - ): - stack.append((node.event.event_id, "task_started")) - break - break - restore_event_scope(tuple(stack)) # Restore last_event_id and emission counter from the record diff --git a/lib/crewai/src/crewai/events/event_context.py b/lib/crewai/src/crewai/events/event_context.py index bcb3de1a2..74e0d86dc 100644 --- a/lib/crewai/src/crewai/events/event_context.py +++ b/lib/crewai/src/crewai/events/event_context.py @@ -138,6 +138,36 @@ def restore_event_scope(stack: tuple[tuple[str, str], ...]) -> None: _event_id_stack.set(stack) +def resume_task_scope(task_id: str) -> bool: + """Push the latest recorded ``task_started`` scope for a task. + + Args: + task_id: The task identifier to look up in the active event record. + + Returns: + ``True`` if a prior scope was pushed; ``False`` otherwise. + """ + from crewai.events.event_bus import crewai_event_bus + + state = crewai_event_bus._runtime_state + if state is None: + return False + latest_event_id: str | None = None + latest_seq = -1 + for node in list(state.event_record.nodes.values()): + ev = node.event + if ev.type != "task_started" or ev.task_id != task_id: + continue + seq = ev.emission_sequence or 0 + if seq > latest_seq: + latest_seq = seq + latest_event_id = ev.event_id + if latest_event_id is None: + return False + push_event_scope(latest_event_id, "task_started") + return True + + def push_event_scope(event_id: str, event_type: str = "") -> None: """Push an event ID and type onto the scope stack.""" config = _event_context_config.get() or _default_config diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index b8b726b77..745233895 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -40,6 +40,7 @@ from crewai.agents.agent_builder.base_agent import BaseAgent, _resolve_agent from crewai.context import reset_current_task_id, set_current_task_id from crewai.core.providers.content_processor import process_content from crewai.events.event_bus import crewai_event_bus +from crewai.events.event_context import resume_task_scope from crewai.events.types.task_events import ( TaskCompletedEvent, TaskFailedEvent, @@ -661,7 +662,10 @@ class Task(BaseModel): tools = tools or self.tools or [] self.processed_by_agents.add(agent.role) - if not (agent.agent_executor and agent.agent_executor._resuming): + executor = agent.agent_executor + if not ( + executor and executor._resuming and resume_task_scope(str(self.id)) + ): crewai_event_bus.emit( self, TaskStartedEvent(context=context, task=self) ) @@ -783,7 +787,10 @@ class Task(BaseModel): tools = tools or self.tools or [] self.processed_by_agents.add(agent.role) - if not (agent.agent_executor and agent.agent_executor._resuming): + executor = agent.agent_executor + if not ( + executor and executor._resuming and resume_task_scope(str(self.id)) + ): crewai_event_bus.emit( self, TaskStartedEvent(context=context, task=self) ) diff --git a/lib/crewai/tests/events/test_event_context.py b/lib/crewai/tests/events/test_event_context.py index 2a69ca1ee..8f66d2edb 100644 --- a/lib/crewai/tests/events/test_event_context.py +++ b/lib/crewai/tests/events/test_event_context.py @@ -11,6 +11,7 @@ from crewai.events.event_context import ( MismatchBehavior, StackDepthExceededError, _event_context_config, + _event_id_stack, EventContextConfig, get_current_parent_id, get_enclosing_parent_id, @@ -21,6 +22,7 @@ from crewai.events.event_context import ( pop_event_scope, push_event_scope, reset_last_event_id, + resume_task_scope, set_last_event_id, set_triggering_event_id, triggered_by_scope, @@ -180,6 +182,91 @@ class TestTriggeredByScope: assert get_triggering_event_id() is None +class TestResumeTaskScope: + """Tests for the checkpoint-resume scope helper.""" + + @pytest.fixture(autouse=True) + def _reset_stack(self) -> None: + _event_id_stack.set(()) + + def _bind_runtime_state(self, *event_dicts: dict[str, object]): + from crewai.events import crewai_event_bus + from crewai.events.types.task_events import TaskStartedEvent + from crewai.state.event_record import EventRecord + from crewai.state.runtime import RuntimeState + + record = EventRecord() + for spec in event_dicts: + ev = TaskStartedEvent(context=None, task=None) + ev.task_id = spec["task_id"] # type: ignore[assignment] + ev.event_id = spec["event_id"] # type: ignore[assignment] + ev.emission_sequence = spec["emission_sequence"] # type: ignore[assignment] + record.add(ev) + state = RuntimeState(root=[]) + state._event_record = record + + previous = crewai_event_bus._runtime_state + crewai_event_bus._runtime_state = state + return crewai_event_bus, previous + + def test_returns_false_when_no_runtime_state(self) -> None: + from crewai.events import crewai_event_bus + + previous = crewai_event_bus._runtime_state + crewai_event_bus._runtime_state = None + try: + assert resume_task_scope("any-task") is False + assert _event_id_stack.get() == () + finally: + crewai_event_bus._runtime_state = previous + + def test_returns_false_when_no_matching_event(self) -> None: + bus, previous = self._bind_runtime_state( + {"task_id": "other", "event_id": "e1", "emission_sequence": 1}, + ) + try: + assert resume_task_scope("missing") is False + assert _event_id_stack.get() == () + finally: + bus._runtime_state = previous + + def test_pushes_latest_event_for_task(self) -> None: + bus, previous = self._bind_runtime_state( + {"task_id": "t1", "event_id": "e1", "emission_sequence": 1}, + {"task_id": "t1", "event_id": "e2", "emission_sequence": 5}, + {"task_id": "t1", "event_id": "e3", "emission_sequence": 3}, + {"task_id": "t2", "event_id": "x1", "emission_sequence": 9}, + ) + try: + assert resume_task_scope("t1") is True + assert _event_id_stack.get() == (("e2", "task_started"),) + finally: + bus._runtime_state = previous + + def test_pairs_cleanly_with_task_completed(self) -> None: + """The pushed scope must be popped by a matching task_completed.""" + from crewai.events import crewai_event_bus + from crewai.events.types.task_events import TaskCompletedEvent + from crewai.tasks.task_output import TaskOutput + + push_event_scope("kickoff-1", "crew_kickoff_started") + bus, previous = self._bind_runtime_state( + {"task_id": "t1", "event_id": "started-1", "emission_sequence": 1}, + ) + try: + assert resume_task_scope("t1") is True + output = TaskOutput(description="d", raw="r", agent="a") + completed = TaskCompletedEvent(output=output, task=None) + completed.task_id = "t1" + crewai_event_bus.emit(None, completed) + crewai_event_bus.flush() + assert _event_id_stack.get() == (("kickoff-1", "crew_kickoff_started"),) + assert completed.started_event_id == "started-1" + finally: + bus._runtime_state = previous + _event_id_stack.set(()) + + def test_agent_scope_preserved_after_tool_error_event() -> None: from crewai.events import crewai_event_bus from crewai.events.types.tool_usage_events import (