Fix streaming implementation issues

- Add streaming parameters to BaseAgent.execute_task method signature
- Fix mock LLM objects to include supports_stop_words attribute
- Update event emission to use crewai_event_bus.emit instead of direct method calls
- Remove unused variables in test files

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-06-04 07:06:43 +00:00
parent b3b2b1e25f
commit 495af081d2
5 changed files with 19 additions and 11 deletions

View File

@@ -254,6 +254,8 @@ class BaseAgent(ABC, BaseModel):
task: Any, task: Any,
context: Optional[str] = None, context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None, tools: Optional[List[BaseTool]] = None,
stream: bool = False,
stream_callback: Optional[Callable[[str, str, str, str], None]] = None,
) -> str: ) -> str:
pass pass

View File

@@ -1,9 +1,7 @@
import pytest import pytest
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
from crewai import Agent, Task, Crew from crewai import Agent, Task, Crew
from crewai.llm import LLM
from crewai.utilities.events.crew_events import CrewStreamChunkEvent, TaskStreamChunkEvent, AgentStreamChunkEvent from crewai.utilities.events.crew_events import CrewStreamChunkEvent, TaskStreamChunkEvent, AgentStreamChunkEvent
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
@pytest.fixture @pytest.fixture
@@ -55,6 +53,7 @@ def test_crew_streaming_enabled():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(
@@ -93,6 +92,7 @@ def test_crew_streaming_disabled_by_default():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(
@@ -213,7 +213,7 @@ def test_streaming_integration_with_llm():
mock_executor._stream_callback = None mock_executor._stream_callback = None
mock_executor._task_description = None mock_executor._task_description = None
result = crew.kickoff(stream=True, stream_callback=stream_callback) crew.kickoff(stream=True, stream_callback=stream_callback)
assert hasattr(agent.agent_executor, '_stream_callback') assert hasattr(agent.agent_executor, '_stream_callback')
assert hasattr(agent.agent_executor, '_task_description') assert hasattr(agent.agent_executor, '_task_description')
@@ -224,6 +224,7 @@ def test_streaming_parameters_propagation():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(

View File

@@ -1,5 +1,4 @@
import pytest from unittest.mock import Mock, patch
from unittest.mock import Mock, patch, MagicMock
from crewai import Agent, Task, Crew from crewai import Agent, Task, Crew
from crewai.utilities.events.crew_events import CrewStreamChunkEvent from crewai.utilities.events.crew_events import CrewStreamChunkEvent
from crewai.utilities.events.llm_events import LLMStreamChunkEvent from crewai.utilities.events.llm_events import LLMStreamChunkEvent
@@ -21,6 +20,7 @@ def test_streaming_callback_called():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(
@@ -78,6 +78,7 @@ def test_streaming_disabled_by_default():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(
@@ -113,6 +114,7 @@ def test_streaming_parameters_propagation():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(
@@ -155,6 +157,7 @@ def test_async_task_streaming():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(
@@ -208,8 +211,8 @@ def test_llm_stream_chunk_to_crew_stream_chunk():
llm_event = LLMStreamChunkEvent(chunk="test chunk") llm_event = LLMStreamChunkEvent(chunk="test chunk")
from crewai.utilities.events.event_listener import event_listener from crewai.utilities.events.crewai_event_bus import crewai_event_bus
event_listener.on_llm_stream_chunk(mock_source, llm_event) crewai_event_bus.emit(mock_source, llm_event)
assert len(received_crew_chunks) == 1 assert len(received_crew_chunks) == 1
crew_event = received_crew_chunks[0] crew_event = received_crew_chunks[0]
@@ -263,7 +266,7 @@ def test_multiple_agents_streaming():
verbose=False verbose=False
) )
result = crew.kickoff(stream=True, stream_callback=stream_callback) crew.kickoff(stream=True, stream_callback=stream_callback)
assert hasattr(crew, '_stream_enabled') assert hasattr(crew, '_stream_enabled')
assert crew._stream_enabled is True assert crew._stream_enabled is True

View File

@@ -1,4 +1,3 @@
import pytest
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
from crewai import Agent, Task, Crew from crewai import Agent, Task, Crew
from crewai.utilities.events.crew_events import CrewStreamChunkEvent from crewai.utilities.events.crew_events import CrewStreamChunkEvent
@@ -21,6 +20,7 @@ def test_streaming_callback_integration():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(
@@ -132,7 +132,7 @@ def test_streaming_with_multiple_agents():
verbose=False verbose=False
) )
result = crew.kickoff(stream=True, stream_callback=stream_callback) crew.kickoff(stream=True, stream_callback=stream_callback)
assert hasattr(crew, '_stream_enabled') assert hasattr(crew, '_stream_enabled')
assert crew._stream_enabled is True assert crew._stream_enabled is True
@@ -145,6 +145,7 @@ def test_streaming_disabled_by_default():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(
@@ -180,6 +181,7 @@ def test_streaming_parameters_propagation():
with patch('crewai.llm.LLM') as mock_llm_class: with patch('crewai.llm.LLM') as mock_llm_class:
mock_llm = Mock() mock_llm = Mock()
mock_llm.call.return_value = "Test response" mock_llm.call.return_value = "Test response"
mock_llm.supports_stop_words = True
mock_llm_class.return_value = mock_llm mock_llm_class.return_value = mock_llm
agent = Agent( agent = Agent(

View File

@@ -820,7 +820,7 @@ def test_crew_streaming_events():
) )
# Execute with streaming enabled # Execute with streaming enabled
result = crew.kickoff(stream=True) crew.kickoff(stream=True)
# Verify that we received crew stream chunks # Verify that we received crew stream chunks
assert len(received_crew_chunks) > 0 assert len(received_crew_chunks) > 0