Merge branch 'main' into feat/improve-hierarchical-docs

This commit is contained in:
Brandon Hancock (bhancock_ai)
2025-02-27 09:07:07 -05:00
committed by GitHub
13 changed files with 275 additions and 61 deletions

View File

@@ -136,17 +136,21 @@ crewai test -n 5 -m gpt-3.5-turbo
### 8. Run ### 8. Run
Run the crew. Run the crew or flow.
```shell Terminal ```shell Terminal
crewai run crewai run
``` ```
<Note>
Starting from version 0.103.0, the `crewai run` command can be used to run both standard crews and flows. For flows, it automatically detects the type from pyproject.toml and runs the appropriate command. This is now the recommended way to run both crews and flows.
</Note>
<Note> <Note>
Make sure to run these commands from the directory where your CrewAI project is set up. Make sure to run these commands from the directory where your CrewAI project is set up.
Some commands may require additional configuration or setup within your project structure. Some commands may require additional configuration or setup within your project structure.
</Note> </Note>
### 9. Chat ### 9. Chat
Starting in version `0.98.0`, when you run the `crewai chat` command, you start an interactive session with your crew. The AI assistant will guide you by asking for necessary inputs to execute the crew. Once all inputs are provided, the crew will execute its tasks. Starting in version `0.98.0`, when you run the `crewai chat` command, you start an interactive session with your crew. The AI assistant will guide you by asking for necessary inputs to execute the crew. Once all inputs are provided, the crew will execute its tasks.
@@ -175,7 +179,6 @@ def crew(self) -> Crew:
``` ```
</Note> </Note>
### 10. API Keys ### 10. API Keys
When running ```crewai create crew``` command, the CLI will first show you the top 5 most common LLM providers and ask you to select one. When running ```crewai create crew``` command, the CLI will first show you the top 5 most common LLM providers and ask you to select one.

View File

@@ -150,12 +150,12 @@ final_output = flow.kickoff()
print("---- Final Output ----") print("---- Final Output ----")
print(final_output) print(final_output)
```` ```
```text Output ```text Output
---- Final Output ---- ---- Final Output ----
Second method received: Output from first_method Second method received: Output from first_method
```` ```
</CodeGroup> </CodeGroup>
@@ -738,3 +738,34 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
referrerpolicy="strict-origin-when-cross-origin" referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen allowfullscreen
></iframe> ></iframe>
## Running Flows
There are two ways to run a flow:
### Using the Flow API
You can run a flow programmatically by creating an instance of your flow class and calling the `kickoff()` method:
```python
flow = ExampleFlow()
result = flow.kickoff()
```
### Using the CLI
Starting from version 0.103.0, you can run flows using the `crewai run` command:
```shell
crewai run
```
This command automatically detects if your project is a flow (based on the `type = "flow"` setting in your pyproject.toml) and runs it accordingly. This is the recommended way to run flows from the command line.
For backward compatibility, you can also use:
```shell
crewai flow kickoff
```
However, the `crewai run` command is now the preferred method as it works for both crews and flows.

View File

@@ -114,7 +114,6 @@ class Agent(BaseAgent):
@model_validator(mode="after") @model_validator(mode="after")
def post_init_setup(self): def post_init_setup(self):
self._set_knowledge()
self.agent_ops_agent_name = self.role self.agent_ops_agent_name = self.role
self.llm = create_llm(self.llm) self.llm = create_llm(self.llm)
@@ -134,8 +133,11 @@ class Agent(BaseAgent):
self.cache_handler = CacheHandler() self.cache_handler = CacheHandler()
self.set_cache_handler(self.cache_handler) self.set_cache_handler(self.cache_handler)
def _set_knowledge(self): def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
try: try:
if self.embedder is None and crew_embedder:
self.embedder = crew_embedder
if self.knowledge_sources: if self.knowledge_sources:
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)") full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}" knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"

View File

@@ -351,3 +351,6 @@ class BaseAgent(ABC, BaseModel):
if not self._rpm_controller: if not self._rpm_controller:
self._rpm_controller = rpm_controller self._rpm_controller = rpm_controller
self.create_agent_executor() self.create_agent_executor()
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
pass

View File

