From 167b609365dcd1728e698290d84f5fcaff0da232 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Sat, 4 Apr 2026 23:18:23 +0800 Subject: [PATCH] fix: restore event scope stack from checkpoint event record Replay the event record during _restore_runtime to rebuild _event_id_stack with correct event IDs. Remove manual push_event_scope calls from task and crew resume paths that used task UUIDs instead of event IDs. --- lib/crewai/src/crewai/crew.py | 33 +++++++++++++++++++ lib/crewai/src/crewai/crews/utils.py | 2 -- lib/crewai/src/crewai/events/event_context.py | 5 +++ lib/crewai/src/crewai/task.py | 9 ++--- 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 4390c338d..865613517 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -412,6 +412,39 @@ class Crew(FlowTrackable, BaseModel): if self.checkpoint_train is not None: self._train = self.checkpoint_train + self._restore_event_scope() + + def _restore_event_scope(self) -> None: + """Rebuild the event scope stack from the checkpoint's event record.""" + from crewai.events.event_bus import crewai_event_bus + from crewai.events.event_context import ( + SCOPE_ENDING_EVENTS, + SCOPE_STARTING_EVENTS, + restore_event_scope, + set_last_event_id, + ) + + state = crewai_event_bus._runtime_state + if state is None: + return + + stack: list[tuple[str, str]] = [] + last_event_id: str | None = None + for node in sorted( + state.event_record.nodes.values(), + key=lambda n: n.event.emission_sequence or 0, + ): + evt = node.event + last_event_id = evt.event_id + if evt.type in SCOPE_STARTING_EVENTS: + stack.append((evt.event_id, evt.type)) + elif evt.type in SCOPE_ENDING_EVENTS and stack: + stack.pop() + + restore_event_scope(tuple(stack)) + if last_event_id is not None: + set_last_event_id(last_event_id) + @field_validator("id", mode="before") @classmethod def _deny_user_set_id(cls, v: UUID4 | None, info: Any) -> UUID4 | None: diff --git a/lib/crewai/src/crewai/crews/utils.py b/lib/crewai/src/crewai/crews/utils.py index d82c297e6..20cecc3b4 100644 --- a/lib/crewai/src/crewai/crews/utils.py +++ b/lib/crewai/src/crewai/crews/utils.py @@ -279,7 +279,6 @@ def prepare_kickoff( from crewai.events.event_bus import crewai_event_bus from crewai.events.event_context import ( get_current_parent_id, - push_event_scope, reset_last_event_id, ) from crewai.events.types.crew_events import CrewKickoffStartedEvent @@ -305,7 +304,6 @@ def prepare_kickoff( normalized = before_callback(normalized) if resuming and crew._kickoff_event_id: - push_event_scope(crew._kickoff_event_id, "crew_kickoff_started") if crew.verbose: from crewai.events.utils.console_formatter import ConsoleFormatter diff --git a/lib/crewai/src/crewai/events/event_context.py b/lib/crewai/src/crewai/events/event_context.py index 672daf786..bcb3de1a2 100644 --- a/lib/crewai/src/crewai/events/event_context.py +++ b/lib/crewai/src/crewai/events/event_context.py @@ -133,6 +133,11 @@ def triggered_by_scope(event_id: str) -> Generator[None, None, None]: _triggering_event_id.set(previous) +def restore_event_scope(stack: tuple[tuple[str, str], ...]) -> None: + """Restore the event scope stack from a checkpoint.""" + _event_id_stack.set(stack) + + def push_event_scope(event_id: str, event_type: str = "") -> None: """Push an event ID and type onto the scope stack.""" config = _event_context_config.get() or _default_config diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index eff5a3b60..73e49ade9 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -39,7 +39,6 @@ from crewai.agents.agent_builder.base_agent import BaseAgent, _resolve_agent from crewai.context import reset_current_task_id, set_current_task_id from crewai.core.providers.content_processor import process_content from crewai.events.event_bus import crewai_event_bus -from crewai.events.event_context import push_event_scope from crewai.events.types.task_events import ( TaskCompletedEvent, TaskFailedEvent, @@ -599,9 +598,7 @@ class Task(BaseModel): tools = tools or self.tools or [] self.processed_by_agents.add(agent.role) - if agent.agent_executor and agent.agent_executor._resuming: - push_event_scope(str(self.id), "task_started") - else: + if not (agent.agent_executor and agent.agent_executor._resuming): crewai_event_bus.emit( self, TaskStartedEvent(context=context, task=self) ) @@ -723,9 +720,7 @@ class Task(BaseModel): tools = tools or self.tools or [] self.processed_by_agents.add(agent.role) - if agent.agent_executor and agent.agent_executor._resuming: - push_event_scope(str(self.id), "task_started") - else: + if not (agent.agent_executor and agent.agent_executor._resuming): crewai_event_bus.emit( self, TaskStartedEvent(context=context, task=self) )