mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-03 14:09:24 +00:00
feat: enhance streaming support in conversational flow
- Introduced function to determine if a result is a streaming output. - Added method to handle streaming results before accessing them. - Updated method to utilize the new streaming result handling. - Implemented context management for LLM streaming in the conversational mixin. - Added tests to verify streaming behavior and ensure proper handling of user messages during streaming.
This commit is contained in:
@@ -4,6 +4,8 @@ Two-column layout: left sidebar (tasks/agents/tokens) + main content
|
||||
(task header, plan checklist, activity timeline, streaming output).
|
||||
"""
|
||||
|
||||
from collections.abc import Iterable
|
||||
import inspect
|
||||
import json as _json
|
||||
import re
|
||||
import threading
|
||||
@@ -45,6 +47,14 @@ def _is_save_to_memory_tool(tool_name: str | None) -> bool:
|
||||
return (tool_name or "").replace(" ", "_").lower() == "save_to_memory"
|
||||
|
||||
|
||||
def _is_streaming_output(value: Any) -> bool:
|
||||
return (
|
||||
isinstance(value, Iterable)
|
||||
and inspect.getattr_static(value, "get_full_text", None) is not None
|
||||
and inspect.getattr_static(value, "result", None) is not None
|
||||
)
|
||||
|
||||
|
||||
def _truncate_log_text(value: Any, limit: int) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
@@ -834,14 +844,18 @@ FooterKey .footer-key--key {
|
||||
set_suppress_tracing_messages(True)
|
||||
try:
|
||||
result = self._flow.handle_turn(message)
|
||||
if hasattr(result, "get_full_text") and hasattr(result, "result"):
|
||||
for _chunk in result:
|
||||
pass
|
||||
result = result.result
|
||||
result = self._consume_conversation_streaming_result(result)
|
||||
self.call_from_thread(self._on_conversation_turn_done, result)
|
||||
except Exception as e:
|
||||
self.call_from_thread(self._on_conversation_turn_failed, str(e))
|
||||
|
||||
def _consume_conversation_streaming_result(self, result: Any) -> Any:
|
||||
if not _is_streaming_output(result):
|
||||
return result
|
||||
for _chunk in result:
|
||||
pass
|
||||
return result.result
|
||||
|
||||
def _on_conversation_turn_done(self, result: Any) -> None:
|
||||
with self._lock:
|
||||
output = self._stringify_output(result)
|
||||
|
||||
@@ -24,6 +24,7 @@ from crewai.events.types.tool_usage_events import (
|
||||
ToolUsageFinishedEvent,
|
||||
ToolUsageStartedEvent,
|
||||
)
|
||||
from crewai.types.streaming import FlowStreamingOutput, StreamChunk
|
||||
from crewai_cli.command import AuthenticationRequiredError
|
||||
from crewai_cli import run_crew
|
||||
from crewai_cli.crew_run_tui import (
|
||||
@@ -144,6 +145,29 @@ def test_conversation_turn_done_records_assistant_message() -> None:
|
||||
assert isinstance(app._crew_result, RawResult)
|
||||
|
||||
|
||||
def test_conversation_streaming_result_is_consumed_before_result_access() -> None:
|
||||
streaming = FlowStreamingOutput()
|
||||
result_accessed_before_completion = False
|
||||
|
||||
def chunks():
|
||||
yield StreamChunk(content="hello ")
|
||||
yield StreamChunk(content="world")
|
||||
streaming._set_result("hello world")
|
||||
|
||||
streaming._sync_iterator = chunks()
|
||||
|
||||
try:
|
||||
streaming.result
|
||||
except RuntimeError:
|
||||
result_accessed_before_completion = True
|
||||
|
||||
app = CrewRunApp(conversational=True)
|
||||
|
||||
assert result_accessed_before_completion is True
|
||||
assert app._consume_conversation_streaming_result(streaming) == "hello world"
|
||||
assert streaming.get_full_text() == "hello world"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conversation_input_submits_turn() -> None:
|
||||
class FakeFlow:
|
||||
|
||||
Reference in New Issue
Block a user