diff --git a/docs/concepts/cli.mdx b/docs/concepts/cli.mdx index 4c9f617ba..ecdcd0836 100644 --- a/docs/concepts/cli.mdx +++ b/docs/concepts/cli.mdx @@ -136,17 +136,21 @@ crewai test -n 5 -m gpt-3.5-turbo ### 8. Run -Run the crew. +Run the crew or flow. ```shell Terminal crewai run ``` + + +Starting from version 0.103.0, the `crewai run` command can be used to run both standard crews and flows. For flows, it automatically detects the type from pyproject.toml and runs the appropriate command. This is now the recommended way to run both crews and flows. + + Make sure to run these commands from the directory where your CrewAI project is set up. Some commands may require additional configuration or setup within your project structure. - ### 9. Chat Starting in version `0.98.0`, when you run the `crewai chat` command, you start an interactive session with your crew. The AI assistant will guide you by asking for necessary inputs to execute the crew. Once all inputs are provided, the crew will execute its tasks. @@ -175,7 +179,6 @@ def crew(self) -> Crew: ``` - ### 10. API Keys When running ```crewai create crew``` command, the CLI will first show you the top 5 most common LLM providers and ask you to select one. diff --git a/docs/concepts/flows.mdx b/docs/concepts/flows.mdx index c22a619fe..8ab99ec01 100644 --- a/docs/concepts/flows.mdx +++ b/docs/concepts/flows.mdx @@ -150,12 +150,12 @@ final_output = flow.kickoff() print("---- Final Output ----") print(final_output) -```` +``` ```text Output ---- Final Output ---- Second method received: Output from first_method -```` +``` @@ -738,3 +738,34 @@ Also, check out our YouTube video on how to use flows in CrewAI below! referrerpolicy="strict-origin-when-cross-origin" allowfullscreen > + +## Running Flows + +There are two ways to run a flow: + +### Using the Flow API + +You can run a flow programmatically by creating an instance of your flow class and calling the `kickoff()` method: + +```python +flow = ExampleFlow() +result = flow.kickoff() +``` + +### Using the CLI + +Starting from version 0.103.0, you can run flows using the `crewai run` command: + +```shell +crewai run +``` + +This command automatically detects if your project is a flow (based on the `type = "flow"` setting in your pyproject.toml) and runs it accordingly. This is the recommended way to run flows from the command line. + +For backward compatibility, you can also use: + +```shell +crewai flow kickoff +``` + +However, the `crewai run` command is now the preferred method as it works for both crews and flows. diff --git a/src/crewai/agent.py b/src/crewai/agent.py index f07408133..cfebc18e5 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -114,7 +114,6 @@ class Agent(BaseAgent): @model_validator(mode="after") def post_init_setup(self): - self._set_knowledge() self.agent_ops_agent_name = self.role self.llm = create_llm(self.llm) @@ -134,8 +133,11 @@ class Agent(BaseAgent): self.cache_handler = CacheHandler() self.set_cache_handler(self.cache_handler) - def _set_knowledge(self): + def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None): try: + if self.embedder is None and crew_embedder: + self.embedder = crew_embedder + if self.knowledge_sources: full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)") knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}" diff --git a/src/crewai/agents/agent_builder/base_agent.py b/src/crewai/agents/agent_builder/base_agent.py index 64110c2ae..f39fafb99 100644 --- a/src/crewai/agents/agent_builder/base_agent.py +++ b/src/crewai/agents/agent_builder/base_agent.py @@ -351,3 +351,6 @@ class BaseAgent(ABC, BaseModel): if not self._rpm_controller: self._rpm_controller = rpm_controller self.create_agent_executor() + + def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None): + pass diff --git a/src/crewai/agents/parser.py b/src/crewai/agents/parser.py index 71444a20a..1bda4df5c 100644 --- a/src/crewai/agents/parser.py +++ b/src/crewai/agents/parser.py @@ -124,14 +124,15 @@ class CrewAgentParser: ) def _extract_thought(self, text: str) -> str: - regex = r"(.*?)(?:\n\nAction|\n\nFinal Answer)" - thought_match = re.search(regex, text, re.DOTALL) - if thought_match: - thought = thought_match.group(1).strip() - # Remove any triple backticks from the thought string - thought = thought.replace("```", "").strip() - return thought - return "" + thought_index = text.find("\n\nAction") + if thought_index == -1: + thought_index = text.find("\n\nFinal Answer") + if thought_index == -1: + return "" + thought = text[:thought_index].strip() + # Remove any triple backticks from the thought string + thought = thought.replace("```", "").strip() + return thought def _clean_action(self, text: str) -> str: """Clean action string by removing non-essential formatting characters.""" diff --git a/src/crewai/cli/cli.py b/src/crewai/cli/cli.py index 761cc52ad..b2d59adbe 100644 --- a/src/crewai/cli/cli.py +++ b/src/crewai/cli/cli.py @@ -203,7 +203,6 @@ def install(context): @crewai.command() def run(): """Run the Crew.""" - click.echo("Running the Crew") run_crew() diff --git a/src/crewai/cli/run_crew.py b/src/crewai/cli/run_crew.py index 95b560109..62241a4b5 100644 --- a/src/crewai/cli/run_crew.py +++ b/src/crewai/cli/run_crew.py @@ -1,4 +1,6 @@ import subprocess +from enum import Enum +from typing import List, Optional import click from packaging import version @@ -7,16 +9,24 @@ from crewai.cli.utils import read_toml from crewai.cli.version import get_crewai_version +class CrewType(Enum): + STANDARD = "standard" + FLOW = "flow" + + def run_crew() -> None: """ - Run the crew by running a command in the UV environment. + Run the crew or flow by running a command in the UV environment. + + Starting from version 0.103.0, this command can be used to run both + standard crews and flows. For flows, it detects the type from pyproject.toml + and automatically runs the appropriate command. """ - command = ["uv", "run", "run_crew"] crewai_version = get_crewai_version() min_required_version = "0.71.0" - pyproject_data = read_toml() + # Check for legacy poetry configuration if pyproject_data.get("tool", {}).get("poetry") and ( version.parse(crewai_version) < version.parse(min_required_version) ): @@ -26,18 +36,54 @@ def run_crew() -> None: fg="red", ) + # Determine crew type + is_flow = pyproject_data.get("tool", {}).get("crewai", {}).get("type") == "flow" + crew_type = CrewType.FLOW if is_flow else CrewType.STANDARD + + # Display appropriate message + click.echo(f"Running the {'Flow' if is_flow else 'Crew'}") + + # Execute the appropriate command + execute_command(crew_type) + + +def execute_command(crew_type: CrewType) -> None: + """ + Execute the appropriate command based on crew type. + + Args: + crew_type: The type of crew to run + """ + command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"] + try: subprocess.run(command, capture_output=False, text=True, check=True) except subprocess.CalledProcessError as e: - click.echo(f"An error occurred while running the crew: {e}", err=True) - click.echo(e.output, err=True, nl=True) - - if pyproject_data.get("tool", {}).get("poetry"): - click.secho( - "It's possible that you are using an old version of crewAI that uses poetry, please run `crewai update` to update your pyproject.toml to use uv.", - fg="yellow", - ) + handle_error(e, crew_type) except Exception as e: click.echo(f"An unexpected error occurred: {e}", err=True) + + +def handle_error(error: subprocess.CalledProcessError, crew_type: CrewType) -> None: + """ + Handle subprocess errors with appropriate messaging. + + Args: + error: The subprocess error that occurred + crew_type: The type of crew that was being run + """ + entity_type = "flow" if crew_type == CrewType.FLOW else "crew" + click.echo(f"An error occurred while running the {entity_type}: {error}", err=True) + + if error.output: + click.echo(error.output, err=True, nl=True) + + pyproject_data = read_toml() + if pyproject_data.get("tool", {}).get("poetry"): + click.secho( + "It's possible that you are using an old version of crewAI that uses poetry, " + "please run `crewai update` to update your pyproject.toml to use uv.", + fg="yellow", + ) diff --git a/src/crewai/cli/templates/flow/README.md b/src/crewai/cli/templates/flow/README.md index b6ce2da71..140834e62 100644 --- a/src/crewai/cli/templates/flow/README.md +++ b/src/crewai/cli/templates/flow/README.md @@ -30,13 +30,13 @@ crewai install ## Running the Project -To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project: +To kickstart your flow and begin execution, run this from the root folder of your project: ```bash crewai run ``` -This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration. +This command initializes the {{name}} Flow as defined in your configuration. This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder. diff --git a/src/crewai/crew.py b/src/crewai/crew.py index cf627700e..9cecfed3a 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -600,6 +600,7 @@ class Crew(BaseModel): agent.i18n = i18n # type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]" agent.crew = self # type: ignore[attr-defined] + agent.set_knowledge(crew_embedder=self.embedder) # TODO: Create an AgentFunctionCalling protocol for future refactoring if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm" agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm" diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 7a8b88ba0..3b6e81293 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -894,35 +894,45 @@ class Flow(Generic[T], metaclass=FlowMeta): Notes ----- - Routers are executed sequentially to maintain flow control - - Each router's result becomes the new trigger_method + - Each router's result becomes a new trigger_method - Normal listeners are executed in parallel for efficiency - Listeners can receive the trigger method's result as a parameter """ # First, handle routers repeatedly until no router triggers anymore + router_results = [] + current_trigger = trigger_method + while True: routers_triggered = self._find_triggered_methods( - trigger_method, router_only=True + current_trigger, router_only=True ) if not routers_triggered: break + for router_name in routers_triggered: await self._execute_single_listener(router_name, result) # After executing router, the router's result is the path - # The last router executed sets the trigger_method - # The router result is the last element in self._method_outputs - trigger_method = self._method_outputs[-1] + router_result = self._method_outputs[-1] + if router_result: # Only add non-None results + router_results.append(router_result) + current_trigger = ( + router_result # Update for next iteration of router chain + ) - # Now that no more routers are triggered by current trigger_method, - # execute normal listeners - listeners_triggered = self._find_triggered_methods( - trigger_method, router_only=False - ) - if listeners_triggered: - tasks = [ - self._execute_single_listener(listener_name, result) - for listener_name in listeners_triggered - ] - await asyncio.gather(*tasks) + # Now execute normal listeners for all router results and the original trigger + all_triggers = [trigger_method] + router_results + + for current_trigger in all_triggers: + if current_trigger: # Skip None results + listeners_triggered = self._find_triggered_methods( + current_trigger, router_only=False + ) + if listeners_triggered: + tasks = [ + self._execute_single_listener(listener_name, result) + for listener_name in listeners_triggered + ] + await asyncio.gather(*tasks) def _find_triggered_methods( self, trigger_method: str, router_only: bool diff --git a/src/crewai/flow/persistence/sqlite.py b/src/crewai/flow/persistence/sqlite.py index 7a6f134fa..21e906afd 100644 --- a/src/crewai/flow/persistence/sqlite.py +++ b/src/crewai/flow/persistence/sqlite.py @@ -4,7 +4,7 @@ SQLite-based implementation of flow state persistence. import json import sqlite3 -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Optional, Union @@ -34,6 +34,7 @@ class SQLiteFlowPersistence(FlowPersistence): ValueError: If db_path is invalid """ from crewai.utilities.paths import db_storage_path + # Get path from argument or default location path = db_path or str(Path(db_storage_path()) / "flow_states.db") @@ -46,7 +47,8 @@ class SQLiteFlowPersistence(FlowPersistence): def init_db(self) -> None: """Create the necessary tables if they don't exist.""" with sqlite3.connect(self.db_path) as conn: - conn.execute(""" + conn.execute( + """ CREATE TABLE IF NOT EXISTS flow_states ( id INTEGER PRIMARY KEY AUTOINCREMENT, flow_uuid TEXT NOT NULL, @@ -54,12 +56,15 @@ class SQLiteFlowPersistence(FlowPersistence): timestamp DATETIME NOT NULL, state_json TEXT NOT NULL ) - """) + """ + ) # Add index for faster UUID lookups - conn.execute(""" + conn.execute( + """ CREATE INDEX IF NOT EXISTS idx_flow_states_uuid ON flow_states(flow_uuid) - """) + """ + ) def save_state( self, @@ -85,19 +90,22 @@ class SQLiteFlowPersistence(FlowPersistence): ) with sqlite3.connect(self.db_path) as conn: - conn.execute(""" + conn.execute( + """ INSERT INTO flow_states ( flow_uuid, method_name, timestamp, state_json ) VALUES (?, ?, ?, ?) - """, ( - flow_uuid, - method_name, - datetime.utcnow().isoformat(), - json.dumps(state_dict), - )) + """, + ( + flow_uuid, + method_name, + datetime.now(timezone.utc).isoformat(), + json.dumps(state_dict), + ), + ) def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]: """Load the most recent state for a given flow UUID. @@ -109,13 +117,16 @@ class SQLiteFlowPersistence(FlowPersistence): The most recent state as a dictionary, or None if no state exists """ with sqlite3.connect(self.db_path) as conn: - cursor = conn.execute(""" + cursor = conn.execute( + """ SELECT state_json FROM flow_states WHERE flow_uuid = ? ORDER BY id DESC LIMIT 1 - """, (flow_uuid,)) + """, + (flow_uuid,), + ) row = cursor.fetchone() if row: diff --git a/src/crewai/utilities/token_counter_callback.py b/src/crewai/utilities/token_counter_callback.py index e612fcae4..7037ad5c4 100644 --- a/src/crewai/utilities/token_counter_callback.py +++ b/src/crewai/utilities/token_counter_callback.py @@ -30,8 +30,14 @@ class TokenCalcHandler(CustomLogger): if hasattr(usage, "prompt_tokens"): self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens) if hasattr(usage, "completion_tokens"): - self.token_cost_process.sum_completion_tokens(usage.completion_tokens) - if hasattr(usage, "prompt_tokens_details") and usage.prompt_tokens_details: + self.token_cost_process.sum_completion_tokens( + usage.completion_tokens + ) + if ( + hasattr(usage, "prompt_tokens_details") + and usage.prompt_tokens_details + and usage.prompt_tokens_details.cached_tokens + ): self.token_cost_process.sum_cached_prompt_tokens( usage.prompt_tokens_details.cached_tokens ) diff --git a/tests/flow_test.py b/tests/flow_test.py index b2edcfa5a..c2640fffb 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -654,3 +654,104 @@ def test_flow_plotting(): assert isinstance(received_events[0], FlowPlotEvent) assert received_events[0].flow_name == "StatelessFlow" assert isinstance(received_events[0].timestamp, datetime) + + +def test_multiple_routers_from_same_trigger(): + """Test that multiple routers triggered by the same method all activate their listeners.""" + execution_order = [] + + class MultiRouterFlow(Flow): + def __init__(self): + super().__init__() + # Set diagnosed conditions to trigger all routers + self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A + + @start() + def scan_medical(self): + execution_order.append("scan_medical") + return "scan_complete" + + @router(scan_medical) + def diagnose_conditions(self): + execution_order.append("diagnose_conditions") + return "diagnosis_complete" + + @router(diagnose_conditions) + def diabetes_router(self): + execution_order.append("diabetes_router") + if "D" in self.state["diagnosed_conditions"]: + return "diabetes" + return None + + @listen("diabetes") + def diabetes_analysis(self): + execution_order.append("diabetes_analysis") + return "diabetes_analysis_complete" + + @router(diagnose_conditions) + def hypertension_router(self): + execution_order.append("hypertension_router") + if "H" in self.state["diagnosed_conditions"]: + return "hypertension" + return None + + @listen("hypertension") + def hypertension_analysis(self): + execution_order.append("hypertension_analysis") + return "hypertension_analysis_complete" + + @router(diagnose_conditions) + def anemia_router(self): + execution_order.append("anemia_router") + if "A" in self.state["diagnosed_conditions"]: + return "anemia" + return None + + @listen("anemia") + def anemia_analysis(self): + execution_order.append("anemia_analysis") + return "anemia_analysis_complete" + + flow = MultiRouterFlow() + flow.kickoff() + + # Verify all methods were called + assert "scan_medical" in execution_order + assert "diagnose_conditions" in execution_order + + # Verify all routers were called + assert "diabetes_router" in execution_order + assert "hypertension_router" in execution_order + assert "anemia_router" in execution_order + + # Verify all listeners were called - this is the key test for the fix + assert "diabetes_analysis" in execution_order + assert "hypertension_analysis" in execution_order + assert "anemia_analysis" in execution_order + + # Verify execution order constraints + assert execution_order.index("diagnose_conditions") > execution_order.index( + "scan_medical" + ) + + # All routers should execute after diagnose_conditions + assert execution_order.index("diabetes_router") > execution_order.index( + "diagnose_conditions" + ) + assert execution_order.index("hypertension_router") > execution_order.index( + "diagnose_conditions" + ) + assert execution_order.index("anemia_router") > execution_order.index( + "diagnose_conditions" + ) + + # All analyses should execute after their respective routers + assert execution_order.index("diabetes_analysis") > execution_order.index( + "diabetes_router" + ) + assert execution_order.index("hypertension_analysis") > execution_order.index( + "hypertension_router" + ) + assert execution_order.index("anemia_analysis") > execution_order.index( + "anemia_router" + )