From 1ed6646eaefc6e6e37aef9a1a75b8003a8027344 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Fri, 3 Apr 2026 20:36:57 +0800 Subject: [PATCH] refactor: move RuntimeState to runtime_state.py, type _runtime_state on event bus --- lib/crewai/src/crewai/__init__.py | 64 ++++------------------- lib/crewai/src/crewai/events/event_bus.py | 33 ++++++------ lib/crewai/src/crewai/runtime_state.py | 64 +++++++++++++++++++++-- lib/crewai/tests/test_crew.py | 1 + 4 files changed, 87 insertions(+), 75 deletions(-) diff --git a/lib/crewai/src/crewai/__init__.py b/lib/crewai/src/crewai/__init__.py index 93cd1db3d..fec6f5f7b 100644 --- a/lib/crewai/src/crewai/__init__.py +++ b/lib/crewai/src/crewai/__init__.py @@ -16,7 +16,6 @@ from crewai.knowledge.knowledge import Knowledge from crewai.llm import LLM from crewai.llms.base_llm import BaseLLM from crewai.process import Process -from crewai.runtime_state import _entity_discriminator from crewai.task import Task from crewai.tasks.llm_guardrail import LLMGuardrail from crewai.tasks.task_output import TaskOutput @@ -163,6 +162,8 @@ try: **sys.modules[_BaseAgent.__module__].__dict__, } + import crewai.runtime_state as _runtime_state_mod + for _mod_name in ( _BaseAgent.__module__, Agent.__module__, @@ -170,6 +171,7 @@ try: Flow.__module__, Task.__module__, "crewai.agents.crew_agent_executor", + _runtime_state_mod.__name__, _AgentExecutor.__module__, ): sys.modules[_mod_name].__dict__.update(_resolve_namespace) @@ -189,7 +191,9 @@ try: from typing import Annotated - from pydantic import Discriminator, RootModel, Tag + from pydantic import Discriminator, Tag + + from crewai.runtime_state import RuntimeState, _entity_discriminator Entity = Annotated[ Annotated[Flow, Tag("flow")] # type: ignore[type-arg] @@ -198,58 +202,10 @@ try: Discriminator(_entity_discriminator), ] - def _sync_checkpoint_fields(entity: object) -> None: - """Copy private runtime attrs into checkpoint fields before serializing.""" - if isinstance(entity, Flow): - entity.checkpoint_completed_methods = ( - set(entity._completed_methods) if entity._completed_methods else None - ) - entity.checkpoint_method_outputs = ( - list(entity._method_outputs) if entity._method_outputs else None - ) - entity.checkpoint_method_counts = ( - {str(k): v for k, v in entity._method_execution_counts.items()} - if entity._method_execution_counts - else None - ) - entity.checkpoint_state = ( - entity._copy_and_serialize_state() - if entity._state is not None - else None - ) - if isinstance(entity, Crew): - entity.checkpoint_inputs = entity._inputs - entity.checkpoint_train = entity._train - entity.checkpoint_kickoff_event_id = entity._kickoff_event_id - - class RuntimeState(RootModel[list[Entity]]): - def checkpoint(self, directory: str) -> str: - """Write a checkpoint file to the directory. - - Args: - directory: Directory to write checkpoint files into. - - Returns: - The path of the written file. - """ - from datetime import datetime, timezone - from pathlib import Path as _Path - import uuid - - from crewai.context import capture_execution_context - - for entity in self.root: - entity.execution_context = capture_execution_context() - _sync_checkpoint_fields(entity) - - dir_path = _Path(directory) - dir_path.mkdir(parents=True, exist_ok=True) - - ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") - filename = f"{ts}_{uuid.uuid4().hex[:8]}.json" - file_path = dir_path / filename - file_path.write_text(self.model_dump_json()) - return str(file_path) + RuntimeState.model_rebuild( + force=True, + _types_namespace={**_full_namespace, "Entity": Entity}, + ) try: Agent.model_rebuild(force=True, _types_namespace=_full_namespace) diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index 334138ba8..045b5bd60 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -5,6 +5,8 @@ of events throughout the CrewAI system, supporting both synchronous and asynchro event handlers with optional dependency management. """ +from __future__ import annotations + import asyncio import atexit from collections.abc import Callable, Generator @@ -13,10 +15,14 @@ from contextlib import contextmanager import contextvars import inspect import threading -from typing import Any, Final, ParamSpec, TypeVar +from typing import TYPE_CHECKING, Any, Final, ParamSpec, TypeVar from typing_extensions import Self + +if TYPE_CHECKING: + from crewai.runtime_state import RuntimeState + from crewai.events.base_events import BaseEvent, get_next_emission_sequence from crewai.events.depends import Depends from crewai.events.event_context import ( @@ -88,7 +94,7 @@ class CrewAIEventsBus: _futures_lock: threading.Lock _executor_initialized: bool _has_pending_events: bool - _runtime_state: Any + _runtime_state: RuntimeState | None def __new__(cls) -> Self: """Create or return the singleton instance. @@ -124,7 +130,7 @@ class CrewAIEventsBus: # Lazy initialization flags - executor and loop created on first emit self._executor_initialized = False self._has_pending_events = False - self._runtime_state: Any = None + self._runtime_state: RuntimeState | None = None self._registered_entity_ids: set[int] = set() def _ensure_executor_initialized(self) -> None: @@ -213,25 +219,16 @@ class CrewAIEventsBus: ) -> Callable[[Callable[P, R]], Callable[P, R]]: """Decorator to register an event handler for a specific event type. + Handlers can accept 2 or 3 arguments: + - ``(source, event)`` — standard handler + - ``(source, event, state: RuntimeState)`` — handler with runtime state + Args: event_type: The event class to listen for - depends_on: Optional dependency or list of dependencies. Handlers with - dependencies will execute after their dependencies complete. + depends_on: Optional dependency or list of dependencies. Returns: Decorator function that registers the handler - - Example: - >>> from crewai.events import crewai_event_bus, Depends - >>> from crewai.events.types.llm_events import LLMCallStartedEvent - >>> - >>> @crewai_event_bus.on(LLMCallStartedEvent) - >>> def setup_context(source, event): - ... print("Setting up context") - >>> - >>> @crewai_event_bus.on(LLMCallStartedEvent, depends_on=Depends(setup_context)) - >>> def process(source, event): - ... print("Processing (runs after setup_context)") """ def decorator(handler: Callable[P, R]) -> Callable[P, R]: @@ -252,7 +249,7 @@ class CrewAIEventsBus: return decorator - def set_runtime_state(self, state: Any) -> None: + def set_runtime_state(self, state: RuntimeState) -> None: """Set the RuntimeState that will be passed to event handlers.""" with self._instance_lock: self._runtime_state = state diff --git a/lib/crewai/src/crewai/runtime_state.py b/lib/crewai/src/crewai/runtime_state.py index 5e0079ae2..0ceff2b85 100644 --- a/lib/crewai/src/crewai/runtime_state.py +++ b/lib/crewai/src/crewai/runtime_state.py @@ -3,11 +3,22 @@ ``RuntimeState`` is a ``RootModel`` whose ``model_dump_json()`` produces a complete, self-contained snapshot of every active entity in the program. -The ``Entity`` type alias and ``RuntimeState`` model are built at import time -in ``crewai/__init__.py`` after all forward references are resolved. +The ``Entity`` type is resolved at import time in ``crewai/__init__.py`` +via ``RuntimeState.model_rebuild()``. """ -from typing import Any +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from typing import TYPE_CHECKING, Any +import uuid + +from pydantic import RootModel + + +if TYPE_CHECKING: + pass def _entity_discriminator(v: dict[str, Any] | object) -> str: @@ -16,3 +27,50 @@ def _entity_discriminator(v: dict[str, Any] | object) -> str: else: raw = getattr(v, "entity_type", "agent") return str(raw) + + +def _sync_checkpoint_fields(entity: object) -> None: + """Copy private runtime attrs into checkpoint fields before serializing.""" + from crewai.crew import Crew + from crewai.flow.flow import Flow + + if isinstance(entity, Flow): + entity.checkpoint_completed_methods = ( + set(entity._completed_methods) if entity._completed_methods else None + ) + entity.checkpoint_method_outputs = ( + list(entity._method_outputs) if entity._method_outputs else None + ) + entity.checkpoint_method_counts = ( + {str(k): v for k, v in entity._method_execution_counts.items()} + if entity._method_execution_counts + else None + ) + entity.checkpoint_state = ( + entity._copy_and_serialize_state() if entity._state is not None else None + ) + if isinstance(entity, Crew): + entity.checkpoint_inputs = entity._inputs + entity.checkpoint_train = entity._train + entity.checkpoint_kickoff_event_id = entity._kickoff_event_id + + +class RuntimeState(RootModel): # type: ignore[type-arg] + root: list[Entity] # type: ignore[name-defined] # noqa: F821 + + def checkpoint(self, directory: str) -> str: + """Write a checkpoint file to the directory.""" + from crewai.context import capture_execution_context + + for entity in self.root: + entity.execution_context = capture_execution_context() + _sync_checkpoint_fields(entity) + + dir_path = Path(directory) + dir_path.mkdir(parents=True, exist_ok=True) + + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") + filename = f"{ts}_{uuid.uuid4().hex[:8]}.json" + file_path = dir_path / filename + file_path.write_text(self.model_dump_json()) + return str(file_path) diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index f941a7965..9621a1f0d 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -2141,6 +2141,7 @@ def test_task_same_callback_both_on_task_and_crew(): @pytest.mark.vcr() def test_tools_with_custom_caching(): + @tool def multiplcation_tool(first_number: int, second_number: int) -> int: """Useful for when you need to multiply two numbers together."""