diff --git a/src/crewai/utilities/events/base_events.py b/src/crewai/utilities/events/base_events.py index 46648500b..47e58ead9 100644 --- a/src/crewai/utilities/events/base_events.py +++ b/src/crewai/utilities/events/base_events.py @@ -9,7 +9,7 @@ from crewai.utilities.serialization import to_serializable class BaseEvent(BaseModel): """Base class for all events""" - timestamp: datetime = Field(default_factory=datetime.now) + timestamp: Optional[datetime] = None type: str source_fingerprint: Optional[str] = None # UUID string of the source entity source_type: Optional[str] = None # "agent", "task", "crew" diff --git a/src/crewai/utilities/events/crewai_event_bus.py b/src/crewai/utilities/events/crewai_event_bus.py index f255e5513..66daec78f 100644 --- a/src/crewai/utilities/events/crewai_event_bus.py +++ b/src/crewai/utilities/events/crewai_event_bus.py @@ -1,5 +1,6 @@ import threading from contextlib import contextmanager +from datetime import datetime from typing import Any, Callable, Dict, List, Type, TypeVar, cast from blinker import Signal @@ -67,6 +68,9 @@ class CrewAIEventsBus: source: The object emitting the event event: The event instance to emit """ + if event.timestamp is None: + event.timestamp = datetime.now() + for event_type, handlers in self._handlers.items(): if isinstance(event, event_type): for handler in handlers: diff --git a/tests/utilities/test_events.py b/tests/utilities/test_events.py index 2f6f11b61..9cc8e2cb4 100644 --- a/tests/utilities/test_events.py +++ b/tests/utilities/test_events.py @@ -736,6 +736,84 @@ def test_streaming_fallback_to_non_streaming(): llm.call = original_call +def test_llm_stream_chunk_events_chronological_order(): + """Test that LLM stream chunk events are emitted in chronological order.""" + import time + + events_received = [] + + with crewai_event_bus.scoped_handlers(): + @crewai_event_bus.on(LLMStreamChunkEvent) + def handle_stream_chunk(source, event): + events_received.append((event.timestamp, event.chunk)) + + llm = LLM(model="gpt-4o", stream=True) + + event1 = LLMStreamChunkEvent(chunk="first") + time.sleep(0.002) + event2 = LLMStreamChunkEvent(chunk="second") + time.sleep(0.001) + event3 = LLMStreamChunkEvent(chunk="third") + + crewai_event_bus.emit(llm, event=event3) + crewai_event_bus.emit(llm, event=event1) + crewai_event_bus.emit(llm, event=event2) + + assert len(events_received) == 3 + timestamps = [event[0] for event in events_received] + assert timestamps == sorted(timestamps), "Events should be emitted in chronological order" + + +def test_llm_stream_chunk_events_preserve_manual_timestamps(): + """Test that manually set timestamps are preserved.""" + from datetime import datetime, timedelta + + events_received = [] + + with crewai_event_bus.scoped_handlers(): + @crewai_event_bus.on(LLMStreamChunkEvent) + def handle_stream_chunk(source, event): + events_received.append((event.timestamp, event.chunk)) + + llm = LLM(model="gpt-4o", stream=True) + + base_time = datetime.now() + event1 = LLMStreamChunkEvent(chunk="first", timestamp=base_time) + event2 = LLMStreamChunkEvent(chunk="second", timestamp=base_time + timedelta(seconds=1)) + + crewai_event_bus.emit(llm, event=event1) + crewai_event_bus.emit(llm, event=event2) + + assert len(events_received) == 2 + assert events_received[0][0] == base_time + assert events_received[1][0] == base_time + timedelta(seconds=1) + + +def test_base_event_timestamp_set_on_emission(): + """Test that BaseEvent timestamp is set when event is emitted if not already set.""" + from crewai.utilities.events.base_events import BaseEvent + + events_received = [] + + class TestEvent(BaseEvent): + type: str = "test_event" + message: str + + with crewai_event_bus.scoped_handlers(): + @crewai_event_bus.on(TestEvent) + def handle_test_event(source, event): + events_received.append(event) + + event_without_timestamp = TestEvent(message="test") + assert event_without_timestamp.timestamp is None + + crewai_event_bus.emit("test_source", event=event_without_timestamp) + + assert len(events_received) == 1 + assert events_received[0].timestamp is not None + assert isinstance(events_received[0].timestamp, datetime) + + @pytest.mark.vcr(filter_headers=["authorization"]) def test_streaming_empty_response_handling(): """Test that streaming handles empty responses correctly."""