From 828a5f4b8ffbd4c958b4d81afa0813cc0c5f9f4a Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Thu, 6 Mar 2025 11:34:10 -0800 Subject: [PATCH] Implement LLM stream chunk event handling with in-memory text stream --- src/crewai/utilities/events/event_listener.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index ab135d0c4..c5c049bc6 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -1,3 +1,4 @@ +from io import StringIO from typing import Any, Dict from pydantic import Field, PrivateAttr @@ -47,6 +48,8 @@ class EventListener(BaseEventListener): _telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry()) logger = Logger(verbose=True, default_color=EMITTER_COLOR) execution_spans: Dict[Task, Any] = Field(default_factory=dict) + next_chunk = 0 + text_stream = StringIO() def __new__(cls): if cls._instance is None: @@ -285,12 +288,16 @@ class EventListener(BaseEventListener): event.timestamp, ) - # @crewai_event_bus.on(LLMStreamChunkEvent) - # def on_llm_stream_chunk(source, event: LLMStreamChunkEvent): - # self.logger.log( - # f"📝 LLM stream chunk received", - # event.timestamp, - # ) + @crewai_event_bus.on(LLMStreamChunkEvent) + def on_llm_stream_chunk(source, event: LLMStreamChunkEvent): + self.text_stream.write(event.chunk) + + self.text_stream.seek(self.next_chunk) + + # Read from the in-memory stream + content = self.text_stream.read() + print(content, end="", flush=True) + self.next_chunk = self.text_stream.tell() event_listener = EventListener()