refactor: move RuntimeState to runtime_state.py, type _runtime_state on event bus

This commit is contained in:
Greyson LaLonde
2026-04-03 20:36:57 +08:00
parent 2e1525f69a
commit 1ed6646eae
4 changed files with 87 additions and 75 deletions

View File

@@ -16,7 +16,6 @@ from crewai.knowledge.knowledge import Knowledge
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.process import Process
from crewai.runtime_state import _entity_discriminator
from crewai.task import Task
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.tasks.task_output import TaskOutput
@@ -163,6 +162,8 @@ try:
**sys.modules[_BaseAgent.__module__].__dict__,
}
import crewai.runtime_state as _runtime_state_mod
for _mod_name in (
_BaseAgent.__module__,
Agent.__module__,
@@ -170,6 +171,7 @@ try:
Flow.__module__,
Task.__module__,
"crewai.agents.crew_agent_executor",
_runtime_state_mod.__name__,
_AgentExecutor.__module__,
):
sys.modules[_mod_name].__dict__.update(_resolve_namespace)
@@ -189,7 +191,9 @@ try:
from typing import Annotated
from pydantic import Discriminator, RootModel, Tag
from pydantic import Discriminator, Tag
from crewai.runtime_state import RuntimeState, _entity_discriminator
Entity = Annotated[
Annotated[Flow, Tag("flow")] # type: ignore[type-arg]
@@ -198,58 +202,10 @@ try:
Discriminator(_entity_discriminator),
]
def _sync_checkpoint_fields(entity: object) -> None:
"""Copy private runtime attrs into checkpoint fields before serializing."""
if isinstance(entity, Flow):
entity.checkpoint_completed_methods = (
set(entity._completed_methods) if entity._completed_methods else None
)
entity.checkpoint_method_outputs = (
list(entity._method_outputs) if entity._method_outputs else None
)
entity.checkpoint_method_counts = (
{str(k): v for k, v in entity._method_execution_counts.items()}
if entity._method_execution_counts
else None
)
entity.checkpoint_state = (
entity._copy_and_serialize_state()
if entity._state is not None
else None
)
if isinstance(entity, Crew):
entity.checkpoint_inputs = entity._inputs
entity.checkpoint_train = entity._train
entity.checkpoint_kickoff_event_id = entity._kickoff_event_id
class RuntimeState(RootModel[list[Entity]]):
def checkpoint(self, directory: str) -> str:
"""Write a checkpoint file to the directory.
Args:
directory: Directory to write checkpoint files into.
Returns:
The path of the written file.
"""
from datetime import datetime, timezone
from pathlib import Path as _Path
import uuid
from crewai.context import capture_execution_context
for entity in self.root:
entity.execution_context = capture_execution_context()
_sync_checkpoint_fields(entity)
dir_path = _Path(directory)
dir_path.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")
filename = f"{ts}_{uuid.uuid4().hex[:8]}.json"
file_path = dir_path / filename
file_path.write_text(self.model_dump_json())
return str(file_path)
RuntimeState.model_rebuild(
force=True,
_types_namespace={**_full_namespace, "Entity": Entity},
)
try:
Agent.model_rebuild(force=True, _types_namespace=_full_namespace)

View File

