Implement LLM stream chunk event handling with in-memory text stream

This commit is contained in:
Lorenze Jay
2025-03-06 11:34:10 -08:00
parent e8707e15ef
commit 828a5f4b8f

View File

@@ -1,3 +1,4 @@
from io import StringIO
from typing import Any, Dict
from pydantic import Field, PrivateAttr
@@ -47,6 +48,8 @@ class EventListener(BaseEventListener):
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
next_chunk = 0
text_stream = StringIO()
def __new__(cls):
if cls._instance is None:
@@ -285,12 +288,16 @@ class EventListener(BaseEventListener):
event.timestamp,
)
# @crewai_event_bus.on(LLMStreamChunkEvent)
# def on_llm_stream_chunk(source, event: LLMStreamChunkEvent):
# self.logger.log(
# f"📝 LLM stream chunk received",
# event.timestamp,
# )
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(source, event: LLMStreamChunkEvent):
self.text_stream.write(event.chunk)
self.text_stream.seek(self.next_chunk)
# Read from the in-memory stream
content = self.text_stream.read()
print(content, end="", flush=True)
self.next_chunk = self.text_stream.tell()
event_listener = EventListener()