mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-25 16:18:13 +00:00
Compare commits
2 Commits
devin/1768
...
devin/1749
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa8d506661 | ||
|
|
6d0158495b |
@@ -9,7 +9,10 @@ from crewai.utilities.serialization import to_serializable
|
|||||||
class BaseEvent(BaseModel):
|
class BaseEvent(BaseModel):
|
||||||
"""Base class for all events"""
|
"""Base class for all events"""
|
||||||
|
|
||||||
timestamp: datetime = Field(default_factory=datetime.now)
|
timestamp: Optional[datetime] = Field(
|
||||||
|
default=None,
|
||||||
|
description="Event timestamp. Set automatically on emission if not provided."
|
||||||
|
)
|
||||||
type: str
|
type: str
|
||||||
source_fingerprint: Optional[str] = None # UUID string of the source entity
|
source_fingerprint: Optional[str] = None # UUID string of the source entity
|
||||||
source_type: Optional[str] = None # "agent", "task", "crew"
|
source_type: Optional[str] = None # "agent", "task", "crew"
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import threading
|
import threading
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
from datetime import datetime, timezone
|
||||||
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
|
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
|
||||||
|
|
||||||
from blinker import Signal
|
from blinker import Signal
|
||||||
@@ -61,12 +62,18 @@ class CrewAIEventsBus:
|
|||||||
|
|
||||||
def emit(self, source: Any, event: BaseEvent) -> None:
|
def emit(self, source: Any, event: BaseEvent) -> None:
|
||||||
"""
|
"""
|
||||||
Emit an event to all registered handlers
|
Emit an event to all registered handlers.
|
||||||
|
|
||||||
|
Sets the event timestamp automatically if not already provided to ensure
|
||||||
|
chronological ordering of events.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
source: The object emitting the event
|
source: The object emitting the event
|
||||||
event: The event instance to emit
|
event: The event instance to emit
|
||||||
"""
|
"""
|
||||||
|
if event.timestamp is None:
|
||||||
|
event.timestamp = datetime.now(timezone.utc)
|
||||||
|
|
||||||
for event_type, handlers in self._handlers.items():
|
for event_type, handlers in self._handlers.items():
|
||||||
if isinstance(event, event_type):
|
if isinstance(event, event_type):
|
||||||
for handler in handlers:
|
for handler in handlers:
|
||||||
|
|||||||
@@ -736,6 +736,94 @@ def test_streaming_fallback_to_non_streaming():
|
|||||||
llm.call = original_call
|
llm.call = original_call
|
||||||
|
|
||||||
|
|
||||||
|
def test_llm_stream_chunk_events_chronological_order():
|
||||||
|
"""Test that LLM stream chunk events are emitted in chronological order."""
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
|
||||||
|
events_received = []
|
||||||
|
|
||||||
|
with crewai_event_bus.scoped_handlers():
|
||||||
|
@crewai_event_bus.on(LLMStreamChunkEvent)
|
||||||
|
def handle_stream_chunk(source, event):
|
||||||
|
events_received.append((event.timestamp, event.chunk))
|
||||||
|
assert isinstance(event.timestamp, datetime), "Event timestamp should be datetime object"
|
||||||
|
|
||||||
|
llm = LLM(model="gpt-4o", stream=True)
|
||||||
|
|
||||||
|
event1 = LLMStreamChunkEvent(chunk="first")
|
||||||
|
event2 = LLMStreamChunkEvent(chunk="second")
|
||||||
|
event3 = LLMStreamChunkEvent(chunk="third")
|
||||||
|
|
||||||
|
assert event1.timestamp is None, "Event should have no timestamp before emission"
|
||||||
|
assert event2.timestamp is None, "Event should have no timestamp before emission"
|
||||||
|
assert event3.timestamp is None, "Event should have no timestamp before emission"
|
||||||
|
|
||||||
|
crewai_event_bus.emit(llm, event=event3)
|
||||||
|
crewai_event_bus.emit(llm, event=event1)
|
||||||
|
crewai_event_bus.emit(llm, event=event2)
|
||||||
|
|
||||||
|
assert len(events_received) == 3
|
||||||
|
timestamps = [event[0] for event in events_received]
|
||||||
|
assert timestamps == sorted(timestamps), "Events should be emitted in chronological order"
|
||||||
|
|
||||||
|
for timestamp in timestamps:
|
||||||
|
assert timestamp.tzinfo == timezone.utc, "Timestamps should be in UTC"
|
||||||
|
|
||||||
|
|
||||||
|
def test_llm_stream_chunk_events_preserve_manual_timestamps():
|
||||||
|
"""Test that manually set timestamps are preserved."""
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
|
||||||
|
events_received = []
|
||||||
|
|
||||||
|
with crewai_event_bus.scoped_handlers():
|
||||||
|
@crewai_event_bus.on(LLMStreamChunkEvent)
|
||||||
|
def handle_stream_chunk(source, event):
|
||||||
|
events_received.append((event.timestamp, event.chunk))
|
||||||
|
assert isinstance(event.timestamp, datetime), "Event timestamp should be datetime object"
|
||||||
|
|
||||||
|
llm = LLM(model="gpt-4o", stream=True)
|
||||||
|
|
||||||
|
base_time = datetime.now(timezone.utc)
|
||||||
|
event1 = LLMStreamChunkEvent(chunk="first", timestamp=base_time)
|
||||||
|
event2 = LLMStreamChunkEvent(chunk="second", timestamp=base_time + timedelta(seconds=1))
|
||||||
|
|
||||||
|
crewai_event_bus.emit(llm, event=event1)
|
||||||
|
crewai_event_bus.emit(llm, event=event2)
|
||||||
|
|
||||||
|
assert len(events_received) == 2
|
||||||
|
assert events_received[0][0] == base_time, "First event timestamp should be preserved"
|
||||||
|
assert events_received[1][0] == base_time + timedelta(seconds=1), "Second event timestamp should be preserved"
|
||||||
|
|
||||||
|
|
||||||
|
def test_base_event_timestamp_set_on_emission():
|
||||||
|
"""Test that BaseEvent timestamp is set when event is emitted if not already set."""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from crewai.utilities.events.base_events import BaseEvent
|
||||||
|
|
||||||
|
events_received = []
|
||||||
|
|
||||||
|
class TestEvent(BaseEvent):
|
||||||
|
type: str = "test_event"
|
||||||
|
message: str
|
||||||
|
|
||||||
|
with crewai_event_bus.scoped_handlers():
|
||||||
|
@crewai_event_bus.on(TestEvent)
|
||||||
|
def handle_test_event(source, event):
|
||||||
|
events_received.append(event)
|
||||||
|
assert isinstance(event.timestamp, datetime), "Event timestamp should be datetime object"
|
||||||
|
|
||||||
|
event_without_timestamp = TestEvent(message="test")
|
||||||
|
assert event_without_timestamp.timestamp is None, "Event should have no timestamp initially"
|
||||||
|
|
||||||
|
crewai_event_bus.emit("test_source", event=event_without_timestamp)
|
||||||
|
|
||||||
|
assert len(events_received) == 1, "Should receive exactly one event"
|
||||||
|
assert events_received[0].timestamp is not None, "Event should have timestamp after emission"
|
||||||
|
assert isinstance(events_received[0].timestamp, datetime), "Timestamp should be datetime object"
|
||||||
|
assert events_received[0].timestamp.tzinfo == timezone.utc, "Timestamp should be in UTC"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||||
def test_streaming_empty_response_handling():
|
def test_streaming_empty_response_handling():
|
||||||
"""Test that streaming handles empty responses correctly."""
|
"""Test that streaming handles empty responses correctly."""
|
||||||
|
|||||||
Reference in New Issue
Block a user