diff --git a/README.md b/README.md
index 5669c71a2..edcbb6f51 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@
# **CrewAI**
-π€ **CrewAI**: Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks.
+π€ **CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
@@ -22,13 +22,17 @@
- [Why CrewAI?](#why-crewai)
- [Getting Started](#getting-started)
- [Key Features](#key-features)
+- [Understanding Flows and Crews](#understanding-flows-and-crews)
+- [CrewAI vs LangGraph](#how-crewai-compares)
- [Examples](#examples)
- [Quick Tutorial](#quick-tutorial)
- [Write Job Descriptions](#write-job-descriptions)
- [Trip Planner](#trip-planner)
- [Stock Analysis](#stock-analysis)
+ - [Using Crews and Flows Together](#using-crews-and-flows-together)
- [Connecting Your Crew to a Model](#connecting-your-crew-to-a-model)
- [How CrewAI Compares](#how-crewai-compares)
+- [Frequently Asked Questions (FAQ)](#frequently-asked-questions-faq)
- [Contribution](#contribution)
- [Telemetry](#telemetry)
- [License](#license)
@@ -36,10 +40,40 @@
## Why CrewAI?
The power of AI collaboration has too much to offer.
-CrewAI is designed to enable AI agents to assume roles, share goals, and operate in a cohesive unit - much like a well-oiled crew. Whether you're building a smart assistant platform, an automated customer service ensemble, or a multi-agent research team, CrewAI provides the backbone for sophisticated multi-agent interactions.
+CrewAI is a standalone framework, built from the ground up without dependencies on Langchain or other agent frameworks. It's designed to enable AI agents to assume roles, share goals, and operate in a cohesive unit - much like a well-oiled crew. Whether you're building a smart assistant platform, an automated customer service ensemble, or a multi-agent research team, CrewAI provides the backbone for sophisticated multi-agent interactions.
## Getting Started
+### Learning Resources
+
+Learn CrewAI through our comprehensive courses:
+- [Multi AI Agent Systems with CrewAI](https://www.deeplearning.ai/short-courses/multi-ai-agent-systems-with-crewai/) - Master the fundamentals of multi-agent systems
+- [Practical Multi AI Agents and Advanced Use Cases](https://www.deeplearning.ai/short-courses/practical-multi-ai-agents-and-advanced-use-cases-with-crewai/) - Deep dive into advanced implementations
+
+### Understanding Flows and Crews
+
+CrewAI offers two powerful, complementary approaches that work seamlessly together to build sophisticated AI applications:
+
+1. **Crews**: Teams of AI agents with true autonomy and agency, working together to accomplish complex tasks through role-based collaboration. Crews enable:
+ - Natural, autonomous decision-making between agents
+ - Dynamic task delegation and collaboration
+ - Specialized roles with defined goals and expertise
+ - Flexible problem-solving approaches
+
+2. **Flows**: Production-ready, event-driven workflows that deliver precise control over complex automations. Flows provide:
+ - Fine-grained control over execution paths for real-world scenarios
+ - Secure, consistent state management between tasks
+ - Clean integration of AI agents with production Python code
+ - Conditional branching for complex business logic
+
+The true power of CrewAI emerges when combining Crews and Flows. This synergy allows you to:
+- Build complex, production-grade applications
+- Balance autonomy with precise control
+- Handle sophisticated real-world scenarios
+- Maintain clean, maintainable code structure
+
+### Getting Started with Installation
+
To get started with CrewAI, follow these simple steps:
### 1. Installation
@@ -51,7 +85,6 @@ First, install CrewAI:
```shell
pip install crewai
```
-
If you want to install the 'crewai' package along with its optional features that include additional tools for agents, you can do so by using the following command:
```shell
@@ -59,6 +92,22 @@ pip install 'crewai[tools]'
```
The command above installs the basic package and also adds extra components which require more dependencies to function.
+### Troubleshooting Dependencies
+
+If you encounter issues during installation or usage, here are some common solutions:
+
+#### Common Issues
+
+1. **ModuleNotFoundError: No module named 'tiktoken'**
+ - Install tiktoken explicitly: `pip install 'crewai[embeddings]'`
+ - If using embedchain or other tools: `pip install 'crewai[tools]'`
+
+2. **Failed building wheel for tiktoken**
+ - Ensure Rust compiler is installed (see installation steps above)
+ - For Windows: Verify Visual C++ Build Tools are installed
+ - Try upgrading pip: `pip install --upgrade pip`
+ - If issues persist, use a pre-built wheel: `pip install tiktoken --prefer-binary`
+
### 2. Setting Up Your Crew with the YAML Configuration
To create a new CrewAI project, run the following CLI (Command Line Interface) command:
@@ -264,13 +313,16 @@ In addition to the sequential process, you can use the hierarchical process, whi
## Key Features
-- **Role-Based Agent Design**: Customize agents with specific roles, goals, and tools.
-- **Autonomous Inter-Agent Delegation**: Agents can autonomously delegate tasks and inquire amongst themselves, enhancing problem-solving efficiency.
-- **Flexible Task Management**: Define tasks with customizable tools and assign them to agents dynamically.
-- **Processes Driven**: Currently only supports `sequential` task execution and `hierarchical` processes, but more complex processes like consensual and autonomous are being worked on.
-- **Save output as file**: Save the output of individual tasks as a file, so you can use it later.
-- **Parse output as Pydantic or Json**: Parse the output of individual tasks as a Pydantic model or as a Json if you want to.
-- **Works with Open Source Models**: Run your crew using Open AI or open source models refer to the [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-Connections/) page for details on configuring your agents' connections to models, even ones running locally!
+**Note**: CrewAI is a standalone framework built from the ground up, without dependencies on Langchain or other agent frameworks.
+
+- **Deep Customization**: Build sophisticated agents with full control over the system - from overriding inner prompts to accessing low-level APIs. Customize roles, goals, tools, and behaviors while maintaining clean abstractions.
+- **Autonomous Inter-Agent Delegation**: Agents can autonomously delegate tasks and inquire amongst themselves, enabling complex problem-solving in real-world scenarios.
+- **Flexible Task Management**: Define and customize tasks with granular control, from simple operations to complex multi-step processes.
+- **Production-Grade Architecture**: Support for both high-level abstractions and low-level customization, with robust error handling and state management.
+- **Predictable Results**: Ensure consistent, accurate outputs through programmatic guardrails, agent training capabilities, and flow-based execution control. See our [documentation on guardrails](https://docs.crewai.com/how-to/guardrails/) for implementation details.
+- **Model Flexibility**: Run your crew using OpenAI or open source models with production-ready integrations. See [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-Connections/) for detailed configuration options.
+- **Event-Driven Flows**: Build complex, real-world workflows with precise control over execution paths, state management, and conditional logic.
+- **Process Orchestration**: Achieve any workflow pattern through flows - from simple sequential and hierarchical processes to complex, custom orchestration patterns with conditional branching and parallel execution.

@@ -305,6 +357,98 @@ You can test different real life examples of AI crews in the [CrewAI-examples re
[](https://www.youtube.com/watch?v=e0Uj4yWdaAg "Stock Analysis")
+### Using Crews and Flows Together
+
+CrewAI's power truly shines when combining Crews with Flows to create sophisticated automation pipelines. Here's how you can orchestrate multiple Crews within a Flow:
+
+```python
+from crewai.flow.flow import Flow, listen, start, router
+from crewai import Crew, Agent, Task
+from pydantic import BaseModel
+
+# Define structured state for precise control
+class MarketState(BaseModel):
+ sentiment: str = "neutral"
+ confidence: float = 0.0
+ recommendations: list = []
+
+class AdvancedAnalysisFlow(Flow[MarketState]):
+ @start()
+ def fetch_market_data(self):
+ # Demonstrate low-level control with structured state
+ self.state.sentiment = "analyzing"
+ return {"sector": "tech", "timeframe": "1W"} # These parameters match the task description template
+
+ @listen(fetch_market_data)
+ def analyze_with_crew(self, market_data):
+ # Show crew agency through specialized roles
+ analyst = Agent(
+ role="Senior Market Analyst",
+ goal="Conduct deep market analysis with expert insight",
+ backstory="You're a veteran analyst known for identifying subtle market patterns"
+ )
+ researcher = Agent(
+ role="Data Researcher",
+ goal="Gather and validate supporting market data",
+ backstory="You excel at finding and correlating multiple data sources"
+ )
+
+ analysis_task = Task(
+ description="Analyze {sector} sector data for the past {timeframe}",
+ expected_output="Detailed market analysis with confidence score",
+ agent=analyst
+ )
+ research_task = Task(
+ description="Find supporting data to validate the analysis",
+ expected_output="Corroborating evidence and potential contradictions",
+ agent=researcher
+ )
+
+ # Demonstrate crew autonomy
+ analysis_crew = Crew(
+ agents=[analyst, researcher],
+ tasks=[analysis_task, research_task],
+ process=Process.sequential,
+ verbose=True
+ )
+ return analysis_crew.kickoff(inputs=market_data) # Pass market_data as named inputs
+
+ @router(analyze_with_crew)
+ def determine_next_steps(self):
+ # Show flow control with conditional routing
+ if self.state.confidence > 0.8:
+ return "high_confidence"
+ elif self.state.confidence > 0.5:
+ return "medium_confidence"
+ return "low_confidence"
+
+ @listen("high_confidence")
+ def execute_strategy(self):
+ # Demonstrate complex decision making
+ strategy_crew = Crew(
+ agents=[
+ Agent(role="Strategy Expert",
+ goal="Develop optimal market strategy")
+ ],
+ tasks=[
+ Task(description="Create detailed strategy based on analysis",
+ expected_output="Step-by-step action plan")
+ ]
+ )
+ return strategy_crew.kickoff()
+
+ @listen("medium_confidence", "low_confidence")
+ def request_additional_analysis(self):
+ self.state.recommendations.append("Gather more data")
+ return "Additional analysis required"
+```
+
+This example demonstrates how to:
+1. Use Python code for basic data operations
+2. Create and execute Crews as steps in your workflow
+3. Use Flow decorators to manage the sequence of operations
+4. Implement conditional branching based on Crew results
+
## Connecting Your Crew to a Model
CrewAI supports using various LLMs through a variety of connection options. By default your agents will use the OpenAI API when querying the model. However, there are several other ways to allow your agents to connect to models. For example, you can configure your agents to use a local model via the Ollama tool.
@@ -313,9 +457,13 @@ Please refer to the [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-
## How CrewAI Compares
-**CrewAI's Advantage**: CrewAI is built with production in mind. It offers the flexibility of Autogen's conversational agents and the structured process approach of ChatDev, but without the rigidity. CrewAI's processes are designed to be dynamic and adaptable, fitting seamlessly into both development and production workflows.
+**CrewAI's Advantage**: CrewAI combines autonomous agent intelligence with precise workflow control through its unique Crews and Flows architecture. The framework excels at both high-level orchestration and low-level customization, enabling complex, production-grade systems with granular control.
-- **Autogen**: While Autogen does good in creating conversational agents capable of working together, it lacks an inherent concept of process. In Autogen, orchestrating agents' interactions requires additional programming, which can become complex and cumbersome as the scale of tasks grows.
+- **LangGraph**: While LangGraph provides a foundation for building agent workflows, its approach requires significant boilerplate code and complex state management patterns. The framework's tight coupling with LangChain can limit flexibility when implementing custom agent behaviors or integrating with external systems.
+
+*P.S. CrewAI demonstrates significant performance advantages over LangGraph, executing 5.76x faster in certain cases like this QA task example ([see comparison](https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/QA%20Agent)) while achieving higher evaluation scores with faster completion times in certain coding tasks, like in this example ([detailed analysis](https://github.com/crewAIInc/crewAI-examples/blob/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/Coding%20Assistant/coding_assistant_eval.ipynb)).*
+
+- **Autogen**: While Autogen excels at creating conversational agents capable of working together, it lacks an inherent concept of process. In Autogen, orchestrating agents' interactions requires additional programming, which can become complex and cumbersome as the scale of tasks grows.
- **ChatDev**: ChatDev introduced the idea of processes into the realm of AI agents, but its implementation is quite rigid. Customizations in ChatDev are limited and not geared towards production environments, which can hinder scalability and flexibility in real-world applications.
@@ -440,5 +588,8 @@ A: CrewAI uses anonymous telemetry to collect usage data for improvement purpose
### Q: Where can I find examples of CrewAI in action?
A: You can find various real-life examples in the [CrewAI-examples repository](https://github.com/crewAIInc/crewAI-examples), including trip planners, stock analysis tools, and more.
+### Q: What is the difference between Crews and Flows?
+A: Crews and Flows serve different but complementary purposes in CrewAI. Crews are teams of AI agents working together to accomplish specific tasks through role-based collaboration, delivering accurate and predictable results. Flows, on the other hand, are event-driven workflows that can orchestrate both Crews and regular Python code, allowing you to build complex automation pipelines with secure state management and conditional execution paths.
+
### Q: How can I contribute to CrewAI?
A: Contributions are welcome! You can fork the repository, create a new branch for your feature, add your improvement, and send a pull request. Check the Contribution section in the README for more details.
diff --git a/docs/concepts/flows.mdx b/docs/concepts/flows.mdx
index 324118310..cf9d20f63 100644
--- a/docs/concepts/flows.mdx
+++ b/docs/concepts/flows.mdx
@@ -138,7 +138,7 @@ print("---- Final Output ----")
print(final_output)
````
-``` text Output
+```text Output
---- Final Output ----
Second method received: Output from first_method
````
diff --git a/docs/concepts/knowledge.mdx b/docs/concepts/knowledge.mdx
index 8df6f623f..ba4f54450 100644
--- a/docs/concepts/knowledge.mdx
+++ b/docs/concepts/knowledge.mdx
@@ -4,8 +4,6 @@ description: What is knowledge in CrewAI and how to use it.
icon: book
---
-# Using Knowledge in CrewAI
-
## What is Knowledge?
Knowledge in CrewAI is a powerful system that allows AI agents to access and utilize external information sources during their tasks.
@@ -36,7 +34,20 @@ CrewAI supports various types of knowledge sources out of the box:
-## Quick Start
+## Supported Knowledge Parameters
+
+| Parameter | Type | Required | Description |
+| :--------------------------- | :---------------------------------- | :------- | :---------------------------------------------------------------------------------------------------------------------------------------------------- |
+| `sources` | **List[BaseKnowledgeSource]** | Yes | List of knowledge sources that provide content to be stored and queried. Can include PDF, CSV, Excel, JSON, text files, or string content. |
+| `collection_name` | **str** | No | Name of the collection where the knowledge will be stored. Used to identify different sets of knowledge. Defaults to "knowledge" if not provided. |
+| `storage` | **Optional[KnowledgeStorage]** | No | Custom storage configuration for managing how the knowledge is stored and retrieved. If not provided, a default storage will be created. |
+
+## Quickstart Example
+
+
+For file-Based Knowledge Sources, make sure to place your files in a `knowledge` directory at the root of your project.
+Also, use relative paths from the `knowledge` directory when creating the source.
+
Here's an example using string-based knowledge:
@@ -80,7 +91,8 @@ result = crew.kickoff(inputs={"question": "What city does John live in and how o
```
-Here's another example with the `CrewDoclingSource`
+Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including TXT, PDF, DOCX, HTML, and more.
+
```python Code
from crewai import LLM, Agent, Crew, Process, Task
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
@@ -128,39 +140,192 @@ result = crew.kickoff(
)
```
+## More Examples
+
+Here are examples of how to use different types of knowledge sources:
+
+### Text File Knowledge Source
+```python
+from crewai.knowledge.source import CrewDoclingSource
+
+# Create a text file knowledge source
+text_source = CrewDoclingSource(
+ file_paths=["document.txt", "another.txt"]
+)
+
+# Create knowledge with text file source
+knowledge = Knowledge(
+ collection_name="text_knowledge",
+ sources=[text_source]
+)
+```
+
+### PDF Knowledge Source
+```python
+from crewai.knowledge.source import PDFKnowledgeSource
+
+# Create a PDF knowledge source
+pdf_source = PDFKnowledgeSource(
+ file_paths=["document.pdf", "another.pdf"]
+)
+
+# Create knowledge with PDF source
+knowledge = Knowledge(
+ collection_name="pdf_knowledge",
+ sources=[pdf_source]
+)
+```
+
+### CSV Knowledge Source
+```python
+from crewai.knowledge.source import CSVKnowledgeSource
+
+# Create a CSV knowledge source
+csv_source = CSVKnowledgeSource(
+ file_paths=["data.csv"]
+)
+
+# Create knowledge with CSV source
+knowledge = Knowledge(
+ collection_name="csv_knowledge",
+ sources=[csv_source]
+)
+```
+
+### Excel Knowledge Source
+```python
+from crewai.knowledge.source import ExcelKnowledgeSource
+
+# Create an Excel knowledge source
+excel_source = ExcelKnowledgeSource(
+ file_paths=["spreadsheet.xlsx"]
+)
+
+# Create knowledge with Excel source
+knowledge = Knowledge(
+ collection_name="excel_knowledge",
+ sources=[excel_source]
+)
+```
+
+### JSON Knowledge Source
+```python
+from crewai.knowledge.source import JSONKnowledgeSource
+
+# Create a JSON knowledge source
+json_source = JSONKnowledgeSource(
+ file_paths=["data.json"]
+)
+
+# Create knowledge with JSON source
+knowledge = Knowledge(
+ collection_name="json_knowledge",
+ sources=[json_source]
+)
+```
+
## Knowledge Configuration
### Chunking Configuration
-Control how content is split for processing by setting the chunk size and overlap.
+Knowledge sources automatically chunk content for better processing.
+You can configure chunking behavior in your knowledge sources:
-```python Code
-knowledge_source = StringKnowledgeSource(
- content="Long content...",
- chunk_size=4000, # Characters per chunk (default)
- chunk_overlap=200 # Overlap between chunks (default)
+```python
+from crewai.knowledge.source import StringKnowledgeSource
+
+source = StringKnowledgeSource(
+ content="Your content here",
+ chunk_size=4000, # Maximum size of each chunk (default: 4000)
+ chunk_overlap=200 # Overlap between chunks (default: 200)
)
```
-## Embedder Configuration
+The chunking configuration helps in:
+- Breaking down large documents into manageable pieces
+- Maintaining context through chunk overlap
+- Optimizing retrieval accuracy
-You can also configure the embedder for the knowledge store. This is useful if you want to use a different embedder for the knowledge store than the one used for the agents.
+### Embeddings Configuration
-```python Code
-...
+You can also configure the embedder for the knowledge store.
+This is useful if you want to use a different embedder for the knowledge store than the one used for the agents.
+The `embedder` parameter supports various embedding model providers that include:
+- `openai`: OpenAI's embedding models
+- `google`: Google's text embedding models
+- `azure`: Azure OpenAI embeddings
+- `ollama`: Local embeddings with Ollama
+- `vertexai`: Google Cloud VertexAI embeddings
+- `cohere`: Cohere's embedding models
+- `bedrock`: AWS Bedrock embeddings
+- `huggingface`: Hugging Face models
+- `watson`: IBM Watson embeddings
+
+Here's an example of how to configure the embedder for the knowledge store using Google's `text-embedding-004` model:
+
+```python Example
+from crewai import Agent, Task, Crew, Process, LLM
+from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
+import os
+
+# Get the GEMINI API key
+GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
+
+# Create a knowledge source
+content = "Users name is John. He is 30 years old and lives in San Francisco."
string_source = StringKnowledgeSource(
- content="Users name is John. He is 30 years old and lives in San Francisco.",
+ content=content,
)
+
+# Create an LLM with a temperature of 0 to ensure deterministic outputs
+gemini_llm = LLM(
+ model="gemini/gemini-1.5-pro-002",
+ api_key=GEMINI_API_KEY,
+ temperature=0,
+)
+
+# Create an agent with the knowledge store
+agent = Agent(
+ role="About User",
+ goal="You know everything about the user.",
+ backstory="""You are a master at understanding people and their preferences.""",
+ verbose=True,
+ allow_delegation=False,
+ llm=gemini_llm,
+)
+
+task = Task(
+ description="Answer the following questions about the user: {question}",
+ expected_output="An answer to the question.",
+ agent=agent,
+)
+
crew = Crew(
- ...
+ agents=[agent],
+ tasks=[task],
+ verbose=True,
+ process=Process.sequential,
knowledge_sources=[string_source],
embedder={
- "provider": "openai",
- "config": {"model": "text-embedding-3-small"},
- },
+ "provider": "google",
+ "config": {
+ "model": "models/text-embedding-004",
+ "api_key": GEMINI_API_KEY,
+ }
+ }
)
-```
+result = crew.kickoff(inputs={"question": "What city does John live in and how old is he?"})
+```
+```text Output
+# Agent: About User
+## Task: Answer the following questions about the user: What city does John live in and how old is he?
+
+# Agent: About User
+## Final Answer:
+John is 30 years old and lives in San Francisco.
+```
+
## Clearing Knowledge
If you need to clear the knowledge stored in CrewAI, you can use the `crewai reset-memories` command with the `--knowledge` option.
@@ -171,6 +336,58 @@ crewai reset-memories --knowledge
This is useful when you've updated your knowledge sources and want to ensure that the agents are using the most recent information.
+## Agent-Specific Knowledge
+
+While knowledge can be provided at the crew level using `crew.knowledge_sources`, individual agents can also have their own knowledge sources using the `knowledge_sources` parameter:
+
+```python Code
+from crewai import Agent, Task, Crew
+from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
+
+# Create agent-specific knowledge about a product
+product_specs = StringKnowledgeSource(
+ content="""The XPS 13 laptop features:
+ - 13.4-inch 4K display
+ - Intel Core i7 processor
+ - 16GB RAM
+ - 512GB SSD storage
+ - 12-hour battery life""",
+ metadata={"category": "product_specs"}
+)
+
+# Create a support agent with product knowledge
+support_agent = Agent(
+ role="Technical Support Specialist",
+ goal="Provide accurate product information and support.",
+ backstory="You are an expert on our laptop products and specifications.",
+ knowledge_sources=[product_specs] # Agent-specific knowledge
+)
+
+# Create a task that requires product knowledge
+support_task = Task(
+ description="Answer this customer question: {question}",
+ agent=support_agent
+)
+
+# Create and run the crew
+crew = Crew(
+ agents=[support_agent],
+ tasks=[support_task]
+)
+
+# Get answer about the laptop's specifications
+result = crew.kickoff(
+ inputs={"question": "What is the storage capacity of the XPS 13?"}
+)
+```
+
+
+ Benefits of agent-specific knowledge:
+ - Give agents specialized information for their roles
+ - Maintain separation of concerns between agents
+ - Combine with crew-level knowledge for layered information access
+
+
## Custom Knowledge Sources
CrewAI allows you to create custom knowledge sources for any type of data by extending the `BaseKnowledgeSource` class. Let's create a practical example that fetches and processes space news articles.
diff --git a/docs/how-to/multimodal-agents.mdx b/docs/how-to/multimodal-agents.mdx
new file mode 100644
index 000000000..1dcf50d25
--- /dev/null
+++ b/docs/how-to/multimodal-agents.mdx
@@ -0,0 +1,138 @@
+---
+title: Using Multimodal Agents
+description: Learn how to enable and use multimodal capabilities in your agents for processing images and other non-text content within the CrewAI framework.
+icon: image
+---
+
+# Using Multimodal Agents
+
+CrewAI supports multimodal agents that can process both text and non-text content like images. This guide will show you how to enable and use multimodal capabilities in your agents.
+
+## Enabling Multimodal Capabilities
+
+To create a multimodal agent, simply set the `multimodal` parameter to `True` when initializing your agent:
+
+```python
+from crewai import Agent
+
+agent = Agent(
+ role="Image Analyst",
+ goal="Analyze and extract insights from images",
+ backstory="An expert in visual content interpretation with years of experience in image analysis",
+ multimodal=True # This enables multimodal capabilities
+)
+```
+
+When you set `multimodal=True`, the agent is automatically configured with the necessary tools for handling non-text content, including the `AddImageTool`.
+
+## Working with Images
+
+The multimodal agent comes pre-configured with the `AddImageTool`, which allows it to process images. You don't need to manually add this tool - it's automatically included when you enable multimodal capabilities.
+
+Here's a complete example showing how to use a multimodal agent to analyze an image:
+
+```python
+from crewai import Agent, Task, Crew
+
+# Create a multimodal agent
+image_analyst = Agent(
+ role="Product Analyst",
+ goal="Analyze product images and provide detailed descriptions",
+ backstory="Expert in visual product analysis with deep knowledge of design and features",
+ multimodal=True
+)
+
+# Create a task for image analysis
+task = Task(
+ description="Analyze the product image at https://example.com/product.jpg and provide a detailed description",
+ agent=image_analyst
+)
+
+# Create and run the crew
+crew = Crew(
+ agents=[image_analyst],
+ tasks=[task]
+)
+
+result = crew.kickoff()
+```
+
+### Advanced Usage with Context
+
+You can provide additional context or specific questions about the image when creating tasks for multimodal agents. The task description can include specific aspects you want the agent to focus on:
+
+```python
+from crewai import Agent, Task, Crew
+
+# Create a multimodal agent for detailed analysis
+expert_analyst = Agent(
+ role="Visual Quality Inspector",
+ goal="Perform detailed quality analysis of product images",
+ backstory="Senior quality control expert with expertise in visual inspection",
+ multimodal=True # AddImageTool is automatically included
+)
+
+# Create a task with specific analysis requirements
+inspection_task = Task(
+ description="""
+ Analyze the product image at https://example.com/product.jpg with focus on:
+ 1. Quality of materials
+ 2. Manufacturing defects
+ 3. Compliance with standards
+ Provide a detailed report highlighting any issues found.
+ """,
+ agent=expert_analyst
+)
+
+# Create and run the crew
+crew = Crew(
+ agents=[expert_analyst],
+ tasks=[inspection_task]
+)
+
+result = crew.kickoff()
+```
+
+### Tool Details
+
+When working with multimodal agents, the `AddImageTool` is automatically configured with the following schema:
+
+```python
+class AddImageToolSchema:
+ image_url: str # Required: The URL or path of the image to process
+ action: Optional[str] = None # Optional: Additional context or specific questions about the image
+```
+
+The multimodal agent will automatically handle the image processing through its built-in tools, allowing it to:
+- Access images via URLs or local file paths
+- Process image content with optional context or specific questions
+- Provide analysis and insights based on the visual information and task requirements
+
+## Best Practices
+
+When working with multimodal agents, keep these best practices in mind:
+
+1. **Image Access**
+ - Ensure your images are accessible via URLs that the agent can reach
+ - For local images, consider hosting them temporarily or using absolute file paths
+ - Verify that image URLs are valid and accessible before running tasks
+
+2. **Task Description**
+ - Be specific about what aspects of the image you want the agent to analyze
+ - Include clear questions or requirements in the task description
+ - Consider using the optional `action` parameter for focused analysis
+
+3. **Resource Management**
+ - Image processing may require more computational resources than text-only tasks
+ - Some language models may require base64 encoding for image data
+ - Consider batch processing for multiple images to optimize performance
+
+4. **Environment Setup**
+ - Verify that your environment has the necessary dependencies for image processing
+ - Ensure your language model supports multimodal capabilities
+ - Test with small images first to validate your setup
+
+5. **Error Handling**
+ - Implement proper error handling for image loading failures
+ - Have fallback strategies for when image processing fails
+ - Monitor and log image processing operations for debugging
diff --git a/docs/how-to/portkey-observability-and-guardrails.mdx b/docs/how-to/portkey-observability-and-guardrails.mdx
new file mode 100644
index 000000000..f4f7a696e
--- /dev/null
+++ b/docs/how-to/portkey-observability-and-guardrails.mdx
@@ -0,0 +1,211 @@
+# Portkey Integration with CrewAI
+
+
+
+[Portkey](https://portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai) is a 2-line upgrade to make your CrewAI agents reliable, cost-efficient, and fast.
+
+Portkey adds 4 core production capabilities to any CrewAI agent:
+1. Routing to **200+ LLMs**
+2. Making each LLM call more robust
+3. Full-stack tracing & cost, performance analytics
+4. Real-time guardrails to enforce behavior
+
+
+
+
+
+## Getting Started
+
+1. **Install Required Packages:**
+
+```bash
+pip install -qU crewai portkey-ai
+```
+
+2. **Configure the LLM Client:**
+
+To build CrewAI Agents with Portkey, you'll need two keys:
+- **Portkey API Key**: Sign up on the [Portkey app](https://app.portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai) and copy your API key
+- **Virtual Key**: Virtual Keys securely manage your LLM API keys in one place. Store your LLM provider API keys securely in Portkey's vault
+
+```python
+from crewai import LLM
+from portkey_ai import createHeaders, PORTKEY_GATEWAY_URL
+
+gpt_llm = LLM(
+ model="gpt-4",
+ base_url=PORTKEY_GATEWAY_URL,
+ api_key="dummy", # We are using Virtual key
+ extra_headers=createHeaders(
+ api_key="YOUR_PORTKEY_API_KEY",
+ virtual_key="YOUR_VIRTUAL_KEY", # Enter your Virtual key from Portkey
+ )
+)
+```
+
+3. **Create and Run Your First Agent:**
+
+```python
+from crewai import Agent, Task, Crew
+
+# Define your agents with roles and goals
+coder = Agent(
+ role='Software developer',
+ goal='Write clear, concise code on demand',
+ backstory='An expert coder with a keen eye for software trends.',
+ llm=gpt_llm
+)
+
+# Create tasks for your agents
+task1 = Task(
+ description="Define the HTML for making a simple website with heading- Hello World! Portkey is working!",
+ expected_output="A clear and concise HTML code",
+ agent=coder
+)
+
+# Instantiate your crew
+crew = Crew(
+ agents=[coder],
+ tasks=[task1],
+)
+
+result = crew.kickoff()
+print(result)
+```
+
+
+## Key Features
+
+| Feature | Description |
+|---------|-------------|
+| π Multi-LLM Support | Access OpenAI, Anthropic, Gemini, Azure, and 250+ providers through a unified interface |
+| π‘οΈ Production Reliability | Implement retries, timeouts, load balancing, and fallbacks |
+| π Advanced Observability | Track 40+ metrics including costs, tokens, latency, and custom metadata |
+| π Comprehensive Logging | Debug with detailed execution traces and function call logs |
+| π§ Security Controls | Set budget limits and implement role-based access control |
+| π Performance Analytics | Capture and analyze feedback for continuous improvement |
+| πΎ Intelligent Caching | Reduce costs and latency with semantic or simple caching |
+
+
+## Production Features with Portkey Configs
+
+All features mentioned below are through Portkey's Config system. Portkey's Config system allows you to define routing strategies using simple JSON objects in your LLM API calls. You can create and manage Configs directly in your code or through the Portkey Dashboard. Each Config has a unique ID for easy reference.
+
+
+
+
+
+
+### 1. Use 250+ LLMs
+Access various LLMs like Anthropic, Gemini, Mistral, Azure OpenAI, and more with minimal code changes. Switch between providers or use them together seamlessly. [Learn more about Universal API](https://portkey.ai/docs/product/ai-gateway/universal-api)
+
+
+Easily switch between different LLM providers:
+
+```python
+# Anthropic Configuration
+anthropic_llm = LLM(
+ model="claude-3-5-sonnet-latest",
+ base_url=PORTKEY_GATEWAY_URL,
+ api_key="dummy",
+ extra_headers=createHeaders(
+ api_key="YOUR_PORTKEY_API_KEY",
+ virtual_key="YOUR_ANTHROPIC_VIRTUAL_KEY", #You don't need provider when using Virtual keys
+ trace_id="anthropic_agent"
+ )
+)
+
+# Azure OpenAI Configuration
+azure_llm = LLM(
+ model="gpt-4",
+ base_url=PORTKEY_GATEWAY_URL,
+ api_key="dummy",
+ extra_headers=createHeaders(
+ api_key="YOUR_PORTKEY_API_KEY",
+ virtual_key="YOUR_AZURE_VIRTUAL_KEY", #You don't need provider when using Virtual keys
+ trace_id="azure_agent"
+ )
+)
+```
+
+
+### 2. Caching
+Improve response times and reduce costs with two powerful caching modes:
+- **Simple Cache**: Perfect for exact matches
+- **Semantic Cache**: Matches responses for requests that are semantically similar
+[Learn more about Caching](https://portkey.ai/docs/product/ai-gateway/cache-simple-and-semantic)
+
+```py
+config = {
+ "cache": {
+ "mode": "semantic", # or "simple" for exact matching
+ }
+}
+```
+
+### 3. Production Reliability
+Portkey provides comprehensive reliability features:
+- **Automatic Retries**: Handle temporary failures gracefully
+- **Request Timeouts**: Prevent hanging operations
+- **Conditional Routing**: Route requests based on specific conditions
+- **Fallbacks**: Set up automatic provider failovers
+- **Load Balancing**: Distribute requests efficiently
+
+[Learn more about Reliability Features](https://portkey.ai/docs/product/ai-gateway/)
+
+
+
+### 4. Metrics
+
+Agent runs are complex. Portkey automatically logs **40+ comprehensive metrics** for your AI agents, including cost, tokens used, latency, etc. Whether you need a broad overview or granular insights into your agent runs, Portkey's customizable filters provide the metrics you need.
+
+
+- Cost per agent interaction
+- Response times and latency
+- Token usage and efficiency
+- Success/failure rates
+- Cache hit rates
+
+
+
+### 5. Detailed Logging
+Logs are essential for understanding agent behavior, diagnosing issues, and improving performance. They provide a detailed record of agent activities and tool use, which is crucial for debugging and optimizing processes.
+
+
+Access a dedicated section to view records of agent executions, including parameters, outcomes, function calls, and errors. Filter logs based on multiple parameters such as trace ID, model, tokens used, and metadata.
+
+
+ Traces
+
+
+
+
+ Logs
+
+
+
+### 6. Enterprise Security Features
+- Set budget limit and rate limts per Virtual Key (disposable API keys)
+- Implement role-based access control
+- Track system changes with audit logs
+- Configure data retention policies
+
+
+
+For detailed information on creating and managing Configs, visit the [Portkey documentation](https://docs.portkey.ai/product/ai-gateway/configs).
+
+## Resources
+
+- [π Portkey Documentation](https://docs.portkey.ai)
+- [π Portkey Dashboard](https://app.portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai)
+- [π¦ Twitter](https://twitter.com/portkeyai)
+- [π¬ Discord Community](https://discord.gg/DD7vgKK299)
+
+
+
+
+
+
+
+
+
diff --git a/docs/how-to/portkey-observability.mdx b/docs/how-to/portkey-observability.mdx
new file mode 100644
index 000000000..4002323a5
--- /dev/null
+++ b/docs/how-to/portkey-observability.mdx
@@ -0,0 +1,202 @@
+---
+title: Portkey Observability and Guardrails
+description: How to use Portkey with CrewAI
+icon: key
+---
+
+
+
+
+[Portkey](https://portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai) is a 2-line upgrade to make your CrewAI agents reliable, cost-efficient, and fast.
+
+Portkey adds 4 core production capabilities to any CrewAI agent:
+1. Routing to **200+ LLMs**
+2. Making each LLM call more robust
+3. Full-stack tracing & cost, performance analytics
+4. Real-time guardrails to enforce behavior
+
+## Getting Started
+
+
+
+ ```bash
+ pip install -qU crewai portkey-ai
+ ```
+
+
+ To build CrewAI Agents with Portkey, you'll need two keys:
+ - **Portkey API Key**: Sign up on the [Portkey app](https://app.portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai) and copy your API key
+ - **Virtual Key**: Virtual Keys securely manage your LLM API keys in one place. Store your LLM provider API keys securely in Portkey's vault
+
+ ```python
+ from crewai import LLM
+ from portkey_ai import createHeaders, PORTKEY_GATEWAY_URL
+
+ gpt_llm = LLM(
+ model="gpt-4",
+ base_url=PORTKEY_GATEWAY_URL,
+ api_key="dummy", # We are using Virtual key
+ extra_headers=createHeaders(
+ api_key="YOUR_PORTKEY_API_KEY",
+ virtual_key="YOUR_VIRTUAL_KEY", # Enter your Virtual key from Portkey
+ )
+ )
+ ```
+
+
+ ```python
+ from crewai import Agent, Task, Crew
+
+ # Define your agents with roles and goals
+ coder = Agent(
+ role='Software developer',
+ goal='Write clear, concise code on demand',
+ backstory='An expert coder with a keen eye for software trends.',
+ llm=gpt_llm
+ )
+
+ # Create tasks for your agents
+ task1 = Task(
+ description="Define the HTML for making a simple website with heading- Hello World! Portkey is working!",
+ expected_output="A clear and concise HTML code",
+ agent=coder
+ )
+
+ # Instantiate your crew
+ crew = Crew(
+ agents=[coder],
+ tasks=[task1],
+ )
+
+ result = crew.kickoff()
+ print(result)
+ ```
+
+
+
+## Key Features
+
+| Feature | Description |
+|:--------|:------------|
+| π Multi-LLM Support | Access OpenAI, Anthropic, Gemini, Azure, and 250+ providers through a unified interface |
+| π‘οΈ Production Reliability | Implement retries, timeouts, load balancing, and fallbacks |
+| π Advanced Observability | Track 40+ metrics including costs, tokens, latency, and custom metadata |
+| π Comprehensive Logging | Debug with detailed execution traces and function call logs |
+| π§ Security Controls | Set budget limits and implement role-based access control |
+| π Performance Analytics | Capture and analyze feedback for continuous improvement |
+| πΎ Intelligent Caching | Reduce costs and latency with semantic or simple caching |
+
+
+## Production Features with Portkey Configs
+
+All features mentioned below are through Portkey's Config system. Portkey's Config system allows you to define routing strategies using simple JSON objects in your LLM API calls. You can create and manage Configs directly in your code or through the Portkey Dashboard. Each Config has a unique ID for easy reference.
+
+
+
+
+
+
+### 1. Use 250+ LLMs
+Access various LLMs like Anthropic, Gemini, Mistral, Azure OpenAI, and more with minimal code changes. Switch between providers or use them together seamlessly. [Learn more about Universal API](https://portkey.ai/docs/product/ai-gateway/universal-api)
+
+
+Easily switch between different LLM providers:
+
+```python
+# Anthropic Configuration
+anthropic_llm = LLM(
+ model="claude-3-5-sonnet-latest",
+ base_url=PORTKEY_GATEWAY_URL,
+ api_key="dummy",
+ extra_headers=createHeaders(
+ api_key="YOUR_PORTKEY_API_KEY",
+ virtual_key="YOUR_ANTHROPIC_VIRTUAL_KEY", #You don't need provider when using Virtual keys
+ trace_id="anthropic_agent"
+ )
+)
+
+# Azure OpenAI Configuration
+azure_llm = LLM(
+ model="gpt-4",
+ base_url=PORTKEY_GATEWAY_URL,
+ api_key="dummy",
+ extra_headers=createHeaders(
+ api_key="YOUR_PORTKEY_API_KEY",
+ virtual_key="YOUR_AZURE_VIRTUAL_KEY", #You don't need provider when using Virtual keys
+ trace_id="azure_agent"
+ )
+)
+```
+
+
+### 2. Caching
+Improve response times and reduce costs with two powerful caching modes:
+- **Simple Cache**: Perfect for exact matches
+- **Semantic Cache**: Matches responses for requests that are semantically similar
+[Learn more about Caching](https://portkey.ai/docs/product/ai-gateway/cache-simple-and-semantic)
+
+```py
+config = {
+ "cache": {
+ "mode": "semantic", # or "simple" for exact matching
+ }
+}
+```
+
+### 3. Production Reliability
+Portkey provides comprehensive reliability features:
+- **Automatic Retries**: Handle temporary failures gracefully
+- **Request Timeouts**: Prevent hanging operations
+- **Conditional Routing**: Route requests based on specific conditions
+- **Fallbacks**: Set up automatic provider failovers
+- **Load Balancing**: Distribute requests efficiently
+
+[Learn more about Reliability Features](https://portkey.ai/docs/product/ai-gateway/)
+
+
+
+### 4. Metrics
+
+Agent runs are complex. Portkey automatically logs **40+ comprehensive metrics** for your AI agents, including cost, tokens used, latency, etc. Whether you need a broad overview or granular insights into your agent runs, Portkey's customizable filters provide the metrics you need.
+
+
+- Cost per agent interaction
+- Response times and latency
+- Token usage and efficiency
+- Success/failure rates
+- Cache hit rates
+
+
+
+### 5. Detailed Logging
+Logs are essential for understanding agent behavior, diagnosing issues, and improving performance. They provide a detailed record of agent activities and tool use, which is crucial for debugging and optimizing processes.
+
+
+Access a dedicated section to view records of agent executions, including parameters, outcomes, function calls, and errors. Filter logs based on multiple parameters such as trace ID, model, tokens used, and metadata.
+
+
+ Traces
+
+
+
+
+ Logs
+
+
+
+### 6. Enterprise Security Features
+- Set budget limit and rate limts per Virtual Key (disposable API keys)
+- Implement role-based access control
+- Track system changes with audit logs
+- Configure data retention policies
+
+
+
+For detailed information on creating and managing Configs, visit the [Portkey documentation](https://docs.portkey.ai/product/ai-gateway/configs).
+
+## Resources
+
+- [π Portkey Documentation](https://docs.portkey.ai)
+- [π Portkey Dashboard](https://app.portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai)
+- [π¦ Twitter](https://twitter.com/portkeyai)
+- [π¬ Discord Community](https://discord.gg/DD7vgKK299)
diff --git a/docs/mint.json b/docs/mint.json
index fad9689b8..9103434b4 100644
--- a/docs/mint.json
+++ b/docs/mint.json
@@ -100,7 +100,8 @@
"how-to/conditional-tasks",
"how-to/agentops-observability",
"how-to/langtrace-observability",
- "how-to/openlit-observability"
+ "how-to/openlit-observability",
+ "how-to/portkey-observability"
]
},
{
diff --git a/pyproject.toml b/pyproject.toml
index f9533a6f9..bcc00a0d9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -8,27 +8,38 @@ authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
dependencies = [
+ # Core Dependencies
"pydantic>=2.4.2",
"openai>=1.13.3",
+ "litellm>=1.44.22",
+ "instructor>=1.3.3",
+
+ # Text Processing
+ "pdfplumber>=0.11.4",
+ "regex>=2024.9.11",
+
+ # Telemetry and Monitoring
"opentelemetry-api>=1.22.0",
"opentelemetry-sdk>=1.22.0",
"opentelemetry-exporter-otlp-proto-http>=1.22.0",
- "instructor>=1.3.3",
- "regex>=2024.9.11",
- "click>=8.1.7",
+
+ # Data Handling
+ "chromadb>=0.5.23",
+ "openpyxl>=3.1.5",
+ "pyvis>=0.3.2",
+
+ # Authentication and Security
+ "auth0-python>=4.7.1",
"python-dotenv>=1.0.0",
+
+ # Configuration and Utils
+ "click>=8.1.7",
"appdirs>=1.4.4",
"jsonref>=1.1.0",
"json-repair>=0.25.2",
- "auth0-python>=4.7.1",
- "litellm>=1.44.22",
- "pyvis>=0.3.2",
"uv>=0.4.25",
"tomli-w>=1.1.0",
"tomli>=2.0.2",
- "chromadb>=0.5.23",
- "pdfplumber>=0.11.4",
- "openpyxl>=3.1.5",
"blinker>=1.9.0",
]
@@ -39,6 +50,9 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools>=0.17.0"]
+embeddings = [
+ "tiktoken~=0.7.0"
+]
agentops = ["agentops>=0.3.0"]
fastembed = ["fastembed>=0.4.1"]
pdfplumber = [
@@ -67,7 +81,6 @@ dev-dependencies = [
"mkdocs-material-extensions>=1.3.1",
"pillow>=10.2.0",
"cairosvg>=2.7.1",
- "crewai-tools>=0.17.0",
"pytest>=8.0.0",
"pytest-vcr>=1.0.2",
"python-dotenv>=1.0.0",
diff --git a/src/crewai/agent.py b/src/crewai/agent.py
index 7d86f3d17..62e8dc435 100644
--- a/src/crewai/agent.py
+++ b/src/crewai/agent.py
@@ -17,6 +17,7 @@ from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
+from crewai.tools.base_tool import Tool
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
@@ -115,6 +116,10 @@ class Agent(BaseAgent):
default=2,
description="Maximum number of retries for an agent to execute a task when an error occurs.",
)
+ multimodal: bool = Field(
+ default=False,
+ description="Whether the agent is multimodal.",
+ )
code_execution_mode: Literal["safe", "unsafe"] = Field(
default="safe",
description="Mode for code execution: 'safe' (using Docker) or 'unsafe' (direct execution).",
@@ -329,6 +334,10 @@ class Agent(BaseAgent):
tools = agent_tools.tools()
return tools
+ def get_multimodal_tools(self) -> List[Tool]:
+ from crewai.tools.agent_tools.add_image_tool import AddImageTool
+ return [AddImageTool()]
+
def get_code_execution_tools(self):
try:
from crewai_tools import CodeInterpreterTool
diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py
index eb0ff7c5a..813ac8a08 100644
--- a/src/crewai/agents/crew_agent_executor.py
+++ b/src/crewai/agents/crew_agent_executor.py
@@ -143,10 +143,20 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
tool_result = self._execute_tool_and_check_finality(
formatted_answer
)
- if self.step_callback:
- self.step_callback(tool_result)
- formatted_answer.text += f"\nObservation: {tool_result.result}"
+ # Directly append the result to the messages if the
+ # tool is "Add image to content" in case of multimodal
+ # agents
+ if formatted_answer.tool == self._i18n.tools("add_image")["name"]:
+ self.messages.append(tool_result.result)
+ continue
+
+ else:
+ if self.step_callback:
+ self.step_callback(tool_result)
+
+ formatted_answer.text += f"\nObservation: {tool_result.result}"
+
formatted_answer.result = tool_result.result
if tool_result.result_as_answer:
return AgentFinish(
diff --git a/src/crewai/crew.py b/src/crewai/crew.py
index 5568e8f36..1d77f2e73 100644
--- a/src/crewai/crew.py
+++ b/src/crewai/crew.py
@@ -37,6 +37,7 @@ from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.types.crew_chat import ChatInputs
+from crewai.tools.base_tool import Tool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -543,9 +544,6 @@ class Crew(BaseModel):
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
- if agent.allow_code_execution: # type: ignore # BaseAgent" has no attribute "allow_code_execution"
- agent.tools += agent.get_code_execution_tools() # type: ignore # "BaseAgent" has no attribute "get_code_execution_tools"; maybe "get_delegation_tools"?
-
if not agent.step_callback: # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.step_callback = self.step_callback # type: ignore # "BaseAgent" has no attribute "step_callback"
@@ -682,7 +680,6 @@ class Crew(BaseModel):
)
manager.tools = []
raise Exception("Manager agent should not have tools")
- manager.tools = self.manager_agent.get_delegation_tools(self.agents)
else:
self.manager_llm = (
getattr(self.manager_llm, "model_name", None)
@@ -694,6 +691,7 @@ class Crew(BaseModel):
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
+ allow_delegation=True,
llm=self.manager_llm,
verbose=self.verbose,
)
@@ -736,7 +734,10 @@ class Crew(BaseModel):
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
)
- self._prepare_agent_tools(task)
+ # Determine which tools to use - task tools take precedence over agent tools
+ tools_for_task = task.tools or agent_to_use.tools or []
+ tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
+
self._log_task_start(task, agent_to_use.role)
if isinstance(task, ConditionalTask):
@@ -753,7 +754,7 @@ class Crew(BaseModel):
future = task.execute_async(
agent=agent_to_use,
context=context,
- tools=agent_to_use.tools,
+ tools=tools_for_task,
)
futures.append((task, future, task_index))
else:
@@ -765,7 +766,7 @@ class Crew(BaseModel):
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
- tools=agent_to_use.tools,
+ tools=tools_for_task,
)
task_outputs = [task_output]
self._process_task_result(task, task_output)
@@ -802,45 +803,77 @@ class Crew(BaseModel):
return skipped_task_output
return None
- def _prepare_agent_tools(self, task: Task):
- if self.process == Process.hierarchical:
- if self.manager_agent:
- self._update_manager_tools(task)
- else:
- raise ValueError("Manager agent is required for hierarchical process.")
- elif task.agent and task.agent.allow_delegation:
- self._add_delegation_tools(task)
+ def _prepare_tools(
+ self, agent: BaseAgent, task: Task, tools: List[Tool]
+ ) -> List[Tool]:
+ # Add delegation tools if agent allows delegation
+ if agent.allow_delegation:
+ if self.process == Process.hierarchical:
+ if self.manager_agent:
+ tools = self._update_manager_tools(task, tools)
+ else:
+ raise ValueError(
+ "Manager agent is required for hierarchical process."
+ )
+
+ elif agent and agent.allow_delegation:
+ tools = self._add_delegation_tools(task, tools)
+
+ # Add code execution tools if agent allows code execution
+ if agent.allow_code_execution:
+ tools = self._add_code_execution_tools(agent, tools)
+
+ if agent and agent.multimodal:
+ tools = self._add_multimodal_tools(agent, tools)
+
+ return tools
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
return self.manager_agent
return task.agent
- def _add_delegation_tools(self, task: Task):
+ def _merge_tools(
+ self, existing_tools: List[Tool], new_tools: List[Tool]
+ ) -> List[Tool]:
+ """Merge new tools into existing tools list, avoiding duplicates by tool name."""
+ if not new_tools:
+ return existing_tools
+
+ # Create mapping of tool names to new tools
+ new_tool_map = {tool.name: tool for tool in new_tools}
+
+ # Remove any existing tools that will be replaced
+ tools = [tool for tool in existing_tools if tool.name not in new_tool_map]
+
+ # Add all new tools
+ tools.extend(new_tools)
+
+ return tools
+
+ def _inject_delegation_tools(
+ self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]
+ ):
+ delegation_tools = task_agent.get_delegation_tools(agents)
+ return self._merge_tools(tools, delegation_tools)
+
+ def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]):
+ multimodal_tools = agent.get_multimodal_tools()
+ return self._merge_tools(tools, multimodal_tools)
+
+ def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]):
+ code_tools = agent.get_code_execution_tools()
+ return self._merge_tools(tools, code_tools)
+
+ def _add_delegation_tools(self, task: Task, tools: List[Tool]):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
- delegation_tools = task.agent.get_delegation_tools(agents_for_delegation)
-
- # Add tools if they are not already in task.tools
- for new_tool in delegation_tools:
- # Find the index of the tool with the same name
- existing_tool_index = next(
- (
- index
- for index, tool in enumerate(task.tools or [])
- if tool.name == new_tool.name
- ),
- None,
- )
- if not task.tools:
- task.tools = []
-
- if existing_tool_index is not None:
- # Replace the existing tool
- task.tools[existing_tool_index] = new_tool
- else:
- # Add the new tool
- task.tools.append(new_tool)
+ if not tools:
+ tools = []
+ tools = self._inject_delegation_tools(
+ tools, task.agent, agents_for_delegation
+ )
+ return tools
def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file:
@@ -848,14 +881,15 @@ class Crew(BaseModel):
task_name=task.name, task=task.description, agent=role, status="started"
)
- def _update_manager_tools(self, task: Task):
+ def _update_manager_tools(self, task: Task, tools: List[Tool]):
if self.manager_agent:
if task.agent:
- self.manager_agent.tools = task.agent.get_delegation_tools([task.agent])
+ tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
else:
- self.manager_agent.tools = self.manager_agent.get_delegation_tools(
- self.agents
+ tools = self._inject_delegation_tools(
+ tools, self.manager_agent, self.agents
)
+ return tools
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
context = (
diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py
index ccc76dc95..dc46aa6d8 100644
--- a/src/crewai/flow/flow.py
+++ b/src/crewai/flow/flow.py
@@ -30,7 +30,47 @@ from crewai.telemetry import Telemetry
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
-def start(condition=None):
+def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
+ """
+ Marks a method as a flow's starting point.
+
+ This decorator designates a method as an entry point for the flow execution.
+ It can optionally specify conditions that trigger the start based on other
+ method executions.
+
+ Parameters
+ ----------
+ condition : Optional[Union[str, dict, Callable]], optional
+ Defines when the start method should execute. Can be:
+ - str: Name of a method that triggers this start
+ - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
+ - Callable: A method reference that triggers this start
+ Default is None, meaning unconditional start.
+
+ Returns
+ -------
+ Callable
+ A decorator function that marks the method as a flow start point.
+
+ Raises
+ ------
+ ValueError
+ If the condition format is invalid.
+
+ Examples
+ --------
+ >>> @start() # Unconditional start
+ >>> def begin_flow(self):
+ ... pass
+
+ >>> @start("method_name") # Start after specific method
+ >>> def conditional_start(self):
+ ... pass
+
+ >>> @start(and_("method1", "method2")) # Start after multiple methods
+ >>> def complex_start(self):
+ ... pass
+ """
def decorator(func):
func.__is_start_method__ = True
if condition is not None:
@@ -55,8 +95,42 @@ def start(condition=None):
return decorator
+def listen(condition: Union[str, dict, Callable]) -> Callable:
+ """
+ Creates a listener that executes when specified conditions are met.
-def listen(condition):
+ This decorator sets up a method to execute in response to other method
+ executions in the flow. It supports both simple and complex triggering
+ conditions.
+
+ Parameters
+ ----------
+ condition : Union[str, dict, Callable]
+ Specifies when the listener should execute. Can be:
+ - str: Name of a method that triggers this listener
+ - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
+ - Callable: A method reference that triggers this listener
+
+ Returns
+ -------
+ Callable
+ A decorator function that sets up the method as a listener.
+
+ Raises
+ ------
+ ValueError
+ If the condition format is invalid.
+
+ Examples
+ --------
+ >>> @listen("process_data") # Listen to single method
+ >>> def handle_processed_data(self):
+ ... pass
+
+ >>> @listen(or_("success", "failure")) # Listen to multiple methods
+ >>> def handle_completion(self):
+ ... pass
+ """
def decorator(func):
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
@@ -80,16 +154,103 @@ def listen(condition):
return decorator
-def router(method):
+def router(condition: Union[str, dict, Callable]) -> Callable:
+ """
+ Creates a routing method that directs flow execution based on conditions.
+
+ This decorator marks a method as a router, which can dynamically determine
+ the next steps in the flow based on its return value. Routers are triggered
+ by specified conditions and can return constants that determine which path
+ the flow should take.
+
+ Parameters
+ ----------
+ condition : Union[str, dict, Callable]
+ Specifies when the router should execute. Can be:
+ - str: Name of a method that triggers this router
+ - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
+ - Callable: A method reference that triggers this router
+
+ Returns
+ -------
+ Callable
+ A decorator function that sets up the method as a router.
+
+ Raises
+ ------
+ ValueError
+ If the condition format is invalid.
+
+ Examples
+ --------
+ >>> @router("check_status")
+ >>> def route_based_on_status(self):
+ ... if self.state.status == "success":
+ ... return SUCCESS
+ ... return FAILURE
+
+ >>> @router(and_("validate", "process"))
+ >>> def complex_routing(self):
+ ... if all([self.state.valid, self.state.processed]):
+ ... return CONTINUE
+ ... return STOP
+ """
def decorator(func):
func.__is_router__ = True
- func.__router_for__ = method.__name__
+ if isinstance(condition, str):
+ func.__trigger_methods__ = [condition]
+ func.__condition_type__ = "OR"
+ elif (
+ isinstance(condition, dict)
+ and "type" in condition
+ and "methods" in condition
+ ):
+ func.__trigger_methods__ = condition["methods"]
+ func.__condition_type__ = condition["type"]
+ elif callable(condition) and hasattr(condition, "__name__"):
+ func.__trigger_methods__ = [condition.__name__]
+ func.__condition_type__ = "OR"
+ else:
+ raise ValueError(
+ "Condition must be a method, string, or a result of or_() or and_()"
+ )
return func
return decorator
+def or_(*conditions: Union[str, dict, Callable]) -> dict:
+ """
+ Combines multiple conditions with OR logic for flow control.
-def or_(*conditions):
+ Creates a condition that is satisfied when any of the specified conditions
+ are met. This is used with @start, @listen, or @router decorators to create
+ complex triggering conditions.
+
+ Parameters
+ ----------
+ *conditions : Union[str, dict, Callable]
+ Variable number of conditions that can be:
+ - str: Method names
+ - dict: Existing condition dictionaries
+ - Callable: Method references
+
+ Returns
+ -------
+ dict
+ A condition dictionary with format:
+ {"type": "OR", "methods": list_of_method_names}
+
+ Raises
+ ------
+ ValueError
+ If any condition is invalid.
+
+ Examples
+ --------
+ >>> @listen(or_("success", "timeout"))
+ >>> def handle_completion(self):
+ ... pass
+ """
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
@@ -103,7 +264,39 @@ def or_(*conditions):
return {"type": "OR", "methods": methods}
-def and_(*conditions):
+def and_(*conditions: Union[str, dict, Callable]) -> dict:
+ """
+ Combines multiple conditions with AND logic for flow control.
+
+ Creates a condition that is satisfied only when all specified conditions
+ are met. This is used with @start, @listen, or @router decorators to create
+ complex triggering conditions.
+
+ Parameters
+ ----------
+ *conditions : Union[str, dict, Callable]
+ Variable number of conditions that can be:
+ - str: Method names
+ - dict: Existing condition dictionaries
+ - Callable: Method references
+
+ Returns
+ -------
+ dict
+ A condition dictionary with format:
+ {"type": "AND", "methods": list_of_method_names}
+
+ Raises
+ ------
+ ValueError
+ If any condition is invalid.
+
+ Examples
+ --------
+ >>> @listen(and_("validated", "processed"))
+ >>> def handle_complete_data(self):
+ ... pass
+ """
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
@@ -123,8 +316,8 @@ class FlowMeta(type):
start_methods = []
listeners = {}
- routers = {}
router_paths = {}
+ routers = set()
for attr_name, attr_value in dct.items():
if hasattr(attr_value, "__is_start_method__"):
@@ -137,18 +330,11 @@ class FlowMeta(type):
methods = attr_value.__trigger_methods__
condition_type = getattr(attr_value, "__condition_type__", "OR")
listeners[attr_name] = (condition_type, methods)
-
- elif hasattr(attr_value, "__is_router__"):
- routers[attr_value.__router_for__] = attr_name
- possible_returns = get_possible_return_constants(attr_value)
- if possible_returns:
- router_paths[attr_name] = possible_returns
-
- # Register router as a listener to its triggering method
- trigger_method_name = attr_value.__router_for__
- methods = [trigger_method_name]
- condition_type = "OR"
- listeners[attr_name] = (condition_type, methods)
+ if hasattr(attr_value, "__is_router__") and attr_value.__is_router__:
+ routers.add(attr_name)
+ possible_returns = get_possible_return_constants(attr_value)
+ if possible_returns:
+ router_paths[attr_name] = possible_returns
setattr(cls, "_start_methods", start_methods)
setattr(cls, "_listeners", listeners)
@@ -163,7 +349,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
_start_methods: List[str] = []
_listeners: Dict[str, tuple[str, List[str]]] = {}
- _routers: Dict[str, str] = {}
+ _routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
event_emitter = Signal("event_emitter")
@@ -210,20 +396,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
return self._method_outputs
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
- """
- Initializes or updates the state with the provided inputs.
-
- Args:
- inputs: Dictionary of inputs to initialize or update the state.
-
- Raises:
- ValueError: If inputs do not match the structured state model.
- TypeError: If state is neither a BaseModel instance nor a dictionary.
- """
if isinstance(self._state, BaseModel):
- # Structured state management
+ # Structured state
try:
- # Define a function to create the dynamic class
+
def create_model_with_extra_forbid(
base_model: Type[BaseModel],
) -> Type[BaseModel]:
@@ -233,34 +409,20 @@ class Flow(Generic[T], metaclass=FlowMeta):
return ModelWithExtraForbid
- # Create the dynamic class
ModelWithExtraForbid = create_model_with_extra_forbid(
self._state.__class__
)
-
- # Create a new instance using the combined state and inputs
self._state = cast(
T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs})
)
-
except ValidationError as e:
raise ValueError(f"Invalid inputs for structured state: {e}") from e
elif isinstance(self._state, dict):
- # Unstructured state management
self._state.update(inputs)
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
- """
- Starts the execution of the flow synchronously.
-
- Args:
- inputs: Optional dictionary of inputs to initialize or update the state.
-
- Returns:
- The final output from the flow execution.
- """
self.event_emitter.send(
self,
event=FlowStartedEvent(
@@ -274,15 +436,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
return asyncio.run(self.kickoff_async())
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
- """
- Starts the execution of the flow asynchronously.
-
- Args:
- inputs: Optional dictionary of inputs to initialize or update the state.
-
- Returns:
- The final output from the flow execution.
- """
if not self._start_methods:
raise ValueError("No start method defined")
@@ -290,16 +443,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
self.__class__.__name__, list(self._methods.keys())
)
- # Create tasks for all start methods
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
-
- # Run all start methods concurrently
await asyncio.gather(*tasks)
- # Determine the final output (from the last executed method)
final_output = self._method_outputs[-1] if self._method_outputs else None
self.event_emitter.send(
@@ -310,10 +459,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
result=final_output,
),
)
-
return final_output
async def _execute_start_method(self, start_method_name: str) -> None:
+ """
+ Executes a flow's start method and its triggered listeners.
+
+ This internal method handles the execution of methods marked with @start
+ decorator and manages the subsequent chain of listener executions.
+
+ Parameters
+ ----------
+ start_method_name : str
+ The name of the start method to execute.
+
+ Notes
+ -----
+ - Executes the start method and captures its result
+ - Triggers execution of any listeners waiting on this start method
+ - Part of the flow's initialization sequence
+ """
result = await self._execute_method(
start_method_name, self._methods[start_method_name]
)
@@ -327,51 +492,146 @@ class Flow(Generic[T], metaclass=FlowMeta):
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
- self._method_outputs.append(result) # Store the output
-
- # Track method execution counts
+ self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
-
return result
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
- listener_tasks = []
+ """
+ Executes all listeners and routers triggered by a method completion.
- if trigger_method in self._routers:
- router_method = self._methods[self._routers[trigger_method]]
- path = await self._execute_method(
- self._routers[trigger_method], router_method
+ This internal method manages the execution flow by:
+ 1. First executing all triggered routers sequentially
+ 2. Then executing all triggered listeners in parallel
+
+ Parameters
+ ----------
+ trigger_method : str
+ The name of the method that triggered these listeners.
+ result : Any
+ The result from the triggering method, passed to listeners
+ that accept parameters.
+
+ Notes
+ -----
+ - Routers are executed sequentially to maintain flow control
+ - Each router's result becomes the new trigger_method
+ - Normal listeners are executed in parallel for efficiency
+ - Listeners can receive the trigger method's result as a parameter
+ """
+ # First, handle routers repeatedly until no router triggers anymore
+ while True:
+ routers_triggered = self._find_triggered_methods(
+ trigger_method, router_only=True
)
- trigger_method = path
+ if not routers_triggered:
+ break
+ for router_name in routers_triggered:
+ await self._execute_single_listener(router_name, result)
+ # After executing router, the router's result is the path
+ # The last router executed sets the trigger_method
+ # The router result is the last element in self._method_outputs
+ trigger_method = self._method_outputs[-1]
+ # Now that no more routers are triggered by current trigger_method,
+ # execute normal listeners
+ listeners_triggered = self._find_triggered_methods(
+ trigger_method, router_only=False
+ )
+ if listeners_triggered:
+ tasks = [
+ self._execute_single_listener(listener_name, result)
+ for listener_name in listeners_triggered
+ ]
+ await asyncio.gather(*tasks)
+
+ def _find_triggered_methods(
+ self, trigger_method: str, router_only: bool
+ ) -> List[str]:
+ """
+ Finds all methods that should be triggered based on conditions.
+
+ This internal method evaluates both OR and AND conditions to determine
+ which methods should be executed next in the flow.
+
+ Parameters
+ ----------
+ trigger_method : str
+ The name of the method that just completed execution.
+ router_only : bool
+ If True, only consider router methods.
+ If False, only consider non-router methods.
+
+ Returns
+ -------
+ List[str]
+ Names of methods that should be triggered.
+
+ Notes
+ -----
+ - Handles both OR and AND conditions:
+ * OR: Triggers if any condition is met
+ * AND: Triggers only when all conditions are met
+ - Maintains state for AND conditions using _pending_and_listeners
+ - Separates router and normal listener evaluation
+ """
+ triggered = []
for listener_name, (condition_type, methods) in self._listeners.items():
+ is_router = listener_name in self._routers
+
+ if router_only != is_router:
+ continue
+
if condition_type == "OR":
+ # If the trigger_method matches any in methods, run this
if trigger_method in methods:
- # Schedule the listener without preventing re-execution
- listener_tasks.append(
- self._execute_single_listener(listener_name, result)
- )
+ triggered.append(listener_name)
elif condition_type == "AND":
# Initialize pending methods for this listener if not already done
if listener_name not in self._pending_and_listeners:
self._pending_and_listeners[listener_name] = set(methods)
# Remove the trigger method from pending methods
- self._pending_and_listeners[listener_name].discard(trigger_method)
+ if trigger_method in self._pending_and_listeners[listener_name]:
+ self._pending_and_listeners[listener_name].discard(trigger_method)
+
if not self._pending_and_listeners[listener_name]:
# All required methods have been executed
- listener_tasks.append(
- self._execute_single_listener(listener_name, result)
- )
+ triggered.append(listener_name)
# Reset pending methods for this listener
self._pending_and_listeners.pop(listener_name, None)
- # Run all listener tasks concurrently and wait for them to complete
- if listener_tasks:
- await asyncio.gather(*listener_tasks)
+ return triggered
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
+ """
+ Executes a single listener method with proper event handling.
+
+ This internal method manages the execution of an individual listener,
+ including parameter inspection, event emission, and error handling.
+
+ Parameters
+ ----------
+ listener_name : str
+ The name of the listener method to execute.
+ result : Any
+ The result from the triggering method, which may be passed
+ to the listener if it accepts parameters.
+
+ Notes
+ -----
+ - Inspects method signature to determine if it accepts the trigger result
+ - Emits events for method execution start and finish
+ - Handles errors gracefully with detailed logging
+ - Recursively triggers listeners of this listener
+ - Supports both parameterized and parameter-less listeners
+
+ Error Handling
+ -------------
+ Catches and logs any exceptions during execution, preventing
+ individual listener failures from breaking the entire flow.
+ """
try:
method = self._methods[listener_name]
@@ -386,17 +646,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
sig = inspect.signature(method)
params = list(sig.parameters.values())
-
- # Exclude 'self' parameter
method_params = [p for p in params if p.name != "self"]
if method_params:
- # If listener expects parameters, pass the result
listener_result = await self._execute_method(
listener_name, method, result
)
else:
- # If listener does not expect parameters, call without arguments
listener_result = await self._execute_method(listener_name, method)
self.event_emitter.send(
@@ -408,8 +664,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
),
)
- # Execute listeners of this listener
+ # Execute listeners (and possibly routers) of this listener
await self._execute_listeners(listener_name, listener_result)
+
except Exception as e:
print(
f"[Flow._execute_single_listener] Error in method {listener_name}: {e}"
@@ -422,5 +679,4 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._telemetry.flow_plotting_span(
self.__class__.__name__, list(self._methods.keys())
)
-
plot_flow(self, filename)
diff --git a/src/crewai/flow/flow_visualizer.py b/src/crewai/flow/flow_visualizer.py
index 988f27919..a70e91a18 100644
--- a/src/crewai/flow/flow_visualizer.py
+++ b/src/crewai/flow/flow_visualizer.py
@@ -1,12 +1,14 @@
# flow_visualizer.py
import os
+from pathlib import Path
from pyvis.network import Network
from crewai.flow.config import COLORS, NODE_STYLES
from crewai.flow.html_template_handler import HTMLTemplateHandler
from crewai.flow.legend_generator import generate_legend_items_html, get_legend_items
+from crewai.flow.path_utils import safe_path_join, validate_path_exists
from crewai.flow.utils import calculate_node_levels
from crewai.flow.visualization_utils import (
add_edges,
@@ -16,89 +18,209 @@ from crewai.flow.visualization_utils import (
class FlowPlot:
+ """Handles the creation and rendering of flow visualization diagrams."""
+
def __init__(self, flow):
+ """
+ Initialize FlowPlot with a flow object.
+
+ Parameters
+ ----------
+ flow : Flow
+ A Flow instance to visualize.
+
+ Raises
+ ------
+ ValueError
+ If flow object is invalid or missing required attributes.
+ """
+ if not hasattr(flow, '_methods'):
+ raise ValueError("Invalid flow object: missing '_methods' attribute")
+ if not hasattr(flow, '_listeners'):
+ raise ValueError("Invalid flow object: missing '_listeners' attribute")
+ if not hasattr(flow, '_start_methods'):
+ raise ValueError("Invalid flow object: missing '_start_methods' attribute")
+
self.flow = flow
self.colors = COLORS
self.node_styles = NODE_STYLES
def plot(self, filename):
- net = Network(
- directed=True,
- height="750px",
- width="100%",
- bgcolor=self.colors["bg"],
- layout=None,
- )
-
- # Set options to disable physics
- net.set_options(
- """
- var options = {
- "nodes": {
- "font": {
- "multi": "html"
- }
- },
- "physics": {
- "enabled": false
- }
- }
"""
- )
+ Generate and save an HTML visualization of the flow.
- # Calculate levels for nodes
- node_levels = calculate_node_levels(self.flow)
+ Parameters
+ ----------
+ filename : str
+ Name of the output file (without extension).
- # Compute positions
- node_positions = compute_positions(self.flow, node_levels)
+ Raises
+ ------
+ ValueError
+ If filename is invalid or network generation fails.
+ IOError
+ If file operations fail or visualization cannot be generated.
+ RuntimeError
+ If network visualization generation fails.
+ """
+ if not filename or not isinstance(filename, str):
+ raise ValueError("Filename must be a non-empty string")
+
+ try:
+ # Initialize network
+ net = Network(
+ directed=True,
+ height="750px",
+ width="100%",
+ bgcolor=self.colors["bg"],
+ layout=None,
+ )
- # Add nodes to the network
- add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
+ # Set options to disable physics
+ net.set_options(
+ """
+ var options = {
+ "nodes": {
+ "font": {
+ "multi": "html"
+ }
+ },
+ "physics": {
+ "enabled": false
+ }
+ }
+ """
+ )
- # Add edges to the network
- add_edges(net, self.flow, node_positions, self.colors)
+ # Calculate levels for nodes
+ try:
+ node_levels = calculate_node_levels(self.flow)
+ except Exception as e:
+ raise ValueError(f"Failed to calculate node levels: {str(e)}")
- network_html = net.generate_html()
- final_html_content = self._generate_final_html(network_html)
+ # Compute positions
+ try:
+ node_positions = compute_positions(self.flow, node_levels)
+ except Exception as e:
+ raise ValueError(f"Failed to compute node positions: {str(e)}")
- # Save the final HTML content to the file
- with open(f"{filename}.html", "w", encoding="utf-8") as f:
- f.write(final_html_content)
- print(f"Plot saved as {filename}.html")
+ # Add nodes to the network
+ try:
+ add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
+ except Exception as e:
+ raise RuntimeError(f"Failed to add nodes to network: {str(e)}")
- self._cleanup_pyvis_lib()
+ # Add edges to the network
+ try:
+ add_edges(net, self.flow, node_positions, self.colors)
+ except Exception as e:
+ raise RuntimeError(f"Failed to add edges to network: {str(e)}")
+
+ # Generate HTML
+ try:
+ network_html = net.generate_html()
+ final_html_content = self._generate_final_html(network_html)
+ except Exception as e:
+ raise RuntimeError(f"Failed to generate network visualization: {str(e)}")
+
+ # Save the final HTML content to the file
+ try:
+ with open(f"{filename}.html", "w", encoding="utf-8") as f:
+ f.write(final_html_content)
+ print(f"Plot saved as {filename}.html")
+ except IOError as e:
+ raise IOError(f"Failed to save flow visualization to {filename}.html: {str(e)}")
+
+ except (ValueError, RuntimeError, IOError) as e:
+ raise e
+ except Exception as e:
+ raise RuntimeError(f"Unexpected error during flow visualization: {str(e)}")
+ finally:
+ self._cleanup_pyvis_lib()
def _generate_final_html(self, network_html):
- # Extract just the body content from the generated HTML
- current_dir = os.path.dirname(__file__)
- template_path = os.path.join(
- current_dir, "assets", "crewai_flow_visual_template.html"
- )
- logo_path = os.path.join(current_dir, "assets", "crewai_logo.svg")
+ """
+ Generate the final HTML content with network visualization and legend.
- html_handler = HTMLTemplateHandler(template_path, logo_path)
- network_body = html_handler.extract_body_content(network_html)
+ Parameters
+ ----------
+ network_html : str
+ HTML content generated by pyvis Network.
- # Generate the legend items HTML
- legend_items = get_legend_items(self.colors)
- legend_items_html = generate_legend_items_html(legend_items)
- final_html_content = html_handler.generate_final_html(
- network_body, legend_items_html
- )
- return final_html_content
+ Returns
+ -------
+ str
+ Complete HTML content with styling and legend.
+
+ Raises
+ ------
+ IOError
+ If template or logo files cannot be accessed.
+ ValueError
+ If network_html is invalid.
+ """
+ if not network_html:
+ raise ValueError("Invalid network HTML content")
+
+ try:
+ # Extract just the body content from the generated HTML
+ current_dir = os.path.dirname(__file__)
+ template_path = safe_path_join("assets", "crewai_flow_visual_template.html", root=current_dir)
+ logo_path = safe_path_join("assets", "crewai_logo.svg", root=current_dir)
+
+ if not os.path.exists(template_path):
+ raise IOError(f"Template file not found: {template_path}")
+ if not os.path.exists(logo_path):
+ raise IOError(f"Logo file not found: {logo_path}")
+
+ html_handler = HTMLTemplateHandler(template_path, logo_path)
+ network_body = html_handler.extract_body_content(network_html)
+
+ # Generate the legend items HTML
+ legend_items = get_legend_items(self.colors)
+ legend_items_html = generate_legend_items_html(legend_items)
+ final_html_content = html_handler.generate_final_html(
+ network_body, legend_items_html
+ )
+ return final_html_content
+ except Exception as e:
+ raise IOError(f"Failed to generate visualization HTML: {str(e)}")
def _cleanup_pyvis_lib(self):
- # Clean up the generated lib folder
- lib_folder = os.path.join(os.getcwd(), "lib")
+ """
+ Clean up the generated lib folder from pyvis.
+
+ This method safely removes the temporary lib directory created by pyvis
+ during network visualization generation.
+ """
try:
+ lib_folder = safe_path_join("lib", root=os.getcwd())
if os.path.exists(lib_folder) and os.path.isdir(lib_folder):
import shutil
-
shutil.rmtree(lib_folder)
+ except ValueError as e:
+ print(f"Error validating lib folder path: {e}")
except Exception as e:
- print(f"Error cleaning up {lib_folder}: {e}")
+ print(f"Error cleaning up lib folder: {e}")
def plot_flow(flow, filename="flow_plot"):
+ """
+ Convenience function to create and save a flow visualization.
+
+ Parameters
+ ----------
+ flow : Flow
+ Flow instance to visualize.
+ filename : str, optional
+ Output filename without extension, by default "flow_plot".
+
+ Raises
+ ------
+ ValueError
+ If flow object or filename is invalid.
+ IOError
+ If file operations fail.
+ """
visualizer = FlowPlot(flow)
visualizer.plot(filename)
diff --git a/src/crewai/flow/html_template_handler.py b/src/crewai/flow/html_template_handler.py
index d521d8cf8..f0d2d89ad 100644
--- a/src/crewai/flow/html_template_handler.py
+++ b/src/crewai/flow/html_template_handler.py
@@ -1,26 +1,53 @@
import base64
import re
+from pathlib import Path
+
+from crewai.flow.path_utils import safe_path_join, validate_path_exists
class HTMLTemplateHandler:
+ """Handles HTML template processing and generation for flow visualization diagrams."""
+
def __init__(self, template_path, logo_path):
- self.template_path = template_path
- self.logo_path = logo_path
+ """
+ Initialize HTMLTemplateHandler with validated template and logo paths.
+
+ Parameters
+ ----------
+ template_path : str
+ Path to the HTML template file.
+ logo_path : str
+ Path to the logo image file.
+
+ Raises
+ ------
+ ValueError
+ If template or logo paths are invalid or files don't exist.
+ """
+ try:
+ self.template_path = validate_path_exists(template_path, "file")
+ self.logo_path = validate_path_exists(logo_path, "file")
+ except ValueError as e:
+ raise ValueError(f"Invalid template or logo path: {e}")
def read_template(self):
+ """Read and return the HTML template file contents."""
with open(self.template_path, "r", encoding="utf-8") as f:
return f.read()
def encode_logo(self):
+ """Convert the logo SVG file to base64 encoded string."""
with open(self.logo_path, "rb") as logo_file:
logo_svg_data = logo_file.read()
return base64.b64encode(logo_svg_data).decode("utf-8")
def extract_body_content(self, html):
+ """Extract and return content between body tags from HTML string."""
match = re.search("(.*?)