mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-02 21:58:11 +00:00
fix(checkpoint): avoid orphan task_started on resume scope restore
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Move scope restoration from Crew-level global push to a per-task push inside Task via resume_task_scope() in event_context. Fixes orphan task_started warning, hierarchical resume (manager_agent now eligible for _resuming), and parallel async resume (each contextvars copy owns its own scope). Tests added.
This commit is contained in:
@@ -443,16 +443,20 @@ class Crew(FlowTrackable, BaseModel):
|
||||
if node.event.type == "task_started" and node.event.task_id:
|
||||
started_task_ids.add(node.event.task_id)
|
||||
|
||||
is_hierarchical = self.process == Process.hierarchical
|
||||
resuming_task_agent_roles: set[str] = set()
|
||||
for task in self.tasks:
|
||||
if (
|
||||
task.output is None
|
||||
and task.agent is not None
|
||||
and str(task.id) in started_task_ids
|
||||
):
|
||||
resuming_task_agent_roles.add(task.agent.role)
|
||||
if task.output is not None or str(task.id) not in started_task_ids:
|
||||
continue
|
||||
executing_agent = self.manager_agent if is_hierarchical else task.agent
|
||||
if executing_agent is not None:
|
||||
resuming_task_agent_roles.add(executing_agent.role)
|
||||
|
||||
for agent in self.agents:
|
||||
candidate_agents: list[BaseAgent] = list(self.agents)
|
||||
if self.manager_agent is not None:
|
||||
candidate_agents.append(self.manager_agent)
|
||||
|
||||
for agent in candidate_agents:
|
||||
agent.crew = self
|
||||
executor = agent.agent_executor
|
||||
if (
|
||||
@@ -467,7 +471,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
agent.agent_executor = None
|
||||
for task in self.tasks:
|
||||
if task.agent is not None:
|
||||
for agent in self.agents:
|
||||
for agent in candidate_agents:
|
||||
if agent.role == task.agent.role:
|
||||
task.agent = agent
|
||||
if agent.agent_executor is not None and task.output is None:
|
||||
@@ -536,25 +540,9 @@ class Crew(FlowTrackable, BaseModel):
|
||||
if state is None:
|
||||
return
|
||||
|
||||
# Restore crew scope and the in-progress task scope. Inner scopes
|
||||
# (agent, llm, tool) are re-created by the executor on resume.
|
||||
stack: list[tuple[str, str]] = []
|
||||
if self._kickoff_event_id:
|
||||
stack.append((self._kickoff_event_id, "crew_kickoff_started"))
|
||||
|
||||
# Find the task_started event for the in-progress task (skipped on resume)
|
||||
for task in self.tasks:
|
||||
if task.output is None:
|
||||
task_id_str = str(task.id)
|
||||
for node in state.event_record.nodes.values():
|
||||
if (
|
||||
node.event.type == "task_started"
|
||||
and node.event.task_id == task_id_str
|
||||
):
|
||||
stack.append((node.event.event_id, "task_started"))
|
||||
break
|
||||
break
|
||||
|
||||
restore_event_scope(tuple(stack))
|
||||
|
||||
# Restore last_event_id and emission counter from the record
|
||||
|
||||
@@ -138,6 +138,36 @@ def restore_event_scope(stack: tuple[tuple[str, str], ...]) -> None:
|
||||
_event_id_stack.set(stack)
|
||||
|
||||
|
||||
def resume_task_scope(task_id: str) -> bool:
|
||||
"""Push the latest recorded ``task_started`` scope for a task.
|
||||
|
||||
Args:
|
||||
task_id: The task identifier to look up in the active event record.
|
||||
|
||||
Returns:
|
||||
``True`` if a prior scope was pushed; ``False`` otherwise.
|
||||
"""
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
|
||||
state = crewai_event_bus._runtime_state
|
||||
if state is None:
|
||||
return False
|
||||
latest_event_id: str | None = None
|
||||
latest_seq = -1
|
||||
for node in list(state.event_record.nodes.values()):
|
||||
ev = node.event
|
||||
if ev.type != "task_started" or ev.task_id != task_id:
|
||||
continue
|
||||
seq = ev.emission_sequence or 0
|
||||
if seq > latest_seq:
|
||||
latest_seq = seq
|
||||
latest_event_id = ev.event_id
|
||||
if latest_event_id is None:
|
||||
return False
|
||||
push_event_scope(latest_event_id, "task_started")
|
||||
return True
|
||||
|
||||
|
||||
def push_event_scope(event_id: str, event_type: str = "") -> None:
|
||||
"""Push an event ID and type onto the scope stack."""
|
||||
config = _event_context_config.get() or _default_config
|
||||
|
||||
@@ -40,6 +40,7 @@ from crewai.agents.agent_builder.base_agent import BaseAgent, _resolve_agent
|
||||
from crewai.context import reset_current_task_id, set_current_task_id
|
||||
from crewai.core.providers.content_processor import process_content
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.event_context import resume_task_scope
|
||||
from crewai.events.types.task_events import (
|
||||
TaskCompletedEvent,
|
||||
TaskFailedEvent,
|
||||
@@ -661,7 +662,10 @@ class Task(BaseModel):
|
||||
tools = tools or self.tools or []
|
||||
|
||||
self.processed_by_agents.add(agent.role)
|
||||
if not (agent.agent_executor and agent.agent_executor._resuming):
|
||||
executor = agent.agent_executor
|
||||
if not (
|
||||
executor and executor._resuming and resume_task_scope(str(self.id))
|
||||
):
|
||||
crewai_event_bus.emit(
|
||||
self, TaskStartedEvent(context=context, task=self)
|
||||
)
|
||||
@@ -783,7 +787,10 @@ class Task(BaseModel):
|
||||
tools = tools or self.tools or []
|
||||
|
||||
self.processed_by_agents.add(agent.role)
|
||||
if not (agent.agent_executor and agent.agent_executor._resuming):
|
||||
executor = agent.agent_executor
|
||||
if not (
|
||||
executor and executor._resuming and resume_task_scope(str(self.id))
|
||||
):
|
||||
crewai_event_bus.emit(
|
||||
self, TaskStartedEvent(context=context, task=self)
|
||||
)
|
||||
|
||||
@@ -11,6 +11,7 @@ from crewai.events.event_context import (
|
||||
MismatchBehavior,
|
||||
StackDepthExceededError,
|
||||
_event_context_config,
|
||||
_event_id_stack,
|
||||
EventContextConfig,
|
||||
get_current_parent_id,
|
||||
get_enclosing_parent_id,
|
||||
@@ -21,6 +22,7 @@ from crewai.events.event_context import (
|
||||
pop_event_scope,
|
||||
push_event_scope,
|
||||
reset_last_event_id,
|
||||
resume_task_scope,
|
||||
set_last_event_id,
|
||||
set_triggering_event_id,
|
||||
triggered_by_scope,
|
||||
@@ -180,6 +182,91 @@ class TestTriggeredByScope:
|
||||
assert get_triggering_event_id() is None
|
||||
|
||||
|
||||
class TestResumeTaskScope:
|
||||
"""Tests for the checkpoint-resume scope helper."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_stack(self) -> None:
|
||||
_event_id_stack.set(())
|
||||
|
||||
def _bind_runtime_state(self, *event_dicts: dict[str, object]):
|
||||
from crewai.events import crewai_event_bus
|
||||
from crewai.events.types.task_events import TaskStartedEvent
|
||||
from crewai.state.event_record import EventRecord
|
||||
from crewai.state.runtime import RuntimeState
|
||||
|
||||
record = EventRecord()
|
||||
for spec in event_dicts:
|
||||
ev = TaskStartedEvent(context=None, task=None)
|
||||
ev.task_id = spec["task_id"] # type: ignore[assignment]
|
||||
ev.event_id = spec["event_id"] # type: ignore[assignment]
|
||||
ev.emission_sequence = spec["emission_sequence"] # type: ignore[assignment]
|
||||
record.add(ev)
|
||||
state = RuntimeState(root=[])
|
||||
state._event_record = record
|
||||
|
||||
previous = crewai_event_bus._runtime_state
|
||||
crewai_event_bus._runtime_state = state
|
||||
return crewai_event_bus, previous
|
||||
|
||||
def test_returns_false_when_no_runtime_state(self) -> None:
|
||||
from crewai.events import crewai_event_bus
|
||||
|
||||
previous = crewai_event_bus._runtime_state
|
||||
crewai_event_bus._runtime_state = None
|
||||
try:
|
||||
assert resume_task_scope("any-task") is False
|
||||
assert _event_id_stack.get() == ()
|
||||
finally:
|
||||
crewai_event_bus._runtime_state = previous
|
||||
|
||||
def test_returns_false_when_no_matching_event(self) -> None:
|
||||
bus, previous = self._bind_runtime_state(
|
||||
{"task_id": "other", "event_id": "e1", "emission_sequence": 1},
|
||||
)
|
||||
try:
|
||||
assert resume_task_scope("missing") is False
|
||||
assert _event_id_stack.get() == ()
|
||||
finally:
|
||||
bus._runtime_state = previous
|
||||
|
||||
def test_pushes_latest_event_for_task(self) -> None:
|
||||
bus, previous = self._bind_runtime_state(
|
||||
{"task_id": "t1", "event_id": "e1", "emission_sequence": 1},
|
||||
{"task_id": "t1", "event_id": "e2", "emission_sequence": 5},
|
||||
{"task_id": "t1", "event_id": "e3", "emission_sequence": 3},
|
||||
{"task_id": "t2", "event_id": "x1", "emission_sequence": 9},
|
||||
)
|
||||
try:
|
||||
assert resume_task_scope("t1") is True
|
||||
assert _event_id_stack.get() == (("e2", "task_started"),)
|
||||
finally:
|
||||
bus._runtime_state = previous
|
||||
|
||||
def test_pairs_cleanly_with_task_completed(self) -> None:
|
||||
"""The pushed scope must be popped by a matching task_completed."""
|
||||
from crewai.events import crewai_event_bus
|
||||
from crewai.events.types.task_events import TaskCompletedEvent
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
push_event_scope("kickoff-1", "crew_kickoff_started")
|
||||
bus, previous = self._bind_runtime_state(
|
||||
{"task_id": "t1", "event_id": "started-1", "emission_sequence": 1},
|
||||
)
|
||||
try:
|
||||
assert resume_task_scope("t1") is True
|
||||
output = TaskOutput(description="d", raw="r", agent="a")
|
||||
completed = TaskCompletedEvent(output=output, task=None)
|
||||
completed.task_id = "t1"
|
||||
crewai_event_bus.emit(None, completed)
|
||||
crewai_event_bus.flush()
|
||||
assert _event_id_stack.get() == (("kickoff-1", "crew_kickoff_started"),)
|
||||
assert completed.started_event_id == "started-1"
|
||||
finally:
|
||||
bus._runtime_state = previous
|
||||
_event_id_stack.set(())
|
||||
|
||||
|
||||
def test_agent_scope_preserved_after_tool_error_event() -> None:
|
||||
from crewai.events import crewai_event_bus
|
||||
from crewai.events.types.tool_usage_events import (
|
||||
|
||||
Reference in New Issue
Block a user