diff --git a/README.md b/README.md
index 5669c71a2..edcbb6f51 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@
# **CrewAI**
-🤖 **CrewAI**: Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks.
+🤖 **CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
@@ -22,13 +22,17 @@
- [Why CrewAI?](#why-crewai)
- [Getting Started](#getting-started)
- [Key Features](#key-features)
+- [Understanding Flows and Crews](#understanding-flows-and-crews)
+- [CrewAI vs LangGraph](#how-crewai-compares)
- [Examples](#examples)
- [Quick Tutorial](#quick-tutorial)
- [Write Job Descriptions](#write-job-descriptions)
- [Trip Planner](#trip-planner)
- [Stock Analysis](#stock-analysis)
+ - [Using Crews and Flows Together](#using-crews-and-flows-together)
- [Connecting Your Crew to a Model](#connecting-your-crew-to-a-model)
- [How CrewAI Compares](#how-crewai-compares)
+- [Frequently Asked Questions (FAQ)](#frequently-asked-questions-faq)
- [Contribution](#contribution)
- [Telemetry](#telemetry)
- [License](#license)
@@ -36,10 +40,40 @@
## Why CrewAI?
The power of AI collaboration has too much to offer.
-CrewAI is designed to enable AI agents to assume roles, share goals, and operate in a cohesive unit - much like a well-oiled crew. Whether you're building a smart assistant platform, an automated customer service ensemble, or a multi-agent research team, CrewAI provides the backbone for sophisticated multi-agent interactions.
+CrewAI is a standalone framework, built from the ground up without dependencies on Langchain or other agent frameworks. It's designed to enable AI agents to assume roles, share goals, and operate in a cohesive unit - much like a well-oiled crew. Whether you're building a smart assistant platform, an automated customer service ensemble, or a multi-agent research team, CrewAI provides the backbone for sophisticated multi-agent interactions.
## Getting Started
+### Learning Resources
+
+Learn CrewAI through our comprehensive courses:
+- [Multi AI Agent Systems with CrewAI](https://www.deeplearning.ai/short-courses/multi-ai-agent-systems-with-crewai/) - Master the fundamentals of multi-agent systems
+- [Practical Multi AI Agents and Advanced Use Cases](https://www.deeplearning.ai/short-courses/practical-multi-ai-agents-and-advanced-use-cases-with-crewai/) - Deep dive into advanced implementations
+
+### Understanding Flows and Crews
+
+CrewAI offers two powerful, complementary approaches that work seamlessly together to build sophisticated AI applications:
+
+1. **Crews**: Teams of AI agents with true autonomy and agency, working together to accomplish complex tasks through role-based collaboration. Crews enable:
+ - Natural, autonomous decision-making between agents
+ - Dynamic task delegation and collaboration
+ - Specialized roles with defined goals and expertise
+ - Flexible problem-solving approaches
+
+2. **Flows**: Production-ready, event-driven workflows that deliver precise control over complex automations. Flows provide:
+ - Fine-grained control over execution paths for real-world scenarios
+ - Secure, consistent state management between tasks
+ - Clean integration of AI agents with production Python code
+ - Conditional branching for complex business logic
+
+The true power of CrewAI emerges when combining Crews and Flows. This synergy allows you to:
+- Build complex, production-grade applications
+- Balance autonomy with precise control
+- Handle sophisticated real-world scenarios
+- Maintain clean, maintainable code structure
+
+### Getting Started with Installation
+
To get started with CrewAI, follow these simple steps:
### 1. Installation
@@ -51,7 +85,6 @@ First, install CrewAI:
```shell
pip install crewai
```
-
If you want to install the 'crewai' package along with its optional features that include additional tools for agents, you can do so by using the following command:
```shell
@@ -59,6 +92,22 @@ pip install 'crewai[tools]'
```
The command above installs the basic package and also adds extra components which require more dependencies to function.
+### Troubleshooting Dependencies
+
+If you encounter issues during installation or usage, here are some common solutions:
+
+#### Common Issues
+
+1. **ModuleNotFoundError: No module named 'tiktoken'**
+ - Install tiktoken explicitly: `pip install 'crewai[embeddings]'`
+ - If using embedchain or other tools: `pip install 'crewai[tools]'`
+
+2. **Failed building wheel for tiktoken**
+ - Ensure Rust compiler is installed (see installation steps above)
+ - For Windows: Verify Visual C++ Build Tools are installed
+ - Try upgrading pip: `pip install --upgrade pip`
+ - If issues persist, use a pre-built wheel: `pip install tiktoken --prefer-binary`
+
### 2. Setting Up Your Crew with the YAML Configuration
To create a new CrewAI project, run the following CLI (Command Line Interface) command:
@@ -264,13 +313,16 @@ In addition to the sequential process, you can use the hierarchical process, whi
## Key Features
-- **Role-Based Agent Design**: Customize agents with specific roles, goals, and tools.
-- **Autonomous Inter-Agent Delegation**: Agents can autonomously delegate tasks and inquire amongst themselves, enhancing problem-solving efficiency.
-- **Flexible Task Management**: Define tasks with customizable tools and assign them to agents dynamically.
-- **Processes Driven**: Currently only supports `sequential` task execution and `hierarchical` processes, but more complex processes like consensual and autonomous are being worked on.
-- **Save output as file**: Save the output of individual tasks as a file, so you can use it later.
-- **Parse output as Pydantic or Json**: Parse the output of individual tasks as a Pydantic model or as a Json if you want to.
-- **Works with Open Source Models**: Run your crew using Open AI or open source models refer to the [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-Connections/) page for details on configuring your agents' connections to models, even ones running locally!
+**Note**: CrewAI is a standalone framework built from the ground up, without dependencies on Langchain or other agent frameworks.
+
+- **Deep Customization**: Build sophisticated agents with full control over the system - from overriding inner prompts to accessing low-level APIs. Customize roles, goals, tools, and behaviors while maintaining clean abstractions.
+- **Autonomous Inter-Agent Delegation**: Agents can autonomously delegate tasks and inquire amongst themselves, enabling complex problem-solving in real-world scenarios.
+- **Flexible Task Management**: Define and customize tasks with granular control, from simple operations to complex multi-step processes.
+- **Production-Grade Architecture**: Support for both high-level abstractions and low-level customization, with robust error handling and state management.
+- **Predictable Results**: Ensure consistent, accurate outputs through programmatic guardrails, agent training capabilities, and flow-based execution control. See our [documentation on guardrails](https://docs.crewai.com/how-to/guardrails/) for implementation details.
+- **Model Flexibility**: Run your crew using OpenAI or open source models with production-ready integrations. See [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-Connections/) for detailed configuration options.
+- **Event-Driven Flows**: Build complex, real-world workflows with precise control over execution paths, state management, and conditional logic.
+- **Process Orchestration**: Achieve any workflow pattern through flows - from simple sequential and hierarchical processes to complex, custom orchestration patterns with conditional branching and parallel execution.

@@ -305,6 +357,98 @@ You can test different real life examples of AI crews in the [CrewAI-examples re
[](https://www.youtube.com/watch?v=e0Uj4yWdaAg "Stock Analysis")
+### Using Crews and Flows Together
+
+CrewAI's power truly shines when combining Crews with Flows to create sophisticated automation pipelines. Here's how you can orchestrate multiple Crews within a Flow:
+
+```python
+from crewai.flow.flow import Flow, listen, start, router
+from crewai import Crew, Agent, Task
+from pydantic import BaseModel
+
+# Define structured state for precise control
+class MarketState(BaseModel):
+ sentiment: str = "neutral"
+ confidence: float = 0.0
+ recommendations: list = []
+
+class AdvancedAnalysisFlow(Flow[MarketState]):
+ @start()
+ def fetch_market_data(self):
+ # Demonstrate low-level control with structured state
+ self.state.sentiment = "analyzing"
+ return {"sector": "tech", "timeframe": "1W"} # These parameters match the task description template
+
+ @listen(fetch_market_data)
+ def analyze_with_crew(self, market_data):
+ # Show crew agency through specialized roles
+ analyst = Agent(
+ role="Senior Market Analyst",
+ goal="Conduct deep market analysis with expert insight",
+ backstory="You're a veteran analyst known for identifying subtle market patterns"
+ )
+ researcher = Agent(
+ role="Data Researcher",
+ goal="Gather and validate supporting market data",
+ backstory="You excel at finding and correlating multiple data sources"
+ )
+
+ analysis_task = Task(
+ description="Analyze {sector} sector data for the past {timeframe}",
+ expected_output="Detailed market analysis with confidence score",
+ agent=analyst
+ )
+ research_task = Task(
+ description="Find supporting data to validate the analysis",
+ expected_output="Corroborating evidence and potential contradictions",
+ agent=researcher
+ )
+
+ # Demonstrate crew autonomy
+ analysis_crew = Crew(
+ agents=[analyst, researcher],
+ tasks=[analysis_task, research_task],
+ process=Process.sequential,
+ verbose=True
+ )
+ return analysis_crew.kickoff(inputs=market_data) # Pass market_data as named inputs
+
+ @router(analyze_with_crew)
+ def determine_next_steps(self):
+ # Show flow control with conditional routing
+ if self.state.confidence > 0.8:
+ return "high_confidence"
+ elif self.state.confidence > 0.5:
+ return "medium_confidence"
+ return "low_confidence"
+
+ @listen("high_confidence")
+ def execute_strategy(self):
+ # Demonstrate complex decision making
+ strategy_crew = Crew(
+ agents=[
+ Agent(role="Strategy Expert",
+ goal="Develop optimal market strategy")
+ ],
+ tasks=[
+ Task(description="Create detailed strategy based on analysis",
+ expected_output="Step-by-step action plan")
+ ]
+ )
+ return strategy_crew.kickoff()
+
+ @listen("medium_confidence", "low_confidence")
+ def request_additional_analysis(self):
+ self.state.recommendations.append("Gather more data")
+ return "Additional analysis required"
+```
+
+This example demonstrates how to:
+1. Use Python code for basic data operations
+2. Create and execute Crews as steps in your workflow
+3. Use Flow decorators to manage the sequence of operations
+4. Implement conditional branching based on Crew results
+
## Connecting Your Crew to a Model
CrewAI supports using various LLMs through a variety of connection options. By default your agents will use the OpenAI API when querying the model. However, there are several other ways to allow your agents to connect to models. For example, you can configure your agents to use a local model via the Ollama tool.
@@ -313,9 +457,13 @@ Please refer to the [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-
## How CrewAI Compares
-**CrewAI's Advantage**: CrewAI is built with production in mind. It offers the flexibility of Autogen's conversational agents and the structured process approach of ChatDev, but without the rigidity. CrewAI's processes are designed to be dynamic and adaptable, fitting seamlessly into both development and production workflows.
+**CrewAI's Advantage**: CrewAI combines autonomous agent intelligence with precise workflow control through its unique Crews and Flows architecture. The framework excels at both high-level orchestration and low-level customization, enabling complex, production-grade systems with granular control.
-- **Autogen**: While Autogen does good in creating conversational agents capable of working together, it lacks an inherent concept of process. In Autogen, orchestrating agents' interactions requires additional programming, which can become complex and cumbersome as the scale of tasks grows.
+- **LangGraph**: While LangGraph provides a foundation for building agent workflows, its approach requires significant boilerplate code and complex state management patterns. The framework's tight coupling with LangChain can limit flexibility when implementing custom agent behaviors or integrating with external systems.
+
+*P.S. CrewAI demonstrates significant performance advantages over LangGraph, executing 5.76x faster in certain cases like this QA task example ([see comparison](https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/QA%20Agent)) while achieving higher evaluation scores with faster completion times in certain coding tasks, like in this example ([detailed analysis](https://github.com/crewAIInc/crewAI-examples/blob/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/Coding%20Assistant/coding_assistant_eval.ipynb)).*
+
+- **Autogen**: While Autogen excels at creating conversational agents capable of working together, it lacks an inherent concept of process. In Autogen, orchestrating agents' interactions requires additional programming, which can become complex and cumbersome as the scale of tasks grows.
- **ChatDev**: ChatDev introduced the idea of processes into the realm of AI agents, but its implementation is quite rigid. Customizations in ChatDev are limited and not geared towards production environments, which can hinder scalability and flexibility in real-world applications.
@@ -440,5 +588,8 @@ A: CrewAI uses anonymous telemetry to collect usage data for improvement purpose
### Q: Where can I find examples of CrewAI in action?
A: You can find various real-life examples in the [CrewAI-examples repository](https://github.com/crewAIInc/crewAI-examples), including trip planners, stock analysis tools, and more.
+### Q: What is the difference between Crews and Flows?
+A: Crews and Flows serve different but complementary purposes in CrewAI. Crews are teams of AI agents working together to accomplish specific tasks through role-based collaboration, delivering accurate and predictable results. Flows, on the other hand, are event-driven workflows that can orchestrate both Crews and regular Python code, allowing you to build complex automation pipelines with secure state management and conditional execution paths.
+
### Q: How can I contribute to CrewAI?
A: Contributions are welcome! You can fork the repository, create a new branch for your feature, add your improvement, and send a pull request. Check the Contribution section in the README for more details.
diff --git a/docs/concepts/knowledge.mdx b/docs/concepts/knowledge.mdx
index 8df6f623f..8a777833e 100644
--- a/docs/concepts/knowledge.mdx
+++ b/docs/concepts/knowledge.mdx
@@ -171,6 +171,58 @@ crewai reset-memories --knowledge
This is useful when you've updated your knowledge sources and want to ensure that the agents are using the most recent information.
+## Agent-Specific Knowledge
+
+While knowledge can be provided at the crew level using `crew.knowledge_sources`, individual agents can also have their own knowledge sources using the `knowledge_sources` parameter:
+
+```python Code
+from crewai import Agent, Task, Crew
+from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
+
+# Create agent-specific knowledge about a product
+product_specs = StringKnowledgeSource(
+ content="""The XPS 13 laptop features:
+ - 13.4-inch 4K display
+ - Intel Core i7 processor
+ - 16GB RAM
+ - 512GB SSD storage
+ - 12-hour battery life""",
+ metadata={"category": "product_specs"}
+)
+
+# Create a support agent with product knowledge
+support_agent = Agent(
+ role="Technical Support Specialist",
+ goal="Provide accurate product information and support.",
+ backstory="You are an expert on our laptop products and specifications.",
+ knowledge_sources=[product_specs] # Agent-specific knowledge
+)
+
+# Create a task that requires product knowledge
+support_task = Task(
+ description="Answer this customer question: {question}",
+ agent=support_agent
+)
+
+# Create and run the crew
+crew = Crew(
+ agents=[support_agent],
+ tasks=[support_task]
+)
+
+# Get answer about the laptop's specifications
+result = crew.kickoff(
+ inputs={"question": "What is the storage capacity of the XPS 13?"}
+)
+```
+
+
+ Benefits of agent-specific knowledge:
+ - Give agents specialized information for their roles
+ - Maintain separation of concerns between agents
+ - Combine with crew-level knowledge for layered information access
+
+
## Custom Knowledge Sources
CrewAI allows you to create custom knowledge sources for any type of data by extending the `BaseKnowledgeSource` class. Let's create a practical example that fetches and processes space news articles.
diff --git a/pyproject.toml b/pyproject.toml
index 3f10c1a87..bcc00a0d9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -8,27 +8,38 @@ authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
dependencies = [
+ # Core Dependencies
"pydantic>=2.4.2",
"openai>=1.13.3",
+ "litellm>=1.44.22",
+ "instructor>=1.3.3",
+
+ # Text Processing
+ "pdfplumber>=0.11.4",
+ "regex>=2024.9.11",
+
+ # Telemetry and Monitoring
"opentelemetry-api>=1.22.0",
"opentelemetry-sdk>=1.22.0",
"opentelemetry-exporter-otlp-proto-http>=1.22.0",
- "instructor>=1.3.3",
- "regex>=2024.9.11",
- "click>=8.1.7",
+
+ # Data Handling
+ "chromadb>=0.5.23",
+ "openpyxl>=3.1.5",
+ "pyvis>=0.3.2",
+
+ # Authentication and Security
+ "auth0-python>=4.7.1",
"python-dotenv>=1.0.0",
+
+ # Configuration and Utils
+ "click>=8.1.7",
"appdirs>=1.4.4",
"jsonref>=1.1.0",
"json-repair>=0.25.2",
- "auth0-python>=4.7.1",
- "litellm>=1.44.22",
- "pyvis>=0.3.2",
"uv>=0.4.25",
"tomli-w>=1.1.0",
"tomli>=2.0.2",
- "chromadb>=0.5.23",
- "pdfplumber>=0.11.4",
- "openpyxl>=3.1.5",
"blinker>=1.9.0",
]
@@ -39,6 +50,9 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools>=0.17.0"]
+embeddings = [
+ "tiktoken~=0.7.0"
+]
agentops = ["agentops>=0.3.0"]
fastembed = ["fastembed>=0.4.1"]
pdfplumber = [
diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py
index 4a6361cce..dc46aa6d8 100644
--- a/src/crewai/flow/flow.py
+++ b/src/crewai/flow/flow.py
@@ -30,7 +30,47 @@ from crewai.telemetry import Telemetry
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
-def start(condition=None):
+def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
+ """
+ Marks a method as a flow's starting point.
+
+ This decorator designates a method as an entry point for the flow execution.
+ It can optionally specify conditions that trigger the start based on other
+ method executions.
+
+ Parameters
+ ----------
+ condition : Optional[Union[str, dict, Callable]], optional
+ Defines when the start method should execute. Can be:
+ - str: Name of a method that triggers this start
+ - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
+ - Callable: A method reference that triggers this start
+ Default is None, meaning unconditional start.
+
+ Returns
+ -------
+ Callable
+ A decorator function that marks the method as a flow start point.
+
+ Raises
+ ------
+ ValueError
+ If the condition format is invalid.
+
+ Examples
+ --------
+ >>> @start() # Unconditional start
+ >>> def begin_flow(self):
+ ... pass
+
+ >>> @start("method_name") # Start after specific method
+ >>> def conditional_start(self):
+ ... pass
+
+ >>> @start(and_("method1", "method2")) # Start after multiple methods
+ >>> def complex_start(self):
+ ... pass
+ """
def decorator(func):
func.__is_start_method__ = True
if condition is not None:
@@ -55,8 +95,42 @@ def start(condition=None):
return decorator
+def listen(condition: Union[str, dict, Callable]) -> Callable:
+ """
+ Creates a listener that executes when specified conditions are met.
-def listen(condition):
+ This decorator sets up a method to execute in response to other method
+ executions in the flow. It supports both simple and complex triggering
+ conditions.
+
+ Parameters
+ ----------
+ condition : Union[str, dict, Callable]
+ Specifies when the listener should execute. Can be:
+ - str: Name of a method that triggers this listener
+ - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
+ - Callable: A method reference that triggers this listener
+
+ Returns
+ -------
+ Callable
+ A decorator function that sets up the method as a listener.
+
+ Raises
+ ------
+ ValueError
+ If the condition format is invalid.
+
+ Examples
+ --------
+ >>> @listen("process_data") # Listen to single method
+ >>> def handle_processed_data(self):
+ ... pass
+
+ >>> @listen(or_("success", "failure")) # Listen to multiple methods
+ >>> def handle_completion(self):
+ ... pass
+ """
def decorator(func):
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
@@ -80,10 +154,49 @@ def listen(condition):
return decorator
-def router(condition):
+def router(condition: Union[str, dict, Callable]) -> Callable:
+ """
+ Creates a routing method that directs flow execution based on conditions.
+
+ This decorator marks a method as a router, which can dynamically determine
+ the next steps in the flow based on its return value. Routers are triggered
+ by specified conditions and can return constants that determine which path
+ the flow should take.
+
+ Parameters
+ ----------
+ condition : Union[str, dict, Callable]
+ Specifies when the router should execute. Can be:
+ - str: Name of a method that triggers this router
+ - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
+ - Callable: A method reference that triggers this router
+
+ Returns
+ -------
+ Callable
+ A decorator function that sets up the method as a router.
+
+ Raises
+ ------
+ ValueError
+ If the condition format is invalid.
+
+ Examples
+ --------
+ >>> @router("check_status")
+ >>> def route_based_on_status(self):
+ ... if self.state.status == "success":
+ ... return SUCCESS
+ ... return FAILURE
+
+ >>> @router(and_("validate", "process"))
+ >>> def complex_routing(self):
+ ... if all([self.state.valid, self.state.processed]):
+ ... return CONTINUE
+ ... return STOP
+ """
def decorator(func):
func.__is_router__ = True
- # Handle conditions like listen/start
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
@@ -105,8 +218,39 @@ def router(condition):
return decorator
+def or_(*conditions: Union[str, dict, Callable]) -> dict:
+ """
+ Combines multiple conditions with OR logic for flow control.
-def or_(*conditions):
+ Creates a condition that is satisfied when any of the specified conditions
+ are met. This is used with @start, @listen, or @router decorators to create
+ complex triggering conditions.
+
+ Parameters
+ ----------
+ *conditions : Union[str, dict, Callable]
+ Variable number of conditions that can be:
+ - str: Method names
+ - dict: Existing condition dictionaries
+ - Callable: Method references
+
+ Returns
+ -------
+ dict
+ A condition dictionary with format:
+ {"type": "OR", "methods": list_of_method_names}
+
+ Raises
+ ------
+ ValueError
+ If any condition is invalid.
+
+ Examples
+ --------
+ >>> @listen(or_("success", "timeout"))
+ >>> def handle_completion(self):
+ ... pass
+ """
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
@@ -120,7 +264,39 @@ def or_(*conditions):
return {"type": "OR", "methods": methods}
-def and_(*conditions):
+def and_(*conditions: Union[str, dict, Callable]) -> dict:
+ """
+ Combines multiple conditions with AND logic for flow control.
+
+ Creates a condition that is satisfied only when all specified conditions
+ are met. This is used with @start, @listen, or @router decorators to create
+ complex triggering conditions.
+
+ Parameters
+ ----------
+ *conditions : Union[str, dict, Callable]
+ Variable number of conditions that can be:
+ - str: Method names
+ - dict: Existing condition dictionaries
+ - Callable: Method references
+
+ Returns
+ -------
+ dict
+ A condition dictionary with format:
+ {"type": "AND", "methods": list_of_method_names}
+
+ Raises
+ ------
+ ValueError
+ If any condition is invalid.
+
+ Examples
+ --------
+ >>> @listen(and_("validated", "processed"))
+ >>> def handle_complete_data(self):
+ ... pass
+ """
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
@@ -286,6 +462,23 @@ class Flow(Generic[T], metaclass=FlowMeta):
return final_output
async def _execute_start_method(self, start_method_name: str) -> None:
+ """
+ Executes a flow's start method and its triggered listeners.
+
+ This internal method handles the execution of methods marked with @start
+ decorator and manages the subsequent chain of listener executions.
+
+ Parameters
+ ----------
+ start_method_name : str
+ The name of the start method to execute.
+
+ Notes
+ -----
+ - Executes the start method and captures its result
+ - Triggers execution of any listeners waiting on this start method
+ - Part of the flow's initialization sequence
+ """
result = await self._execute_method(
start_method_name, self._methods[start_method_name]
)
@@ -306,6 +499,28 @@ class Flow(Generic[T], metaclass=FlowMeta):
return result
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
+ """
+ Executes all listeners and routers triggered by a method completion.
+
+ This internal method manages the execution flow by:
+ 1. First executing all triggered routers sequentially
+ 2. Then executing all triggered listeners in parallel
+
+ Parameters
+ ----------
+ trigger_method : str
+ The name of the method that triggered these listeners.
+ result : Any
+ The result from the triggering method, passed to listeners
+ that accept parameters.
+
+ Notes
+ -----
+ - Routers are executed sequentially to maintain flow control
+ - Each router's result becomes the new trigger_method
+ - Normal listeners are executed in parallel for efficiency
+ - Listeners can receive the trigger method's result as a parameter
+ """
# First, handle routers repeatedly until no router triggers anymore
while True:
routers_triggered = self._find_triggered_methods(
@@ -335,6 +550,33 @@ class Flow(Generic[T], metaclass=FlowMeta):
def _find_triggered_methods(
self, trigger_method: str, router_only: bool
) -> List[str]:
+ """
+ Finds all methods that should be triggered based on conditions.
+
+ This internal method evaluates both OR and AND conditions to determine
+ which methods should be executed next in the flow.
+
+ Parameters
+ ----------
+ trigger_method : str
+ The name of the method that just completed execution.
+ router_only : bool
+ If True, only consider router methods.
+ If False, only consider non-router methods.
+
+ Returns
+ -------
+ List[str]
+ Names of methods that should be triggered.
+
+ Notes
+ -----
+ - Handles both OR and AND conditions:
+ * OR: Triggers if any condition is met
+ * AND: Triggers only when all conditions are met
+ - Maintains state for AND conditions using _pending_and_listeners
+ - Separates router and normal listener evaluation
+ """
triggered = []
for listener_name, (condition_type, methods) in self._listeners.items():
is_router = listener_name in self._routers
@@ -363,6 +605,33 @@ class Flow(Generic[T], metaclass=FlowMeta):
return triggered
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
+ """
+ Executes a single listener method with proper event handling.
+
+ This internal method manages the execution of an individual listener,
+ including parameter inspection, event emission, and error handling.
+
+ Parameters
+ ----------
+ listener_name : str
+ The name of the listener method to execute.
+ result : Any
+ The result from the triggering method, which may be passed
+ to the listener if it accepts parameters.
+
+ Notes
+ -----
+ - Inspects method signature to determine if it accepts the trigger result
+ - Emits events for method execution start and finish
+ - Handles errors gracefully with detailed logging
+ - Recursively triggers listeners of this listener
+ - Supports both parameterized and parameter-less listeners
+
+ Error Handling
+ -------------
+ Catches and logs any exceptions during execution, preventing
+ individual listener failures from breaking the entire flow.
+ """
try:
method = self._methods[listener_name]
diff --git a/src/crewai/flow/flow_visualizer.py b/src/crewai/flow/flow_visualizer.py
index 988f27919..a70e91a18 100644
--- a/src/crewai/flow/flow_visualizer.py
+++ b/src/crewai/flow/flow_visualizer.py
@@ -1,12 +1,14 @@
# flow_visualizer.py
import os
+from pathlib import Path
from pyvis.network import Network
from crewai.flow.config import COLORS, NODE_STYLES
from crewai.flow.html_template_handler import HTMLTemplateHandler
from crewai.flow.legend_generator import generate_legend_items_html, get_legend_items
+from crewai.flow.path_utils import safe_path_join, validate_path_exists
from crewai.flow.utils import calculate_node_levels
from crewai.flow.visualization_utils import (
add_edges,
@@ -16,89 +18,209 @@ from crewai.flow.visualization_utils import (
class FlowPlot:
+ """Handles the creation and rendering of flow visualization diagrams."""
+
def __init__(self, flow):
+ """
+ Initialize FlowPlot with a flow object.
+
+ Parameters
+ ----------
+ flow : Flow
+ A Flow instance to visualize.
+
+ Raises
+ ------
+ ValueError
+ If flow object is invalid or missing required attributes.
+ """
+ if not hasattr(flow, '_methods'):
+ raise ValueError("Invalid flow object: missing '_methods' attribute")
+ if not hasattr(flow, '_listeners'):
+ raise ValueError("Invalid flow object: missing '_listeners' attribute")
+ if not hasattr(flow, '_start_methods'):
+ raise ValueError("Invalid flow object: missing '_start_methods' attribute")
+
self.flow = flow
self.colors = COLORS
self.node_styles = NODE_STYLES
def plot(self, filename):
- net = Network(
- directed=True,
- height="750px",
- width="100%",
- bgcolor=self.colors["bg"],
- layout=None,
- )
-
- # Set options to disable physics
- net.set_options(
- """
- var options = {
- "nodes": {
- "font": {
- "multi": "html"
- }
- },
- "physics": {
- "enabled": false
- }
- }
"""
- )
+ Generate and save an HTML visualization of the flow.
- # Calculate levels for nodes
- node_levels = calculate_node_levels(self.flow)
+ Parameters
+ ----------
+ filename : str
+ Name of the output file (without extension).
- # Compute positions
- node_positions = compute_positions(self.flow, node_levels)
+ Raises
+ ------
+ ValueError
+ If filename is invalid or network generation fails.
+ IOError
+ If file operations fail or visualization cannot be generated.
+ RuntimeError
+ If network visualization generation fails.
+ """
+ if not filename or not isinstance(filename, str):
+ raise ValueError("Filename must be a non-empty string")
+
+ try:
+ # Initialize network
+ net = Network(
+ directed=True,
+ height="750px",
+ width="100%",
+ bgcolor=self.colors["bg"],
+ layout=None,
+ )
- # Add nodes to the network
- add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
+ # Set options to disable physics
+ net.set_options(
+ """
+ var options = {
+ "nodes": {
+ "font": {
+ "multi": "html"
+ }
+ },
+ "physics": {
+ "enabled": false
+ }
+ }
+ """
+ )
- # Add edges to the network
- add_edges(net, self.flow, node_positions, self.colors)
+ # Calculate levels for nodes
+ try:
+ node_levels = calculate_node_levels(self.flow)
+ except Exception as e:
+ raise ValueError(f"Failed to calculate node levels: {str(e)}")
- network_html = net.generate_html()
- final_html_content = self._generate_final_html(network_html)
+ # Compute positions
+ try:
+ node_positions = compute_positions(self.flow, node_levels)
+ except Exception as e:
+ raise ValueError(f"Failed to compute node positions: {str(e)}")
- # Save the final HTML content to the file
- with open(f"{filename}.html", "w", encoding="utf-8") as f:
- f.write(final_html_content)
- print(f"Plot saved as {filename}.html")
+ # Add nodes to the network
+ try:
+ add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
+ except Exception as e:
+ raise RuntimeError(f"Failed to add nodes to network: {str(e)}")
- self._cleanup_pyvis_lib()
+ # Add edges to the network
+ try:
+ add_edges(net, self.flow, node_positions, self.colors)
+ except Exception as e:
+ raise RuntimeError(f"Failed to add edges to network: {str(e)}")
+
+ # Generate HTML
+ try:
+ network_html = net.generate_html()
+ final_html_content = self._generate_final_html(network_html)
+ except Exception as e:
+ raise RuntimeError(f"Failed to generate network visualization: {str(e)}")
+
+ # Save the final HTML content to the file
+ try:
+ with open(f"{filename}.html", "w", encoding="utf-8") as f:
+ f.write(final_html_content)
+ print(f"Plot saved as {filename}.html")
+ except IOError as e:
+ raise IOError(f"Failed to save flow visualization to {filename}.html: {str(e)}")
+
+ except (ValueError, RuntimeError, IOError) as e:
+ raise e
+ except Exception as e:
+ raise RuntimeError(f"Unexpected error during flow visualization: {str(e)}")
+ finally:
+ self._cleanup_pyvis_lib()
def _generate_final_html(self, network_html):
- # Extract just the body content from the generated HTML
- current_dir = os.path.dirname(__file__)
- template_path = os.path.join(
- current_dir, "assets", "crewai_flow_visual_template.html"
- )
- logo_path = os.path.join(current_dir, "assets", "crewai_logo.svg")
+ """
+ Generate the final HTML content with network visualization and legend.
- html_handler = HTMLTemplateHandler(template_path, logo_path)
- network_body = html_handler.extract_body_content(network_html)
+ Parameters
+ ----------
+ network_html : str
+ HTML content generated by pyvis Network.
- # Generate the legend items HTML
- legend_items = get_legend_items(self.colors)
- legend_items_html = generate_legend_items_html(legend_items)
- final_html_content = html_handler.generate_final_html(
- network_body, legend_items_html
- )
- return final_html_content
+ Returns
+ -------
+ str
+ Complete HTML content with styling and legend.
+
+ Raises
+ ------
+ IOError
+ If template or logo files cannot be accessed.
+ ValueError
+ If network_html is invalid.
+ """
+ if not network_html:
+ raise ValueError("Invalid network HTML content")
+
+ try:
+ # Extract just the body content from the generated HTML
+ current_dir = os.path.dirname(__file__)
+ template_path = safe_path_join("assets", "crewai_flow_visual_template.html", root=current_dir)
+ logo_path = safe_path_join("assets", "crewai_logo.svg", root=current_dir)
+
+ if not os.path.exists(template_path):
+ raise IOError(f"Template file not found: {template_path}")
+ if not os.path.exists(logo_path):
+ raise IOError(f"Logo file not found: {logo_path}")
+
+ html_handler = HTMLTemplateHandler(template_path, logo_path)
+ network_body = html_handler.extract_body_content(network_html)
+
+ # Generate the legend items HTML
+ legend_items = get_legend_items(self.colors)
+ legend_items_html = generate_legend_items_html(legend_items)
+ final_html_content = html_handler.generate_final_html(
+ network_body, legend_items_html
+ )
+ return final_html_content
+ except Exception as e:
+ raise IOError(f"Failed to generate visualization HTML: {str(e)}")
def _cleanup_pyvis_lib(self):
- # Clean up the generated lib folder
- lib_folder = os.path.join(os.getcwd(), "lib")
+ """
+ Clean up the generated lib folder from pyvis.
+
+ This method safely removes the temporary lib directory created by pyvis
+ during network visualization generation.
+ """
try:
+ lib_folder = safe_path_join("lib", root=os.getcwd())
if os.path.exists(lib_folder) and os.path.isdir(lib_folder):
import shutil
-
shutil.rmtree(lib_folder)
+ except ValueError as e:
+ print(f"Error validating lib folder path: {e}")
except Exception as e:
- print(f"Error cleaning up {lib_folder}: {e}")
+ print(f"Error cleaning up lib folder: {e}")
def plot_flow(flow, filename="flow_plot"):
+ """
+ Convenience function to create and save a flow visualization.
+
+ Parameters
+ ----------
+ flow : Flow
+ Flow instance to visualize.
+ filename : str, optional
+ Output filename without extension, by default "flow_plot".
+
+ Raises
+ ------
+ ValueError
+ If flow object or filename is invalid.
+ IOError
+ If file operations fail.
+ """
visualizer = FlowPlot(flow)
visualizer.plot(filename)
diff --git a/src/crewai/flow/html_template_handler.py b/src/crewai/flow/html_template_handler.py
index d521d8cf8..f0d2d89ad 100644
--- a/src/crewai/flow/html_template_handler.py
+++ b/src/crewai/flow/html_template_handler.py
@@ -1,26 +1,53 @@
import base64
import re
+from pathlib import Path
+
+from crewai.flow.path_utils import safe_path_join, validate_path_exists
class HTMLTemplateHandler:
+ """Handles HTML template processing and generation for flow visualization diagrams."""
+
def __init__(self, template_path, logo_path):
- self.template_path = template_path
- self.logo_path = logo_path
+ """
+ Initialize HTMLTemplateHandler with validated template and logo paths.
+
+ Parameters
+ ----------
+ template_path : str
+ Path to the HTML template file.
+ logo_path : str
+ Path to the logo image file.
+
+ Raises
+ ------
+ ValueError
+ If template or logo paths are invalid or files don't exist.
+ """
+ try:
+ self.template_path = validate_path_exists(template_path, "file")
+ self.logo_path = validate_path_exists(logo_path, "file")
+ except ValueError as e:
+ raise ValueError(f"Invalid template or logo path: {e}")
def read_template(self):
+ """Read and return the HTML template file contents."""
with open(self.template_path, "r", encoding="utf-8") as f:
return f.read()
def encode_logo(self):
+ """Convert the logo SVG file to base64 encoded string."""
with open(self.logo_path, "rb") as logo_file:
logo_svg_data = logo_file.read()
return base64.b64encode(logo_svg_data).decode("utf-8")
def extract_body_content(self, html):
+ """Extract and return content between body tags from HTML string."""
match = re.search("(.*?)