Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
2ff47f4bd7 fix: remove trailing whitespace from docstring
Co-Authored-By: João <joao@crewai.com>
2025-10-20 12:26:37 +00:00
Devin AI
0af7e04cde fix: address lint issues in streaming implementation
- Remove whitespace from blank lines
- Refactor try-except out of loop for better performance
- Use list() instead of append in loop for better performance

Co-Authored-By: João <joao@crewai.com>
2025-10-20 12:24:36 +00:00
Devin AI
b664637afa feat: add kickoff_stream method for FastAPI streaming integration
- Add kickoff_stream() method to Crew class that yields events in real-time
- Enables easy integration with FastAPI StreamingResponse
- Add comprehensive tests for streaming functionality
- Include FastAPI example demonstrating Server-Sent Events (SSE)

Resolves #3739

Co-Authored-By: João <joao@crewai.com>
2025-10-20 12:21:07 +00:00
3 changed files with 367 additions and 0 deletions

View File

@@ -0,0 +1,177 @@
"""
FastAPI Streaming Integration Example for CrewAI
This example demonstrates how to integrate CrewAI with FastAPI to stream
crew execution events in real-time using Server-Sent Events (SSE).
Installation:
pip install crewai fastapi uvicorn
Usage:
python fastapi_streaming_example.py
Then visit:
http://localhost:8000/docs for the API documentation
http://localhost:8000/stream?topic=AI to see streaming in action
"""
import json
from typing import AsyncGenerator
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from crewai import Agent, Crew, Task
app = FastAPI(title="CrewAI Streaming API")
class ResearchRequest(BaseModel):
topic: str
def create_research_crew(topic: str) -> Crew:
"""Create a research crew for the given topic."""
researcher = Agent(
role="Researcher",
goal=f"Research and analyze information about {topic}",
backstory="You're an expert researcher with deep knowledge in various fields.",
verbose=True,
)
task = Task(
description=f"Research and provide a comprehensive summary about {topic}",
expected_output="A detailed summary with key insights",
agent=researcher,
)
return Crew(agents=[researcher], tasks=[task], verbose=True)
@app.get("/")
async def root():
"""Root endpoint with API information."""
return {
"message": "CrewAI Streaming API",
"endpoints": {
"/stream": "GET - Stream crew execution events (query param: topic)",
"/research": "POST - Execute crew and return final result",
},
}
@app.get("/stream")
async def stream_crew_execution(topic: str = "artificial intelligence"):
"""
Stream crew execution events in real-time using Server-Sent Events.
Args:
topic: The research topic (default: "artificial intelligence")
Returns:
StreamingResponse with text/event-stream content type
"""
async def event_generator() -> AsyncGenerator[str, None]:
"""Generate Server-Sent Events from crew execution."""
crew = create_research_crew(topic)
try:
for event in crew.kickoff_stream(inputs={"topic": topic}):
event_data = json.dumps(event)
yield f"data: {event_data}\n\n"
yield "data: {\"type\": \"done\"}\n\n"
except Exception as e:
error_event = {"type": "error", "data": {"message": str(e)}}
yield f"data: {json.dumps(error_event)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.post("/research")
async def research_topic(request: ResearchRequest):
"""
Execute crew research and return the final result.
Args:
request: ResearchRequest with topic field
Returns:
JSON response with the research result
"""
crew = create_research_crew(request.topic)
try:
result = crew.kickoff(inputs={"topic": request.topic})
return {
"success": True,
"topic": request.topic,
"result": result.raw,
"usage_metrics": (
result.token_usage.model_dump() if result.token_usage else None
),
}
except Exception as e:
return {"success": False, "error": str(e)}
@app.get("/stream-filtered")
async def stream_filtered_events(
topic: str = "artificial intelligence", event_types: str = "llm_stream_chunk"
):
"""
Stream only specific event types.
Args:
topic: The research topic
event_types: Comma-separated list of event types to include
Returns:
StreamingResponse with filtered events
"""
allowed_types = set(event_types.split(","))
async def event_generator() -> AsyncGenerator[str, None]:
crew = create_research_crew(topic)
try:
for event in crew.kickoff_stream(inputs={"topic": topic}):
if event["type"] in allowed_types:
event_data = json.dumps(event)
yield f"data: {event_data}\n\n"
yield "data: {\"type\": \"done\"}\n\n"
except Exception as e:
error_event = {"type": "error", "data": {"message": str(e)}}
yield f"data: {json.dumps(error_event)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
if __name__ == "__main__":
import uvicorn
print("Starting CrewAI Streaming API...")
print("Visit http://localhost:8000/docs for API documentation")
print("Try: http://localhost:8000/stream?topic=quantum%20computing")
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -766,6 +766,118 @@ class Crew(FlowTrackable, BaseModel):
self._task_output_handler.reset()
return results
def kickoff_stream(self, inputs: dict[str, Any] | None = None):
"""
Stream crew execution events in real-time.
This method yields events as they occur during crew execution, making it
easy to integrate with streaming frameworks like FastAPI's StreamingResponse.
Args:
inputs: Optional dictionary of inputs for the crew execution
Yields:
dict: Event dictionaries containing event type and data
Example:
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.get("/stream")
async def stream_crew():
def event_generator():
for event in crew.kickoff_stream(inputs={"topic": "AI"}):
yield f"data: {json.dumps(event)}\\n\\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
```
"""
import queue
import threading
from crewai.events.base_events import BaseEvent
event_queue: queue.Queue = queue.Queue()
completion_event = threading.Event()
exception_holder = {"exception": None}
def event_handler(source: Any, event: BaseEvent):
event_dict = {
"type": event.type,
"data": event.model_dump(exclude={"from_task", "from_agent"}),
}
event_queue.put(event_dict)
from crewai.events.types.crew_events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
)
from crewai.events.types.task_events import (
TaskStartedEvent,
TaskCompletedEvent,
TaskFailedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
)
from crewai.events.types.llm_events import (
LLMStreamChunkEvent,
LLMCallStartedEvent,
LLMCallCompletedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageStartedEvent,
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
)
crewai_event_bus.register_handler(CrewKickoffStartedEvent, event_handler)
crewai_event_bus.register_handler(CrewKickoffCompletedEvent, event_handler)
crewai_event_bus.register_handler(CrewKickoffFailedEvent, event_handler)
crewai_event_bus.register_handler(TaskStartedEvent, event_handler)
crewai_event_bus.register_handler(TaskCompletedEvent, event_handler)
crewai_event_bus.register_handler(TaskFailedEvent, event_handler)
crewai_event_bus.register_handler(AgentExecutionStartedEvent, event_handler)
crewai_event_bus.register_handler(AgentExecutionCompletedEvent, event_handler)
crewai_event_bus.register_handler(LLMStreamChunkEvent, event_handler)
crewai_event_bus.register_handler(LLMCallStartedEvent, event_handler)
crewai_event_bus.register_handler(LLMCallCompletedEvent, event_handler)
crewai_event_bus.register_handler(ToolUsageStartedEvent, event_handler)
crewai_event_bus.register_handler(ToolUsageFinishedEvent, event_handler)
crewai_event_bus.register_handler(ToolUsageErrorEvent, event_handler)
def run_kickoff():
try:
result = self.kickoff(inputs=inputs)
event_queue.put({"type": "final_output", "data": {"output": result.raw}})
except Exception as e:
exception_holder["exception"] = e
finally:
completion_event.set()
thread = threading.Thread(target=run_kickoff, daemon=True)
thread.start()
try:
while not completion_event.is_set() or not event_queue.empty():
event = event_queue.get(timeout=0.1) if not event_queue.empty() else None
if event is not None:
yield event
if exception_holder["exception"]:
raise exception_holder["exception"]
finally:
thread.join(timeout=1)
def _handle_crew_planning(self):
"""Handles the Crew planning."""
self._logger.log("info", "Planning the crew execution")

