mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-23 15:18:14 +00:00
Compare commits
3 Commits
gl/feat/na
...
devin/1760
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ff47f4bd7 | ||
|
|
0af7e04cde | ||
|
|
b664637afa |
177
fastapi_streaming_example.py
Normal file
177
fastapi_streaming_example.py
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
"""
|
||||||
|
FastAPI Streaming Integration Example for CrewAI
|
||||||
|
|
||||||
|
This example demonstrates how to integrate CrewAI with FastAPI to stream
|
||||||
|
crew execution events in real-time using Server-Sent Events (SSE).
|
||||||
|
|
||||||
|
Installation:
|
||||||
|
pip install crewai fastapi uvicorn
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python fastapi_streaming_example.py
|
||||||
|
|
||||||
|
Then visit:
|
||||||
|
http://localhost:8000/docs for the API documentation
|
||||||
|
http://localhost:8000/stream?topic=AI to see streaming in action
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from typing import AsyncGenerator
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from crewai import Agent, Crew, Task
|
||||||
|
|
||||||
|
app = FastAPI(title="CrewAI Streaming API")
|
||||||
|
|
||||||
|
|
||||||
|
class ResearchRequest(BaseModel):
|
||||||
|
topic: str
|
||||||
|
|
||||||
|
|
||||||
|
def create_research_crew(topic: str) -> Crew:
|
||||||
|
"""Create a research crew for the given topic."""
|
||||||
|
researcher = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal=f"Research and analyze information about {topic}",
|
||||||
|
backstory="You're an expert researcher with deep knowledge in various fields.",
|
||||||
|
verbose=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
task = Task(
|
||||||
|
description=f"Research and provide a comprehensive summary about {topic}",
|
||||||
|
expected_output="A detailed summary with key insights",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
return Crew(agents=[researcher], tasks=[task], verbose=True)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/")
|
||||||
|
async def root():
|
||||||
|
"""Root endpoint with API information."""
|
||||||
|
return {
|
||||||
|
"message": "CrewAI Streaming API",
|
||||||
|
"endpoints": {
|
||||||
|
"/stream": "GET - Stream crew execution events (query param: topic)",
|
||||||
|
"/research": "POST - Execute crew and return final result",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/stream")
|
||||||
|
async def stream_crew_execution(topic: str = "artificial intelligence"):
|
||||||
|
"""
|
||||||
|
Stream crew execution events in real-time using Server-Sent Events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
topic: The research topic (default: "artificial intelligence")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
StreamingResponse with text/event-stream content type
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def event_generator() -> AsyncGenerator[str, None]:
|
||||||
|
"""Generate Server-Sent Events from crew execution."""
|
||||||
|
crew = create_research_crew(topic)
|
||||||
|
|
||||||
|
try:
|
||||||
|
for event in crew.kickoff_stream(inputs={"topic": topic}):
|
||||||
|
event_data = json.dumps(event)
|
||||||
|
yield f"data: {event_data}\n\n"
|
||||||
|
|
||||||
|
yield "data: {\"type\": \"done\"}\n\n"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_event = {"type": "error", "data": {"message": str(e)}}
|
||||||
|
yield f"data: {json.dumps(error_event)}\n\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
"X-Accel-Buffering": "no",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/research")
|
||||||
|
async def research_topic(request: ResearchRequest):
|
||||||
|
"""
|
||||||
|
Execute crew research and return the final result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: ResearchRequest with topic field
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSON response with the research result
|
||||||
|
"""
|
||||||
|
crew = create_research_crew(request.topic)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = crew.kickoff(inputs={"topic": request.topic})
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"topic": request.topic,
|
||||||
|
"result": result.raw,
|
||||||
|
"usage_metrics": (
|
||||||
|
result.token_usage.model_dump() if result.token_usage else None
|
||||||
|
),
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
return {"success": False, "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/stream-filtered")
|
||||||
|
async def stream_filtered_events(
|
||||||
|
topic: str = "artificial intelligence", event_types: str = "llm_stream_chunk"
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Stream only specific event types.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
topic: The research topic
|
||||||
|
event_types: Comma-separated list of event types to include
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
StreamingResponse with filtered events
|
||||||
|
"""
|
||||||
|
allowed_types = set(event_types.split(","))
|
||||||
|
|
||||||
|
async def event_generator() -> AsyncGenerator[str, None]:
|
||||||
|
crew = create_research_crew(topic)
|
||||||
|
|
||||||
|
try:
|
||||||
|
for event in crew.kickoff_stream(inputs={"topic": topic}):
|
||||||
|
if event["type"] in allowed_types:
|
||||||
|
event_data = json.dumps(event)
|
||||||
|
yield f"data: {event_data}\n\n"
|
||||||
|
|
||||||
|
yield "data: {\"type\": \"done\"}\n\n"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_event = {"type": "error", "data": {"message": str(e)}}
|
||||||
|
yield f"data: {json.dumps(error_event)}\n\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
|
print("Starting CrewAI Streaming API...")
|
||||||
|
print("Visit http://localhost:8000/docs for API documentation")
|
||||||
|
print("Try: http://localhost:8000/stream?topic=quantum%20computing")
|
||||||
|
|
||||||
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
@@ -766,6 +766,118 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
self._task_output_handler.reset()
|
self._task_output_handler.reset()
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def kickoff_stream(self, inputs: dict[str, Any] | None = None):
|
||||||
|
"""
|
||||||
|
Stream crew execution events in real-time.
|
||||||
|
|
||||||
|
This method yields events as they occur during crew execution, making it
|
||||||
|
easy to integrate with streaming frameworks like FastAPI's StreamingResponse.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
inputs: Optional dictionary of inputs for the crew execution
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
dict: Event dictionaries containing event type and data
|
||||||
|
|
||||||
|
Example:
|
||||||
|
```python
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
|
import json
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
@app.get("/stream")
|
||||||
|
async def stream_crew():
|
||||||
|
def event_generator():
|
||||||
|
for event in crew.kickoff_stream(inputs={"topic": "AI"}):
|
||||||
|
yield f"data: {json.dumps(event)}\\n\\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream"
|
||||||
|
)
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
|
from crewai.events.base_events import BaseEvent
|
||||||
|
|
||||||
|
event_queue: queue.Queue = queue.Queue()
|
||||||
|
completion_event = threading.Event()
|
||||||
|
exception_holder = {"exception": None}
|
||||||
|
|
||||||
|
def event_handler(source: Any, event: BaseEvent):
|
||||||
|
event_dict = {
|
||||||
|
"type": event.type,
|
||||||
|
"data": event.model_dump(exclude={"from_task", "from_agent"}),
|
||||||
|
}
|
||||||
|
event_queue.put(event_dict)
|
||||||
|
|
||||||
|
from crewai.events.types.crew_events import (
|
||||||
|
CrewKickoffStartedEvent,
|
||||||
|
CrewKickoffCompletedEvent,
|
||||||
|
CrewKickoffFailedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.task_events import (
|
||||||
|
TaskStartedEvent,
|
||||||
|
TaskCompletedEvent,
|
||||||
|
TaskFailedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.agent_events import (
|
||||||
|
AgentExecutionStartedEvent,
|
||||||
|
AgentExecutionCompletedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.llm_events import (
|
||||||
|
LLMStreamChunkEvent,
|
||||||
|
LLMCallStartedEvent,
|
||||||
|
LLMCallCompletedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.tool_usage_events import (
|
||||||
|
ToolUsageStartedEvent,
|
||||||
|
ToolUsageFinishedEvent,
|
||||||
|
ToolUsageErrorEvent,
|
||||||
|
)
|
||||||
|
|
||||||
|
crewai_event_bus.register_handler(CrewKickoffStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(CrewKickoffCompletedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(CrewKickoffFailedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(TaskStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(TaskCompletedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(TaskFailedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(AgentExecutionStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(AgentExecutionCompletedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(LLMStreamChunkEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(LLMCallStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(LLMCallCompletedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(ToolUsageStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(ToolUsageFinishedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(ToolUsageErrorEvent, event_handler)
|
||||||
|
|
||||||
|
def run_kickoff():
|
||||||
|
try:
|
||||||
|
result = self.kickoff(inputs=inputs)
|
||||||
|
event_queue.put({"type": "final_output", "data": {"output": result.raw}})
|
||||||
|
except Exception as e:
|
||||||
|
exception_holder["exception"] = e
|
||||||
|
finally:
|
||||||
|
completion_event.set()
|
||||||
|
|
||||||
|
thread = threading.Thread(target=run_kickoff, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
while not completion_event.is_set() or not event_queue.empty():
|
||||||
|
event = event_queue.get(timeout=0.1) if not event_queue.empty() else None
|
||||||
|
if event is not None:
|
||||||
|
yield event
|
||||||
|
|
||||||
|
if exception_holder["exception"]:
|
||||||
|
raise exception_holder["exception"]
|
||||||
|
|
||||||
|
finally:
|
||||||
|
thread.join(timeout=1)
|
||||||
|
|
||||||
def _handle_crew_planning(self):
|
def _handle_crew_planning(self):
|
||||||
"""Handles the Crew planning."""
|
"""Handles the Crew planning."""
|
||||||
self._logger.log("info", "Planning the crew execution")
|
self._logger.log("info", "Planning the crew execution")
|
||||||
|
|||||||
@@ -4744,3 +4744,81 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory():
|
|||||||
assert "Researcher" in messages[0]["content"]
|
assert "Researcher" in messages[0]["content"]
|
||||||
assert messages[1]["role"] == "user"
|
assert messages[1]["role"] == "user"
|
||||||
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]
|
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||||
|
def test_crew_kickoff_stream(researcher):
|
||||||
|
"""Test that crew.kickoff_stream() yields events during execution."""
|
||||||
|
task = Task(
|
||||||
|
description="Research a topic about AI",
|
||||||
|
expected_output="A brief summary about AI",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(agents=[researcher], tasks=[task])
|
||||||
|
|
||||||
|
events = list(crew.kickoff_stream())
|
||||||
|
|
||||||
|
assert len(events) > 0
|
||||||
|
|
||||||
|
event_types = [event["type"] for event in events]
|
||||||
|
assert "crew_kickoff_started" in event_types
|
||||||
|
assert "final_output" in event_types
|
||||||
|
|
||||||
|
final_output_event = next(e for e in events if e["type"] == "final_output")
|
||||||
|
assert "output" in final_output_event["data"]
|
||||||
|
assert isinstance(final_output_event["data"]["output"], str)
|
||||||
|
assert len(final_output_event["data"]["output"]) > 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||||
|
def test_crew_kickoff_stream_with_inputs(researcher):
|
||||||
|
"""Test that crew.kickoff_stream() works with inputs."""
|
||||||
|
task = Task(
|
||||||
|
description="Research about {topic}",
|
||||||
|
expected_output="A brief summary about {topic}",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(agents=[researcher], tasks=[task])
|
||||||
|
|
||||||
|
events = list(crew.kickoff_stream(inputs={"topic": "machine learning"}))
|
||||||
|
|
||||||
|
assert len(events) > 0
|
||||||
|
|
||||||
|
event_types = [event["type"] for event in events]
|
||||||
|
assert "crew_kickoff_started" in event_types
|
||||||
|
assert "final_output" in event_types
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||||
|
def test_crew_kickoff_stream_includes_llm_chunks(researcher):
|
||||||
|
"""Test that crew.kickoff_stream() includes LLM stream chunks."""
|
||||||
|
task = Task(
|
||||||
|
description="Write a short poem about AI",
|
||||||
|
expected_output="A 2-line poem",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(agents=[researcher], tasks=[task])
|
||||||
|
|
||||||
|
events = list(crew.kickoff_stream())
|
||||||
|
|
||||||
|
event_types = [event["type"] for event in events]
|
||||||
|
|
||||||
|
assert "task_started" in event_types or "agent_execution_started" in event_types
|
||||||
|
|
||||||
|
|
||||||
|
def test_crew_kickoff_stream_handles_errors(researcher):
|
||||||
|
"""Test that crew.kickoff_stream() properly handles errors."""
|
||||||
|
task = Task(
|
||||||
|
description="This task will fail",
|
||||||
|
expected_output="Should not complete",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(agents=[researcher], tasks=[task])
|
||||||
|
|
||||||
|
with patch("crewai.crew.Crew.kickoff", side_effect=Exception("Test error")):
|
||||||
|
with pytest.raises(Exception, match="Test error"):
|
||||||
|
list(crew.kickoff_stream())
|
||||||
|
|||||||
Reference in New Issue
Block a user