Fix LLMStreamChunkEvent chronological ordering issue

- Modified BaseEvent to set timestamp as Optional[datetime] = None instead of at creation time
- Updated crewai_event_bus.emit() to set timestamp at emission time if not already set
- Added comprehensive tests for chronological ordering of stream chunk events
- Added test for preserving manually set timestamps
- Added test for BaseEvent timestamp setting behavior

Fixes GitHub issue #3008 where streaming chunk events were emitted out of chronological order due to variable processing delays between event creation and emission.

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-06-13 23:27:09 +00:00
parent 7f12e98de5
commit 6d0158495b
3 changed files with 83 additions and 1 deletions

View File

@@ -9,7 +9,7 @@ from crewai.utilities.serialization import to_serializable
class BaseEvent(BaseModel): class BaseEvent(BaseModel):
"""Base class for all events""" """Base class for all events"""
timestamp: datetime = Field(default_factory=datetime.now) timestamp: Optional[datetime] = None
type: str type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew" source_type: Optional[str] = None # "agent", "task", "crew"

View File

@@ -1,5 +1,6 @@
import threading import threading
from contextlib import contextmanager from contextlib import contextmanager
from datetime import datetime
from typing import Any, Callable, Dict, List, Type, TypeVar, cast from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from blinker import Signal from blinker import Signal
@@ -67,6 +68,9 @@ class CrewAIEventsBus:
source: The object emitting the event source: The object emitting the event
event: The event instance to emit event: The event instance to emit
""" """
if event.timestamp is None:
event.timestamp = datetime.now()
for event_type, handlers in self._handlers.items(): for event_type, handlers in self._handlers.items():
if isinstance(event, event_type): if isinstance(event, event_type):
for handler in handlers: for handler in handlers:

View File

@@ -736,6 +736,84 @@ def test_streaming_fallback_to_non_streaming():
llm.call = original_call llm.call = original_call
def test_llm_stream_chunk_events_chronological_order():
"""Test that LLM stream chunk events are emitted in chronological order."""
import time
events_received = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
events_received.append((event.timestamp, event.chunk))
llm = LLM(model="gpt-4o", stream=True)
event1 = LLMStreamChunkEvent(chunk="first")
time.sleep(0.002)
event2 = LLMStreamChunkEvent(chunk="second")
time.sleep(0.001)
event3 = LLMStreamChunkEvent(chunk="third")
crewai_event_bus.emit(llm, event=event3)
crewai_event_bus.emit(llm, event=event1)
crewai_event_bus.emit(llm, event=event2)
assert len(events_received) == 3
timestamps = [event[0] for event in events_received]
assert timestamps == sorted(timestamps), "Events should be emitted in chronological order"
def test_llm_stream_chunk_events_preserve_manual_timestamps():
"""Test that manually set timestamps are preserved."""
from datetime import datetime, timedelta
events_received = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
events_received.append((event.timestamp, event.chunk))
llm = LLM(model="gpt-4o", stream=True)
base_time = datetime.now()
event1 = LLMStreamChunkEvent(chunk="first", timestamp=base_time)
event2 = LLMStreamChunkEvent(chunk="second", timestamp=base_time + timedelta(seconds=1))
crewai_event_bus.emit(llm, event=event1)
crewai_event_bus.emit(llm, event=event2)
assert len(events_received) == 2
assert events_received[0][0] == base_time
assert events_received[1][0] == base_time + timedelta(seconds=1)
def test_base_event_timestamp_set_on_emission():
"""Test that BaseEvent timestamp is set when event is emitted if not already set."""
from crewai.utilities.events.base_events import BaseEvent
events_received = []
class TestEvent(BaseEvent):
type: str = "test_event"
message: str
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(TestEvent)
def handle_test_event(source, event):
events_received.append(event)
event_without_timestamp = TestEvent(message="test")
assert event_without_timestamp.timestamp is None
crewai_event_bus.emit("test_source", event=event_without_timestamp)
assert len(events_received) == 1
assert events_received[0].timestamp is not None
assert isinstance(events_received[0].timestamp, datetime)
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])
def test_streaming_empty_response_handling(): def test_streaming_empty_response_handling():
"""Test that streaming handles empty responses correctly.""" """Test that streaming handles empty responses correctly."""