feat: add emission sequence for event ordering

This commit is contained in:
Greyson LaLonde
2026-01-20 01:01:14 -05:00
parent ceef062426
commit decdefe8f5
2 changed files with 37 additions and 4 deletions

View File

@@ -1,9 +1,35 @@
from collections.abc import Iterator
from datetime import datetime, timezone
import itertools
from typing import Any
import uuid
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
from crewai.utilities.serialization import Serializable, to_serializable
_emission_counter: Iterator[int] = itertools.count(start=1)
def get_next_emission_sequence() -> int:
"""Get the next emission sequence number.
Thread-safe due to atomic next() on itertools.count under the GIL.
Returns:
The next sequence number.
"""
return next(_emission_counter)
def reset_emission_counter() -> None:
"""Reset the emission sequence counter to 1.
Useful for test isolation.
"""
global _emission_counter
_emission_counter = itertools.count(start=1)
class BaseEvent(BaseModel):
@@ -22,7 +48,11 @@ class BaseEvent(BaseModel):
agent_id: str | None = None
agent_role: str | None = None
def to_json(self, exclude: set[str] | None = None):
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
parent_event_id: str | None = None
emission_sequence: int | None = None
def to_json(self, exclude: set[str] | None = None) -> Serializable:
"""
Converts the event to a JSON-serializable dictionary.
@@ -34,13 +64,13 @@ class BaseEvent(BaseModel):
"""
return to_serializable(self, exclude=exclude)
def _set_task_params(self, data: dict[str, Any]):
def _set_task_params(self, data: dict[str, Any]) -> None:
if "from_task" in data and (task := data["from_task"]):
self.task_id = str(task.id)
self.task_name = task.name or task.description
self.from_task = None
def _set_agent_params(self, data: dict[str, Any]):
def _set_agent_params(self, data: dict[str, Any]) -> None:
task = data.get("from_task", None)
agent = task.agent if task else data.get("from_agent", None)

View File

@@ -15,5 +15,8 @@ class TraceEvent:
type: str = ""
event_data: dict[str, Any] = field(default_factory=dict)
emission_sequence: int | None = None
parent_event_id: str | None = None
def to_dict(self) -> dict[str, Any]:
return asdict(self)