Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
aa8d506661 Address code review feedback: improve documentation and UTC timezone handling
- Added Field description to BaseEvent timestamp for better clarity
- Updated crewai_event_bus.emit() to use UTC timezone for consistency
- Improved test reliability by removing time.sleep() dependencies
- Enhanced documentation for emit() method with detailed docstring
- Added comprehensive type checks and UTC timezone assertions in tests
- Improved error messages and edge case coverage in test assertions

Addresses feedback from PR review by joaomdmoura

Co-Authored-By: João <joao@crewai.com>
2025-06-13 23:31:56 +00:00
Devin AI
6d0158495b Fix LLMStreamChunkEvent chronological ordering issue
- Modified BaseEvent to set timestamp as Optional[datetime] = None instead of at creation time
- Updated crewai_event_bus.emit() to set timestamp at emission time if not already set
- Added comprehensive tests for chronological ordering of stream chunk events
- Added test for preserving manually set timestamps
- Added test for BaseEvent timestamp setting behavior

Fixes GitHub issue #3008 where streaming chunk events were emitted out of chronological order due to variable processing delays between event creation and emission.

Co-Authored-By: João <joao@crewai.com>
2025-06-13 23:27:09 +00:00
3 changed files with 100 additions and 2 deletions

View File

@@ -9,7 +9,10 @@ from crewai.utilities.serialization import to_serializable
class BaseEvent(BaseModel):
"""Base class for all events"""
timestamp: datetime = Field(default_factory=datetime.now)
timestamp: Optional[datetime] = Field(
default=None,
description="Event timestamp. Set automatically on emission if not provided."
)
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew"

View File

@@ -1,5 +1,6 @@
import threading
from contextlib import contextmanager
from datetime import datetime, timezone
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from blinker import Signal
@@ -61,12 +62,18 @@ class CrewAIEventsBus:
def emit(self, source: Any, event: BaseEvent) -> None:
"""
Emit an event to all registered handlers
Emit an event to all registered handlers.
Sets the event timestamp automatically if not already provided to ensure
chronological ordering of events.
Args:
source: The object emitting the event
event: The event instance to emit
"""
if event.timestamp is None:
event.timestamp = datetime.now(timezone.utc)
for event_type, handlers in self._handlers.items():
if isinstance(event, event_type):
for handler in handlers:

View File

@@ -736,6 +736,94 @@ def test_streaming_fallback_to_non_streaming():
llm.call = original_call
def test_llm_stream_chunk_events_chronological_order():
"""Test that LLM stream chunk events are emitted in chronological order."""
from datetime import datetime, timezone, timedelta
events_received = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
events_received.append((event.timestamp, event.chunk))
assert isinstance(event.timestamp, datetime), "Event timestamp should be datetime object"
llm = LLM(model="gpt-4o", stream=True)
event1 = LLMStreamChunkEvent(chunk="first")
event2 = LLMStreamChunkEvent(chunk="second")
event3 = LLMStreamChunkEvent(chunk="third")
assert event1.timestamp is None, "Event should have no timestamp before emission"
assert event2.timestamp is None, "Event should have no timestamp before emission"
assert event3.timestamp is None, "Event should have no timestamp before emission"
crewai_event_bus.emit(llm, event=event3)
crewai_event_bus.emit(llm, event=event1)
crewai_event_bus.emit(llm, event=event2)
assert len(events_received) == 3
timestamps = [event[0] for event in events_received]
assert timestamps == sorted(timestamps), "Events should be emitted in chronological order"
for timestamp in timestamps:
assert timestamp.tzinfo == timezone.utc, "Timestamps should be in UTC"
def test_llm_stream_chunk_events_preserve_manual_timestamps():
"""Test that manually set timestamps are preserved."""
from datetime import datetime, timezone, timedelta
events_received = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
events_received.append((event.timestamp, event.chunk))
assert isinstance(event.timestamp, datetime), "Event timestamp should be datetime object"
llm = LLM(model="gpt-4o", stream=True)
base_time = datetime.now(timezone.utc)
event1 = LLMStreamChunkEvent(chunk="first", timestamp=base_time)
event2 = LLMStreamChunkEvent(chunk="second", timestamp=base_time + timedelta(seconds=1))
crewai_event_bus.emit(llm, event=event1)
crewai_event_bus.emit(llm, event=event2)
assert len(events_received) == 2
assert events_received[0][0] == base_time, "First event timestamp should be preserved"
assert events_received[1][0] == base_time + timedelta(seconds=1), "Second event timestamp should be preserved"
def test_base_event_timestamp_set_on_emission():
"""Test that BaseEvent timestamp is set when event is emitted if not already set."""
from datetime import datetime, timezone
from crewai.utilities.events.base_events import BaseEvent
events_received = []
class TestEvent(BaseEvent):
type: str = "test_event"
message: str
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(TestEvent)
def handle_test_event(source, event):
events_received.append(event)
assert isinstance(event.timestamp, datetime), "Event timestamp should be datetime object"
event_without_timestamp = TestEvent(message="test")
assert event_without_timestamp.timestamp is None, "Event should have no timestamp initially"
crewai_event_bus.emit("test_source", event=event_without_timestamp)
assert len(events_received) == 1, "Should receive exactly one event"
assert events_received[0].timestamp is not None, "Event should have timestamp after emission"
assert isinstance(events_received[0].timestamp, datetime), "Timestamp should be datetime object"
assert events_received[0].timestamp.tzinfo == timezone.utc, "Timestamp should be in UTC"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_streaming_empty_response_handling():
"""Test that streaming handles empty responses correctly."""