@@ -124,14 +124,15 @@ class CrewAgentParser:
) )
def _extract_thought(self, text: str) -> str: def _extract_thought(self, text: str) -> str:
regex = r"(.*?)(?:\n\nAction|\n\nFinal Answer)" thought_index = text.find("\n\nAction")
thought_match = re.search(regex, text, re.DOTALL) if thought_index == -1:
if thought_match: thought_index = text.find("\n\nFinal Answer")
thought = thought_match.group(1).strip() if thought_index == -1:
# Remove any triple backticks from the thought string return ""
thought = thought.replace("```", "").strip() thought = text[:thought_index].strip()
return thought # Remove any triple backticks from the thought string
return "" thought = thought.replace("```", "").strip()
return thought
def _clean_action(self, text: str) -> str: def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters.""" """Clean action string by removing non-essential formatting characters."""

View File

@@ -203,7 +203,6 @@ def install(context):
@crewai.command() @crewai.command()
def run(): def run():
"""Run the Crew.""" """Run the Crew."""
click.echo("Running the Crew")
run_crew() run_crew()

View File

@@ -1,4 +1,6 @@
import subprocess import subprocess
from enum import Enum
from typing import List, Optional
import click import click
from packaging import version from packaging import version
@@ -7,16 +9,24 @@ from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version from crewai.cli.version import get_crewai_version
class CrewType(Enum):
STANDARD = "standard"
FLOW = "flow"
def run_crew() -> None: def run_crew() -> None:
""" """
Run the crew by running a command in the UV environment. Run the crew or flow by running a command in the UV environment.
Starting from version 0.103.0, this command can be used to run both
standard crews and flows. For flows, it detects the type from pyproject.toml
and automatically runs the appropriate command.
""" """
command = ["uv", "run", "run_crew"]
crewai_version = get_crewai_version() crewai_version = get_crewai_version()
min_required_version = "0.71.0" min_required_version = "0.71.0"
pyproject_data = read_toml() pyproject_data = read_toml()
# Check for legacy poetry configuration
if pyproject_data.get("tool", {}).get("poetry") and ( if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version) version.parse(crewai_version) < version.parse(min_required_version)
): ):
@@ -26,18 +36,54 @@ def run_crew() -> None:
fg="red", fg="red",
) )
# Determine crew type
is_flow = pyproject_data.get("tool", {}).get("crewai", {}).get("type") == "flow"
crew_type = CrewType.FLOW if is_flow else CrewType.STANDARD
# Display appropriate message
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")
# Execute the appropriate command
execute_command(crew_type)
def execute_command(crew_type: CrewType) -> None:
"""
Execute the appropriate command based on crew type.
Args:
crew_type: The type of crew to run
"""
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
try: try:
subprocess.run(command, capture_output=False, text=True, check=True) subprocess.run(command, capture_output=False, text=True, check=True)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the crew: {e}", err=True) handle_error(e, crew_type)
click.echo(e.output, err=True, nl=True)
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)
except Exception as e: except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True) click.echo(f"An unexpected error occurred: {e}", err=True)
def handle_error(error: subprocess.CalledProcessError, crew_type: CrewType) -> None:
"""
Handle subprocess errors with appropriate messaging.
Args:
error: The subprocess error that occurred
crew_type: The type of crew that was being run
"""
entity_type = "flow" if crew_type == CrewType.FLOW else "crew"
click.echo(f"An error occurred while running the {entity_type}: {error}", err=True)
if error.output:
click.echo(error.output, err=True, nl=True)
pyproject_data = read_toml()
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, "
"please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)

View File

@@ -30,13 +30,13 @@ crewai install
## Running the Project ## Running the Project
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project: To kickstart your flow and begin execution, run this from the root folder of your project:
```bash ```bash
crewai run crewai run
``` ```
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration. This command initializes the {{name}} Flow as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder. This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.

View File

@@ -600,6 +600,7 @@ class Crew(BaseModel):
agent.i18n = i18n agent.i18n = i18n
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]" # type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined] agent.crew = self # type: ignore[attr-defined]
agent.set_knowledge(crew_embedder=self.embedder)
# TODO: Create an AgentFunctionCalling protocol for future refactoring # TODO: Create an AgentFunctionCalling protocol for future refactoring
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm" if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm" agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"

View File

@@ -894,35 +894,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes Notes
----- -----
- Routers are executed sequentially to maintain flow control - Routers are executed sequentially to maintain flow control
- Each router's result becomes the new trigger_method - Each router's result becomes a new trigger_method
- Normal listeners are executed in parallel for efficiency - Normal listeners are executed in parallel for efficiency
- Listeners can receive the trigger method's result as a parameter - Listeners can receive the trigger method's result as a parameter
""" """
# First, handle routers repeatedly until no router triggers anymore # First, handle routers repeatedly until no router triggers anymore
router_results = []
current_trigger = trigger_method
while True: while True:
routers_triggered = self._find_triggered_methods( routers_triggered = self._find_triggered_methods(
trigger_method, router_only=True current_trigger, router_only=True
) )
if not routers_triggered: if not routers_triggered:
break break
for router_name in routers_triggered: for router_name in routers_triggered:
await self._execute_single_listener(router_name, result) await self._execute_single_listener(router_name, result)
# After executing router, the router's result is the path # After executing router, the router's result is the path
# The last router executed sets the trigger_method router_result = self._method_outputs[-1]
# The router result is the last element in self._method_outputs if router_result: # Only add non-None results
trigger_method = self._method_outputs[-1] router_results.append(router_result)
current_trigger = (
router_result # Update for next iteration of router chain
)
# Now that no more routers are triggered by current trigger_method, # Now execute normal listeners for all router results and the original trigger
# execute normal listeners all_triggers = [trigger_method] + router_results
listeners_triggered = self._find_triggered_methods(
trigger_method, router_only=False for current_trigger in all_triggers:
) if current_trigger: # Skip None results
if listeners_triggered: listeners_triggered = self._find_triggered_methods(
tasks = [ current_trigger, router_only=False
self._execute_single_listener(listener_name, result) )
for listener_name in listeners_triggered if listeners_triggered:
] tasks = [
await asyncio.gather(*tasks) self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
def _find_triggered_methods( def _find_triggered_methods(
self, trigger_method: str, router_only: bool self, trigger_method: str, router_only: bool

View File

@@ -4,7 +4,7 @@ SQLite-based implementation of flow state persistence.
import json import json
import sqlite3 import sqlite3
from datetime import datetime from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union
@@ -34,6 +34,7 @@ class SQLiteFlowPersistence(FlowPersistence):
ValueError: If db_path is invalid ValueError: If db_path is invalid
""" """
from crewai.utilities.paths import db_storage_path from crewai.utilities.paths import db_storage_path
# Get path from argument or default location # Get path from argument or default location
path = db_path or str(Path(db_storage_path()) / "flow_states.db") path = db_path or str(Path(db_storage_path()) / "flow_states.db")
@@ -46,7 +47,8 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None: def init_db(self) -> None:
"""Create the necessary tables if they don't exist.""" """Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
conn.execute(""" conn.execute(
"""
CREATE TABLE IF NOT EXISTS flow_states ( CREATE TABLE IF NOT EXISTS flow_states (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL, flow_uuid TEXT NOT NULL,
@@ -54,12 +56,15 @@ class SQLiteFlowPersistence(FlowPersistence):
timestamp DATETIME NOT NULL, timestamp DATETIME NOT NULL,
state_json TEXT NOT NULL state_json TEXT NOT NULL
) )
""") """
)
# Add index for faster UUID lookups # Add index for faster UUID lookups
conn.execute(""" conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
ON flow_states(flow_uuid) ON flow_states(flow_uuid)
""") """
)
def save_state( def save_state(
self, self,
@@ -85,19 +90,22 @@ class SQLiteFlowPersistence(FlowPersistence):
) )
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
conn.execute(""" conn.execute(
"""
INSERT INTO flow_states ( INSERT INTO flow_states (
flow_uuid, flow_uuid,
method_name, method_name,
timestamp, timestamp,
state_json state_json
) VALUES (?, ?, ?, ?) ) VALUES (?, ?, ?, ?)
""", ( """,
flow_uuid, (
method_name, flow_uuid,
datetime.utcnow().isoformat(), method_name,
json.dumps(state_dict), datetime.now(timezone.utc).isoformat(),
)) json.dumps(state_dict),
),
)
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]: def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID. """Load the most recent state for a given flow UUID.
@@ -109,13 +117,16 @@ class SQLiteFlowPersistence(FlowPersistence):
The most recent state as a dictionary, or None if no state exists The most recent state as a dictionary, or None if no state exists
""" """
with sqlite3.connect(self.db_path) as conn: with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(""" cursor = conn.execute(
"""
SELECT state_json SELECT state_json
FROM flow_states FROM flow_states
WHERE flow_uuid = ? WHERE flow_uuid = ?
ORDER BY id DESC ORDER BY id DESC
LIMIT 1 LIMIT 1
""", (flow_uuid,)) """,
(flow_uuid,),
)
row = cursor.fetchone() row = cursor.fetchone()
if row: if row:

View File

@@ -30,8 +30,14 @@ class TokenCalcHandler(CustomLogger):
if hasattr(usage, "prompt_tokens"): if hasattr(usage, "prompt_tokens"):
self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens) self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens)
if hasattr(usage, "completion_tokens"): if hasattr(usage, "completion_tokens"):
self.token_cost_process.sum_completion_tokens(usage.completion_tokens) self.token_cost_process.sum_completion_tokens(
if hasattr(usage, "prompt_tokens_details") and usage.prompt_tokens_details: usage.completion_tokens
)
if (
hasattr(usage, "prompt_tokens_details")
and usage.prompt_tokens_details
and usage.prompt_tokens_details.cached_tokens
):
self.token_cost_process.sum_cached_prompt_tokens( self.token_cost_process.sum_cached_prompt_tokens(
usage.prompt_tokens_details.cached_tokens usage.prompt_tokens_details.cached_tokens
) )