View File

@@ -4744,3 +4744,81 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory():
assert "Researcher" in messages[0]["content"]
assert messages[1]["role"] == "user"
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_kickoff_stream(researcher):
"""Test that crew.kickoff_stream() yields events during execution."""
task = Task(
description="Research a topic about AI",
expected_output="A brief summary about AI",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
events = list(crew.kickoff_stream())
assert len(events) > 0
event_types = [event["type"] for event in events]
assert "crew_kickoff_started" in event_types
assert "final_output" in event_types
final_output_event = next(e for e in events if e["type"] == "final_output")
assert "output" in final_output_event["data"]
assert isinstance(final_output_event["data"]["output"], str)
assert len(final_output_event["data"]["output"]) > 0
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_kickoff_stream_with_inputs(researcher):
"""Test that crew.kickoff_stream() works with inputs."""
task = Task(
description="Research about {topic}",
expected_output="A brief summary about {topic}",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
events = list(crew.kickoff_stream(inputs={"topic": "machine learning"}))
assert len(events) > 0
event_types = [event["type"] for event in events]
assert "crew_kickoff_started" in event_types
assert "final_output" in event_types
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_kickoff_stream_includes_llm_chunks(researcher):
"""Test that crew.kickoff_stream() includes LLM stream chunks."""
task = Task(
description="Write a short poem about AI",
expected_output="A 2-line poem",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
events = list(crew.kickoff_stream())
event_types = [event["type"] for event in events]
assert "task_started" in event_types or "agent_execution_started" in event_types
def test_crew_kickoff_stream_handles_errors(researcher):
"""Test that crew.kickoff_stream() properly handles errors."""
task = Task(
description="This task will fail",
expected_output="Should not complete",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[task])
with patch("crewai.crew.Crew.kickoff", side_effect=Exception("Test error")):
with pytest.raises(Exception, match="Test error"):
list(crew.kickoff_stream())