@@ -5,6 +5,8 @@ of events throughout the CrewAI system, supporting both synchronous and asynchro
event handlers with optional dependency management.
"""
from __future__ import annotations
import asyncio
import atexit
from collections.abc import Callable, Generator
@@ -13,10 +15,14 @@ from contextlib import contextmanager
import contextvars
import inspect
import threading
from typing import Any, Final, ParamSpec, TypeVar
from typing import TYPE_CHECKING, Any, Final, ParamSpec, TypeVar
from typing_extensions import Self
if TYPE_CHECKING:
from crewai.runtime_state import RuntimeState
from crewai.events.base_events import BaseEvent, get_next_emission_sequence
from crewai.events.depends import Depends
from crewai.events.event_context import (
@@ -88,7 +94,7 @@ class CrewAIEventsBus:
_futures_lock: threading.Lock
_executor_initialized: bool
_has_pending_events: bool
_runtime_state: Any
_runtime_state: RuntimeState | None
def __new__(cls) -> Self:
"""Create or return the singleton instance.
@@ -124,7 +130,7 @@ class CrewAIEventsBus:
# Lazy initialization flags - executor and loop created on first emit
self._executor_initialized = False
self._has_pending_events = False
self._runtime_state: Any = None
self._runtime_state: RuntimeState | None = None
self._registered_entity_ids: set[int] = set()
def _ensure_executor_initialized(self) -> None:
@@ -213,25 +219,16 @@ class CrewAIEventsBus:
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""Decorator to register an event handler for a specific event type.
Handlers can accept 2 or 3 arguments:
- ``(source, event)`` — standard handler
- ``(source, event, state: RuntimeState)`` — handler with runtime state
Args:
event_type: The event class to listen for
depends_on: Optional dependency or list of dependencies. Handlers with
dependencies will execute after their dependencies complete.
depends_on: Optional dependency or list of dependencies.
Returns:
Decorator function that registers the handler
Example:
>>> from crewai.events import crewai_event_bus, Depends
>>> from crewai.events.types.llm_events import LLMCallStartedEvent
>>>
>>> @crewai_event_bus.on(LLMCallStartedEvent)
>>> def setup_context(source, event):
... print("Setting up context")
>>>
>>> @crewai_event_bus.on(LLMCallStartedEvent, depends_on=Depends(setup_context))
>>> def process(source, event):
... print("Processing (runs after setup_context)")
"""
def decorator(handler: Callable[P, R]) -> Callable[P, R]:
@@ -252,7 +249,7 @@ class CrewAIEventsBus:
return decorator
def set_runtime_state(self, state: Any) -> None:
def set_runtime_state(self, state: RuntimeState) -> None:
"""Set the RuntimeState that will be passed to event handlers."""
with self._instance_lock:
self._runtime_state = state

View File

@@ -3,11 +3,22 @@
``RuntimeState`` is a ``RootModel`` whose ``model_dump_json()`` produces a
complete, self-contained snapshot of every active entity in the program.
The ``Entity`` type alias and ``RuntimeState`` model are built at import time
in ``crewai/__init__.py`` after all forward references are resolved.
The ``Entity`` type is resolved at import time in ``crewai/__init__.py``
via ``RuntimeState.model_rebuild()``.
"""
from typing import Any
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any
import uuid
from pydantic import RootModel
if TYPE_CHECKING:
pass
def _entity_discriminator(v: dict[str, Any] | object) -> str:
@@ -16,3 +27,50 @@ def _entity_discriminator(v: dict[str, Any] | object) -> str:
else:
raw = getattr(v, "entity_type", "agent")
return str(raw)
def _sync_checkpoint_fields(entity: object) -> None:
"""Copy private runtime attrs into checkpoint fields before serializing."""
from crewai.crew import Crew
from crewai.flow.flow import Flow
if isinstance(entity, Flow):
entity.checkpoint_completed_methods = (
set(entity._completed_methods) if entity._completed_methods else None
)
entity.checkpoint_method_outputs = (
list(entity._method_outputs) if entity._method_outputs else None
)
entity.checkpoint_method_counts = (
{str(k): v for k, v in entity._method_execution_counts.items()}
if entity._method_execution_counts
else None
)
entity.checkpoint_state = (
entity._copy_and_serialize_state() if entity._state is not None else None
)
if isinstance(entity, Crew):
entity.checkpoint_inputs = entity._inputs
entity.checkpoint_train = entity._train
entity.checkpoint_kickoff_event_id = entity._kickoff_event_id
class RuntimeState(RootModel): # type: ignore[type-arg]
root: list[Entity] # type: ignore[name-defined] # noqa: F821
def checkpoint(self, directory: str) -> str:
"""Write a checkpoint file to the directory."""
from crewai.context import capture_execution_context
for entity in self.root:
entity.execution_context = capture_execution_context()
_sync_checkpoint_fields(entity)
dir_path = Path(directory)
dir_path.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")
filename = f"{ts}_{uuid.uuid4().hex[:8]}.json"
file_path = dir_path / filename
file_path.write_text(self.model_dump_json())
return str(file_path)

View File

@@ -2141,6 +2141,7 @@ def test_task_same_callback_both_on_task_and_crew():
@pytest.mark.vcr()
def test_tools_with_custom_caching():
@tool
def multiplcation_tool(first_number: int, second_number: int) -> int:
"""Useful for when you need to multiply two numbers together."""