mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-06 22:58:30 +00:00
Compare commits
9 Commits
fix/unsafe
...
devin/1760
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ff47f4bd7 | ||
|
|
0af7e04cde | ||
|
|
b664637afa | ||
|
|
42f2b4d551 | ||
|
|
0229390ad1 | ||
|
|
f0fb349ddf | ||
|
|
bf2e2a42da | ||
|
|
814c962196 | ||
|
|
2ebb2e845f |
177
fastapi_streaming_example.py
Normal file
177
fastapi_streaming_example.py
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
"""
|
||||||
|
FastAPI Streaming Integration Example for CrewAI
|
||||||
|
|
||||||
|
This example demonstrates how to integrate CrewAI with FastAPI to stream
|
||||||
|
crew execution events in real-time using Server-Sent Events (SSE).
|
||||||
|
|
||||||
|
Installation:
|
||||||
|
pip install crewai fastapi uvicorn
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python fastapi_streaming_example.py
|
||||||
|
|
||||||
|
Then visit:
|
||||||
|
http://localhost:8000/docs for the API documentation
|
||||||
|
http://localhost:8000/stream?topic=AI to see streaming in action
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from typing import AsyncGenerator
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from crewai import Agent, Crew, Task
|
||||||
|
|
||||||
|
app = FastAPI(title="CrewAI Streaming API")
|
||||||
|
|
||||||
|
|
||||||
|
class ResearchRequest(BaseModel):
|
||||||
|
topic: str
|
||||||
|
|
||||||
|
|
||||||
|
def create_research_crew(topic: str) -> Crew:
|
||||||
|
"""Create a research crew for the given topic."""
|
||||||
|
researcher = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal=f"Research and analyze information about {topic}",
|
||||||
|
backstory="You're an expert researcher with deep knowledge in various fields.",
|
||||||
|
verbose=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
task = Task(
|
||||||
|
description=f"Research and provide a comprehensive summary about {topic}",
|
||||||
|
expected_output="A detailed summary with key insights",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
return Crew(agents=[researcher], tasks=[task], verbose=True)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/")
|
||||||
|
async def root():
|
||||||
|
"""Root endpoint with API information."""
|
||||||
|
return {
|
||||||
|
"message": "CrewAI Streaming API",
|
||||||
|
"endpoints": {
|
||||||
|
"/stream": "GET - Stream crew execution events (query param: topic)",
|
||||||
|
"/research": "POST - Execute crew and return final result",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/stream")
|
||||||
|
async def stream_crew_execution(topic: str = "artificial intelligence"):
|
||||||
|
"""
|
||||||
|
Stream crew execution events in real-time using Server-Sent Events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
topic: The research topic (default: "artificial intelligence")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
StreamingResponse with text/event-stream content type
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def event_generator() -> AsyncGenerator[str, None]:
|
||||||
|
"""Generate Server-Sent Events from crew execution."""
|
||||||
|
crew = create_research_crew(topic)
|
||||||
|
|
||||||
|
try:
|
||||||
|
for event in crew.kickoff_stream(inputs={"topic": topic}):
|
||||||
|
event_data = json.dumps(event)
|
||||||
|
yield f"data: {event_data}\n\n"
|
||||||
|
|
||||||
|
yield "data: {\"type\": \"done\"}\n\n"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_event = {"type": "error", "data": {"message": str(e)}}
|
||||||
|
yield f"data: {json.dumps(error_event)}\n\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
"X-Accel-Buffering": "no",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/research")
|
||||||
|
async def research_topic(request: ResearchRequest):
|
||||||
|
"""
|
||||||
|
Execute crew research and return the final result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: ResearchRequest with topic field
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSON response with the research result
|
||||||
|
"""
|
||||||
|
crew = create_research_crew(request.topic)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = crew.kickoff(inputs={"topic": request.topic})
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"topic": request.topic,
|
||||||
|
"result": result.raw,
|
||||||
|
"usage_metrics": (
|
||||||
|
result.token_usage.model_dump() if result.token_usage else None
|
||||||
|
),
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
return {"success": False, "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/stream-filtered")
|
||||||
|
async def stream_filtered_events(
|
||||||
|
topic: str = "artificial intelligence", event_types: str = "llm_stream_chunk"
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Stream only specific event types.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
topic: The research topic
|
||||||
|
event_types: Comma-separated list of event types to include
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
StreamingResponse with filtered events
|
||||||
|
"""
|
||||||
|
allowed_types = set(event_types.split(","))
|
||||||
|
|
||||||
|
async def event_generator() -> AsyncGenerator[str, None]:
|
||||||
|
crew = create_research_crew(topic)
|
||||||
|
|
||||||
|
try:
|
||||||
|
for event in crew.kickoff_stream(inputs={"topic": topic}):
|
||||||
|
if event["type"] in allowed_types:
|
||||||
|
event_data = json.dumps(event)
|
||||||
|
yield f"data: {event_data}\n\n"
|
||||||
|
|
||||||
|
yield "data: {\"type\": \"done\"}\n\n"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_event = {"type": "error", "data": {"message": str(e)}}
|
||||||
|
yield f"data: {json.dumps(error_event)}\n\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import uvicorn
|
||||||
|
|
||||||
|
print("Starting CrewAI Streaming API...")
|
||||||
|
print("Visit http://localhost:8000/docs for API documentation")
|
||||||
|
print("Try: http://localhost:8000/stream?topic=quantum%20computing")
|
||||||
|
|
||||||
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
|
|||||||
|
|
||||||
_suppress_pydantic_deprecation_warnings()
|
_suppress_pydantic_deprecation_warnings()
|
||||||
|
|
||||||
__version__ = "0.203.0"
|
__version__ = "0.203.1"
|
||||||
_telemetry_submitted = False
|
_telemetry_submitted = False
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ def validate_jwt_token(
|
|||||||
algorithms=["RS256"],
|
algorithms=["RS256"],
|
||||||
audience=audience,
|
audience=audience,
|
||||||
issuer=issuer,
|
issuer=issuer,
|
||||||
|
leeway=10.0,
|
||||||
options={
|
options={
|
||||||
"verify_signature": True,
|
"verify_signature": True,
|
||||||
"verify_exp": True,
|
"verify_exp": True,
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
|||||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||||
requires-python = ">=3.10,<3.14"
|
requires-python = ">=3.10,<3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crewai[tools]>=0.203.0,<1.0.0"
|
"crewai[tools]>=0.203.1,<1.0.0"
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
|||||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||||
requires-python = ">=3.10,<3.14"
|
requires-python = ">=3.10,<3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crewai[tools]>=0.203.0,<1.0.0",
|
"crewai[tools]>=0.203.1,<1.0.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
|
|||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.10,<3.14"
|
requires-python = ">=3.10,<3.14"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crewai[tools]>=0.203.0"
|
"crewai[tools]>=0.203.1"
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.crewai]
|
[tool.crewai]
|
||||||
|
|||||||
@@ -766,6 +766,118 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
self._task_output_handler.reset()
|
self._task_output_handler.reset()
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
def kickoff_stream(self, inputs: dict[str, Any] | None = None):
|
||||||
|
"""
|
||||||
|
Stream crew execution events in real-time.
|
||||||
|
|
||||||
|
This method yields events as they occur during crew execution, making it
|
||||||
|
easy to integrate with streaming frameworks like FastAPI's StreamingResponse.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
inputs: Optional dictionary of inputs for the crew execution
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
dict: Event dictionaries containing event type and data
|
||||||
|
|
||||||
|
Example:
|
||||||
|
```python
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
|
import json
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
@app.get("/stream")
|
||||||
|
async def stream_crew():
|
||||||
|
def event_generator():
|
||||||
|
for event in crew.kickoff_stream(inputs={"topic": "AI"}):
|
||||||
|
yield f"data: {json.dumps(event)}\\n\\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream"
|
||||||
|
)
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
|
from crewai.events.base_events import BaseEvent
|
||||||
|
|
||||||
|
event_queue: queue.Queue = queue.Queue()
|
||||||
|
completion_event = threading.Event()
|
||||||
|
exception_holder = {"exception": None}
|
||||||
|
|
||||||
|
def event_handler(source: Any, event: BaseEvent):
|
||||||
|
event_dict = {
|
||||||
|
"type": event.type,
|
||||||
|
"data": event.model_dump(exclude={"from_task", "from_agent"}),
|
||||||
|
}
|
||||||
|
event_queue.put(event_dict)
|
||||||
|
|
||||||
|
from crewai.events.types.crew_events import (
|
||||||
|
CrewKickoffStartedEvent,
|
||||||
|
CrewKickoffCompletedEvent,
|
||||||
|
CrewKickoffFailedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.task_events import (
|
||||||
|
TaskStartedEvent,
|
||||||
|
TaskCompletedEvent,
|
||||||
|
TaskFailedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.agent_events import (
|
||||||
|
AgentExecutionStartedEvent,
|
||||||
|
AgentExecutionCompletedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.llm_events import (
|
||||||
|
LLMStreamChunkEvent,
|
||||||
|
LLMCallStartedEvent,
|
||||||
|
LLMCallCompletedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.tool_usage_events import (
|
||||||
|
ToolUsageStartedEvent,
|
||||||
|
ToolUsageFinishedEvent,
|
||||||
|
ToolUsageErrorEvent,
|
||||||
|
)
|
||||||
|
|
||||||
|
crewai_event_bus.register_handler(CrewKickoffStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(CrewKickoffCompletedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(CrewKickoffFailedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(TaskStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(TaskCompletedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(TaskFailedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(AgentExecutionStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(AgentExecutionCompletedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(LLMStreamChunkEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(LLMCallStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(LLMCallCompletedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(ToolUsageStartedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(ToolUsageFinishedEvent, event_handler)
|
||||||
|
crewai_event_bus.register_handler(ToolUsageErrorEvent, event_handler)
|
||||||
|
|
||||||
|
def run_kickoff():
|
||||||
|
try:
|
||||||
|
result = self.kickoff(inputs=inputs)
|
||||||
|
event_queue.put({"type": "final_output", "data": {"output": result.raw}})
|
||||||
|
except Exception as e:
|
||||||
|
exception_holder["exception"] = e
|
||||||
|
finally:
|
||||||
|
completion_event.set()
|
||||||
|
|
||||||
|
thread = threading.Thread(target=run_kickoff, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
while not completion_event.is_set() or not event_queue.empty():
|
||||||
|
event = event_queue.get(timeout=0.1) if not event_queue.empty() else None
|
||||||
|
if event is not None:
|
||||||
|
yield event
|
||||||
|
|
||||||
|
if exception_holder["exception"]:
|
||||||
|
raise exception_holder["exception"]
|
||||||
|
|
||||||
|
finally:
|
||||||
|
thread.join(timeout=1)
|
||||||
|
|
||||||
def _handle_crew_planning(self):
|
def _handle_crew_planning(self):
|
||||||
"""Handles the Crew planning."""
|
"""Handles the Crew planning."""
|
||||||
self._logger.log("info", "Planning the crew execution")
|
self._logger.log("info", "Planning the crew execution")
|
||||||
|
|||||||
@@ -358,7 +358,8 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
|
|||||||
try:
|
try:
|
||||||
response = input().strip().lower()
|
response = input().strip().lower()
|
||||||
result[0] = response in ["y", "yes"]
|
result[0] = response in ["y", "yes"]
|
||||||
except (EOFError, KeyboardInterrupt):
|
except (EOFError, KeyboardInterrupt, OSError, LookupError):
|
||||||
|
# Handle all input-related errors silently
|
||||||
result[0] = False
|
result[0] = False
|
||||||
|
|
||||||
input_thread = threading.Thread(target=get_input, daemon=True)
|
input_thread = threading.Thread(target=get_input, daemon=True)
|
||||||
@@ -371,6 +372,7 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
|
|||||||
return result[0]
|
return result[0]
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
|
# Suppress any warnings or errors and assume "no"
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ from crewai.flow.flow_visualizer import plot_flow
|
|||||||
from crewai.flow.persistence.base import FlowPersistence
|
from crewai.flow.persistence.base import FlowPersistence
|
||||||
from crewai.flow.types import FlowExecutionData
|
from crewai.flow.types import FlowExecutionData
|
||||||
from crewai.flow.utils import get_possible_return_constants
|
from crewai.flow.utils import get_possible_return_constants
|
||||||
from crewai.utilities.printer import Printer
|
from crewai.utilities.printer import Printer, PrinterColor
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -105,7 +105,7 @@ def start(condition: str | dict | Callable | None = None) -> Callable:
|
|||||||
condition : Optional[Union[str, dict, Callable]], optional
|
condition : Optional[Union[str, dict, Callable]], optional
|
||||||
Defines when the start method should execute. Can be:
|
Defines when the start method should execute. Can be:
|
||||||
- str: Name of a method that triggers this start
|
- str: Name of a method that triggers this start
|
||||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
- dict: Result from or_() or and_(), including nested conditions
|
||||||
- Callable: A method reference that triggers this start
|
- Callable: A method reference that triggers this start
|
||||||
Default is None, meaning unconditional start.
|
Default is None, meaning unconditional start.
|
||||||
|
|
||||||
@@ -140,13 +140,18 @@ def start(condition: str | dict | Callable | None = None) -> Callable:
|
|||||||
if isinstance(condition, str):
|
if isinstance(condition, str):
|
||||||
func.__trigger_methods__ = [condition]
|
func.__trigger_methods__ = [condition]
|
||||||
func.__condition_type__ = "OR"
|
func.__condition_type__ = "OR"
|
||||||
elif (
|
elif isinstance(condition, dict) and "type" in condition:
|
||||||
isinstance(condition, dict)
|
if "conditions" in condition:
|
||||||
and "type" in condition
|
func.__trigger_condition__ = condition
|
||||||
and "methods" in condition
|
func.__trigger_methods__ = _extract_all_methods(condition)
|
||||||
):
|
func.__condition_type__ = condition["type"]
|
||||||
func.__trigger_methods__ = condition["methods"]
|
elif "methods" in condition:
|
||||||
func.__condition_type__ = condition["type"]
|
func.__trigger_methods__ = condition["methods"]
|
||||||
|
func.__condition_type__ = condition["type"]
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Condition dict must contain 'conditions' or 'methods'"
|
||||||
|
)
|
||||||
elif callable(condition) and hasattr(condition, "__name__"):
|
elif callable(condition) and hasattr(condition, "__name__"):
|
||||||
func.__trigger_methods__ = [condition.__name__]
|
func.__trigger_methods__ = [condition.__name__]
|
||||||
func.__condition_type__ = "OR"
|
func.__condition_type__ = "OR"
|
||||||
@@ -172,7 +177,7 @@ def listen(condition: str | dict | Callable) -> Callable:
|
|||||||
condition : Union[str, dict, Callable]
|
condition : Union[str, dict, Callable]
|
||||||
Specifies when the listener should execute. Can be:
|
Specifies when the listener should execute. Can be:
|
||||||
- str: Name of a method that triggers this listener
|
- str: Name of a method that triggers this listener
|
||||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
- dict: Result from or_() or and_(), including nested conditions
|
||||||
- Callable: A method reference that triggers this listener
|
- Callable: A method reference that triggers this listener
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
@@ -200,13 +205,18 @@ def listen(condition: str | dict | Callable) -> Callable:
|
|||||||
if isinstance(condition, str):
|
if isinstance(condition, str):
|
||||||
func.__trigger_methods__ = [condition]
|
func.__trigger_methods__ = [condition]
|
||||||
func.__condition_type__ = "OR"
|
func.__condition_type__ = "OR"
|
||||||
elif (
|
elif isinstance(condition, dict) and "type" in condition:
|
||||||
isinstance(condition, dict)
|
if "conditions" in condition:
|
||||||
and "type" in condition
|
func.__trigger_condition__ = condition
|
||||||
and "methods" in condition
|
func.__trigger_methods__ = _extract_all_methods(condition)
|
||||||
):
|
func.__condition_type__ = condition["type"]
|
||||||
func.__trigger_methods__ = condition["methods"]
|
elif "methods" in condition:
|
||||||
func.__condition_type__ = condition["type"]
|
func.__trigger_methods__ = condition["methods"]
|
||||||
|
func.__condition_type__ = condition["type"]
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Condition dict must contain 'conditions' or 'methods'"
|
||||||
|
)
|
||||||
elif callable(condition) and hasattr(condition, "__name__"):
|
elif callable(condition) and hasattr(condition, "__name__"):
|
||||||
func.__trigger_methods__ = [condition.__name__]
|
func.__trigger_methods__ = [condition.__name__]
|
||||||
func.__condition_type__ = "OR"
|
func.__condition_type__ = "OR"
|
||||||
@@ -233,7 +243,7 @@ def router(condition: str | dict | Callable) -> Callable:
|
|||||||
condition : Union[str, dict, Callable]
|
condition : Union[str, dict, Callable]
|
||||||
Specifies when the router should execute. Can be:
|
Specifies when the router should execute. Can be:
|
||||||
- str: Name of a method that triggers this router
|
- str: Name of a method that triggers this router
|
||||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
- dict: Result from or_() or and_(), including nested conditions
|
||||||
- Callable: A method reference that triggers this router
|
- Callable: A method reference that triggers this router
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
@@ -266,13 +276,18 @@ def router(condition: str | dict | Callable) -> Callable:
|
|||||||
if isinstance(condition, str):
|
if isinstance(condition, str):
|
||||||
func.__trigger_methods__ = [condition]
|
func.__trigger_methods__ = [condition]
|
||||||
func.__condition_type__ = "OR"
|
func.__condition_type__ = "OR"
|
||||||
elif (
|
elif isinstance(condition, dict) and "type" in condition:
|
||||||
isinstance(condition, dict)
|
if "conditions" in condition:
|
||||||
and "type" in condition
|
func.__trigger_condition__ = condition
|
||||||
and "methods" in condition
|
func.__trigger_methods__ = _extract_all_methods(condition)
|
||||||
):
|
func.__condition_type__ = condition["type"]
|
||||||
func.__trigger_methods__ = condition["methods"]
|
elif "methods" in condition:
|
||||||
func.__condition_type__ = condition["type"]
|
func.__trigger_methods__ = condition["methods"]
|
||||||
|
func.__condition_type__ = condition["type"]
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Condition dict must contain 'conditions' or 'methods'"
|
||||||
|
)
|
||||||
elif callable(condition) and hasattr(condition, "__name__"):
|
elif callable(condition) and hasattr(condition, "__name__"):
|
||||||
func.__trigger_methods__ = [condition.__name__]
|
func.__trigger_methods__ = [condition.__name__]
|
||||||
func.__condition_type__ = "OR"
|
func.__condition_type__ = "OR"
|
||||||
@@ -298,14 +313,15 @@ def or_(*conditions: str | dict | Callable) -> dict:
|
|||||||
*conditions : Union[str, dict, Callable]
|
*conditions : Union[str, dict, Callable]
|
||||||
Variable number of conditions that can be:
|
Variable number of conditions that can be:
|
||||||
- str: Method names
|
- str: Method names
|
||||||
- dict: Existing condition dictionaries
|
- dict: Existing condition dictionaries (nested conditions)
|
||||||
- Callable: Method references
|
- Callable: Method references
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
dict
|
dict
|
||||||
A condition dictionary with format:
|
A condition dictionary with format:
|
||||||
{"type": "OR", "methods": list_of_method_names}
|
{"type": "OR", "conditions": list_of_conditions}
|
||||||
|
where each condition can be a string (method name) or a nested dict
|
||||||
|
|
||||||
Raises
|
Raises
|
||||||
------
|
------
|
||||||
@@ -317,18 +333,22 @@ def or_(*conditions: str | dict | Callable) -> dict:
|
|||||||
>>> @listen(or_("success", "timeout"))
|
>>> @listen(or_("success", "timeout"))
|
||||||
>>> def handle_completion(self):
|
>>> def handle_completion(self):
|
||||||
... pass
|
... pass
|
||||||
|
|
||||||
|
>>> @listen(or_(and_("step1", "step2"), "step3"))
|
||||||
|
>>> def handle_nested(self):
|
||||||
|
... pass
|
||||||
"""
|
"""
|
||||||
methods = []
|
processed_conditions: list[str | dict[str, Any]] = []
|
||||||
for condition in conditions:
|
for condition in conditions:
|
||||||
if isinstance(condition, dict) and "methods" in condition:
|
if isinstance(condition, dict):
|
||||||
methods.extend(condition["methods"])
|
processed_conditions.append(condition)
|
||||||
elif isinstance(condition, str):
|
elif isinstance(condition, str):
|
||||||
methods.append(condition)
|
processed_conditions.append(condition)
|
||||||
elif callable(condition):
|
elif callable(condition):
|
||||||
methods.append(getattr(condition, "__name__", repr(condition)))
|
processed_conditions.append(getattr(condition, "__name__", repr(condition)))
|
||||||
else:
|
else:
|
||||||
raise ValueError("Invalid condition in or_()")
|
raise ValueError("Invalid condition in or_()")
|
||||||
return {"type": "OR", "methods": methods}
|
return {"type": "OR", "conditions": processed_conditions}
|
||||||
|
|
||||||
|
|
||||||
def and_(*conditions: str | dict | Callable) -> dict:
|
def and_(*conditions: str | dict | Callable) -> dict:
|
||||||
@@ -344,14 +364,15 @@ def and_(*conditions: str | dict | Callable) -> dict:
|
|||||||
*conditions : Union[str, dict, Callable]
|
*conditions : Union[str, dict, Callable]
|
||||||
Variable number of conditions that can be:
|
Variable number of conditions that can be:
|
||||||
- str: Method names
|
- str: Method names
|
||||||
- dict: Existing condition dictionaries
|
- dict: Existing condition dictionaries (nested conditions)
|
||||||
- Callable: Method references
|
- Callable: Method references
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
dict
|
dict
|
||||||
A condition dictionary with format:
|
A condition dictionary with format:
|
||||||
{"type": "AND", "methods": list_of_method_names}
|
{"type": "AND", "conditions": list_of_conditions}
|
||||||
|
where each condition can be a string (method name) or a nested dict
|
||||||
|
|
||||||
Raises
|
Raises
|
||||||
------
|
------
|
||||||
@@ -363,18 +384,69 @@ def and_(*conditions: str | dict | Callable) -> dict:
|
|||||||
>>> @listen(and_("validated", "processed"))
|
>>> @listen(and_("validated", "processed"))
|
||||||
>>> def handle_complete_data(self):
|
>>> def handle_complete_data(self):
|
||||||
... pass
|
... pass
|
||||||
|
|
||||||
|
>>> @listen(and_(or_("step1", "step2"), "step3"))
|
||||||
|
>>> def handle_nested(self):
|
||||||
|
... pass
|
||||||
"""
|
"""
|
||||||
methods = []
|
processed_conditions: list[str | dict[str, Any]] = []
|
||||||
for condition in conditions:
|
for condition in conditions:
|
||||||
if isinstance(condition, dict) and "methods" in condition:
|
if isinstance(condition, dict):
|
||||||
methods.extend(condition["methods"])
|
processed_conditions.append(condition)
|
||||||
elif isinstance(condition, str):
|
elif isinstance(condition, str):
|
||||||
methods.append(condition)
|
processed_conditions.append(condition)
|
||||||
elif callable(condition):
|
elif callable(condition):
|
||||||
methods.append(getattr(condition, "__name__", repr(condition)))
|
processed_conditions.append(getattr(condition, "__name__", repr(condition)))
|
||||||
else:
|
else:
|
||||||
raise ValueError("Invalid condition in and_()")
|
raise ValueError("Invalid condition in and_()")
|
||||||
return {"type": "AND", "methods": methods}
|
return {"type": "AND", "conditions": processed_conditions}
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_condition(condition: str | dict | list) -> dict:
|
||||||
|
"""Normalize a condition to standard format with 'conditions' key.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition: Can be a string (method name), dict (condition), or list
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Normalized dict with 'type' and 'conditions' keys
|
||||||
|
"""
|
||||||
|
if isinstance(condition, str):
|
||||||
|
return {"type": "OR", "conditions": [condition]}
|
||||||
|
if isinstance(condition, dict):
|
||||||
|
if "conditions" in condition:
|
||||||
|
return condition
|
||||||
|
if "methods" in condition:
|
||||||
|
return {"type": condition["type"], "conditions": condition["methods"]}
|
||||||
|
return condition
|
||||||
|
if isinstance(condition, list):
|
||||||
|
return {"type": "OR", "conditions": condition}
|
||||||
|
return {"type": "OR", "conditions": [condition]}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_all_methods(condition: str | dict | list) -> list[str]:
|
||||||
|
"""Extract all method names from a condition (including nested).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition: Can be a string, dict, or list
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all method names in the condition tree
|
||||||
|
"""
|
||||||
|
if isinstance(condition, str):
|
||||||
|
return [condition]
|
||||||
|
if isinstance(condition, dict):
|
||||||
|
normalized = _normalize_condition(condition)
|
||||||
|
methods = []
|
||||||
|
for sub_cond in normalized.get("conditions", []):
|
||||||
|
methods.extend(_extract_all_methods(sub_cond))
|
||||||
|
return methods
|
||||||
|
if isinstance(condition, list):
|
||||||
|
methods = []
|
||||||
|
for item in condition:
|
||||||
|
methods.extend(_extract_all_methods(item))
|
||||||
|
return methods
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
class FlowMeta(type):
|
class FlowMeta(type):
|
||||||
@@ -402,7 +474,10 @@ class FlowMeta(type):
|
|||||||
if hasattr(attr_value, "__trigger_methods__"):
|
if hasattr(attr_value, "__trigger_methods__"):
|
||||||
methods = attr_value.__trigger_methods__
|
methods = attr_value.__trigger_methods__
|
||||||
condition_type = getattr(attr_value, "__condition_type__", "OR")
|
condition_type = getattr(attr_value, "__condition_type__", "OR")
|
||||||
listeners[attr_name] = (condition_type, methods)
|
if hasattr(attr_value, "__trigger_condition__"):
|
||||||
|
listeners[attr_name] = attr_value.__trigger_condition__
|
||||||
|
else:
|
||||||
|
listeners[attr_name] = (condition_type, methods)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
hasattr(attr_value, "__is_router__")
|
hasattr(attr_value, "__is_router__")
|
||||||
@@ -822,6 +897,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
# Clear completed methods and outputs for a fresh start
|
# Clear completed methods and outputs for a fresh start
|
||||||
self._completed_methods.clear()
|
self._completed_methods.clear()
|
||||||
self._method_outputs.clear()
|
self._method_outputs.clear()
|
||||||
|
self._pending_and_listeners.clear()
|
||||||
else:
|
else:
|
||||||
# We're restoring from persistence, set the flag
|
# We're restoring from persistence, set the flag
|
||||||
self._is_execution_resuming = True
|
self._is_execution_resuming = True
|
||||||
@@ -1086,10 +1162,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
for method_name in self._start_methods:
|
for method_name in self._start_methods:
|
||||||
# Check if this start method is triggered by the current trigger
|
# Check if this start method is triggered by the current trigger
|
||||||
if method_name in self._listeners:
|
if method_name in self._listeners:
|
||||||
condition_type, trigger_methods = self._listeners[
|
condition_data = self._listeners[method_name]
|
||||||
method_name
|
should_trigger = False
|
||||||
]
|
if isinstance(condition_data, tuple):
|
||||||
if current_trigger in trigger_methods:
|
_, trigger_methods = condition_data
|
||||||
|
should_trigger = current_trigger in trigger_methods
|
||||||
|
elif isinstance(condition_data, dict):
|
||||||
|
all_methods = _extract_all_methods(condition_data)
|
||||||
|
should_trigger = current_trigger in all_methods
|
||||||
|
|
||||||
|
if should_trigger:
|
||||||
# Only execute if this is a cycle (method was already completed)
|
# Only execute if this is a cycle (method was already completed)
|
||||||
if method_name in self._completed_methods:
|
if method_name in self._completed_methods:
|
||||||
# For router-triggered start methods in cycles, temporarily clear resumption flag
|
# For router-triggered start methods in cycles, temporarily clear resumption flag
|
||||||
@@ -1099,6 +1181,51 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
await self._execute_start_method(method_name)
|
await self._execute_start_method(method_name)
|
||||||
self._is_execution_resuming = was_resuming
|
self._is_execution_resuming = was_resuming
|
||||||
|
|
||||||
|
def _evaluate_condition(
|
||||||
|
self, condition: str | dict, trigger_method: str, listener_name: str
|
||||||
|
) -> bool:
|
||||||
|
"""Recursively evaluate a condition (simple or nested).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
condition: Can be a string (method name) or dict (nested condition)
|
||||||
|
trigger_method: The method that just completed
|
||||||
|
listener_name: Name of the listener being evaluated
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if the condition is satisfied, False otherwise
|
||||||
|
"""
|
||||||
|
if isinstance(condition, str):
|
||||||
|
return condition == trigger_method
|
||||||
|
|
||||||
|
if isinstance(condition, dict):
|
||||||
|
normalized = _normalize_condition(condition)
|
||||||
|
cond_type = normalized.get("type", "OR")
|
||||||
|
sub_conditions = normalized.get("conditions", [])
|
||||||
|
|
||||||
|
if cond_type == "OR":
|
||||||
|
return any(
|
||||||
|
self._evaluate_condition(sub_cond, trigger_method, listener_name)
|
||||||
|
for sub_cond in sub_conditions
|
||||||
|
)
|
||||||
|
|
||||||
|
if cond_type == "AND":
|
||||||
|
pending_key = f"{listener_name}:{id(condition)}"
|
||||||
|
|
||||||
|
if pending_key not in self._pending_and_listeners:
|
||||||
|
all_methods = set(_extract_all_methods(condition))
|
||||||
|
self._pending_and_listeners[pending_key] = all_methods
|
||||||
|
|
||||||
|
if trigger_method in self._pending_and_listeners[pending_key]:
|
||||||
|
self._pending_and_listeners[pending_key].discard(trigger_method)
|
||||||
|
|
||||||
|
if not self._pending_and_listeners[pending_key]:
|
||||||
|
self._pending_and_listeners.pop(pending_key, None)
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def _find_triggered_methods(
|
def _find_triggered_methods(
|
||||||
self, trigger_method: str, router_only: bool
|
self, trigger_method: str, router_only: bool
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
@@ -1106,7 +1233,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
Finds all methods that should be triggered based on conditions.
|
Finds all methods that should be triggered based on conditions.
|
||||||
|
|
||||||
This internal method evaluates both OR and AND conditions to determine
|
This internal method evaluates both OR and AND conditions to determine
|
||||||
which methods should be executed next in the flow.
|
which methods should be executed next in the flow. Supports nested conditions.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@@ -1123,14 +1250,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
|
|
||||||
Notes
|
Notes
|
||||||
-----
|
-----
|
||||||
- Handles both OR and AND conditions:
|
- Handles both OR and AND conditions, including nested combinations
|
||||||
* OR: Triggers if any condition is met
|
|
||||||
* AND: Triggers only when all conditions are met
|
|
||||||
- Maintains state for AND conditions using _pending_and_listeners
|
- Maintains state for AND conditions using _pending_and_listeners
|
||||||
- Separates router and normal listener evaluation
|
- Separates router and normal listener evaluation
|
||||||
"""
|
"""
|
||||||
triggered = []
|
triggered = []
|
||||||
for listener_name, (condition_type, methods) in self._listeners.items():
|
|
||||||
|
for listener_name, condition_data in self._listeners.items():
|
||||||
is_router = listener_name in self._routers
|
is_router = listener_name in self._routers
|
||||||
|
|
||||||
if router_only != is_router:
|
if router_only != is_router:
|
||||||
@@ -1139,23 +1265,29 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
if not router_only and listener_name in self._start_methods:
|
if not router_only and listener_name in self._start_methods:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if condition_type == "OR":
|
if isinstance(condition_data, tuple):
|
||||||
# If the trigger_method matches any in methods, run this
|
condition_type, methods = condition_data
|
||||||
if trigger_method in methods:
|
|
||||||
triggered.append(listener_name)
|
|
||||||
elif condition_type == "AND":
|
|
||||||
# Initialize pending methods for this listener if not already done
|
|
||||||
if listener_name not in self._pending_and_listeners:
|
|
||||||
self._pending_and_listeners[listener_name] = set(methods)
|
|
||||||
# Remove the trigger method from pending methods
|
|
||||||
if trigger_method in self._pending_and_listeners[listener_name]:
|
|
||||||
self._pending_and_listeners[listener_name].discard(trigger_method)
|
|
||||||
|
|
||||||
if not self._pending_and_listeners[listener_name]:
|
if condition_type == "OR":
|
||||||
# All required methods have been executed
|
if trigger_method in methods:
|
||||||
|
triggered.append(listener_name)
|
||||||
|
elif condition_type == "AND":
|
||||||
|
if listener_name not in self._pending_and_listeners:
|
||||||
|
self._pending_and_listeners[listener_name] = set(methods)
|
||||||
|
if trigger_method in self._pending_and_listeners[listener_name]:
|
||||||
|
self._pending_and_listeners[listener_name].discard(
|
||||||
|
trigger_method
|
||||||
|
)
|
||||||
|
|
||||||
|
if not self._pending_and_listeners[listener_name]:
|
||||||
|
triggered.append(listener_name)
|
||||||
|
self._pending_and_listeners.pop(listener_name, None)
|
||||||
|
|
||||||
|
elif isinstance(condition_data, dict):
|
||||||
|
if self._evaluate_condition(
|
||||||
|
condition_data, trigger_method, listener_name
|
||||||
|
):
|
||||||
triggered.append(listener_name)
|
triggered.append(listener_name)
|
||||||
# Reset pending methods for this listener
|
|
||||||
self._pending_and_listeners.pop(listener_name, None)
|
|
||||||
|
|
||||||
return triggered
|
return triggered
|
||||||
|
|
||||||
@@ -1218,7 +1350,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
def _log_flow_event(
|
def _log_flow_event(
|
||||||
self, message: str, color: str = "yellow", level: str = "info"
|
self, message: str, color: PrinterColor | None = "yellow", level: str = "info"
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Centralized logging method for flow events.
|
"""Centralized logging method for flow events.
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import uuid
|
|||||||
import warnings
|
import warnings
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from concurrent.futures import Future
|
from concurrent.futures import Future
|
||||||
from copy import copy
|
from copy import copy as shallow_copy
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import (
|
from typing import (
|
||||||
@@ -672,7 +672,9 @@ Follow these guidelines:
|
|||||||
copied_data = {k: v for k, v in copied_data.items() if v is not None}
|
copied_data = {k: v for k, v in copied_data.items() if v is not None}
|
||||||
|
|
||||||
cloned_context = (
|
cloned_context = (
|
||||||
[task_mapping[context_task.key] for context_task in self.context]
|
self.context
|
||||||
|
if self.context is NOT_SPECIFIED
|
||||||
|
else [task_mapping[context_task.key] for context_task in self.context]
|
||||||
if isinstance(self.context, list)
|
if isinstance(self.context, list)
|
||||||
else None
|
else None
|
||||||
)
|
)
|
||||||
@@ -681,7 +683,7 @@ Follow these guidelines:
|
|||||||
return next((agent for agent in agents if agent.role == role), None)
|
return next((agent for agent in agents if agent.role == role), None)
|
||||||
|
|
||||||
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
|
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
|
||||||
cloned_tools = copy(self.tools) if self.tools else []
|
cloned_tools = shallow_copy(self.tools) if self.tools else []
|
||||||
|
|
||||||
return self.__class__(
|
return self.__class__(
|
||||||
**copied_data,
|
**copied_data,
|
||||||
|
|||||||
@@ -1,6 +1,11 @@
|
|||||||
"""Utility for colored console output."""
|
"""Utility for colored console output."""
|
||||||
|
|
||||||
from typing import Final, Literal, NamedTuple
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING, Final, Literal, NamedTuple
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from _typeshed import SupportsWrite
|
||||||
|
|
||||||
PrinterColor = Literal[
|
PrinterColor = Literal[
|
||||||
"purple",
|
"purple",
|
||||||
@@ -54,13 +59,22 @@ class Printer:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def print(
|
def print(
|
||||||
content: str | list[ColoredText], color: PrinterColor | None = None
|
content: str | list[ColoredText],
|
||||||
|
color: PrinterColor | None = None,
|
||||||
|
sep: str | None = " ",
|
||||||
|
end: str | None = "\n",
|
||||||
|
file: SupportsWrite[str] | None = None,
|
||||||
|
flush: Literal[False] = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Prints content to the console with optional color formatting.
|
"""Prints content to the console with optional color formatting.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
content: Either a string or a list of ColoredText objects for multicolor output.
|
content: Either a string or a list of ColoredText objects for multicolor output.
|
||||||
color: Optional color for the text when content is a string. Ignored when content is a list.
|
color: Optional color for the text when content is a string. Ignored when content is a list.
|
||||||
|
sep: Separator to use between the text and color.
|
||||||
|
end: String appended after the last value.
|
||||||
|
file: A file-like object (stream); defaults to the current sys.stdout.
|
||||||
|
flush: Whether to forcibly flush the stream.
|
||||||
"""
|
"""
|
||||||
if isinstance(content, str):
|
if isinstance(content, str):
|
||||||
content = [ColoredText(content, color)]
|
content = [ColoredText(content, color)]
|
||||||
@@ -68,5 +82,9 @@ class Printer:
|
|||||||
"".join(
|
"".join(
|
||||||
f"{_COLOR_CODES[c.color] if c.color else ''}{c.text}{RESET}"
|
f"{_COLOR_CODES[c.color] if c.color else ''}{c.text}{RESET}"
|
||||||
for c in content
|
for c in content
|
||||||
)
|
),
|
||||||
|
sep=sep,
|
||||||
|
end=end,
|
||||||
|
file=file,
|
||||||
|
flush=flush,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import jwt
|
|
||||||
import unittest
|
import unittest
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import jwt
|
||||||
|
|
||||||
from crewai.cli.authentication.utils import validate_jwt_token
|
from crewai.cli.authentication.utils import validate_jwt_token
|
||||||
|
|
||||||
@@ -17,19 +17,22 @@ class TestUtils(unittest.TestCase):
|
|||||||
key="mock_signing_key"
|
key="mock_signing_key"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
jwt_token = "aaaaa.bbbbbb.cccccc" # noqa: S105
|
||||||
|
|
||||||
decoded_token = validate_jwt_token(
|
decoded_token = validate_jwt_token(
|
||||||
jwt_token="aaaaa.bbbbbb.cccccc",
|
jwt_token=jwt_token,
|
||||||
jwks_url="https://mock_jwks_url",
|
jwks_url="https://mock_jwks_url",
|
||||||
issuer="https://mock_issuer",
|
issuer="https://mock_issuer",
|
||||||
audience="app_id_xxxx",
|
audience="app_id_xxxx",
|
||||||
)
|
)
|
||||||
|
|
||||||
mock_jwt.decode.assert_called_with(
|
mock_jwt.decode.assert_called_with(
|
||||||
"aaaaa.bbbbbb.cccccc",
|
jwt_token,
|
||||||
"mock_signing_key",
|
"mock_signing_key",
|
||||||
algorithms=["RS256"],
|
algorithms=["RS256"],
|
||||||
audience="app_id_xxxx",
|
audience="app_id_xxxx",
|
||||||
issuer="https://mock_issuer",
|
issuer="https://mock_issuer",
|
||||||
|
leeway=10.0,
|
||||||
options={
|
options={
|
||||||
"verify_signature": True,
|
"verify_signature": True,
|
||||||
"verify_exp": True,
|
"verify_exp": True,
|
||||||
@@ -43,9 +46,9 @@ class TestUtils(unittest.TestCase):
|
|||||||
|
|
||||||
def test_validate_jwt_token_expired(self, mock_jwt, mock_pyjwkclient):
|
def test_validate_jwt_token_expired(self, mock_jwt, mock_pyjwkclient):
|
||||||
mock_jwt.decode.side_effect = jwt.ExpiredSignatureError
|
mock_jwt.decode.side_effect = jwt.ExpiredSignatureError
|
||||||
with self.assertRaises(Exception):
|
with self.assertRaises(Exception): # noqa: B017
|
||||||
validate_jwt_token(
|
validate_jwt_token(
|
||||||
jwt_token="aaaaa.bbbbbb.cccccc",
|
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
|
||||||
jwks_url="https://mock_jwks_url",
|
jwks_url="https://mock_jwks_url",
|
||||||
issuer="https://mock_issuer",
|
issuer="https://mock_issuer",
|
||||||
audience="app_id_xxxx",
|
audience="app_id_xxxx",
|
||||||
@@ -53,9 +56,9 @@ class TestUtils(unittest.TestCase):
|
|||||||
|
|
||||||
def test_validate_jwt_token_invalid_audience(self, mock_jwt, mock_pyjwkclient):
|
def test_validate_jwt_token_invalid_audience(self, mock_jwt, mock_pyjwkclient):
|
||||||
mock_jwt.decode.side_effect = jwt.InvalidAudienceError
|
mock_jwt.decode.side_effect = jwt.InvalidAudienceError
|
||||||
with self.assertRaises(Exception):
|
with self.assertRaises(Exception): # noqa: B017
|
||||||
validate_jwt_token(
|
validate_jwt_token(
|
||||||
jwt_token="aaaaa.bbbbbb.cccccc",
|
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
|
||||||
jwks_url="https://mock_jwks_url",
|
jwks_url="https://mock_jwks_url",
|
||||||
issuer="https://mock_issuer",
|
issuer="https://mock_issuer",
|
||||||
audience="app_id_xxxx",
|
audience="app_id_xxxx",
|
||||||
@@ -63,9 +66,9 @@ class TestUtils(unittest.TestCase):
|
|||||||
|
|
||||||
def test_validate_jwt_token_invalid_issuer(self, mock_jwt, mock_pyjwkclient):
|
def test_validate_jwt_token_invalid_issuer(self, mock_jwt, mock_pyjwkclient):
|
||||||
mock_jwt.decode.side_effect = jwt.InvalidIssuerError
|
mock_jwt.decode.side_effect = jwt.InvalidIssuerError
|
||||||
with self.assertRaises(Exception):
|
with self.assertRaises(Exception): # noqa: B017
|
||||||
validate_jwt_token(
|
validate_jwt_token(
|
||||||
jwt_token="aaaaa.bbbbbb.cccccc",
|
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
|
||||||
jwks_url="https://mock_jwks_url",
|
jwks_url="https://mock_jwks_url",
|
||||||
issuer="https://mock_issuer",
|
issuer="https://mock_issuer",
|
||||||
audience="app_id_xxxx",
|
audience="app_id_xxxx",
|
||||||
@@ -75,9 +78,9 @@ class TestUtils(unittest.TestCase):
|
|||||||
self, mock_jwt, mock_pyjwkclient
|
self, mock_jwt, mock_pyjwkclient
|
||||||
):
|
):
|
||||||
mock_jwt.decode.side_effect = jwt.MissingRequiredClaimError
|
mock_jwt.decode.side_effect = jwt.MissingRequiredClaimError
|
||||||
with self.assertRaises(Exception):
|
with self.assertRaises(Exception): # noqa: B017
|
||||||
validate_jwt_token(
|
validate_jwt_token(
|
||||||
jwt_token="aaaaa.bbbbbb.cccccc",
|
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
|
||||||
jwks_url="https://mock_jwks_url",
|
jwks_url="https://mock_jwks_url",
|
||||||
issuer="https://mock_issuer",
|
issuer="https://mock_issuer",
|
||||||
audience="app_id_xxxx",
|
audience="app_id_xxxx",
|
||||||
@@ -85,9 +88,9 @@ class TestUtils(unittest.TestCase):
|
|||||||
|
|
||||||
def test_validate_jwt_token_jwks_error(self, mock_jwt, mock_pyjwkclient):
|
def test_validate_jwt_token_jwks_error(self, mock_jwt, mock_pyjwkclient):
|
||||||
mock_jwt.decode.side_effect = jwt.exceptions.PyJWKClientError
|
mock_jwt.decode.side_effect = jwt.exceptions.PyJWKClientError
|
||||||
with self.assertRaises(Exception):
|
with self.assertRaises(Exception): # noqa: B017
|
||||||
validate_jwt_token(
|
validate_jwt_token(
|
||||||
jwt_token="aaaaa.bbbbbb.cccccc",
|
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
|
||||||
jwks_url="https://mock_jwks_url",
|
jwks_url="https://mock_jwks_url",
|
||||||
issuer="https://mock_issuer",
|
issuer="https://mock_issuer",
|
||||||
audience="app_id_xxxx",
|
audience="app_id_xxxx",
|
||||||
@@ -95,9 +98,9 @@ class TestUtils(unittest.TestCase):
|
|||||||
|
|
||||||
def test_validate_jwt_token_invalid_token(self, mock_jwt, mock_pyjwkclient):
|
def test_validate_jwt_token_invalid_token(self, mock_jwt, mock_pyjwkclient):
|
||||||
mock_jwt.decode.side_effect = jwt.InvalidTokenError
|
mock_jwt.decode.side_effect = jwt.InvalidTokenError
|
||||||
with self.assertRaises(Exception):
|
with self.assertRaises(Exception): # noqa: B017
|
||||||
validate_jwt_token(
|
validate_jwt_token(
|
||||||
jwt_token="aaaaa.bbbbbb.cccccc",
|
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
|
||||||
jwks_url="https://mock_jwks_url",
|
jwks_url="https://mock_jwks_url",
|
||||||
issuer="https://mock_issuer",
|
issuer="https://mock_issuer",
|
||||||
audience="app_id_xxxx",
|
audience="app_id_xxxx",
|
||||||
|
|||||||
@@ -4744,3 +4744,81 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory():
|
|||||||
assert "Researcher" in messages[0]["content"]
|
assert "Researcher" in messages[0]["content"]
|
||||||
assert messages[1]["role"] == "user"
|
assert messages[1]["role"] == "user"
|
||||||
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]
|
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||||
|
def test_crew_kickoff_stream(researcher):
|
||||||
|
"""Test that crew.kickoff_stream() yields events during execution."""
|
||||||
|
task = Task(
|
||||||
|
description="Research a topic about AI",
|
||||||
|
expected_output="A brief summary about AI",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(agents=[researcher], tasks=[task])
|
||||||
|
|
||||||
|
events = list(crew.kickoff_stream())
|
||||||
|
|
||||||
|
assert len(events) > 0
|
||||||
|
|
||||||
|
event_types = [event["type"] for event in events]
|
||||||
|
assert "crew_kickoff_started" in event_types
|
||||||
|
assert "final_output" in event_types
|
||||||
|
|
||||||
|
final_output_event = next(e for e in events if e["type"] == "final_output")
|
||||||
|
assert "output" in final_output_event["data"]
|
||||||
|
assert isinstance(final_output_event["data"]["output"], str)
|
||||||
|
assert len(final_output_event["data"]["output"]) > 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||||
|
def test_crew_kickoff_stream_with_inputs(researcher):
|
||||||
|
"""Test that crew.kickoff_stream() works with inputs."""
|
||||||
|
task = Task(
|
||||||
|
description="Research about {topic}",
|
||||||
|
expected_output="A brief summary about {topic}",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(agents=[researcher], tasks=[task])
|
||||||
|
|
||||||
|
events = list(crew.kickoff_stream(inputs={"topic": "machine learning"}))
|
||||||
|
|
||||||
|
assert len(events) > 0
|
||||||
|
|
||||||
|
event_types = [event["type"] for event in events]
|
||||||
|
assert "crew_kickoff_started" in event_types
|
||||||
|
assert "final_output" in event_types
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||||
|
def test_crew_kickoff_stream_includes_llm_chunks(researcher):
|
||||||
|
"""Test that crew.kickoff_stream() includes LLM stream chunks."""
|
||||||
|
task = Task(
|
||||||
|
description="Write a short poem about AI",
|
||||||
|
expected_output="A 2-line poem",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(agents=[researcher], tasks=[task])
|
||||||
|
|
||||||
|
events = list(crew.kickoff_stream())
|
||||||
|
|
||||||
|
event_types = [event["type"] for event in events]
|
||||||
|
|
||||||
|
assert "task_started" in event_types or "agent_execution_started" in event_types
|
||||||
|
|
||||||
|
|
||||||
|
def test_crew_kickoff_stream_handles_errors(researcher):
|
||||||
|
"""Test that crew.kickoff_stream() properly handles errors."""
|
||||||
|
task = Task(
|
||||||
|
description="This task will fail",
|
||||||
|
expected_output="Should not complete",
|
||||||
|
agent=researcher,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(agents=[researcher], tasks=[task])
|
||||||
|
|
||||||
|
with patch("crewai.crew.Crew.kickoff", side_effect=Exception("Test error")):
|
||||||
|
with pytest.raises(Exception, match="Test error"):
|
||||||
|
list(crew.kickoff_stream())
|
||||||
|
|||||||
@@ -6,15 +6,15 @@ from datetime import datetime
|
|||||||
import pytest
|
import pytest
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
|
||||||
from crewai.events.event_bus import crewai_event_bus
|
from crewai.events.event_bus import crewai_event_bus
|
||||||
from crewai.events.types.flow_events import (
|
from crewai.events.types.flow_events import (
|
||||||
FlowFinishedEvent,
|
FlowFinishedEvent,
|
||||||
FlowStartedEvent,
|
|
||||||
FlowPlotEvent,
|
FlowPlotEvent,
|
||||||
|
FlowStartedEvent,
|
||||||
MethodExecutionFinishedEvent,
|
MethodExecutionFinishedEvent,
|
||||||
MethodExecutionStartedEvent,
|
MethodExecutionStartedEvent,
|
||||||
)
|
)
|
||||||
|
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
||||||
|
|
||||||
|
|
||||||
def test_simple_sequential_flow():
|
def test_simple_sequential_flow():
|
||||||
@@ -679,11 +679,11 @@ def test_structured_flow_event_emission():
|
|||||||
assert isinstance(received_events[3], MethodExecutionStartedEvent)
|
assert isinstance(received_events[3], MethodExecutionStartedEvent)
|
||||||
assert received_events[3].method_name == "send_welcome_message"
|
assert received_events[3].method_name == "send_welcome_message"
|
||||||
assert received_events[3].params == {}
|
assert received_events[3].params == {}
|
||||||
assert getattr(received_events[3].state, "sent") is False
|
assert received_events[3].state.sent is False
|
||||||
|
|
||||||
assert isinstance(received_events[4], MethodExecutionFinishedEvent)
|
assert isinstance(received_events[4], MethodExecutionFinishedEvent)
|
||||||
assert received_events[4].method_name == "send_welcome_message"
|
assert received_events[4].method_name == "send_welcome_message"
|
||||||
assert getattr(received_events[4].state, "sent") is True
|
assert received_events[4].state.sent is True
|
||||||
assert received_events[4].result == "Welcome, Anakin!"
|
assert received_events[4].result == "Welcome, Anakin!"
|
||||||
|
|
||||||
assert isinstance(received_events[5], FlowFinishedEvent)
|
assert isinstance(received_events[5], FlowFinishedEvent)
|
||||||
@@ -894,3 +894,75 @@ def test_flow_name():
|
|||||||
|
|
||||||
flow = MyFlow()
|
flow = MyFlow()
|
||||||
assert flow.name == "MyFlow"
|
assert flow.name == "MyFlow"
|
||||||
|
|
||||||
|
|
||||||
|
def test_nested_and_or_conditions():
|
||||||
|
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
|
||||||
|
|
||||||
|
Reproduces bug from issue #3719 where nested conditions are flattened,
|
||||||
|
causing premature execution.
|
||||||
|
"""
|
||||||
|
execution_order = []
|
||||||
|
|
||||||
|
class NestedConditionFlow(Flow):
|
||||||
|
@start()
|
||||||
|
def method_1(self):
|
||||||
|
execution_order.append("method_1")
|
||||||
|
|
||||||
|
@listen(method_1)
|
||||||
|
def method_2(self):
|
||||||
|
execution_order.append("method_2")
|
||||||
|
|
||||||
|
@router(method_2)
|
||||||
|
def method_3(self):
|
||||||
|
execution_order.append("method_3")
|
||||||
|
# Choose b_condition path
|
||||||
|
return "b_condition"
|
||||||
|
|
||||||
|
@listen("b_condition")
|
||||||
|
def method_5(self):
|
||||||
|
execution_order.append("method_5")
|
||||||
|
|
||||||
|
@listen(method_5)
|
||||||
|
async def method_4(self):
|
||||||
|
execution_order.append("method_4")
|
||||||
|
|
||||||
|
@listen(or_("a_condition", "b_condition"))
|
||||||
|
async def method_6(self):
|
||||||
|
execution_order.append("method_6")
|
||||||
|
|
||||||
|
@listen(
|
||||||
|
or_(
|
||||||
|
and_("a_condition", method_6),
|
||||||
|
and_(method_6, method_4),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
def method_7(self):
|
||||||
|
execution_order.append("method_7")
|
||||||
|
|
||||||
|
@listen(method_7)
|
||||||
|
async def method_8(self):
|
||||||
|
execution_order.append("method_8")
|
||||||
|
|
||||||
|
flow = NestedConditionFlow()
|
||||||
|
flow.kickoff()
|
||||||
|
|
||||||
|
# Verify execution happened
|
||||||
|
assert "method_1" in execution_order
|
||||||
|
assert "method_2" in execution_order
|
||||||
|
assert "method_3" in execution_order
|
||||||
|
assert "method_5" in execution_order
|
||||||
|
assert "method_4" in execution_order
|
||||||
|
assert "method_6" in execution_order
|
||||||
|
assert "method_7" in execution_order
|
||||||
|
assert "method_8" in execution_order
|
||||||
|
|
||||||
|
# Critical assertion: method_7 should only execute AFTER both method_6 AND method_4
|
||||||
|
# Since b_condition was returned, method_6 triggers on b_condition
|
||||||
|
# method_7 requires: (a_condition AND method_6) OR (method_6 AND method_4)
|
||||||
|
# The second condition (method_6 AND method_4) should be the one that triggers
|
||||||
|
assert execution_order.index("method_7") > execution_order.index("method_6")
|
||||||
|
assert execution_order.index("method_7") > execution_order.index("method_4")
|
||||||
|
|
||||||
|
# method_8 should execute after method_7
|
||||||
|
assert execution_order.index("method_8") > execution_order.index("method_7")
|
||||||
|
|||||||
@@ -1218,7 +1218,7 @@ def test_create_directory_false():
|
|||||||
assert not resolved_dir.exists()
|
assert not resolved_dir.exists()
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
RuntimeError, match="Directory .* does not exist and create_directory is False"
|
RuntimeError, match=r"Directory .* does not exist and create_directory is False"
|
||||||
):
|
):
|
||||||
task._save_file("test content")
|
task._save_file("test content")
|
||||||
|
|
||||||
@@ -1635,3 +1635,48 @@ def test_task_interpolation_with_hyphens():
|
|||||||
assert "say hello world" in task.prompt()
|
assert "say hello world" in task.prompt()
|
||||||
|
|
||||||
assert result.raw == "Hello, World!"
|
assert result.raw == "Hello, World!"
|
||||||
|
|
||||||
|
|
||||||
|
def test_task_copy_with_none_context():
|
||||||
|
original_task = Task(
|
||||||
|
description="Test task",
|
||||||
|
expected_output="Test output",
|
||||||
|
context=None
|
||||||
|
)
|
||||||
|
|
||||||
|
new_task = original_task.copy(agents=[], task_mapping={})
|
||||||
|
assert original_task.context is None
|
||||||
|
assert new_task.context is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_task_copy_with_not_specified_context():
|
||||||
|
from crewai.utilities.constants import NOT_SPECIFIED
|
||||||
|
original_task = Task(
|
||||||
|
description="Test task",
|
||||||
|
expected_output="Test output",
|
||||||
|
)
|
||||||
|
|
||||||
|
new_task = original_task.copy(agents=[], task_mapping={})
|
||||||
|
assert original_task.context is NOT_SPECIFIED
|
||||||
|
assert new_task.context is NOT_SPECIFIED
|
||||||
|
|
||||||
|
|
||||||
|
def test_task_copy_with_list_context():
|
||||||
|
"""Test that copying a task with list context works correctly."""
|
||||||
|
task1 = Task(
|
||||||
|
description="Task 1",
|
||||||
|
expected_output="Output 1"
|
||||||
|
)
|
||||||
|
task2 = Task(
|
||||||
|
description="Task 2",
|
||||||
|
expected_output="Output 2",
|
||||||
|
context=[task1]
|
||||||
|
)
|
||||||
|
|
||||||
|
task_mapping = {task1.key: task1}
|
||||||
|
|
||||||
|
copied_task2 = task2.copy(agents=[], task_mapping=task_mapping)
|
||||||
|
|
||||||
|
assert isinstance(copied_task2.context, list)
|
||||||
|
assert len(copied_task2.context) == 1
|
||||||
|
assert copied_task2.context[0] is task1
|
||||||
|
|||||||
Reference in New Issue
Block a user