View File

@@ -654,3 +654,104 @@ def test_flow_plotting():
assert isinstance(received_events[0], FlowPlotEvent) assert isinstance(received_events[0], FlowPlotEvent)
assert received_events[0].flow_name == "StatelessFlow" assert received_events[0].flow_name == "StatelessFlow"
assert isinstance(received_events[0].timestamp, datetime) assert isinstance(received_events[0].timestamp, datetime)
def test_multiple_routers_from_same_trigger():
"""Test that multiple routers triggered by the same method all activate their listeners."""
execution_order = []
class MultiRouterFlow(Flow):
def __init__(self):
super().__init__()
# Set diagnosed conditions to trigger all routers
self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A
@start()
def scan_medical(self):
execution_order.append("scan_medical")
return "scan_complete"
@router(scan_medical)
def diagnose_conditions(self):
execution_order.append("diagnose_conditions")
return "diagnosis_complete"
@router(diagnose_conditions)
def diabetes_router(self):
execution_order.append("diabetes_router")
if "D" in self.state["diagnosed_conditions"]:
return "diabetes"
return None
@listen("diabetes")
def diabetes_analysis(self):
execution_order.append("diabetes_analysis")
return "diabetes_analysis_complete"
@router(diagnose_conditions)
def hypertension_router(self):
execution_order.append("hypertension_router")
if "H" in self.state["diagnosed_conditions"]:
return "hypertension"
return None
@listen("hypertension")
def hypertension_analysis(self):
execution_order.append("hypertension_analysis")
return "hypertension_analysis_complete"
@router(diagnose_conditions)
def anemia_router(self):
execution_order.append("anemia_router")
if "A" in self.state["diagnosed_conditions"]:
return "anemia"
return None
@listen("anemia")
def anemia_analysis(self):
execution_order.append("anemia_analysis")
return "anemia_analysis_complete"
flow = MultiRouterFlow()
flow.kickoff()
# Verify all methods were called
assert "scan_medical" in execution_order
assert "diagnose_conditions" in execution_order
# Verify all routers were called
assert "diabetes_router" in execution_order
assert "hypertension_router" in execution_order
assert "anemia_router" in execution_order
# Verify all listeners were called - this is the key test for the fix
assert "diabetes_analysis" in execution_order
assert "hypertension_analysis" in execution_order
assert "anemia_analysis" in execution_order
# Verify execution order constraints
assert execution_order.index("diagnose_conditions") > execution_order.index(
"scan_medical"
)
# All routers should execute after diagnose_conditions
assert execution_order.index("diabetes_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("hypertension_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("anemia_router") > execution_order.index(
"diagnose_conditions"
)
# All analyses should execute after their respective routers
assert execution_order.index("diabetes_analysis") > execution_order.index(
"diabetes_router"
)
assert execution_order.index("hypertension_analysis") > execution_order.index(
"hypertension_router"
)
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)