mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-03 08:12:39 +00:00
feat: add kickoff_stream method for FastAPI streaming integration
- Add kickoff_stream() method to Crew class that yields events in real-time - Enables easy integration with FastAPI StreamingResponse - Add comprehensive tests for streaming functionality - Include FastAPI example demonstrating Server-Sent Events (SSE) Resolves #3739 Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -766,6 +766,120 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self._task_output_handler.reset()
|
||||
return results
|
||||
|
||||
def kickoff_stream(self, inputs: dict[str, Any] | None = None):
|
||||
"""
|
||||
Stream crew execution events in real-time.
|
||||
|
||||
This method yields events as they occur during crew execution, making it
|
||||
easy to integrate with streaming frameworks like FastAPI's StreamingResponse.
|
||||
|
||||
Args:
|
||||
inputs: Optional dictionary of inputs for the crew execution
|
||||
|
||||
Yields:
|
||||
dict: Event dictionaries containing event type and data
|
||||
|
||||
Example:
|
||||
```python
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import StreamingResponse
|
||||
import json
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@app.get("/stream")
|
||||
async def stream_crew():
|
||||
def event_generator():
|
||||
for event in crew.kickoff_stream(inputs={"topic": "AI"}):
|
||||
yield f"data: {json.dumps(event)}\\n\\n"
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream"
|
||||
)
|
||||
```
|
||||
"""
|
||||
import queue
|
||||
import threading
|
||||
from crewai.events.base_events import BaseEvent
|
||||
|
||||
event_queue: queue.Queue = queue.Queue()
|
||||
completion_event = threading.Event()
|
||||
exception_holder = {"exception": None}
|
||||
|
||||
def event_handler(source: Any, event: BaseEvent):
|
||||
event_dict = {
|
||||
"type": event.type,
|
||||
"data": event.model_dump(exclude={"from_task", "from_agent"}),
|
||||
}
|
||||
event_queue.put(event_dict)
|
||||
|
||||
from crewai.events.types.crew_events import (
|
||||
CrewKickoffStartedEvent,
|
||||
CrewKickoffCompletedEvent,
|
||||
CrewKickoffFailedEvent,
|
||||
)
|
||||
from crewai.events.types.task_events import (
|
||||
TaskStartedEvent,
|
||||
TaskCompletedEvent,
|
||||
TaskFailedEvent,
|
||||
)
|
||||
from crewai.events.types.agent_events import (
|
||||
AgentExecutionStartedEvent,
|
||||
AgentExecutionCompletedEvent,
|
||||
)
|
||||
from crewai.events.types.llm_events import (
|
||||
LLMStreamChunkEvent,
|
||||
LLMCallStartedEvent,
|
||||
LLMCallCompletedEvent,
|
||||
)
|
||||
from crewai.events.types.tool_usage_events import (
|
||||
ToolUsageStartedEvent,
|
||||
ToolUsageFinishedEvent,
|
||||
ToolUsageErrorEvent,
|
||||
)
|
||||
|
||||
crewai_event_bus.register_handler(CrewKickoffStartedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(CrewKickoffCompletedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(CrewKickoffFailedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(TaskStartedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(TaskCompletedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(TaskFailedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(AgentExecutionStartedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(AgentExecutionCompletedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(LLMStreamChunkEvent, event_handler)
|
||||
crewai_event_bus.register_handler(LLMCallStartedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(LLMCallCompletedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(ToolUsageStartedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(ToolUsageFinishedEvent, event_handler)
|
||||
crewai_event_bus.register_handler(ToolUsageErrorEvent, event_handler)
|
||||
|
||||
def run_kickoff():
|
||||
try:
|
||||
result = self.kickoff(inputs=inputs)
|
||||
event_queue.put({"type": "final_output", "data": {"output": result.raw}})
|
||||
except Exception as e:
|
||||
exception_holder["exception"] = e
|
||||
finally:
|
||||
completion_event.set()
|
||||
|
||||
thread = threading.Thread(target=run_kickoff, daemon=True)
|
||||
thread.start()
|
||||
|
||||
try:
|
||||
while not completion_event.is_set() or not event_queue.empty():
|
||||
try:
|
||||
event = event_queue.get(timeout=0.1)
|
||||
yield event
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
if exception_holder["exception"]:
|
||||
raise exception_holder["exception"]
|
||||
|
||||
finally:
|
||||
thread.join(timeout=1)
|
||||
|
||||
def _handle_crew_planning(self):
|
||||
"""Handles the Crew planning."""
|
||||
self._logger.log("info", "Planning the crew execution")
|
||||
|
||||
Reference in New Issue
Block a user