diff --git a/.gitignore b/.gitignore
index 8559a290c..947b99b7b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,4 @@ crew_tasks_output.json
.mypy_cache
.ruff_cache
.venv
+agentops.log
\ No newline at end of file
diff --git a/README.md b/README.md
index bf1287d4d..edcbb6f51 100644
--- a/README.md
+++ b/README.md
@@ -85,7 +85,6 @@ First, install CrewAI:
```shell
pip install crewai
```
-
If you want to install the 'crewai' package along with its optional features that include additional tools for agents, you can do so by using the following command:
```shell
@@ -93,6 +92,22 @@ pip install 'crewai[tools]'
```
The command above installs the basic package and also adds extra components which require more dependencies to function.
+### Troubleshooting Dependencies
+
+If you encounter issues during installation or usage, here are some common solutions:
+
+#### Common Issues
+
+1. **ModuleNotFoundError: No module named 'tiktoken'**
+ - Install tiktoken explicitly: `pip install 'crewai[embeddings]'`
+ - If using embedchain or other tools: `pip install 'crewai[tools]'`
+
+2. **Failed building wheel for tiktoken**
+ - Ensure Rust compiler is installed (see installation steps above)
+ - For Windows: Verify Visual C++ Build Tools are installed
+ - Try upgrading pip: `pip install --upgrade pip`
+ - If issues persist, use a pre-built wheel: `pip install tiktoken --prefer-binary`
+
### 2. Setting Up Your Crew with the YAML Configuration
To create a new CrewAI project, run the following CLI (Command Line Interface) command:
diff --git a/docs/concepts/flows.mdx b/docs/concepts/flows.mdx
index 324118310..cf9d20f63 100644
--- a/docs/concepts/flows.mdx
+++ b/docs/concepts/flows.mdx
@@ -138,7 +138,7 @@ print("---- Final Output ----")
print(final_output)
````
-``` text Output
+```text Output
---- Final Output ----
Second method received: Output from first_method
````
diff --git a/docs/concepts/knowledge.mdx b/docs/concepts/knowledge.mdx
index 8a777833e..91110e19f 100644
--- a/docs/concepts/knowledge.mdx
+++ b/docs/concepts/knowledge.mdx
@@ -4,8 +4,6 @@ description: What is knowledge in CrewAI and how to use it.
icon: book
---
-# Using Knowledge in CrewAI
-
## What is Knowledge?
Knowledge in CrewAI is a powerful system that allows AI agents to access and utilize external information sources during their tasks.
@@ -36,7 +34,20 @@ CrewAI supports various types of knowledge sources out of the box:
-## Quick Start
+## Supported Knowledge Parameters
+
+| Parameter | Type | Required | Description |
+| :--------------------------- | :---------------------------------- | :------- | :---------------------------------------------------------------------------------------------------------------------------------------------------- |
+| `sources` | **List[BaseKnowledgeSource]** | Yes | List of knowledge sources that provide content to be stored and queried. Can include PDF, CSV, Excel, JSON, text files, or string content. |
+| `collection_name` | **str** | No | Name of the collection where the knowledge will be stored. Used to identify different sets of knowledge. Defaults to "knowledge" if not provided. |
+| `storage` | **Optional[KnowledgeStorage]** | No | Custom storage configuration for managing how the knowledge is stored and retrieved. If not provided, a default storage will be created. |
+
+## Quickstart Example
+
+
+For file-Based Knowledge Sources, make sure to place your files in a `knowledge` directory at the root of your project.
+Also, use relative paths from the `knowledge` directory when creating the source.
+
Here's an example using string-based knowledge:
@@ -80,7 +91,8 @@ result = crew.kickoff(inputs={"question": "What city does John live in and how o
```
-Here's another example with the `CrewDoclingSource`
+Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including TXT, PDF, DOCX, HTML, and more.
+
```python Code
from crewai import LLM, Agent, Crew, Process, Task
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
@@ -128,39 +140,217 @@ result = crew.kickoff(
)
```
+## More Examples
+
+Here are examples of how to use different types of knowledge sources:
+
+### Text File Knowledge Source
+```python
+from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
+
+# Create a text file knowledge source
+text_source = CrewDoclingSource(
+ file_paths=["document.txt", "another.txt"]
+)
+
+# Create crew with text file source on agents or crew level
+agent = Agent(
+ ...
+ knowledge_sources=[text_source]
+)
+
+crew = Crew(
+ ...
+ knowledge_sources=[text_source]
+)
+```
+
+### PDF Knowledge Source
+```python
+from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
+
+# Create a PDF knowledge source
+pdf_source = PDFKnowledgeSource(
+ file_paths=["document.pdf", "another.pdf"]
+)
+
+# Create crew with PDF knowledge source on agents or crew level
+agent = Agent(
+ ...
+ knowledge_sources=[pdf_source]
+)
+
+crew = Crew(
+ ...
+ knowledge_sources=[pdf_source]
+)
+```
+
+### CSV Knowledge Source
+```python
+from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
+
+# Create a CSV knowledge source
+csv_source = CSVKnowledgeSource(
+ file_paths=["data.csv"]
+)
+
+# Create crew with CSV knowledge source or on agent level
+agent = Agent(
+ ...
+ knowledge_sources=[csv_source]
+)
+
+crew = Crew(
+ ...
+ knowledge_sources=[csv_source]
+)
+```
+
+### Excel Knowledge Source
+```python
+from crewai.knowledge.source.excel_knowledge_source import ExcelKnowledgeSource
+
+# Create an Excel knowledge source
+excel_source = ExcelKnowledgeSource(
+ file_paths=["spreadsheet.xlsx"]
+)
+
+# Create crew with Excel knowledge source on agents or crew level
+agent = Agent(
+ ...
+ knowledge_sources=[excel_source]
+)
+
+crew = Crew(
+ ...
+ knowledge_sources=[excel_source]
+)
+```
+
+### JSON Knowledge Source
+```python
+from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource
+
+# Create a JSON knowledge source
+json_source = JSONKnowledgeSource(
+ file_paths=["data.json"]
+)
+
+# Create crew with JSON knowledge source on agents or crew level
+agent = Agent(
+ ...
+ knowledge_sources=[json_source]
+)
+
+crew = Crew(
+ ...
+ knowledge_sources=[json_source]
+)
+```
+
## Knowledge Configuration
### Chunking Configuration
-Control how content is split for processing by setting the chunk size and overlap.
+Knowledge sources automatically chunk content for better processing.
+You can configure chunking behavior in your knowledge sources:
-```python Code
-knowledge_source = StringKnowledgeSource(
- content="Long content...",
- chunk_size=4000, # Characters per chunk (default)
- chunk_overlap=200 # Overlap between chunks (default)
+```python
+from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
+
+source = StringKnowledgeSource(
+ content="Your content here",
+ chunk_size=4000, # Maximum size of each chunk (default: 4000)
+ chunk_overlap=200 # Overlap between chunks (default: 200)
)
```
-## Embedder Configuration
+The chunking configuration helps in:
+- Breaking down large documents into manageable pieces
+- Maintaining context through chunk overlap
+- Optimizing retrieval accuracy
-You can also configure the embedder for the knowledge store. This is useful if you want to use a different embedder for the knowledge store than the one used for the agents.
+### Embeddings Configuration
-```python Code
-...
+You can also configure the embedder for the knowledge store.
+This is useful if you want to use a different embedder for the knowledge store than the one used for the agents.
+The `embedder` parameter supports various embedding model providers that include:
+- `openai`: OpenAI's embedding models
+- `google`: Google's text embedding models
+- `azure`: Azure OpenAI embeddings
+- `ollama`: Local embeddings with Ollama
+- `vertexai`: Google Cloud VertexAI embeddings
+- `cohere`: Cohere's embedding models
+- `bedrock`: AWS Bedrock embeddings
+- `huggingface`: Hugging Face models
+- `watson`: IBM Watson embeddings
+
+Here's an example of how to configure the embedder for the knowledge store using Google's `text-embedding-004` model:
+
+```python Example
+from crewai import Agent, Task, Crew, Process, LLM
+from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
+import os
+
+# Get the GEMINI API key
+GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
+
+# Create a knowledge source
+content = "Users name is John. He is 30 years old and lives in San Francisco."
string_source = StringKnowledgeSource(
- content="Users name is John. He is 30 years old and lives in San Francisco.",
+ content=content,
)
+
+# Create an LLM with a temperature of 0 to ensure deterministic outputs
+gemini_llm = LLM(
+ model="gemini/gemini-1.5-pro-002",
+ api_key=GEMINI_API_KEY,
+ temperature=0,
+)
+
+# Create an agent with the knowledge store
+agent = Agent(
+ role="About User",
+ goal="You know everything about the user.",
+ backstory="""You are a master at understanding people and their preferences.""",
+ verbose=True,
+ allow_delegation=False,
+ llm=gemini_llm,
+)
+
+task = Task(
+ description="Answer the following questions about the user: {question}",
+ expected_output="An answer to the question.",
+ agent=agent,
+)
+
crew = Crew(
- ...
+ agents=[agent],
+ tasks=[task],
+ verbose=True,
+ process=Process.sequential,
knowledge_sources=[string_source],
embedder={
- "provider": "openai",
- "config": {"model": "text-embedding-3-small"},
- },
+ "provider": "google",
+ "config": {
+ "model": "models/text-embedding-004",
+ "api_key": GEMINI_API_KEY,
+ }
+ }
)
-```
+result = crew.kickoff(inputs={"question": "What city does John live in and how old is he?"})
+```
+```text Output
+# Agent: About User
+## Task: Answer the following questions about the user: What city does John live in and how old is he?
+
+# Agent: About User
+## Final Answer:
+John is 30 years old and lives in San Francisco.
+```
+
## Clearing Knowledge
If you need to clear the knowledge stored in CrewAI, you can use the `crewai reset-memories` command with the `--knowledge` option.
diff --git a/docs/how-to/Portkey-Observability-and-Guardrails.md b/docs/how-to/portkey-observability-and-guardrails.mdx
similarity index 100%
rename from docs/how-to/Portkey-Observability-and-Guardrails.md
rename to docs/how-to/portkey-observability-and-guardrails.mdx
diff --git a/docs/how-to/portkey-observability.mdx b/docs/how-to/portkey-observability.mdx
new file mode 100644
index 000000000..4002323a5
--- /dev/null
+++ b/docs/how-to/portkey-observability.mdx
@@ -0,0 +1,202 @@
+---
+title: Portkey Observability and Guardrails
+description: How to use Portkey with CrewAI
+icon: key
+---
+
+
+
+
+[Portkey](https://portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai) is a 2-line upgrade to make your CrewAI agents reliable, cost-efficient, and fast.
+
+Portkey adds 4 core production capabilities to any CrewAI agent:
+1. Routing to **200+ LLMs**
+2. Making each LLM call more robust
+3. Full-stack tracing & cost, performance analytics
+4. Real-time guardrails to enforce behavior
+
+## Getting Started
+
+
+
+ ```bash
+ pip install -qU crewai portkey-ai
+ ```
+
+
+ To build CrewAI Agents with Portkey, you'll need two keys:
+ - **Portkey API Key**: Sign up on the [Portkey app](https://app.portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai) and copy your API key
+ - **Virtual Key**: Virtual Keys securely manage your LLM API keys in one place. Store your LLM provider API keys securely in Portkey's vault
+
+ ```python
+ from crewai import LLM
+ from portkey_ai import createHeaders, PORTKEY_GATEWAY_URL
+
+ gpt_llm = LLM(
+ model="gpt-4",
+ base_url=PORTKEY_GATEWAY_URL,
+ api_key="dummy", # We are using Virtual key
+ extra_headers=createHeaders(
+ api_key="YOUR_PORTKEY_API_KEY",
+ virtual_key="YOUR_VIRTUAL_KEY", # Enter your Virtual key from Portkey
+ )
+ )
+ ```
+
+
+ ```python
+ from crewai import Agent, Task, Crew
+
+ # Define your agents with roles and goals
+ coder = Agent(
+ role='Software developer',
+ goal='Write clear, concise code on demand',
+ backstory='An expert coder with a keen eye for software trends.',
+ llm=gpt_llm
+ )
+
+ # Create tasks for your agents
+ task1 = Task(
+ description="Define the HTML for making a simple website with heading- Hello World! Portkey is working!",
+ expected_output="A clear and concise HTML code",
+ agent=coder
+ )
+
+ # Instantiate your crew
+ crew = Crew(
+ agents=[coder],
+ tasks=[task1],
+ )
+
+ result = crew.kickoff()
+ print(result)
+ ```
+
+
+
+## Key Features
+
+| Feature | Description |
+|:--------|:------------|
+| 🌐 Multi-LLM Support | Access OpenAI, Anthropic, Gemini, Azure, and 250+ providers through a unified interface |
+| 🛡️ Production Reliability | Implement retries, timeouts, load balancing, and fallbacks |
+| 📊 Advanced Observability | Track 40+ metrics including costs, tokens, latency, and custom metadata |
+| 🔍 Comprehensive Logging | Debug with detailed execution traces and function call logs |
+| 🚧 Security Controls | Set budget limits and implement role-based access control |
+| 🔄 Performance Analytics | Capture and analyze feedback for continuous improvement |
+| 💾 Intelligent Caching | Reduce costs and latency with semantic or simple caching |
+
+
+## Production Features with Portkey Configs
+
+All features mentioned below are through Portkey's Config system. Portkey's Config system allows you to define routing strategies using simple JSON objects in your LLM API calls. You can create and manage Configs directly in your code or through the Portkey Dashboard. Each Config has a unique ID for easy reference.
+
+
+
+
+
+
+### 1. Use 250+ LLMs
+Access various LLMs like Anthropic, Gemini, Mistral, Azure OpenAI, and more with minimal code changes. Switch between providers or use them together seamlessly. [Learn more about Universal API](https://portkey.ai/docs/product/ai-gateway/universal-api)
+
+
+Easily switch between different LLM providers:
+
+```python
+# Anthropic Configuration
+anthropic_llm = LLM(
+ model="claude-3-5-sonnet-latest",
+ base_url=PORTKEY_GATEWAY_URL,
+ api_key="dummy",
+ extra_headers=createHeaders(
+ api_key="YOUR_PORTKEY_API_KEY",
+ virtual_key="YOUR_ANTHROPIC_VIRTUAL_KEY", #You don't need provider when using Virtual keys
+ trace_id="anthropic_agent"
+ )
+)
+
+# Azure OpenAI Configuration
+azure_llm = LLM(
+ model="gpt-4",
+ base_url=PORTKEY_GATEWAY_URL,
+ api_key="dummy",
+ extra_headers=createHeaders(
+ api_key="YOUR_PORTKEY_API_KEY",
+ virtual_key="YOUR_AZURE_VIRTUAL_KEY", #You don't need provider when using Virtual keys
+ trace_id="azure_agent"
+ )
+)
+```
+
+
+### 2. Caching
+Improve response times and reduce costs with two powerful caching modes:
+- **Simple Cache**: Perfect for exact matches
+- **Semantic Cache**: Matches responses for requests that are semantically similar
+[Learn more about Caching](https://portkey.ai/docs/product/ai-gateway/cache-simple-and-semantic)
+
+```py
+config = {
+ "cache": {
+ "mode": "semantic", # or "simple" for exact matching
+ }
+}
+```
+
+### 3. Production Reliability
+Portkey provides comprehensive reliability features:
+- **Automatic Retries**: Handle temporary failures gracefully
+- **Request Timeouts**: Prevent hanging operations
+- **Conditional Routing**: Route requests based on specific conditions
+- **Fallbacks**: Set up automatic provider failovers
+- **Load Balancing**: Distribute requests efficiently
+
+[Learn more about Reliability Features](https://portkey.ai/docs/product/ai-gateway/)
+
+
+
+### 4. Metrics
+
+Agent runs are complex. Portkey automatically logs **40+ comprehensive metrics** for your AI agents, including cost, tokens used, latency, etc. Whether you need a broad overview or granular insights into your agent runs, Portkey's customizable filters provide the metrics you need.
+
+
+- Cost per agent interaction
+- Response times and latency
+- Token usage and efficiency
+- Success/failure rates
+- Cache hit rates
+
+
+
+### 5. Detailed Logging
+Logs are essential for understanding agent behavior, diagnosing issues, and improving performance. They provide a detailed record of agent activities and tool use, which is crucial for debugging and optimizing processes.
+
+
+Access a dedicated section to view records of agent executions, including parameters, outcomes, function calls, and errors. Filter logs based on multiple parameters such as trace ID, model, tokens used, and metadata.
+
+
+ Traces
+
+
+
+
+ Logs
+
+
+
+### 6. Enterprise Security Features
+- Set budget limit and rate limts per Virtual Key (disposable API keys)
+- Implement role-based access control
+- Track system changes with audit logs
+- Configure data retention policies
+
+
+
+For detailed information on creating and managing Configs, visit the [Portkey documentation](https://docs.portkey.ai/product/ai-gateway/configs).
+
+## Resources
+
+- [📘 Portkey Documentation](https://docs.portkey.ai)
+- [📊 Portkey Dashboard](https://app.portkey.ai/?utm_source=crewai&utm_medium=crewai&utm_campaign=crewai)
+- [🐦 Twitter](https://twitter.com/portkeyai)
+- [💬 Discord Community](https://discord.gg/DD7vgKK299)
diff --git a/docs/mint.json b/docs/mint.json
index fad9689b8..9103434b4 100644
--- a/docs/mint.json
+++ b/docs/mint.json
@@ -100,7 +100,8 @@
"how-to/conditional-tasks",
"how-to/agentops-observability",
"how-to/langtrace-observability",
- "how-to/openlit-observability"
+ "how-to/openlit-observability",
+ "how-to/portkey-observability"
]
},
{
diff --git a/pyproject.toml b/pyproject.toml
index 3f10c1a87..fd1798d36 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "crewai"
-version = "0.86.0"
+version = "0.95.0"
description = "Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks."
readme = "README.md"
requires-python = ">=3.10,<3.13"
@@ -8,28 +8,39 @@ authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
dependencies = [
+ # Core Dependencies
"pydantic>=2.4.2",
"openai>=1.13.3",
+ "litellm>=1.44.22",
+ "instructor>=1.3.3",
+
+ # Text Processing
+ "pdfplumber>=0.11.4",
+ "regex>=2024.9.11",
+
+ # Telemetry and Monitoring
"opentelemetry-api>=1.22.0",
"opentelemetry-sdk>=1.22.0",
"opentelemetry-exporter-otlp-proto-http>=1.22.0",
- "instructor>=1.3.3",
- "regex>=2024.9.11",
- "click>=8.1.7",
+
+ # Data Handling
+ "chromadb>=0.5.23",
+ "openpyxl>=3.1.5",
+ "pyvis>=0.3.2",
+
+ # Authentication and Security
+ "auth0-python>=4.7.1",
"python-dotenv>=1.0.0",
+
+ # Configuration and Utils
+ "click>=8.1.7",
"appdirs>=1.4.4",
"jsonref>=1.1.0",
"json-repair>=0.25.2",
- "auth0-python>=4.7.1",
- "litellm>=1.44.22",
- "pyvis>=0.3.2",
"uv>=0.4.25",
"tomli-w>=1.1.0",
"tomli>=2.0.2",
- "chromadb>=0.5.23",
- "pdfplumber>=0.11.4",
- "openpyxl>=3.1.5",
- "blinker>=1.9.0",
+ "blinker>=1.9.0"
]
[project.urls]
@@ -38,7 +49,10 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
-tools = ["crewai-tools>=0.17.0"]
+tools = ["crewai-tools>=0.25.5"]
+embeddings = [
+ "tiktoken~=0.7.0"
+]
agentops = ["agentops>=0.3.0"]
fastembed = ["fastembed>=0.4.1"]
pdfplumber = [
diff --git a/src/crewai/__init__.py b/src/crewai/__init__.py
index 0833afd58..f0ba35a38 100644
--- a/src/crewai/__init__.py
+++ b/src/crewai/__init__.py
@@ -14,7 +14,7 @@ warnings.filterwarnings(
category=UserWarning,
module="pydantic.main",
)
-__version__ = "0.86.0"
+__version__ = "0.95.0"
__all__ = [
"Agent",
"Crew",
diff --git a/src/crewai/cli/templates/crew/pyproject.toml b/src/crewai/cli/templates/crew/pyproject.toml
index 5ea8194c8..bd2d871c5 100644
--- a/src/crewai/cli/templates/crew/pyproject.toml
+++ b/src/crewai/cli/templates/crew/pyproject.toml
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
- "crewai[tools]>=0.86.0,<1.0.0"
+ "crewai[tools]>=0.95.0,<1.0.0"
]
[project.scripts]
diff --git a/src/crewai/cli/templates/flow/pyproject.toml b/src/crewai/cli/templates/flow/pyproject.toml
index 8a523d2ed..6ca589497 100644
--- a/src/crewai/cli/templates/flow/pyproject.toml
+++ b/src/crewai/cli/templates/flow/pyproject.toml
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
- "crewai[tools]>=0.86.0,<1.0.0",
+ "crewai[tools]>=0.95.0,<1.0.0",
]
[project.scripts]
diff --git a/src/crewai/cli/templates/tool/pyproject.toml b/src/crewai/cli/templates/tool/pyproject.toml
index 69b8d355f..0c4b88e9c 100644
--- a/src/crewai/cli/templates/tool/pyproject.toml
+++ b/src/crewai/cli/templates/tool/pyproject.toml
@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.13"
dependencies = [
- "crewai[tools]>=0.86.0"
+ "crewai[tools]>=0.95.0"
]
[tool.crewai]
diff --git a/src/crewai/crew.py b/src/crewai/crew.py
index d488783ea..c01a89280 100644
--- a/src/crewai/crew.py
+++ b/src/crewai/crew.py
@@ -726,11 +726,7 @@ class Crew(BaseModel):
# Determine which tools to use - task tools take precedence over agent tools
tools_for_task = task.tools or agent_to_use.tools or []
- tools_for_task = self._prepare_tools(
- agent_to_use,
- task,
- tools_for_task
- )
+ tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
self._log_task_start(task, agent_to_use.role)
@@ -797,14 +793,18 @@ class Crew(BaseModel):
return skipped_task_output
return None
- def _prepare_tools(self, agent: BaseAgent, task: Task, tools: List[Tool]) -> List[Tool]:
+ def _prepare_tools(
+ self, agent: BaseAgent, task: Task, tools: List[Tool]
+ ) -> List[Tool]:
# Add delegation tools if agent allows delegation
if agent.allow_delegation:
if self.process == Process.hierarchical:
if self.manager_agent:
tools = self._update_manager_tools(task, tools)
else:
- raise ValueError("Manager agent is required for hierarchical process.")
+ raise ValueError(
+ "Manager agent is required for hierarchical process."
+ )
elif agent and agent.allow_delegation:
tools = self._add_delegation_tools(task, tools)
@@ -823,7 +823,9 @@ class Crew(BaseModel):
return self.manager_agent
return task.agent
- def _merge_tools(self, existing_tools: List[Tool], new_tools: List[Tool]) -> List[Tool]:
+ def _merge_tools(
+ self, existing_tools: List[Tool], new_tools: List[Tool]
+ ) -> List[Tool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
if not new_tools:
return existing_tools
@@ -839,7 +841,9 @@ class Crew(BaseModel):
return tools
- def _inject_delegation_tools(self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]):
+ def _inject_delegation_tools(
+ self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]
+ ):
delegation_tools = task_agent.get_delegation_tools(agents)
return self._merge_tools(tools, delegation_tools)
@@ -856,7 +860,9 @@ class Crew(BaseModel):
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
tools = []
- tools = self._inject_delegation_tools(tools, task.agent, agents_for_delegation)
+ tools = self._inject_delegation_tools(
+ tools, task.agent, agents_for_delegation
+ )
return tools
def _log_task_start(self, task: Task, role: str = "None"):
@@ -870,7 +876,9 @@ class Crew(BaseModel):
if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
else:
- tools = self._inject_delegation_tools(tools, self.manager_agent, self.agents)
+ tools = self._inject_delegation_tools(
+ tools, self.manager_agent, self.agents
+ )
return tools
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py
index 4a6361cce..dc46aa6d8 100644
--- a/src/crewai/flow/flow.py
+++ b/src/crewai/flow/flow.py
@@ -30,7 +30,47 @@ from crewai.telemetry import Telemetry
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
-def start(condition=None):
+def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
+ """
+ Marks a method as a flow's starting point.
+
+ This decorator designates a method as an entry point for the flow execution.
+ It can optionally specify conditions that trigger the start based on other
+ method executions.
+
+ Parameters
+ ----------
+ condition : Optional[Union[str, dict, Callable]], optional
+ Defines when the start method should execute. Can be:
+ - str: Name of a method that triggers this start
+ - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
+ - Callable: A method reference that triggers this start
+ Default is None, meaning unconditional start.
+
+ Returns
+ -------
+ Callable
+ A decorator function that marks the method as a flow start point.
+
+ Raises
+ ------
+ ValueError
+ If the condition format is invalid.
+
+ Examples
+ --------
+ >>> @start() # Unconditional start
+ >>> def begin_flow(self):
+ ... pass
+
+ >>> @start("method_name") # Start after specific method
+ >>> def conditional_start(self):
+ ... pass
+
+ >>> @start(and_("method1", "method2")) # Start after multiple methods
+ >>> def complex_start(self):
+ ... pass
+ """
def decorator(func):
func.__is_start_method__ = True
if condition is not None:
@@ -55,8 +95,42 @@ def start(condition=None):
return decorator
+def listen(condition: Union[str, dict, Callable]) -> Callable:
+ """
+ Creates a listener that executes when specified conditions are met.
-def listen(condition):
+ This decorator sets up a method to execute in response to other method
+ executions in the flow. It supports both simple and complex triggering
+ conditions.
+
+ Parameters
+ ----------
+ condition : Union[str, dict, Callable]
+ Specifies when the listener should execute. Can be:
+ - str: Name of a method that triggers this listener
+ - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
+ - Callable: A method reference that triggers this listener
+
+ Returns
+ -------
+ Callable
+ A decorator function that sets up the method as a listener.
+
+ Raises
+ ------
+ ValueError
+ If the condition format is invalid.
+
+ Examples
+ --------
+ >>> @listen("process_data") # Listen to single method
+ >>> def handle_processed_data(self):
+ ... pass
+
+ >>> @listen(or_("success", "failure")) # Listen to multiple methods
+ >>> def handle_completion(self):
+ ... pass
+ """
def decorator(func):
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
@@ -80,10 +154,49 @@ def listen(condition):
return decorator
-def router(condition):
+def router(condition: Union[str, dict, Callable]) -> Callable:
+ """
+ Creates a routing method that directs flow execution based on conditions.
+
+ This decorator marks a method as a router, which can dynamically determine
+ the next steps in the flow based on its return value. Routers are triggered
+ by specified conditions and can return constants that determine which path
+ the flow should take.
+
+ Parameters
+ ----------
+ condition : Union[str, dict, Callable]
+ Specifies when the router should execute. Can be:
+ - str: Name of a method that triggers this router
+ - dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
+ - Callable: A method reference that triggers this router
+
+ Returns
+ -------
+ Callable
+ A decorator function that sets up the method as a router.
+
+ Raises
+ ------
+ ValueError
+ If the condition format is invalid.
+
+ Examples
+ --------
+ >>> @router("check_status")
+ >>> def route_based_on_status(self):
+ ... if self.state.status == "success":
+ ... return SUCCESS
+ ... return FAILURE
+
+ >>> @router(and_("validate", "process"))
+ >>> def complex_routing(self):
+ ... if all([self.state.valid, self.state.processed]):
+ ... return CONTINUE
+ ... return STOP
+ """
def decorator(func):
func.__is_router__ = True
- # Handle conditions like listen/start
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
@@ -105,8 +218,39 @@ def router(condition):
return decorator
+def or_(*conditions: Union[str, dict, Callable]) -> dict:
+ """
+ Combines multiple conditions with OR logic for flow control.
-def or_(*conditions):
+ Creates a condition that is satisfied when any of the specified conditions
+ are met. This is used with @start, @listen, or @router decorators to create
+ complex triggering conditions.
+
+ Parameters
+ ----------
+ *conditions : Union[str, dict, Callable]
+ Variable number of conditions that can be:
+ - str: Method names
+ - dict: Existing condition dictionaries
+ - Callable: Method references
+
+ Returns
+ -------
+ dict
+ A condition dictionary with format:
+ {"type": "OR", "methods": list_of_method_names}
+
+ Raises
+ ------
+ ValueError
+ If any condition is invalid.
+
+ Examples
+ --------
+ >>> @listen(or_("success", "timeout"))
+ >>> def handle_completion(self):
+ ... pass
+ """
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
@@ -120,7 +264,39 @@ def or_(*conditions):
return {"type": "OR", "methods": methods}
-def and_(*conditions):
+def and_(*conditions: Union[str, dict, Callable]) -> dict:
+ """
+ Combines multiple conditions with AND logic for flow control.
+
+ Creates a condition that is satisfied only when all specified conditions
+ are met. This is used with @start, @listen, or @router decorators to create
+ complex triggering conditions.
+
+ Parameters
+ ----------
+ *conditions : Union[str, dict, Callable]
+ Variable number of conditions that can be:
+ - str: Method names
+ - dict: Existing condition dictionaries
+ - Callable: Method references
+
+ Returns
+ -------
+ dict
+ A condition dictionary with format:
+ {"type": "AND", "methods": list_of_method_names}
+
+ Raises
+ ------
+ ValueError
+ If any condition is invalid.
+
+ Examples
+ --------
+ >>> @listen(and_("validated", "processed"))
+ >>> def handle_complete_data(self):
+ ... pass
+ """
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
@@ -286,6 +462,23 @@ class Flow(Generic[T], metaclass=FlowMeta):
return final_output
async def _execute_start_method(self, start_method_name: str) -> None:
+ """
+ Executes a flow's start method and its triggered listeners.
+
+ This internal method handles the execution of methods marked with @start
+ decorator and manages the subsequent chain of listener executions.
+
+ Parameters
+ ----------
+ start_method_name : str
+ The name of the start method to execute.
+
+ Notes
+ -----
+ - Executes the start method and captures its result
+ - Triggers execution of any listeners waiting on this start method
+ - Part of the flow's initialization sequence
+ """
result = await self._execute_method(
start_method_name, self._methods[start_method_name]
)
@@ -306,6 +499,28 @@ class Flow(Generic[T], metaclass=FlowMeta):
return result
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
+ """
+ Executes all listeners and routers triggered by a method completion.
+
+ This internal method manages the execution flow by:
+ 1. First executing all triggered routers sequentially
+ 2. Then executing all triggered listeners in parallel
+
+ Parameters
+ ----------
+ trigger_method : str
+ The name of the method that triggered these listeners.
+ result : Any
+ The result from the triggering method, passed to listeners
+ that accept parameters.
+
+ Notes
+ -----
+ - Routers are executed sequentially to maintain flow control
+ - Each router's result becomes the new trigger_method
+ - Normal listeners are executed in parallel for efficiency
+ - Listeners can receive the trigger method's result as a parameter
+ """
# First, handle routers repeatedly until no router triggers anymore
while True:
routers_triggered = self._find_triggered_methods(
@@ -335,6 +550,33 @@ class Flow(Generic[T], metaclass=FlowMeta):
def _find_triggered_methods(
self, trigger_method: str, router_only: bool
) -> List[str]:
+ """
+ Finds all methods that should be triggered based on conditions.
+
+ This internal method evaluates both OR and AND conditions to determine
+ which methods should be executed next in the flow.
+
+ Parameters
+ ----------
+ trigger_method : str
+ The name of the method that just completed execution.
+ router_only : bool
+ If True, only consider router methods.
+ If False, only consider non-router methods.
+
+ Returns
+ -------
+ List[str]
+ Names of methods that should be triggered.
+
+ Notes
+ -----
+ - Handles both OR and AND conditions:
+ * OR: Triggers if any condition is met
+ * AND: Triggers only when all conditions are met
+ - Maintains state for AND conditions using _pending_and_listeners
+ - Separates router and normal listener evaluation
+ """
triggered = []
for listener_name, (condition_type, methods) in self._listeners.items():
is_router = listener_name in self._routers
@@ -363,6 +605,33 @@ class Flow(Generic[T], metaclass=FlowMeta):
return triggered
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
+ """
+ Executes a single listener method with proper event handling.
+
+ This internal method manages the execution of an individual listener,
+ including parameter inspection, event emission, and error handling.
+
+ Parameters
+ ----------
+ listener_name : str
+ The name of the listener method to execute.
+ result : Any
+ The result from the triggering method, which may be passed
+ to the listener if it accepts parameters.
+
+ Notes
+ -----
+ - Inspects method signature to determine if it accepts the trigger result
+ - Emits events for method execution start and finish
+ - Handles errors gracefully with detailed logging
+ - Recursively triggers listeners of this listener
+ - Supports both parameterized and parameter-less listeners
+
+ Error Handling
+ -------------
+ Catches and logs any exceptions during execution, preventing
+ individual listener failures from breaking the entire flow.
+ """
try:
method = self._methods[listener_name]
diff --git a/src/crewai/flow/flow_visualizer.py b/src/crewai/flow/flow_visualizer.py
index 988f27919..a70e91a18 100644
--- a/src/crewai/flow/flow_visualizer.py
+++ b/src/crewai/flow/flow_visualizer.py
@@ -1,12 +1,14 @@
# flow_visualizer.py
import os
+from pathlib import Path
from pyvis.network import Network
from crewai.flow.config import COLORS, NODE_STYLES
from crewai.flow.html_template_handler import HTMLTemplateHandler
from crewai.flow.legend_generator import generate_legend_items_html, get_legend_items
+from crewai.flow.path_utils import safe_path_join, validate_path_exists
from crewai.flow.utils import calculate_node_levels
from crewai.flow.visualization_utils import (
add_edges,
@@ -16,89 +18,209 @@ from crewai.flow.visualization_utils import (
class FlowPlot:
+ """Handles the creation and rendering of flow visualization diagrams."""
+
def __init__(self, flow):
+ """
+ Initialize FlowPlot with a flow object.
+
+ Parameters
+ ----------
+ flow : Flow
+ A Flow instance to visualize.
+
+ Raises
+ ------
+ ValueError
+ If flow object is invalid or missing required attributes.
+ """
+ if not hasattr(flow, '_methods'):
+ raise ValueError("Invalid flow object: missing '_methods' attribute")
+ if not hasattr(flow, '_listeners'):
+ raise ValueError("Invalid flow object: missing '_listeners' attribute")
+ if not hasattr(flow, '_start_methods'):
+ raise ValueError("Invalid flow object: missing '_start_methods' attribute")
+
self.flow = flow
self.colors = COLORS
self.node_styles = NODE_STYLES
def plot(self, filename):
- net = Network(
- directed=True,
- height="750px",
- width="100%",
- bgcolor=self.colors["bg"],
- layout=None,
- )
-
- # Set options to disable physics
- net.set_options(
- """
- var options = {
- "nodes": {
- "font": {
- "multi": "html"
- }
- },
- "physics": {
- "enabled": false
- }
- }
"""
- )
+ Generate and save an HTML visualization of the flow.
- # Calculate levels for nodes
- node_levels = calculate_node_levels(self.flow)
+ Parameters
+ ----------
+ filename : str
+ Name of the output file (without extension).
- # Compute positions
- node_positions = compute_positions(self.flow, node_levels)
+ Raises
+ ------
+ ValueError
+ If filename is invalid or network generation fails.
+ IOError
+ If file operations fail or visualization cannot be generated.
+ RuntimeError
+ If network visualization generation fails.
+ """
+ if not filename or not isinstance(filename, str):
+ raise ValueError("Filename must be a non-empty string")
+
+ try:
+ # Initialize network
+ net = Network(
+ directed=True,
+ height="750px",
+ width="100%",
+ bgcolor=self.colors["bg"],
+ layout=None,
+ )
- # Add nodes to the network
- add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
+ # Set options to disable physics
+ net.set_options(
+ """
+ var options = {
+ "nodes": {
+ "font": {
+ "multi": "html"
+ }
+ },
+ "physics": {
+ "enabled": false
+ }
+ }
+ """
+ )
- # Add edges to the network
- add_edges(net, self.flow, node_positions, self.colors)
+ # Calculate levels for nodes
+ try:
+ node_levels = calculate_node_levels(self.flow)
+ except Exception as e:
+ raise ValueError(f"Failed to calculate node levels: {str(e)}")
- network_html = net.generate_html()
- final_html_content = self._generate_final_html(network_html)
+ # Compute positions
+ try:
+ node_positions = compute_positions(self.flow, node_levels)
+ except Exception as e:
+ raise ValueError(f"Failed to compute node positions: {str(e)}")
- # Save the final HTML content to the file
- with open(f"{filename}.html", "w", encoding="utf-8") as f:
- f.write(final_html_content)
- print(f"Plot saved as {filename}.html")
+ # Add nodes to the network
+ try:
+ add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
+ except Exception as e:
+ raise RuntimeError(f"Failed to add nodes to network: {str(e)}")
- self._cleanup_pyvis_lib()
+ # Add edges to the network
+ try:
+ add_edges(net, self.flow, node_positions, self.colors)
+ except Exception as e:
+ raise RuntimeError(f"Failed to add edges to network: {str(e)}")
+
+ # Generate HTML
+ try:
+ network_html = net.generate_html()
+ final_html_content = self._generate_final_html(network_html)
+ except Exception as e:
+ raise RuntimeError(f"Failed to generate network visualization: {str(e)}")
+
+ # Save the final HTML content to the file
+ try:
+ with open(f"{filename}.html", "w", encoding="utf-8") as f:
+ f.write(final_html_content)
+ print(f"Plot saved as {filename}.html")
+ except IOError as e:
+ raise IOError(f"Failed to save flow visualization to {filename}.html: {str(e)}")
+
+ except (ValueError, RuntimeError, IOError) as e:
+ raise e
+ except Exception as e:
+ raise RuntimeError(f"Unexpected error during flow visualization: {str(e)}")
+ finally:
+ self._cleanup_pyvis_lib()
def _generate_final_html(self, network_html):
- # Extract just the body content from the generated HTML
- current_dir = os.path.dirname(__file__)
- template_path = os.path.join(
- current_dir, "assets", "crewai_flow_visual_template.html"
- )
- logo_path = os.path.join(current_dir, "assets", "crewai_logo.svg")
+ """
+ Generate the final HTML content with network visualization and legend.
- html_handler = HTMLTemplateHandler(template_path, logo_path)
- network_body = html_handler.extract_body_content(network_html)
+ Parameters
+ ----------
+ network_html : str
+ HTML content generated by pyvis Network.
- # Generate the legend items HTML
- legend_items = get_legend_items(self.colors)
- legend_items_html = generate_legend_items_html(legend_items)
- final_html_content = html_handler.generate_final_html(
- network_body, legend_items_html
- )
- return final_html_content
+ Returns
+ -------
+ str
+ Complete HTML content with styling and legend.
+
+ Raises
+ ------
+ IOError
+ If template or logo files cannot be accessed.
+ ValueError
+ If network_html is invalid.
+ """
+ if not network_html:
+ raise ValueError("Invalid network HTML content")
+
+ try:
+ # Extract just the body content from the generated HTML
+ current_dir = os.path.dirname(__file__)
+ template_path = safe_path_join("assets", "crewai_flow_visual_template.html", root=current_dir)
+ logo_path = safe_path_join("assets", "crewai_logo.svg", root=current_dir)
+
+ if not os.path.exists(template_path):
+ raise IOError(f"Template file not found: {template_path}")
+ if not os.path.exists(logo_path):
+ raise IOError(f"Logo file not found: {logo_path}")
+
+ html_handler = HTMLTemplateHandler(template_path, logo_path)
+ network_body = html_handler.extract_body_content(network_html)
+
+ # Generate the legend items HTML
+ legend_items = get_legend_items(self.colors)
+ legend_items_html = generate_legend_items_html(legend_items)
+ final_html_content = html_handler.generate_final_html(
+ network_body, legend_items_html
+ )
+ return final_html_content
+ except Exception as e:
+ raise IOError(f"Failed to generate visualization HTML: {str(e)}")
def _cleanup_pyvis_lib(self):
- # Clean up the generated lib folder
- lib_folder = os.path.join(os.getcwd(), "lib")
+ """
+ Clean up the generated lib folder from pyvis.
+
+ This method safely removes the temporary lib directory created by pyvis
+ during network visualization generation.
+ """
try:
+ lib_folder = safe_path_join("lib", root=os.getcwd())
if os.path.exists(lib_folder) and os.path.isdir(lib_folder):
import shutil
-
shutil.rmtree(lib_folder)
+ except ValueError as e:
+ print(f"Error validating lib folder path: {e}")
except Exception as e:
- print(f"Error cleaning up {lib_folder}: {e}")
+ print(f"Error cleaning up lib folder: {e}")
def plot_flow(flow, filename="flow_plot"):
+ """
+ Convenience function to create and save a flow visualization.
+
+ Parameters
+ ----------
+ flow : Flow
+ Flow instance to visualize.
+ filename : str, optional
+ Output filename without extension, by default "flow_plot".
+
+ Raises
+ ------
+ ValueError
+ If flow object or filename is invalid.
+ IOError
+ If file operations fail.
+ """
visualizer = FlowPlot(flow)
visualizer.plot(filename)
diff --git a/src/crewai/flow/html_template_handler.py b/src/crewai/flow/html_template_handler.py
index d521d8cf8..f0d2d89ad 100644
--- a/src/crewai/flow/html_template_handler.py
+++ b/src/crewai/flow/html_template_handler.py
@@ -1,26 +1,53 @@
import base64
import re
+from pathlib import Path
+
+from crewai.flow.path_utils import safe_path_join, validate_path_exists
class HTMLTemplateHandler:
+ """Handles HTML template processing and generation for flow visualization diagrams."""
+
def __init__(self, template_path, logo_path):
- self.template_path = template_path
- self.logo_path = logo_path
+ """
+ Initialize HTMLTemplateHandler with validated template and logo paths.
+
+ Parameters
+ ----------
+ template_path : str
+ Path to the HTML template file.
+ logo_path : str
+ Path to the logo image file.
+
+ Raises
+ ------
+ ValueError
+ If template or logo paths are invalid or files don't exist.
+ """
+ try:
+ self.template_path = validate_path_exists(template_path, "file")
+ self.logo_path = validate_path_exists(logo_path, "file")
+ except ValueError as e:
+ raise ValueError(f"Invalid template or logo path: {e}")
def read_template(self):
+ """Read and return the HTML template file contents."""
with open(self.template_path, "r", encoding="utf-8") as f:
return f.read()
def encode_logo(self):
+ """Convert the logo SVG file to base64 encoded string."""
with open(self.logo_path, "rb") as logo_file:
logo_svg_data = logo_file.read()
return base64.b64encode(logo_svg_data).decode("utf-8")
def extract_body_content(self, html):
+ """Extract and return content between body tags from HTML string."""
match = re.search("
(.*?)