diff --git a/src/crewai/utilities/events/base_events.py b/src/crewai/utilities/events/base_events.py index 47e58ead9..9d023f74c 100644 --- a/src/crewai/utilities/events/base_events.py +++ b/src/crewai/utilities/events/base_events.py @@ -9,7 +9,10 @@ from crewai.utilities.serialization import to_serializable class BaseEvent(BaseModel): """Base class for all events""" - timestamp: Optional[datetime] = None + timestamp: Optional[datetime] = Field( + default=None, + description="Event timestamp. Set automatically on emission if not provided." + ) type: str source_fingerprint: Optional[str] = None # UUID string of the source entity source_type: Optional[str] = None # "agent", "task", "crew" diff --git a/src/crewai/utilities/events/crewai_event_bus.py b/src/crewai/utilities/events/crewai_event_bus.py index 66daec78f..fc4d35b23 100644 --- a/src/crewai/utilities/events/crewai_event_bus.py +++ b/src/crewai/utilities/events/crewai_event_bus.py @@ -1,6 +1,6 @@ import threading from contextlib import contextmanager -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Callable, Dict, List, Type, TypeVar, cast from blinker import Signal @@ -62,14 +62,17 @@ class CrewAIEventsBus: def emit(self, source: Any, event: BaseEvent) -> None: """ - Emit an event to all registered handlers + Emit an event to all registered handlers. + + Sets the event timestamp automatically if not already provided to ensure + chronological ordering of events. Args: source: The object emitting the event event: The event instance to emit """ if event.timestamp is None: - event.timestamp = datetime.now() + event.timestamp = datetime.now(timezone.utc) for event_type, handlers in self._handlers.items(): if isinstance(event, event_type): diff --git a/tests/utilities/test_events.py b/tests/utilities/test_events.py index 9cc8e2cb4..924d0d499 100644 --- a/tests/utilities/test_events.py +++ b/tests/utilities/test_events.py @@ -738,7 +738,7 @@ def test_streaming_fallback_to_non_streaming(): def test_llm_stream_chunk_events_chronological_order(): """Test that LLM stream chunk events are emitted in chronological order.""" - import time + from datetime import datetime, timezone, timedelta events_received = [] @@ -746,15 +746,18 @@ def test_llm_stream_chunk_events_chronological_order(): @crewai_event_bus.on(LLMStreamChunkEvent) def handle_stream_chunk(source, event): events_received.append((event.timestamp, event.chunk)) + assert isinstance(event.timestamp, datetime), "Event timestamp should be datetime object" llm = LLM(model="gpt-4o", stream=True) event1 = LLMStreamChunkEvent(chunk="first") - time.sleep(0.002) - event2 = LLMStreamChunkEvent(chunk="second") - time.sleep(0.001) + event2 = LLMStreamChunkEvent(chunk="second") event3 = LLMStreamChunkEvent(chunk="third") + assert event1.timestamp is None, "Event should have no timestamp before emission" + assert event2.timestamp is None, "Event should have no timestamp before emission" + assert event3.timestamp is None, "Event should have no timestamp before emission" + crewai_event_bus.emit(llm, event=event3) crewai_event_bus.emit(llm, event=event1) crewai_event_bus.emit(llm, event=event2) @@ -762,11 +765,14 @@ def test_llm_stream_chunk_events_chronological_order(): assert len(events_received) == 3 timestamps = [event[0] for event in events_received] assert timestamps == sorted(timestamps), "Events should be emitted in chronological order" + + for timestamp in timestamps: + assert timestamp.tzinfo == timezone.utc, "Timestamps should be in UTC" def test_llm_stream_chunk_events_preserve_manual_timestamps(): """Test that manually set timestamps are preserved.""" - from datetime import datetime, timedelta + from datetime import datetime, timezone, timedelta events_received = [] @@ -774,10 +780,11 @@ def test_llm_stream_chunk_events_preserve_manual_timestamps(): @crewai_event_bus.on(LLMStreamChunkEvent) def handle_stream_chunk(source, event): events_received.append((event.timestamp, event.chunk)) + assert isinstance(event.timestamp, datetime), "Event timestamp should be datetime object" llm = LLM(model="gpt-4o", stream=True) - base_time = datetime.now() + base_time = datetime.now(timezone.utc) event1 = LLMStreamChunkEvent(chunk="first", timestamp=base_time) event2 = LLMStreamChunkEvent(chunk="second", timestamp=base_time + timedelta(seconds=1)) @@ -785,12 +792,13 @@ def test_llm_stream_chunk_events_preserve_manual_timestamps(): crewai_event_bus.emit(llm, event=event2) assert len(events_received) == 2 - assert events_received[0][0] == base_time - assert events_received[1][0] == base_time + timedelta(seconds=1) + assert events_received[0][0] == base_time, "First event timestamp should be preserved" + assert events_received[1][0] == base_time + timedelta(seconds=1), "Second event timestamp should be preserved" def test_base_event_timestamp_set_on_emission(): """Test that BaseEvent timestamp is set when event is emitted if not already set.""" + from datetime import datetime, timezone from crewai.utilities.events.base_events import BaseEvent events_received = [] @@ -803,15 +811,17 @@ def test_base_event_timestamp_set_on_emission(): @crewai_event_bus.on(TestEvent) def handle_test_event(source, event): events_received.append(event) + assert isinstance(event.timestamp, datetime), "Event timestamp should be datetime object" event_without_timestamp = TestEvent(message="test") - assert event_without_timestamp.timestamp is None + assert event_without_timestamp.timestamp is None, "Event should have no timestamp initially" crewai_event_bus.emit("test_source", event=event_without_timestamp) - assert len(events_received) == 1 - assert events_received[0].timestamp is not None - assert isinstance(events_received[0].timestamp, datetime) + assert len(events_received) == 1, "Should receive exactly one event" + assert events_received[0].timestamp is not None, "Event should have timestamp after emission" + assert isinstance(events_received[0].timestamp, datetime), "Timestamp should be datetime object" + assert events_received[0].timestamp.tzinfo == timezone.utc, "Timestamp should be in UTC" @pytest.mark.vcr(filter_headers=["authorization"])