Merge branch 'main' into docs/checkpointing-restructure

This commit is contained in:
Greyson LaLonde
2026-05-23 01:20:35 +08:00
committed by GitHub
4 changed files with 138 additions and 26 deletions

View File

@@ -443,16 +443,20 @@ class Crew(FlowTrackable, BaseModel):
if node.event.type == "task_started" and node.event.task_id:
started_task_ids.add(node.event.task_id)
is_hierarchical = self.process == Process.hierarchical
resuming_task_agent_roles: set[str] = set()
for task in self.tasks:
if (
task.output is None
and task.agent is not None
and str(task.id) in started_task_ids
):
resuming_task_agent_roles.add(task.agent.role)
if task.output is not None or str(task.id) not in started_task_ids:
continue
executing_agent = self.manager_agent if is_hierarchical else task.agent
if executing_agent is not None:
resuming_task_agent_roles.add(executing_agent.role)
for agent in self.agents:
candidate_agents: list[BaseAgent] = list(self.agents)
if self.manager_agent is not None:
candidate_agents.append(self.manager_agent)
for agent in candidate_agents:
agent.crew = self
executor = agent.agent_executor
if (
@@ -467,7 +471,7 @@ class Crew(FlowTrackable, BaseModel):
agent.agent_executor = None
for task in self.tasks:
if task.agent is not None:
for agent in self.agents:
for agent in candidate_agents:
if agent.role == task.agent.role:
task.agent = agent
if agent.agent_executor is not None and task.output is None:
@@ -536,25 +540,9 @@ class Crew(FlowTrackable, BaseModel):
if state is None:
return
# Restore crew scope and the in-progress task scope. Inner scopes
# (agent, llm, tool) are re-created by the executor on resume.
stack: list[tuple[str, str]] = []
if self._kickoff_event_id:
stack.append((self._kickoff_event_id, "crew_kickoff_started"))
# Find the task_started event for the in-progress task (skipped on resume)
for task in self.tasks:
if task.output is None:
task_id_str = str(task.id)
for node in state.event_record.nodes.values():
if (
node.event.type == "task_started"
and node.event.task_id == task_id_str
):
stack.append((node.event.event_id, "task_started"))
break
break
restore_event_scope(tuple(stack))
# Restore last_event_id and emission counter from the record

View File

@@ -138,6 +138,36 @@ def restore_event_scope(stack: tuple[tuple[str, str], ...]) -> None:
_event_id_stack.set(stack)
def resume_task_scope(task_id: str) -> bool:
"""Push the latest recorded ``task_started`` scope for a task.
Args:
task_id: The task identifier to look up in the active event record.
Returns:
``True`` if a prior scope was pushed; ``False`` otherwise.
"""
from crewai.events.event_bus import crewai_event_bus
state = crewai_event_bus._runtime_state
if state is None:
return False
latest_event_id: str | None = None
latest_seq = -1
for node in list(state.event_record.nodes.values()):
ev = node.event
if ev.type != "task_started" or ev.task_id != task_id:
continue
seq = ev.emission_sequence or 0
if seq > latest_seq:
latest_seq = seq
latest_event_id = ev.event_id
if latest_event_id is None:
return False
push_event_scope(latest_event_id, "task_started")
return True
def push_event_scope(event_id: str, event_type: str = "") -> None:
"""Push an event ID and type onto the scope stack."""
config = _event_context_config.get() or _default_config

View File

@@ -40,6 +40,7 @@ from crewai.agents.agent_builder.base_agent import BaseAgent, _resolve_agent
from crewai.context import reset_current_task_id, set_current_task_id
from crewai.core.providers.content_processor import process_content
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import resume_task_scope
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
@@ -661,7 +662,10 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
if not (agent.agent_executor and agent.agent_executor._resuming):
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)
@@ -783,7 +787,10 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
if not (agent.agent_executor and agent.agent_executor._resuming):
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)

View File

@@ -11,6 +11,7 @@ from crewai.events.event_context import (
MismatchBehavior,
StackDepthExceededError,
_event_context_config,
_event_id_stack,
EventContextConfig,
get_current_parent_id,
get_enclosing_parent_id,
@@ -21,6 +22,7 @@ from crewai.events.event_context import (
pop_event_scope,
push_event_scope,
reset_last_event_id,
resume_task_scope,
set_last_event_id,
set_triggering_event_id,
triggered_by_scope,
@@ -180,6 +182,91 @@ class TestTriggeredByScope:
assert get_triggering_event_id() is None
class TestResumeTaskScope:
"""Tests for the checkpoint-resume scope helper."""
@pytest.fixture(autouse=True)
def _reset_stack(self) -> None:
_event_id_stack.set(())
def _bind_runtime_state(self, *event_dicts: dict[str, object]):
from crewai.events import crewai_event_bus
from crewai.events.types.task_events import TaskStartedEvent
from crewai.state.event_record import EventRecord
from crewai.state.runtime import RuntimeState
record = EventRecord()
for spec in event_dicts:
ev = TaskStartedEvent(context=None, task=None)
ev.task_id = spec["task_id"] # type: ignore[assignment]
ev.event_id = spec["event_id"] # type: ignore[assignment]
ev.emission_sequence = spec["emission_sequence"] # type: ignore[assignment]
record.add(ev)
state = RuntimeState(root=[])
state._event_record = record
previous = crewai_event_bus._runtime_state
crewai_event_bus._runtime_state = state
return crewai_event_bus, previous
def test_returns_false_when_no_runtime_state(self) -> None:
from crewai.events import crewai_event_bus
previous = crewai_event_bus._runtime_state
crewai_event_bus._runtime_state = None
try:
assert resume_task_scope("any-task") is False
assert _event_id_stack.get() == ()
finally:
crewai_event_bus._runtime_state = previous
def test_returns_false_when_no_matching_event(self) -> None:
bus, previous = self._bind_runtime_state(
{"task_id": "other", "event_id": "e1", "emission_sequence": 1},
)
try:
assert resume_task_scope("missing") is False
assert _event_id_stack.get() == ()
finally:
bus._runtime_state = previous
def test_pushes_latest_event_for_task(self) -> None:
bus, previous = self._bind_runtime_state(
{"task_id": "t1", "event_id": "e1", "emission_sequence": 1},
{"task_id": "t1", "event_id": "e2", "emission_sequence": 5},
{"task_id": "t1", "event_id": "e3", "emission_sequence": 3},
{"task_id": "t2", "event_id": "x1", "emission_sequence": 9},
)
try:
assert resume_task_scope("t1") is True
assert _event_id_stack.get() == (("e2", "task_started"),)
finally:
bus._runtime_state = previous
def test_pairs_cleanly_with_task_completed(self) -> None:
"""The pushed scope must be popped by a matching task_completed."""
from crewai.events import crewai_event_bus
from crewai.events.types.task_events import TaskCompletedEvent
from crewai.tasks.task_output import TaskOutput
push_event_scope("kickoff-1", "crew_kickoff_started")
bus, previous = self._bind_runtime_state(
{"task_id": "t1", "event_id": "started-1", "emission_sequence": 1},
)
try:
assert resume_task_scope("t1") is True
output = TaskOutput(description="d", raw="r", agent="a")
completed = TaskCompletedEvent(output=output, task=None)
completed.task_id = "t1"
crewai_event_bus.emit(None, completed)
crewai_event_bus.flush()
assert _event_id_stack.get() == (("kickoff-1", "crew_kickoff_started"),)
assert completed.started_event_id == "started-1"
finally:
bus._runtime_state = previous
_event_id_stack.set(())
def test_agent_scope_preserved_after_tool_error_event() -> None:
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (