mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-08 20:18:16 +00:00
fix: restore event scope stack from checkpoint event record
Replay the event record during _restore_runtime to rebuild _event_id_stack with correct event IDs. Remove manual push_event_scope calls from task and crew resume paths that used task UUIDs instead of event IDs.
This commit is contained in:
@@ -412,6 +412,39 @@ class Crew(FlowTrackable, BaseModel):
|
||||
if self.checkpoint_train is not None:
|
||||
self._train = self.checkpoint_train
|
||||
|
||||
self._restore_event_scope()
|
||||
|
||||
def _restore_event_scope(self) -> None:
|
||||
"""Rebuild the event scope stack from the checkpoint's event record."""
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.event_context import (
|
||||
SCOPE_ENDING_EVENTS,
|
||||
SCOPE_STARTING_EVENTS,
|
||||
restore_event_scope,
|
||||
set_last_event_id,
|
||||
)
|
||||
|
||||
state = crewai_event_bus._runtime_state
|
||||
if state is None:
|
||||
return
|
||||
|
||||
stack: list[tuple[str, str]] = []
|
||||
last_event_id: str | None = None
|
||||
for node in sorted(
|
||||
state.event_record.nodes.values(),
|
||||
key=lambda n: n.event.emission_sequence or 0,
|
||||
):
|
||||
evt = node.event
|
||||
last_event_id = evt.event_id
|
||||
if evt.type in SCOPE_STARTING_EVENTS:
|
||||
stack.append((evt.event_id, evt.type))
|
||||
elif evt.type in SCOPE_ENDING_EVENTS and stack:
|
||||
stack.pop()
|
||||
|
||||
restore_event_scope(tuple(stack))
|
||||
if last_event_id is not None:
|
||||
set_last_event_id(last_event_id)
|
||||
|
||||
@field_validator("id", mode="before")
|
||||
@classmethod
|
||||
def _deny_user_set_id(cls, v: UUID4 | None, info: Any) -> UUID4 | None:
|
||||
|
||||
@@ -279,7 +279,6 @@ def prepare_kickoff(
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.event_context import (
|
||||
get_current_parent_id,
|
||||
push_event_scope,
|
||||
reset_last_event_id,
|
||||
)
|
||||
from crewai.events.types.crew_events import CrewKickoffStartedEvent
|
||||
@@ -305,7 +304,6 @@ def prepare_kickoff(
|
||||
normalized = before_callback(normalized)
|
||||
|
||||
if resuming and crew._kickoff_event_id:
|
||||
push_event_scope(crew._kickoff_event_id, "crew_kickoff_started")
|
||||
if crew.verbose:
|
||||
from crewai.events.utils.console_formatter import ConsoleFormatter
|
||||
|
||||
|
||||
@@ -133,6 +133,11 @@ def triggered_by_scope(event_id: str) -> Generator[None, None, None]:
|
||||
_triggering_event_id.set(previous)
|
||||
|
||||
|
||||
def restore_event_scope(stack: tuple[tuple[str, str], ...]) -> None:
|
||||
"""Restore the event scope stack from a checkpoint."""
|
||||
_event_id_stack.set(stack)
|
||||
|
||||
|
||||
def push_event_scope(event_id: str, event_type: str = "") -> None:
|
||||
"""Push an event ID and type onto the scope stack."""
|
||||
config = _event_context_config.get() or _default_config
|
||||
|
||||
@@ -39,7 +39,6 @@ from crewai.agents.agent_builder.base_agent import BaseAgent, _resolve_agent
|
||||
from crewai.context import reset_current_task_id, set_current_task_id
|
||||
from crewai.core.providers.content_processor import process_content
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.event_context import push_event_scope
|
||||
from crewai.events.types.task_events import (
|
||||
TaskCompletedEvent,
|
||||
TaskFailedEvent,
|
||||
@@ -599,9 +598,7 @@ class Task(BaseModel):
|
||||
tools = tools or self.tools or []
|
||||
|
||||
self.processed_by_agents.add(agent.role)
|
||||
if agent.agent_executor and agent.agent_executor._resuming:
|
||||
push_event_scope(str(self.id), "task_started")
|
||||
else:
|
||||
if not (agent.agent_executor and agent.agent_executor._resuming):
|
||||
crewai_event_bus.emit(
|
||||
self, TaskStartedEvent(context=context, task=self)
|
||||
)
|
||||
@@ -723,9 +720,7 @@ class Task(BaseModel):
|
||||
tools = tools or self.tools or []
|
||||
|
||||
self.processed_by_agents.add(agent.role)
|
||||
if agent.agent_executor and agent.agent_executor._resuming:
|
||||
push_event_scope(str(self.id), "task_started")
|
||||
else:
|
||||
if not (agent.agent_executor and agent.agent_executor._resuming):
|
||||
crewai_event_bus.emit(
|
||||
self, TaskStartedEvent(context=context, task=self)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user