mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-06 14:48:29 +00:00
Compare commits
5 Commits
1.5.0
...
devin/1749
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
755bb185c0 | ||
|
|
bb72804e74 | ||
|
|
510a4087cd | ||
|
|
495af081d2 | ||
|
|
b3b2b1e25f |
151
README_STREAMING.md
Normal file
151
README_STREAMING.md
Normal file
@@ -0,0 +1,151 @@
|
||||
# CrewAI Streaming Support
|
||||
|
||||
This document describes the streaming functionality added to CrewAI to support real-time output during crew execution.
|
||||
|
||||
## Overview
|
||||
|
||||
The streaming feature allows users to receive real-time updates during crew execution, similar to how autogen and langgraph provide streaming capabilities. This is particularly useful for multi-agent scenarios where you want to see the progress of each agent and task as they execute.
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic Streaming
|
||||
|
||||
```python
|
||||
from crewai import Agent, Task, Crew
|
||||
from crewai.llm import LLM
|
||||
|
||||
def stream_callback(chunk, agent_role, task_description, step_type):
|
||||
"""Callback function to handle streaming chunks."""
|
||||
print(f"[{agent_role}] {step_type}: {chunk}", end="", flush=True)
|
||||
|
||||
llm = LLM(model="gpt-4o-mini", stream=True)
|
||||
|
||||
agent = Agent(
|
||||
role="Content Writer",
|
||||
goal="Write engaging content",
|
||||
backstory="You are an experienced content writer.",
|
||||
llm=llm
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Write a short story about AI",
|
||||
expected_output="A creative short story",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task]
|
||||
)
|
||||
|
||||
# Enable streaming with callback
|
||||
result = crew.kickoff(
|
||||
stream=True,
|
||||
stream_callback=stream_callback
|
||||
)
|
||||
```
|
||||
|
||||
### Multi-Agent Streaming
|
||||
|
||||
```python
|
||||
def stream_callback(chunk, agent_role, task_description, step_type):
|
||||
"""Enhanced callback for multi-agent scenarios."""
|
||||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||||
print(f"[{timestamp}] {agent_role} ({step_type}): {chunk}", end="", flush=True)
|
||||
|
||||
researcher = Agent(
|
||||
role="Research Analyst",
|
||||
goal="Research topics thoroughly",
|
||||
backstory="You are an experienced researcher.",
|
||||
llm=llm
|
||||
)
|
||||
|
||||
writer = Agent(
|
||||
role="Content Writer",
|
||||
goal="Write based on research",
|
||||
backstory="You create compelling content.",
|
||||
llm=llm
|
||||
)
|
||||
|
||||
research_task = Task(
|
||||
description="Research AI trends",
|
||||
expected_output="Research summary",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
writing_task = Task(
|
||||
description="Write blog post about AI trends",
|
||||
expected_output="Blog post",
|
||||
agent=writer,
|
||||
context=[research_task]
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
tasks=[research_task, writing_task]
|
||||
)
|
||||
|
||||
result = crew.kickoff(
|
||||
stream=True,
|
||||
stream_callback=stream_callback
|
||||
)
|
||||
```
|
||||
|
||||
## API Reference
|
||||
|
||||
### Crew.kickoff()
|
||||
|
||||
```python
|
||||
def kickoff(
|
||||
self,
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Callable[[str, str, str, str], None]] = None,
|
||||
) -> CrewOutput:
|
||||
```
|
||||
|
||||
**Parameters:**
|
||||
- `inputs`: Dictionary of inputs for the crew
|
||||
- `stream`: Whether to enable streaming output (default: False)
|
||||
- `stream_callback`: Callback function for streaming chunks
|
||||
|
||||
**Stream Callback Signature:**
|
||||
```python
|
||||
def stream_callback(chunk: str, agent_role: str, task_description: str, step_type: str) -> None:
|
||||
```
|
||||
|
||||
**Callback Parameters:**
|
||||
- `chunk`: The streaming text chunk
|
||||
- `agent_role`: Role of the agent producing the chunk
|
||||
- `task_description`: Description of the current task
|
||||
- `step_type`: Type of step ("agent_thinking", "final_answer", "llm_response", etc.)
|
||||
|
||||
## Events
|
||||
|
||||
The streaming system emits several types of events:
|
||||
|
||||
### CrewStreamChunkEvent
|
||||
Emitted for crew-level streaming chunks with context about the agent and task.
|
||||
|
||||
### TaskStreamChunkEvent
|
||||
Emitted for task-level streaming chunks.
|
||||
|
||||
### AgentStreamChunkEvent
|
||||
Emitted for agent-level streaming chunks.
|
||||
|
||||
## Integration with Existing LLM Streaming
|
||||
|
||||
The crew streaming builds on top of the existing LLM streaming infrastructure. When you enable streaming at the crew level, it automatically aggregates and contextualizes the LLM-level streaming chunks.
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Enable LLM Streaming**: Make sure your LLM has `stream=True` for optimal experience
|
||||
2. **Handle Empty Chunks**: Your callback should handle empty or whitespace-only chunks gracefully
|
||||
3. **Performance**: Streaming adds minimal overhead but consider disabling for batch processing
|
||||
4. **Error Handling**: Implement proper error handling in your stream callback
|
||||
|
||||
## Examples
|
||||
|
||||
See the `examples/` directory for complete working examples:
|
||||
- `streaming_example.py`: Basic single-agent streaming
|
||||
- `streaming_multi_agent_example.py`: Multi-agent streaming with context
|
||||
68
docs/streaming.md
Normal file
68
docs/streaming.md
Normal file
@@ -0,0 +1,68 @@
|
||||
# Streaming Support in CrewAI
|
||||
|
||||
CrewAI now supports real-time streaming output during crew execution, allowing you to see the progress of agents and tasks as they work.
|
||||
|
||||
## Basic Usage
|
||||
|
||||
```python
|
||||
from crewai import Agent, Task, Crew
|
||||
from crewai.llm import LLM
|
||||
|
||||
def stream_callback(chunk, agent_role, task_description, step_type):
|
||||
print(f"[{agent_role}] {step_type}: {chunk}", end="", flush=True)
|
||||
|
||||
llm = LLM(model="gpt-4o-mini", stream=True)
|
||||
|
||||
agent = Agent(
|
||||
role="Writer",
|
||||
goal="Write content",
|
||||
backstory="You are a skilled writer.",
|
||||
llm=llm
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Write a short story",
|
||||
expected_output="A creative story",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
|
||||
result = crew.kickoff(
|
||||
stream=True,
|
||||
stream_callback=stream_callback
|
||||
)
|
||||
```
|
||||
|
||||
## Multi-Agent Streaming
|
||||
|
||||
```python
|
||||
def enhanced_callback(chunk, agent_role, task_description, step_type):
|
||||
print(f"[{agent_role}] {task_description[:20]}... - {step_type}: {chunk}")
|
||||
|
||||
researcher = Agent(role="Researcher", ...)
|
||||
writer = Agent(role="Writer", ...)
|
||||
|
||||
research_task = Task(description="Research topic", agent=researcher)
|
||||
write_task = Task(description="Write article", agent=writer, context=[research_task])
|
||||
|
||||
crew = Crew(agents=[researcher, writer], tasks=[research_task, write_task])
|
||||
result = crew.kickoff(stream=True, stream_callback=enhanced_callback)
|
||||
```
|
||||
|
||||
## Stream Callback Parameters
|
||||
|
||||
- `chunk`: The streaming text chunk
|
||||
- `agent_role`: Role of the agent producing the chunk
|
||||
- `task_description`: Description of the current task
|
||||
- `step_type`: Type of step ("agent_thinking", "final_answer", "llm_response")
|
||||
|
||||
## Events
|
||||
|
||||
The streaming system emits `CrewStreamChunkEvent`, `TaskStreamChunkEvent`, and `AgentStreamChunkEvent` that can be handled using the event bus.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Enable streaming on your LLM: `LLM(model="...", stream=True)`
|
||||
- Use the `stream=True` parameter in `crew.kickoff()`
|
||||
- Provide a callback function to handle streaming chunks
|
||||
36
examples/streaming_example.py
Normal file
36
examples/streaming_example.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from crewai import Agent, Task, Crew
|
||||
from crewai.llm import LLM
|
||||
|
||||
def stream_callback(chunk, agent_role, task_description, step_type):
|
||||
"""Callback function to handle streaming chunks."""
|
||||
print(f"[{agent_role}] {step_type}: {chunk}", end="", flush=True)
|
||||
|
||||
llm = LLM(model="gpt-4o-mini", stream=True)
|
||||
|
||||
agent = Agent(
|
||||
role="Content Writer",
|
||||
goal="Write engaging content",
|
||||
backstory="You are an experienced content writer who creates compelling narratives.",
|
||||
llm=llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Write a short story about a robot learning to paint",
|
||||
expected_output="A creative short story of 2-3 paragraphs",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
print("Starting crew execution with streaming...")
|
||||
result = crew.kickoff(
|
||||
stream=True,
|
||||
stream_callback=stream_callback
|
||||
)
|
||||
|
||||
print(f"\n\nFinal result:\n{result}")
|
||||
51
examples/streaming_multi_agent_example.py
Normal file
51
examples/streaming_multi_agent_example.py
Normal file
@@ -0,0 +1,51 @@
|
||||
from crewai import Agent, Task, Crew
|
||||
from crewai.llm import LLM
|
||||
|
||||
def stream_callback(chunk, agent_role, task_description, step_type):
|
||||
"""Callback function to handle streaming chunks from multiple agents."""
|
||||
print(f"[{agent_role}] {step_type}: {chunk}", end="", flush=True)
|
||||
|
||||
llm = LLM(model="gpt-4o-mini", stream=True)
|
||||
|
||||
researcher = Agent(
|
||||
role="Research Analyst",
|
||||
goal="Research and analyze topics thoroughly",
|
||||
backstory="You are an experienced research analyst who excels at gathering and analyzing information.",
|
||||
llm=llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
writer = Agent(
|
||||
role="Content Writer",
|
||||
goal="Write engaging content based on research",
|
||||
backstory="You are a skilled content writer who creates compelling narratives from research data.",
|
||||
llm=llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
research_task = Task(
|
||||
description="Research the latest trends in artificial intelligence and machine learning",
|
||||
expected_output="A comprehensive research summary of AI/ML trends",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
writing_task = Task(
|
||||
description="Write an engaging blog post about AI trends based on the research",
|
||||
expected_output="A well-written blog post about AI trends",
|
||||
agent=writer,
|
||||
context=[research_task]
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
tasks=[research_task, writing_task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
print("Starting multi-agent crew execution with streaming...")
|
||||
result = crew.kickoff(
|
||||
stream=True,
|
||||
stream_callback=stream_callback
|
||||
)
|
||||
|
||||
print(f"\n\nFinal result:\n{result}")
|
||||
@@ -1,6 +1,6 @@
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import Any, Dict, List, Literal, Optional, Sequence, Type, Union
|
||||
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Type, Union
|
||||
|
||||
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
|
||||
|
||||
@@ -225,6 +225,8 @@ class Agent(BaseAgent):
|
||||
task: Task,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Callable[[str, str, str, str], None]] = None,
|
||||
) -> str:
|
||||
"""Execute a task with the agent.
|
||||
|
||||
@@ -232,6 +234,8 @@ class Agent(BaseAgent):
|
||||
task: Task to execute.
|
||||
context: Context to execute the task in.
|
||||
tools: Tools to use for the task.
|
||||
stream: Whether to enable streaming output.
|
||||
stream_callback: Callback function for streaming chunks.
|
||||
|
||||
Returns:
|
||||
Output of the agent
|
||||
@@ -363,6 +367,10 @@ class Agent(BaseAgent):
|
||||
|
||||
tools = tools or self.tools or []
|
||||
self.create_agent_executor(tools=tools, task=task)
|
||||
|
||||
if stream and stream_callback:
|
||||
self.agent_executor._stream_callback = stream_callback
|
||||
self.agent_executor._task_description = task.description
|
||||
|
||||
if self.crew and self.crew._train:
|
||||
task_prompt = self._training_handler(task_prompt=task_prompt)
|
||||
@@ -429,7 +437,7 @@ class Agent(BaseAgent):
|
||||
),
|
||||
)
|
||||
raise e
|
||||
result = self.execute_task(task, context, tools)
|
||||
result = self.execute_task(task, context, tools, stream, stream_callback)
|
||||
|
||||
if self.max_rpm and self._rpm_controller:
|
||||
self._rpm_controller.stop_rpm_counter()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Any, AsyncIterable, Dict, List, Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pydantic import Field, PrivateAttr
|
||||
|
||||
@@ -22,7 +22,6 @@ from crewai.utilities.events.agent_events import (
|
||||
)
|
||||
|
||||
try:
|
||||
from langchain_core.messages import ToolMessage
|
||||
from langgraph.checkpoint.memory import MemorySaver
|
||||
from langgraph.prebuilt import create_react_agent
|
||||
|
||||
@@ -126,6 +125,8 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
|
||||
task: Any,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Any] = None,
|
||||
) -> str:
|
||||
"""Execute a task using the LangGraph workflow."""
|
||||
self.create_agent_executor(tools)
|
||||
|
||||
@@ -86,6 +86,8 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
|
||||
task: Any,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Any] = None,
|
||||
) -> str:
|
||||
"""Execute a task using the OpenAI Assistant"""
|
||||
self._converter_adapter.configure_structured_output(task)
|
||||
|
||||
@@ -25,7 +25,7 @@ from crewai.security.security_config import SecurityConfig
|
||||
from crewai.tools.base_tool import BaseTool, Tool
|
||||
from crewai.utilities import I18N, Logger, RPMController
|
||||
from crewai.utilities.config import process_config
|
||||
from crewai.utilities.converter import Converter
|
||||
|
||||
from crewai.utilities.string_utils import interpolate_only
|
||||
|
||||
T = TypeVar("T", bound="BaseAgent")
|
||||
@@ -254,6 +254,8 @@ class BaseAgent(ABC, BaseModel):
|
||||
task: Any,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Callable[[str, str, str, str], None]] = None,
|
||||
) -> str:
|
||||
pass
|
||||
|
||||
|
||||
@@ -80,6 +80,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self.messages: List[Dict[str, str]] = []
|
||||
self.iterations = 0
|
||||
self.log_error_after = 3
|
||||
self._stream_callback = None
|
||||
self._task_description = None
|
||||
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
|
||||
tool.name: tool for tool in self.tools
|
||||
}
|
||||
@@ -157,6 +159,23 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
printer=self._printer,
|
||||
)
|
||||
formatted_answer = process_llm_response(answer, self.use_stop_words)
|
||||
|
||||
if hasattr(self, '_stream_callback') and self._stream_callback:
|
||||
if hasattr(formatted_answer, 'text'):
|
||||
step_type = "agent_thinking" if hasattr(formatted_answer, 'tool') else "final_answer"
|
||||
self._stream_callback(
|
||||
formatted_answer.text,
|
||||
self.agent.role if self.agent else "unknown",
|
||||
getattr(self, '_task_description', "unknown"),
|
||||
step_type
|
||||
)
|
||||
elif isinstance(formatted_answer, str):
|
||||
self._stream_callback(
|
||||
formatted_answer,
|
||||
self.agent.role if self.agent else "unknown",
|
||||
getattr(self, '_task_description', "unknown"),
|
||||
"final_answer"
|
||||
)
|
||||
|
||||
if isinstance(formatted_answer, AgentAction):
|
||||
# Extract agent fingerprint if available
|
||||
|
||||
@@ -615,13 +615,30 @@ class Crew(FlowTrackable, BaseModel):
|
||||
def kickoff(
|
||||
self,
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Callable[[str, str, str, str], None]] = None,
|
||||
) -> CrewOutput:
|
||||
"""
|
||||
Starts the crew to work on its assigned tasks.
|
||||
|
||||
Args:
|
||||
inputs (dict): Inputs to be used by the crew.
|
||||
stream (bool): Whether to enable streaming output.
|
||||
stream_callback (callable): Callback function for streaming chunks.
|
||||
Signature: (chunk, agent_role, task_description, step_type)
|
||||
|
||||
Returns:
|
||||
CrewOutput: The output of the crew.
|
||||
"""
|
||||
try:
|
||||
for before_callback in self.before_kickoff_callbacks:
|
||||
if inputs is None:
|
||||
inputs = {}
|
||||
inputs = before_callback(inputs)
|
||||
|
||||
self._stream_enabled = stream
|
||||
self._stream_callback = stream_callback
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
CrewKickoffStartedEvent(crew_name=self.name or "crew", inputs=inputs),
|
||||
@@ -865,6 +882,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
agent=agent_to_use,
|
||||
context=context,
|
||||
tools=cast(List[BaseTool], tools_for_task),
|
||||
stream=getattr(self, '_stream_enabled', False),
|
||||
stream_callback=getattr(self, '_stream_callback', None),
|
||||
)
|
||||
futures.append((task, future, task_index))
|
||||
else:
|
||||
@@ -877,6 +896,8 @@ class Crew(FlowTrackable, BaseModel):
|
||||
agent=agent_to_use,
|
||||
context=context,
|
||||
tools=cast(List[BaseTool], tools_for_task),
|
||||
stream=getattr(self, '_stream_enabled', False),
|
||||
stream_callback=getattr(self, '_stream_callback', None),
|
||||
)
|
||||
task_outputs.append(task_output)
|
||||
self._process_task_result(task, task_output)
|
||||
|
||||
@@ -346,9 +346,11 @@ class Task(BaseModel):
|
||||
agent: Optional[BaseAgent] = None,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Callable[[str, str, str, str], None]] = None,
|
||||
) -> TaskOutput:
|
||||
"""Execute the task synchronously."""
|
||||
return self._execute_core(agent, context, tools)
|
||||
return self._execute_core(agent, context, tools, stream, stream_callback)
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
@@ -369,13 +371,15 @@ class Task(BaseModel):
|
||||
agent: BaseAgent | None = None,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Callable[[str, str, str, str], None]] = None,
|
||||
) -> Future[TaskOutput]:
|
||||
"""Execute the task asynchronously."""
|
||||
future: Future[TaskOutput] = Future()
|
||||
threading.Thread(
|
||||
daemon=True,
|
||||
target=self._execute_task_async,
|
||||
args=(agent, context, tools, future),
|
||||
args=(agent, context, tools, future, stream, stream_callback),
|
||||
).start()
|
||||
return future
|
||||
|
||||
@@ -385,9 +389,11 @@ class Task(BaseModel):
|
||||
context: Optional[str],
|
||||
tools: Optional[List[Any]],
|
||||
future: Future[TaskOutput],
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Callable[[str, str, str, str], None]] = None,
|
||||
) -> None:
|
||||
"""Execute the task asynchronously with context handling."""
|
||||
result = self._execute_core(agent, context, tools)
|
||||
result = self._execute_core(agent, context, tools, stream, stream_callback)
|
||||
future.set_result(result)
|
||||
|
||||
def _execute_core(
|
||||
@@ -395,6 +401,8 @@ class Task(BaseModel):
|
||||
agent: Optional[BaseAgent],
|
||||
context: Optional[str],
|
||||
tools: Optional[List[Any]],
|
||||
stream: bool = False,
|
||||
stream_callback: Optional[Callable[[str, str, str, str], None]] = None,
|
||||
) -> TaskOutput:
|
||||
"""Run the core execution logic of the task."""
|
||||
try:
|
||||
@@ -416,6 +424,8 @@ class Task(BaseModel):
|
||||
task=self,
|
||||
context=context,
|
||||
tools=tools,
|
||||
stream=stream,
|
||||
stream_callback=stream_callback,
|
||||
)
|
||||
|
||||
pydantic_output, json_output = self._export_output(result)
|
||||
@@ -449,7 +459,7 @@ class Task(BaseModel):
|
||||
content=f"Guardrail blocked, retrying, due to: {guardrail_result.error}\n",
|
||||
color="yellow",
|
||||
)
|
||||
return self._execute_core(agent, context, tools)
|
||||
return self._execute_core(agent, context, tools, stream, stream_callback)
|
||||
|
||||
if guardrail_result.result is None:
|
||||
raise Exception(
|
||||
|
||||
@@ -109,3 +109,32 @@ class CrewTestResultEvent(CrewBaseEvent):
|
||||
execution_duration: float
|
||||
model: str
|
||||
type: str = "crew_test_result"
|
||||
|
||||
|
||||
class CrewStreamChunkEvent(CrewBaseEvent):
|
||||
"""Event emitted when a streaming chunk is received during crew execution"""
|
||||
|
||||
type: str = "crew_stream_chunk"
|
||||
chunk: str
|
||||
agent_role: Optional[str] = None
|
||||
task_description: Optional[str] = None
|
||||
step_type: str
|
||||
|
||||
|
||||
class TaskStreamChunkEvent(BaseEvent):
|
||||
"""Event emitted when a streaming chunk is received during task execution"""
|
||||
|
||||
type: str = "task_stream_chunk"
|
||||
chunk: str
|
||||
task_description: str
|
||||
agent_role: str
|
||||
step_type: str
|
||||
|
||||
|
||||
class AgentStreamChunkEvent(BaseEvent):
|
||||
"""Event emitted when a streaming chunk is received during agent execution"""
|
||||
|
||||
type: str = "agent_stream_chunk"
|
||||
chunk: str
|
||||
agent_role: str
|
||||
step_type: str
|
||||
|
||||
@@ -357,6 +357,34 @@ class EventListener(BaseEventListener):
|
||||
content = self.text_stream.read()
|
||||
print(content, end="", flush=True)
|
||||
self.next_chunk = self.text_stream.tell()
|
||||
|
||||
from crewai.utilities.events.crew_events import CrewStreamChunkEvent
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
|
||||
agent_role = "unknown"
|
||||
task_description = "unknown"
|
||||
|
||||
if hasattr(source, 'agent') and source.agent:
|
||||
agent_role = source.agent.role
|
||||
elif hasattr(source, 'role'):
|
||||
agent_role = source.role
|
||||
|
||||
if hasattr(source, 'task') and source.task:
|
||||
task_description = source.task.description
|
||||
elif hasattr(source, '_task_description'):
|
||||
task_description = source._task_description
|
||||
|
||||
crewai_event_bus.emit(
|
||||
source,
|
||||
CrewStreamChunkEvent(
|
||||
chunk=event.chunk,
|
||||
agent_role=agent_role,
|
||||
task_description=task_description,
|
||||
step_type="llm_response",
|
||||
crew=getattr(source, 'crew', None),
|
||||
crew_name=getattr(source, 'crew', {}).get('__class__', {}).get('__name__', None) if hasattr(source, 'crew') and source.crew else None
|
||||
)
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(CrewTestStartedEvent)
|
||||
def on_crew_test_started(source, event: CrewTestStartedEvent):
|
||||
|
||||
@@ -3159,7 +3159,7 @@ def test_replay_with_context_set_to_nullable():
|
||||
)
|
||||
crew.kickoff()
|
||||
|
||||
mock_execute_task.assert_called_with(agent=ANY, context="", tools=ANY)
|
||||
mock_execute_task.assert_called_with(agent=ANY, context="", tools=ANY, stream=False, stream_callback=None)
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@@ -4069,7 +4069,7 @@ def test_crew_guardrail_feedback_in_context():
|
||||
|
||||
with patch.object(Agent, "execute_task") as mock_execute_task:
|
||||
# Define side_effect to capture context and return different responses
|
||||
def side_effect(task, context=None, tools=None):
|
||||
def side_effect(task, context=None, tools=None, stream=False, stream_callback=None):
|
||||
execution_contexts.append(context if context else "")
|
||||
if len(execution_contexts) == 1:
|
||||
return "This is a test response"
|
||||
|
||||
@@ -89,7 +89,7 @@ def test_task_prompt_includes_expected_output():
|
||||
with patch.object(Agent, "execute_task") as execute:
|
||||
execute.return_value = "ok"
|
||||
task.execute_sync(agent=researcher)
|
||||
execute.assert_called_once_with(task=task, context=None, tools=[])
|
||||
execute.assert_called_once_with(task=task, context=None, tools=[], stream=False, stream_callback=None)
|
||||
|
||||
|
||||
def test_task_callback():
|
||||
@@ -181,7 +181,7 @@ def test_execute_with_agent():
|
||||
|
||||
with patch.object(Agent, "execute_task", return_value="ok") as execute:
|
||||
task.execute_sync(agent=researcher)
|
||||
execute.assert_called_once_with(task=task, context=None, tools=[])
|
||||
execute.assert_called_once_with(task=task, context=None, tools=[], stream=False, stream_callback=None)
|
||||
|
||||
|
||||
def test_async_execution():
|
||||
@@ -203,7 +203,7 @@ def test_async_execution():
|
||||
execution = task.execute_async(agent=researcher)
|
||||
result = execution.result()
|
||||
assert result.raw == "ok"
|
||||
execute.assert_called_once_with(task=task, context=None, tools=[])
|
||||
execute.assert_called_once_with(task=task, context=None, tools=[], stream=False, stream_callback=None)
|
||||
|
||||
|
||||
def test_multiple_output_type_error():
|
||||
|
||||
263
tests/test_streaming.py
Normal file
263
tests/test_streaming.py
Normal file
@@ -0,0 +1,263 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from crewai import Agent, Task, Crew
|
||||
from crewai.utilities.events.crew_events import CrewStreamChunkEvent, TaskStreamChunkEvent, AgentStreamChunkEvent
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_llm():
|
||||
return Mock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def agent(mock_llm):
|
||||
return Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def task(agent):
|
||||
return Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def crew(agent, task):
|
||||
return Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
|
||||
def test_crew_streaming_enabled():
|
||||
"""Test that crew streaming can be enabled."""
|
||||
received_chunks = []
|
||||
|
||||
def stream_callback(chunk, agent_role, task_desc, step_type):
|
||||
received_chunks.append({
|
||||
'chunk': chunk,
|
||||
'agent_role': agent_role,
|
||||
'task_desc': task_desc,
|
||||
'step_type': step_type
|
||||
})
|
||||
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
with patch.object(crew, '_execute_tasks') as mock_execute:
|
||||
mock_execute.return_value = Mock()
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
assert hasattr(crew, '_stream_enabled')
|
||||
assert crew._stream_enabled is True
|
||||
assert hasattr(crew, '_stream_callback')
|
||||
assert crew._stream_callback == stream_callback
|
||||
|
||||
|
||||
def test_crew_streaming_disabled_by_default():
|
||||
"""Test that crew streaming is disabled by default."""
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
with patch.object(crew, '_execute_tasks') as mock_execute:
|
||||
mock_execute.return_value = Mock()
|
||||
|
||||
crew.kickoff()
|
||||
|
||||
assert getattr(crew, '_stream_enabled', False) is False
|
||||
assert getattr(crew, '_stream_callback', None) is None
|
||||
|
||||
|
||||
def test_crew_stream_chunk_event():
|
||||
"""Test CrewStreamChunkEvent creation and properties."""
|
||||
event = CrewStreamChunkEvent(
|
||||
chunk="test chunk",
|
||||
agent_role="Test Agent",
|
||||
task_description="Test task",
|
||||
step_type="agent_thinking",
|
||||
crew=None,
|
||||
crew_name="TestCrew"
|
||||
)
|
||||
|
||||
assert event.type == "crew_stream_chunk"
|
||||
assert event.chunk == "test chunk"
|
||||
assert event.agent_role == "Test Agent"
|
||||
assert event.task_description == "Test task"
|
||||
assert event.step_type == "agent_thinking"
|
||||
|
||||
|
||||
def test_task_stream_chunk_event():
|
||||
"""Test TaskStreamChunkEvent creation and properties."""
|
||||
event = TaskStreamChunkEvent(
|
||||
chunk="test chunk",
|
||||
task_description="Test task",
|
||||
agent_role="Test Agent",
|
||||
step_type="task_execution"
|
||||
)
|
||||
|
||||
assert event.type == "task_stream_chunk"
|
||||
assert event.chunk == "test chunk"
|
||||
assert event.task_description == "Test task"
|
||||
assert event.agent_role == "Test Agent"
|
||||
assert event.step_type == "task_execution"
|
||||
|
||||
|
||||
def test_agent_stream_chunk_event():
|
||||
"""Test AgentStreamChunkEvent creation and properties."""
|
||||
event = AgentStreamChunkEvent(
|
||||
chunk="test chunk",
|
||||
agent_role="Test Agent",
|
||||
step_type="agent_thinking"
|
||||
)
|
||||
|
||||
assert event.type == "agent_stream_chunk"
|
||||
assert event.chunk == "test chunk"
|
||||
assert event.agent_role == "Test Agent"
|
||||
assert event.step_type == "agent_thinking"
|
||||
|
||||
|
||||
def test_streaming_integration_with_llm():
|
||||
"""Test that streaming integrates with existing LLM streaming."""
|
||||
received_callback_chunks = []
|
||||
|
||||
def stream_callback(chunk, agent_role, task_desc, step_type):
|
||||
received_callback_chunks.append({
|
||||
'chunk': chunk,
|
||||
'agent_role': agent_role,
|
||||
'task_desc': task_desc,
|
||||
'step_type': step_type
|
||||
})
|
||||
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Here's a joke: Why did the robot cross the road? To get to the other side!"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Tell me a short joke",
|
||||
expected_output="A short joke",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
with patch.object(agent, 'agent_executor') as mock_executor:
|
||||
mock_executor._stream_callback = None
|
||||
mock_executor._task_description = None
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
assert hasattr(agent.agent_executor, '_stream_callback')
|
||||
assert hasattr(agent.agent_executor, '_task_description')
|
||||
|
||||
|
||||
def test_streaming_parameters_propagation():
|
||||
"""Test that streaming parameters are properly propagated through the execution chain."""
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
stream_callback = Mock()
|
||||
|
||||
with patch.object(task, 'execute_sync') as mock_execute_sync:
|
||||
mock_execute_sync.return_value = Mock()
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
mock_execute_sync.assert_called_once()
|
||||
call_args = mock_execute_sync.call_args
|
||||
assert 'stream' in call_args.kwargs
|
||||
assert call_args.kwargs['stream'] is True
|
||||
assert 'stream_callback' in call_args.kwargs
|
||||
assert call_args.kwargs['stream_callback'] == stream_callback
|
||||
274
tests/test_streaming_comprehensive.py
Normal file
274
tests/test_streaming_comprehensive.py
Normal file
@@ -0,0 +1,274 @@
|
||||
from unittest.mock import Mock, patch
|
||||
from crewai import Agent, Task, Crew
|
||||
from crewai.utilities.events.crew_events import CrewStreamChunkEvent
|
||||
from crewai.utilities.events.llm_events import LLMStreamChunkEvent
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
|
||||
|
||||
def test_streaming_callback_called():
|
||||
"""Test that streaming callback is called during execution."""
|
||||
callback_calls = []
|
||||
|
||||
def stream_callback(chunk, agent_role, task_desc, step_type):
|
||||
callback_calls.append({
|
||||
'chunk': chunk,
|
||||
'agent_role': agent_role,
|
||||
'task_desc': task_desc,
|
||||
'step_type': step_type
|
||||
})
|
||||
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
with patch.object(agent, 'agent_executor') as mock_executor:
|
||||
mock_executor._stream_callback = None
|
||||
mock_executor._task_description = None
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
assert hasattr(agent.agent_executor, '_stream_callback')
|
||||
assert agent.agent_executor._stream_callback == stream_callback
|
||||
assert hasattr(agent.agent_executor, '_task_description')
|
||||
assert agent.agent_executor._task_description == "Test task"
|
||||
|
||||
|
||||
def test_crew_stream_chunk_event_creation():
|
||||
"""Test CrewStreamChunkEvent can be created with all required fields."""
|
||||
event = CrewStreamChunkEvent(
|
||||
chunk="test chunk",
|
||||
agent_role="Test Agent",
|
||||
task_description="Test task",
|
||||
step_type="agent_thinking",
|
||||
crew=None,
|
||||
crew_name="TestCrew"
|
||||
)
|
||||
|
||||
assert event.type == "crew_stream_chunk"
|
||||
assert event.chunk == "test chunk"
|
||||
assert event.agent_role == "Test Agent"
|
||||
assert event.task_description == "Test task"
|
||||
assert event.step_type == "agent_thinking"
|
||||
|
||||
|
||||
def test_streaming_disabled_by_default():
|
||||
"""Test that streaming is disabled by default."""
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
crew.kickoff()
|
||||
|
||||
assert getattr(crew, '_stream_enabled', False) is False
|
||||
assert getattr(crew, '_stream_callback', None) is None
|
||||
|
||||
|
||||
def test_streaming_parameters_propagation():
|
||||
"""Test that streaming parameters are propagated through execution chain."""
|
||||
stream_callback = Mock()
|
||||
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
with patch.object(task, 'execute_sync') as mock_execute_sync:
|
||||
mock_execute_sync.return_value = Mock()
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
mock_execute_sync.assert_called_once()
|
||||
call_args = mock_execute_sync.call_args
|
||||
assert 'stream' in call_args.kwargs
|
||||
assert call_args.kwargs['stream'] is True
|
||||
assert 'stream_callback' in call_args.kwargs
|
||||
assert call_args.kwargs['stream_callback'] == stream_callback
|
||||
|
||||
|
||||
def test_async_task_streaming():
|
||||
"""Test that streaming works with async tasks."""
|
||||
stream_callback = Mock()
|
||||
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent,
|
||||
async_execution=True
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
with patch.object(task, 'execute_async') as mock_execute_async:
|
||||
mock_future = Mock()
|
||||
mock_execute_async.return_value = mock_future
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
mock_execute_async.assert_called_once()
|
||||
call_args = mock_execute_async.call_args
|
||||
assert 'stream' in call_args.kwargs
|
||||
assert call_args.kwargs['stream'] is True
|
||||
assert 'stream_callback' in call_args.kwargs
|
||||
assert call_args.kwargs['stream_callback'] == stream_callback
|
||||
|
||||
|
||||
def test_llm_stream_chunk_to_crew_stream_chunk():
|
||||
"""Test that LLMStreamChunkEvent triggers CrewStreamChunkEvent."""
|
||||
received_crew_chunks = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(CrewStreamChunkEvent)
|
||||
def handle_crew_stream_chunk(source, event):
|
||||
received_crew_chunks.append(event)
|
||||
|
||||
mock_source = Mock()
|
||||
mock_source.agent = Mock()
|
||||
mock_source.agent.role = "Test Agent"
|
||||
mock_source._task_description = "Test task"
|
||||
|
||||
llm_event = LLMStreamChunkEvent(chunk="test chunk")
|
||||
|
||||
crewai_event_bus.emit(mock_source, llm_event)
|
||||
|
||||
assert len(received_crew_chunks) == 1
|
||||
crew_event = received_crew_chunks[0]
|
||||
assert crew_event.type == "crew_stream_chunk"
|
||||
assert crew_event.chunk == "test chunk"
|
||||
assert crew_event.agent_role == "Test Agent"
|
||||
assert crew_event.task_description == "Test task"
|
||||
assert crew_event.step_type == "llm_response"
|
||||
|
||||
|
||||
def test_multiple_agents_streaming():
|
||||
"""Test streaming with multiple agents."""
|
||||
stream_callback = Mock()
|
||||
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Agent response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent1 = Agent(
|
||||
role="Agent 1",
|
||||
goal="Goal 1",
|
||||
backstory="Backstory 1",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
agent2 = Agent(
|
||||
role="Agent 2",
|
||||
goal="Goal 2",
|
||||
backstory="Backstory 2",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task1 = Task(
|
||||
description="Task 1",
|
||||
expected_output="Output 1",
|
||||
agent=agent1
|
||||
)
|
||||
|
||||
task2 = Task(
|
||||
description="Task 2",
|
||||
expected_output="Output 2",
|
||||
agent=agent2
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent1, agent2],
|
||||
tasks=[task1, task2],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
assert hasattr(crew, '_stream_enabled')
|
||||
assert crew._stream_enabled is True
|
||||
assert hasattr(crew, '_stream_callback')
|
||||
assert crew._stream_callback == stream_callback
|
||||
218
tests/test_streaming_integration.py
Normal file
218
tests/test_streaming_integration.py
Normal file
@@ -0,0 +1,218 @@
|
||||
from unittest.mock import Mock, patch
|
||||
from crewai import Agent, Task, Crew
|
||||
from crewai.utilities.events.crew_events import CrewStreamChunkEvent
|
||||
from crewai.utilities.events.llm_events import LLMStreamChunkEvent
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
|
||||
|
||||
def test_streaming_callback_integration():
|
||||
"""Test that streaming callback is properly integrated through the execution chain."""
|
||||
received_chunks = []
|
||||
|
||||
def stream_callback(chunk, agent_role, task_desc, step_type):
|
||||
received_chunks.append({
|
||||
'chunk': chunk,
|
||||
'agent_role': agent_role,
|
||||
'task_desc': task_desc,
|
||||
'step_type': step_type
|
||||
})
|
||||
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
with patch.object(agent, 'agent_executor') as mock_executor:
|
||||
mock_executor._stream_callback = None
|
||||
mock_executor._task_description = None
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
assert hasattr(agent.agent_executor, '_stream_callback')
|
||||
assert hasattr(agent.agent_executor, '_task_description')
|
||||
|
||||
|
||||
def test_crew_stream_chunk_event_emission():
|
||||
"""Test that CrewStreamChunkEvent is emitted when LLMStreamChunkEvent occurs."""
|
||||
received_crew_chunks = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(CrewStreamChunkEvent)
|
||||
def handle_crew_stream_chunk(source, event):
|
||||
received_crew_chunks.append(event)
|
||||
|
||||
mock_source = Mock()
|
||||
mock_source.agent = Mock()
|
||||
mock_source.agent.role = "Test Agent"
|
||||
mock_source._task_description = "Test task"
|
||||
|
||||
llm_event = LLMStreamChunkEvent(chunk="test chunk")
|
||||
|
||||
from crewai.utilities.events.event_listener import event_listener
|
||||
event_listener.on_llm_stream_chunk(mock_source, llm_event)
|
||||
|
||||
assert len(received_crew_chunks) == 1
|
||||
crew_event = received_crew_chunks[0]
|
||||
assert crew_event.type == "crew_stream_chunk"
|
||||
assert crew_event.chunk == "test chunk"
|
||||
assert crew_event.agent_role == "Test Agent"
|
||||
assert crew_event.task_description == "Test task"
|
||||
assert crew_event.step_type == "llm_response"
|
||||
|
||||
|
||||
def test_streaming_with_multiple_agents():
|
||||
"""Test streaming works correctly with multiple agents."""
|
||||
received_chunks = []
|
||||
|
||||
def stream_callback(chunk, agent_role, task_desc, step_type):
|
||||
received_chunks.append({
|
||||
'chunk': chunk,
|
||||
'agent_role': agent_role,
|
||||
'task_desc': task_desc,
|
||||
'step_type': step_type
|
||||
})
|
||||
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Agent response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent1 = Agent(
|
||||
role="Agent 1",
|
||||
goal="Goal 1",
|
||||
backstory="Backstory 1",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
agent2 = Agent(
|
||||
role="Agent 2",
|
||||
goal="Goal 2",
|
||||
backstory="Backstory 2",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task1 = Task(
|
||||
description="Task 1",
|
||||
expected_output="Output 1",
|
||||
agent=agent1
|
||||
)
|
||||
|
||||
task2 = Task(
|
||||
description="Task 2",
|
||||
expected_output="Output 2",
|
||||
agent=agent2
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent1, agent2],
|
||||
tasks=[task1, task2],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
assert hasattr(crew, '_stream_enabled')
|
||||
assert crew._stream_enabled is True
|
||||
assert hasattr(crew, '_stream_callback')
|
||||
assert crew._stream_callback == stream_callback
|
||||
|
||||
|
||||
def test_streaming_disabled_by_default():
|
||||
"""Test that streaming is disabled by default."""
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
crew.kickoff()
|
||||
|
||||
assert getattr(crew, '_stream_enabled', False) is False
|
||||
assert getattr(crew, '_stream_callback', None) is None
|
||||
|
||||
|
||||
def test_streaming_parameters_propagation():
|
||||
"""Test that streaming parameters are properly propagated through execution chain."""
|
||||
stream_callback = Mock()
|
||||
|
||||
with patch('crewai.llm.LLM') as mock_llm_class:
|
||||
mock_llm = Mock()
|
||||
mock_llm.call.return_value = "Test response"
|
||||
mock_llm.supports_stop_words = True
|
||||
mock_llm_class.return_value = mock_llm
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=mock_llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
with patch.object(task, 'execute_sync') as mock_execute_sync:
|
||||
mock_execute_sync.return_value = Mock()
|
||||
|
||||
crew.kickoff(stream=True, stream_callback=stream_callback)
|
||||
|
||||
mock_execute_sync.assert_called_once()
|
||||
call_args = mock_execute_sync.call_args
|
||||
assert 'stream' in call_args.kwargs
|
||||
assert call_args.kwargs['stream'] is True
|
||||
assert 'stream_callback' in call_args.kwargs
|
||||
assert call_args.kwargs['stream_callback'] == stream_callback
|
||||
@@ -119,7 +119,7 @@ def test_guardrail_error_in_context():
|
||||
# Mock execute_task to succeed on second attempt
|
||||
first_call = True
|
||||
|
||||
def execute_task(task, context, tools):
|
||||
def execute_task(task, context, tools, stream=False, stream_callback=None):
|
||||
nonlocal first_call
|
||||
if first_call:
|
||||
first_call = False
|
||||
|
||||
@@ -779,3 +779,54 @@ def test_streaming_empty_response_handling():
|
||||
finally:
|
||||
# Restore the original method
|
||||
llm.call = original_call
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_crew_streaming_events():
|
||||
"""Test that crew streaming events are emitted correctly."""
|
||||
from crewai.utilities.events.crew_events import CrewStreamChunkEvent
|
||||
received_crew_chunks = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(CrewStreamChunkEvent)
|
||||
def handle_crew_stream_chunk(source, event):
|
||||
received_crew_chunks.append(event)
|
||||
|
||||
# Create an LLM with streaming enabled
|
||||
llm = LLM(model="gpt-4o-mini", stream=True)
|
||||
|
||||
# Create agent and task
|
||||
from crewai import Agent, Task, Crew
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
llm=llm,
|
||||
verbose=False
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Tell me a short joke",
|
||||
expected_output="A short joke",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
verbose=False
|
||||
)
|
||||
|
||||
# Execute with streaming enabled
|
||||
crew.kickoff(stream=True)
|
||||
|
||||
# Verify that we received crew stream chunks
|
||||
assert len(received_crew_chunks) > 0
|
||||
|
||||
for chunk_event in received_crew_chunks:
|
||||
assert chunk_event.type == "crew_stream_chunk"
|
||||
assert chunk_event.agent_role == "Test Agent"
|
||||
assert chunk_event.step_type == "llm_response"
|
||||
assert isinstance(chunk_event.chunk, str)
|
||||
|
||||
Reference in New Issue
Block a user