diff --git a/README.md b/README.md index bf1287d4d..edcbb6f51 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,6 @@ First, install CrewAI: ```shell pip install crewai ``` - If you want to install the 'crewai' package along with its optional features that include additional tools for agents, you can do so by using the following command: ```shell @@ -93,6 +92,22 @@ pip install 'crewai[tools]' ``` The command above installs the basic package and also adds extra components which require more dependencies to function. +### Troubleshooting Dependencies + +If you encounter issues during installation or usage, here are some common solutions: + +#### Common Issues + +1. **ModuleNotFoundError: No module named 'tiktoken'** + - Install tiktoken explicitly: `pip install 'crewai[embeddings]'` + - If using embedchain or other tools: `pip install 'crewai[tools]'` + +2. **Failed building wheel for tiktoken** + - Ensure Rust compiler is installed (see installation steps above) + - For Windows: Verify Visual C++ Build Tools are installed + - Try upgrading pip: `pip install --upgrade pip` + - If issues persist, use a pre-built wheel: `pip install tiktoken --prefer-binary` + ### 2. Setting Up Your Crew with the YAML Configuration To create a new CrewAI project, run the following CLI (Command Line Interface) command: diff --git a/pyproject.toml b/pyproject.toml index 3f10c1a87..10d7ea62d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,27 +8,38 @@ authors = [ { name = "Joao Moura", email = "joao@crewai.com" } ] dependencies = [ + # Core Dependencies "pydantic>=2.4.2", "openai>=1.13.3", + "litellm>=1.56.4", + "instructor>=1.3.3", + + # Text Processing + "pdfplumber>=0.11.4", + "regex>=2024.9.11", + + # Telemetry and Monitoring "opentelemetry-api>=1.22.0", "opentelemetry-sdk>=1.22.0", "opentelemetry-exporter-otlp-proto-http>=1.22.0", - "instructor>=1.3.3", - "regex>=2024.9.11", - "click>=8.1.7", + + # Data Handling + "chromadb>=0.5.23", + "openpyxl>=3.1.5", + "pyvis>=0.3.2", + + # Authentication and Security + "auth0-python>=4.7.1", "python-dotenv>=1.0.0", + + # Configuration and Utils + "click>=8.1.7", "appdirs>=1.4.4", "jsonref>=1.1.0", "json-repair>=0.25.2", - "auth0-python>=4.7.1", - "litellm>=1.44.22", - "pyvis>=0.3.2", "uv>=0.4.25", "tomli-w>=1.1.0", "tomli>=2.0.2", - "chromadb>=0.5.23", - "pdfplumber>=0.11.4", - "openpyxl>=3.1.5", "blinker>=1.9.0", ] @@ -39,6 +50,9 @@ Repository = "https://github.com/crewAIInc/crewAI" [project.optional-dependencies] tools = ["crewai-tools>=0.17.0"] +embeddings = [ + "tiktoken~=0.7.0" +] agentops = ["agentops>=0.3.0"] fastembed = ["fastembed>=0.4.1"] pdfplumber = [ diff --git a/src/crewai/crew.py b/src/crewai/crew.py index d488783ea..c01a89280 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -726,11 +726,7 @@ class Crew(BaseModel): # Determine which tools to use - task tools take precedence over agent tools tools_for_task = task.tools or agent_to_use.tools or [] - tools_for_task = self._prepare_tools( - agent_to_use, - task, - tools_for_task - ) + tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task) self._log_task_start(task, agent_to_use.role) @@ -797,14 +793,18 @@ class Crew(BaseModel): return skipped_task_output return None - def _prepare_tools(self, agent: BaseAgent, task: Task, tools: List[Tool]) -> List[Tool]: + def _prepare_tools( + self, agent: BaseAgent, task: Task, tools: List[Tool] + ) -> List[Tool]: # Add delegation tools if agent allows delegation if agent.allow_delegation: if self.process == Process.hierarchical: if self.manager_agent: tools = self._update_manager_tools(task, tools) else: - raise ValueError("Manager agent is required for hierarchical process.") + raise ValueError( + "Manager agent is required for hierarchical process." + ) elif agent and agent.allow_delegation: tools = self._add_delegation_tools(task, tools) @@ -823,7 +823,9 @@ class Crew(BaseModel): return self.manager_agent return task.agent - def _merge_tools(self, existing_tools: List[Tool], new_tools: List[Tool]) -> List[Tool]: + def _merge_tools( + self, existing_tools: List[Tool], new_tools: List[Tool] + ) -> List[Tool]: """Merge new tools into existing tools list, avoiding duplicates by tool name.""" if not new_tools: return existing_tools @@ -839,7 +841,9 @@ class Crew(BaseModel): return tools - def _inject_delegation_tools(self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]): + def _inject_delegation_tools( + self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent] + ): delegation_tools = task_agent.get_delegation_tools(agents) return self._merge_tools(tools, delegation_tools) @@ -856,7 +860,9 @@ class Crew(BaseModel): if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent: if not tools: tools = [] - tools = self._inject_delegation_tools(tools, task.agent, agents_for_delegation) + tools = self._inject_delegation_tools( + tools, task.agent, agents_for_delegation + ) return tools def _log_task_start(self, task: Task, role: str = "None"): @@ -870,7 +876,9 @@ class Crew(BaseModel): if task.agent: tools = self._inject_delegation_tools(tools, task.agent, [task.agent]) else: - tools = self._inject_delegation_tools(tools, self.manager_agent, self.agents) + tools = self._inject_delegation_tools( + tools, self.manager_agent, self.agents + ) return tools def _get_context(self, task: Task, task_outputs: List[TaskOutput]): diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 4a6361cce..dc46aa6d8 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -30,7 +30,47 @@ from crewai.telemetry import Telemetry T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]]) -def start(condition=None): +def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable: + """ + Marks a method as a flow's starting point. + + This decorator designates a method as an entry point for the flow execution. + It can optionally specify conditions that trigger the start based on other + method executions. + + Parameters + ---------- + condition : Optional[Union[str, dict, Callable]], optional + Defines when the start method should execute. Can be: + - str: Name of a method that triggers this start + - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers) + - Callable: A method reference that triggers this start + Default is None, meaning unconditional start. + + Returns + ------- + Callable + A decorator function that marks the method as a flow start point. + + Raises + ------ + ValueError + If the condition format is invalid. + + Examples + -------- + >>> @start() # Unconditional start + >>> def begin_flow(self): + ... pass + + >>> @start("method_name") # Start after specific method + >>> def conditional_start(self): + ... pass + + >>> @start(and_("method1", "method2")) # Start after multiple methods + >>> def complex_start(self): + ... pass + """ def decorator(func): func.__is_start_method__ = True if condition is not None: @@ -55,8 +95,42 @@ def start(condition=None): return decorator +def listen(condition: Union[str, dict, Callable]) -> Callable: + """ + Creates a listener that executes when specified conditions are met. -def listen(condition): + This decorator sets up a method to execute in response to other method + executions in the flow. It supports both simple and complex triggering + conditions. + + Parameters + ---------- + condition : Union[str, dict, Callable] + Specifies when the listener should execute. Can be: + - str: Name of a method that triggers this listener + - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers) + - Callable: A method reference that triggers this listener + + Returns + ------- + Callable + A decorator function that sets up the method as a listener. + + Raises + ------ + ValueError + If the condition format is invalid. + + Examples + -------- + >>> @listen("process_data") # Listen to single method + >>> def handle_processed_data(self): + ... pass + + >>> @listen(or_("success", "failure")) # Listen to multiple methods + >>> def handle_completion(self): + ... pass + """ def decorator(func): if isinstance(condition, str): func.__trigger_methods__ = [condition] @@ -80,10 +154,49 @@ def listen(condition): return decorator -def router(condition): +def router(condition: Union[str, dict, Callable]) -> Callable: + """ + Creates a routing method that directs flow execution based on conditions. + + This decorator marks a method as a router, which can dynamically determine + the next steps in the flow based on its return value. Routers are triggered + by specified conditions and can return constants that determine which path + the flow should take. + + Parameters + ---------- + condition : Union[str, dict, Callable] + Specifies when the router should execute. Can be: + - str: Name of a method that triggers this router + - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers) + - Callable: A method reference that triggers this router + + Returns + ------- + Callable + A decorator function that sets up the method as a router. + + Raises + ------ + ValueError + If the condition format is invalid. + + Examples + -------- + >>> @router("check_status") + >>> def route_based_on_status(self): + ... if self.state.status == "success": + ... return SUCCESS + ... return FAILURE + + >>> @router(and_("validate", "process")) + >>> def complex_routing(self): + ... if all([self.state.valid, self.state.processed]): + ... return CONTINUE + ... return STOP + """ def decorator(func): func.__is_router__ = True - # Handle conditions like listen/start if isinstance(condition, str): func.__trigger_methods__ = [condition] func.__condition_type__ = "OR" @@ -105,8 +218,39 @@ def router(condition): return decorator +def or_(*conditions: Union[str, dict, Callable]) -> dict: + """ + Combines multiple conditions with OR logic for flow control. -def or_(*conditions): + Creates a condition that is satisfied when any of the specified conditions + are met. This is used with @start, @listen, or @router decorators to create + complex triggering conditions. + + Parameters + ---------- + *conditions : Union[str, dict, Callable] + Variable number of conditions that can be: + - str: Method names + - dict: Existing condition dictionaries + - Callable: Method references + + Returns + ------- + dict + A condition dictionary with format: + {"type": "OR", "methods": list_of_method_names} + + Raises + ------ + ValueError + If any condition is invalid. + + Examples + -------- + >>> @listen(or_("success", "timeout")) + >>> def handle_completion(self): + ... pass + """ methods = [] for condition in conditions: if isinstance(condition, dict) and "methods" in condition: @@ -120,7 +264,39 @@ def or_(*conditions): return {"type": "OR", "methods": methods} -def and_(*conditions): +def and_(*conditions: Union[str, dict, Callable]) -> dict: + """ + Combines multiple conditions with AND logic for flow control. + + Creates a condition that is satisfied only when all specified conditions + are met. This is used with @start, @listen, or @router decorators to create + complex triggering conditions. + + Parameters + ---------- + *conditions : Union[str, dict, Callable] + Variable number of conditions that can be: + - str: Method names + - dict: Existing condition dictionaries + - Callable: Method references + + Returns + ------- + dict + A condition dictionary with format: + {"type": "AND", "methods": list_of_method_names} + + Raises + ------ + ValueError + If any condition is invalid. + + Examples + -------- + >>> @listen(and_("validated", "processed")) + >>> def handle_complete_data(self): + ... pass + """ methods = [] for condition in conditions: if isinstance(condition, dict) and "methods" in condition: @@ -286,6 +462,23 @@ class Flow(Generic[T], metaclass=FlowMeta): return final_output async def _execute_start_method(self, start_method_name: str) -> None: + """ + Executes a flow's start method and its triggered listeners. + + This internal method handles the execution of methods marked with @start + decorator and manages the subsequent chain of listener executions. + + Parameters + ---------- + start_method_name : str + The name of the start method to execute. + + Notes + ----- + - Executes the start method and captures its result + - Triggers execution of any listeners waiting on this start method + - Part of the flow's initialization sequence + """ result = await self._execute_method( start_method_name, self._methods[start_method_name] ) @@ -306,6 +499,28 @@ class Flow(Generic[T], metaclass=FlowMeta): return result async def _execute_listeners(self, trigger_method: str, result: Any) -> None: + """ + Executes all listeners and routers triggered by a method completion. + + This internal method manages the execution flow by: + 1. First executing all triggered routers sequentially + 2. Then executing all triggered listeners in parallel + + Parameters + ---------- + trigger_method : str + The name of the method that triggered these listeners. + result : Any + The result from the triggering method, passed to listeners + that accept parameters. + + Notes + ----- + - Routers are executed sequentially to maintain flow control + - Each router's result becomes the new trigger_method + - Normal listeners are executed in parallel for efficiency + - Listeners can receive the trigger method's result as a parameter + """ # First, handle routers repeatedly until no router triggers anymore while True: routers_triggered = self._find_triggered_methods( @@ -335,6 +550,33 @@ class Flow(Generic[T], metaclass=FlowMeta): def _find_triggered_methods( self, trigger_method: str, router_only: bool ) -> List[str]: + """ + Finds all methods that should be triggered based on conditions. + + This internal method evaluates both OR and AND conditions to determine + which methods should be executed next in the flow. + + Parameters + ---------- + trigger_method : str + The name of the method that just completed execution. + router_only : bool + If True, only consider router methods. + If False, only consider non-router methods. + + Returns + ------- + List[str] + Names of methods that should be triggered. + + Notes + ----- + - Handles both OR and AND conditions: + * OR: Triggers if any condition is met + * AND: Triggers only when all conditions are met + - Maintains state for AND conditions using _pending_and_listeners + - Separates router and normal listener evaluation + """ triggered = [] for listener_name, (condition_type, methods) in self._listeners.items(): is_router = listener_name in self._routers @@ -363,6 +605,33 @@ class Flow(Generic[T], metaclass=FlowMeta): return triggered async def _execute_single_listener(self, listener_name: str, result: Any) -> None: + """ + Executes a single listener method with proper event handling. + + This internal method manages the execution of an individual listener, + including parameter inspection, event emission, and error handling. + + Parameters + ---------- + listener_name : str + The name of the listener method to execute. + result : Any + The result from the triggering method, which may be passed + to the listener if it accepts parameters. + + Notes + ----- + - Inspects method signature to determine if it accepts the trigger result + - Emits events for method execution start and finish + - Handles errors gracefully with detailed logging + - Recursively triggers listeners of this listener + - Supports both parameterized and parameter-less listeners + + Error Handling + ------------- + Catches and logs any exceptions during execution, preventing + individual listener failures from breaking the entire flow. + """ try: method = self._methods[listener_name] diff --git a/src/crewai/flow/flow_visualizer.py b/src/crewai/flow/flow_visualizer.py index 988f27919..a70e91a18 100644 --- a/src/crewai/flow/flow_visualizer.py +++ b/src/crewai/flow/flow_visualizer.py @@ -1,12 +1,14 @@ # flow_visualizer.py import os +from pathlib import Path from pyvis.network import Network from crewai.flow.config import COLORS, NODE_STYLES from crewai.flow.html_template_handler import HTMLTemplateHandler from crewai.flow.legend_generator import generate_legend_items_html, get_legend_items +from crewai.flow.path_utils import safe_path_join, validate_path_exists from crewai.flow.utils import calculate_node_levels from crewai.flow.visualization_utils import ( add_edges, @@ -16,89 +18,209 @@ from crewai.flow.visualization_utils import ( class FlowPlot: + """Handles the creation and rendering of flow visualization diagrams.""" + def __init__(self, flow): + """ + Initialize FlowPlot with a flow object. + + Parameters + ---------- + flow : Flow + A Flow instance to visualize. + + Raises + ------ + ValueError + If flow object is invalid or missing required attributes. + """ + if not hasattr(flow, '_methods'): + raise ValueError("Invalid flow object: missing '_methods' attribute") + if not hasattr(flow, '_listeners'): + raise ValueError("Invalid flow object: missing '_listeners' attribute") + if not hasattr(flow, '_start_methods'): + raise ValueError("Invalid flow object: missing '_start_methods' attribute") + self.flow = flow self.colors = COLORS self.node_styles = NODE_STYLES def plot(self, filename): - net = Network( - directed=True, - height="750px", - width="100%", - bgcolor=self.colors["bg"], - layout=None, - ) - - # Set options to disable physics - net.set_options( - """ - var options = { - "nodes": { - "font": { - "multi": "html" - } - }, - "physics": { - "enabled": false - } - } """ - ) + Generate and save an HTML visualization of the flow. - # Calculate levels for nodes - node_levels = calculate_node_levels(self.flow) + Parameters + ---------- + filename : str + Name of the output file (without extension). - # Compute positions - node_positions = compute_positions(self.flow, node_levels) + Raises + ------ + ValueError + If filename is invalid or network generation fails. + IOError + If file operations fail or visualization cannot be generated. + RuntimeError + If network visualization generation fails. + """ + if not filename or not isinstance(filename, str): + raise ValueError("Filename must be a non-empty string") + + try: + # Initialize network + net = Network( + directed=True, + height="750px", + width="100%", + bgcolor=self.colors["bg"], + layout=None, + ) - # Add nodes to the network - add_nodes_to_network(net, self.flow, node_positions, self.node_styles) + # Set options to disable physics + net.set_options( + """ + var options = { + "nodes": { + "font": { + "multi": "html" + } + }, + "physics": { + "enabled": false + } + } + """ + ) - # Add edges to the network - add_edges(net, self.flow, node_positions, self.colors) + # Calculate levels for nodes + try: + node_levels = calculate_node_levels(self.flow) + except Exception as e: + raise ValueError(f"Failed to calculate node levels: {str(e)}") - network_html = net.generate_html() - final_html_content = self._generate_final_html(network_html) + # Compute positions + try: + node_positions = compute_positions(self.flow, node_levels) + except Exception as e: + raise ValueError(f"Failed to compute node positions: {str(e)}") - # Save the final HTML content to the file - with open(f"{filename}.html", "w", encoding="utf-8") as f: - f.write(final_html_content) - print(f"Plot saved as {filename}.html") + # Add nodes to the network + try: + add_nodes_to_network(net, self.flow, node_positions, self.node_styles) + except Exception as e: + raise RuntimeError(f"Failed to add nodes to network: {str(e)}") - self._cleanup_pyvis_lib() + # Add edges to the network + try: + add_edges(net, self.flow, node_positions, self.colors) + except Exception as e: + raise RuntimeError(f"Failed to add edges to network: {str(e)}") + + # Generate HTML + try: + network_html = net.generate_html() + final_html_content = self._generate_final_html(network_html) + except Exception as e: + raise RuntimeError(f"Failed to generate network visualization: {str(e)}") + + # Save the final HTML content to the file + try: + with open(f"{filename}.html", "w", encoding="utf-8") as f: + f.write(final_html_content) + print(f"Plot saved as {filename}.html") + except IOError as e: + raise IOError(f"Failed to save flow visualization to {filename}.html: {str(e)}") + + except (ValueError, RuntimeError, IOError) as e: + raise e + except Exception as e: + raise RuntimeError(f"Unexpected error during flow visualization: {str(e)}") + finally: + self._cleanup_pyvis_lib() def _generate_final_html(self, network_html): - # Extract just the body content from the generated HTML - current_dir = os.path.dirname(__file__) - template_path = os.path.join( - current_dir, "assets", "crewai_flow_visual_template.html" - ) - logo_path = os.path.join(current_dir, "assets", "crewai_logo.svg") + """ + Generate the final HTML content with network visualization and legend. - html_handler = HTMLTemplateHandler(template_path, logo_path) - network_body = html_handler.extract_body_content(network_html) + Parameters + ---------- + network_html : str + HTML content generated by pyvis Network. - # Generate the legend items HTML - legend_items = get_legend_items(self.colors) - legend_items_html = generate_legend_items_html(legend_items) - final_html_content = html_handler.generate_final_html( - network_body, legend_items_html - ) - return final_html_content + Returns + ------- + str + Complete HTML content with styling and legend. + + Raises + ------ + IOError + If template or logo files cannot be accessed. + ValueError + If network_html is invalid. + """ + if not network_html: + raise ValueError("Invalid network HTML content") + + try: + # Extract just the body content from the generated HTML + current_dir = os.path.dirname(__file__) + template_path = safe_path_join("assets", "crewai_flow_visual_template.html", root=current_dir) + logo_path = safe_path_join("assets", "crewai_logo.svg", root=current_dir) + + if not os.path.exists(template_path): + raise IOError(f"Template file not found: {template_path}") + if not os.path.exists(logo_path): + raise IOError(f"Logo file not found: {logo_path}") + + html_handler = HTMLTemplateHandler(template_path, logo_path) + network_body = html_handler.extract_body_content(network_html) + + # Generate the legend items HTML + legend_items = get_legend_items(self.colors) + legend_items_html = generate_legend_items_html(legend_items) + final_html_content = html_handler.generate_final_html( + network_body, legend_items_html + ) + return final_html_content + except Exception as e: + raise IOError(f"Failed to generate visualization HTML: {str(e)}") def _cleanup_pyvis_lib(self): - # Clean up the generated lib folder - lib_folder = os.path.join(os.getcwd(), "lib") + """ + Clean up the generated lib folder from pyvis. + + This method safely removes the temporary lib directory created by pyvis + during network visualization generation. + """ try: + lib_folder = safe_path_join("lib", root=os.getcwd()) if os.path.exists(lib_folder) and os.path.isdir(lib_folder): import shutil - shutil.rmtree(lib_folder) + except ValueError as e: + print(f"Error validating lib folder path: {e}") except Exception as e: - print(f"Error cleaning up {lib_folder}: {e}") + print(f"Error cleaning up lib folder: {e}") def plot_flow(flow, filename="flow_plot"): + """ + Convenience function to create and save a flow visualization. + + Parameters + ---------- + flow : Flow + Flow instance to visualize. + filename : str, optional + Output filename without extension, by default "flow_plot". + + Raises + ------ + ValueError + If flow object or filename is invalid. + IOError + If file operations fail. + """ visualizer = FlowPlot(flow) visualizer.plot(filename) diff --git a/src/crewai/flow/html_template_handler.py b/src/crewai/flow/html_template_handler.py index d521d8cf8..f0d2d89ad 100644 --- a/src/crewai/flow/html_template_handler.py +++ b/src/crewai/flow/html_template_handler.py @@ -1,26 +1,53 @@ import base64 import re +from pathlib import Path + +from crewai.flow.path_utils import safe_path_join, validate_path_exists class HTMLTemplateHandler: + """Handles HTML template processing and generation for flow visualization diagrams.""" + def __init__(self, template_path, logo_path): - self.template_path = template_path - self.logo_path = logo_path + """ + Initialize HTMLTemplateHandler with validated template and logo paths. + + Parameters + ---------- + template_path : str + Path to the HTML template file. + logo_path : str + Path to the logo image file. + + Raises + ------ + ValueError + If template or logo paths are invalid or files don't exist. + """ + try: + self.template_path = validate_path_exists(template_path, "file") + self.logo_path = validate_path_exists(logo_path, "file") + except ValueError as e: + raise ValueError(f"Invalid template or logo path: {e}") def read_template(self): + """Read and return the HTML template file contents.""" with open(self.template_path, "r", encoding="utf-8") as f: return f.read() def encode_logo(self): + """Convert the logo SVG file to base64 encoded string.""" with open(self.logo_path, "rb") as logo_file: logo_svg_data = logo_file.read() return base64.b64encode(logo_svg_data).decode("utf-8") def extract_body_content(self, html): + """Extract and return content between body tags from HTML string.""" match = re.search("