Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
83791b3c62 Address PR feedback: Improve documentation and add edge case tests
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-05 09:12:23 +00:00
Devin AI
70b7148698 Fix #2753: Handle large inputs in memory by chunking text before embedding
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-05 09:06:33 +00:00
27 changed files with 515 additions and 2158 deletions

175
README.md
View File

@@ -4,7 +4,7 @@
# **CrewAI**
🤖 **CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
🤖 **CrewAI**: Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks.
<h3>
@@ -22,17 +22,13 @@
- [Why CrewAI?](#why-crewai)
- [Getting Started](#getting-started)
- [Key Features](#key-features)
- [Understanding Flows and Crews](#understanding-flows-and-crews)
- [CrewAI vs LangGraph](#how-crewai-compares)
- [Examples](#examples)
- [Quick Tutorial](#quick-tutorial)
- [Write Job Descriptions](#write-job-descriptions)
- [Trip Planner](#trip-planner)
- [Stock Analysis](#stock-analysis)
- [Using Crews and Flows Together](#using-crews-and-flows-together)
- [Connecting Your Crew to a Model](#connecting-your-crew-to-a-model)
- [How CrewAI Compares](#how-crewai-compares)
- [Frequently Asked Questions (FAQ)](#frequently-asked-questions-faq)
- [Contribution](#contribution)
- [Telemetry](#telemetry)
- [License](#license)
@@ -40,40 +36,10 @@
## Why CrewAI?
The power of AI collaboration has too much to offer.
CrewAI is a standalone framework, built from the ground up without dependencies on Langchain or other agent frameworks. It's designed to enable AI agents to assume roles, share goals, and operate in a cohesive unit - much like a well-oiled crew. Whether you're building a smart assistant platform, an automated customer service ensemble, or a multi-agent research team, CrewAI provides the backbone for sophisticated multi-agent interactions.
CrewAI is designed to enable AI agents to assume roles, share goals, and operate in a cohesive unit - much like a well-oiled crew. Whether you're building a smart assistant platform, an automated customer service ensemble, or a multi-agent research team, CrewAI provides the backbone for sophisticated multi-agent interactions.
## Getting Started
### Learning Resources
Learn CrewAI through our comprehensive courses:
- [Multi AI Agent Systems with CrewAI](https://www.deeplearning.ai/short-courses/multi-ai-agent-systems-with-crewai/) - Master the fundamentals of multi-agent systems
- [Practical Multi AI Agents and Advanced Use Cases](https://www.deeplearning.ai/short-courses/practical-multi-ai-agents-and-advanced-use-cases-with-crewai/) - Deep dive into advanced implementations
### Understanding Flows and Crews
CrewAI offers two powerful, complementary approaches that work seamlessly together to build sophisticated AI applications:
1. **Crews**: Teams of AI agents with true autonomy and agency, working together to accomplish complex tasks through role-based collaboration. Crews enable:
- Natural, autonomous decision-making between agents
- Dynamic task delegation and collaboration
- Specialized roles with defined goals and expertise
- Flexible problem-solving approaches
2. **Flows**: Production-ready, event-driven workflows that deliver precise control over complex automations. Flows provide:
- Fine-grained control over execution paths for real-world scenarios
- Secure, consistent state management between tasks
- Clean integration of AI agents with production Python code
- Conditional branching for complex business logic
The true power of CrewAI emerges when combining Crews and Flows. This synergy allows you to:
- Build complex, production-grade applications
- Balance autonomy with precise control
- Handle sophisticated real-world scenarios
- Maintain clean, maintainable code structure
### Getting Started with Installation
To get started with CrewAI, follow these simple steps:
### 1. Installation
@@ -85,6 +51,7 @@ First, install CrewAI:
```shell
pip install crewai
```
If you want to install the 'crewai' package along with its optional features that include additional tools for agents, you can do so by using the following command:
```shell
@@ -92,22 +59,6 @@ pip install 'crewai[tools]'
```
The command above installs the basic package and also adds extra components which require more dependencies to function.
### Troubleshooting Dependencies
If you encounter issues during installation or usage, here are some common solutions:
#### Common Issues
1. **ModuleNotFoundError: No module named 'tiktoken'**
- Install tiktoken explicitly: `pip install 'crewai[embeddings]'`
- If using embedchain or other tools: `pip install 'crewai[tools]'`
2. **Failed building wheel for tiktoken**
- Ensure Rust compiler is installed (see installation steps above)
- For Windows: Verify Visual C++ Build Tools are installed
- Try upgrading pip: `pip install --upgrade pip`
- If issues persist, use a pre-built wheel: `pip install tiktoken --prefer-binary`
### 2. Setting Up Your Crew with the YAML Configuration
To create a new CrewAI project, run the following CLI (Command Line Interface) command:
@@ -313,16 +264,13 @@ In addition to the sequential process, you can use the hierarchical process, whi
## Key Features
**Note**: CrewAI is a standalone framework built from the ground up, without dependencies on Langchain or other agent frameworks.
- **Deep Customization**: Build sophisticated agents with full control over the system - from overriding inner prompts to accessing low-level APIs. Customize roles, goals, tools, and behaviors while maintaining clean abstractions.
- **Autonomous Inter-Agent Delegation**: Agents can autonomously delegate tasks and inquire amongst themselves, enabling complex problem-solving in real-world scenarios.
- **Flexible Task Management**: Define and customize tasks with granular control, from simple operations to complex multi-step processes.
- **Production-Grade Architecture**: Support for both high-level abstractions and low-level customization, with robust error handling and state management.
- **Predictable Results**: Ensure consistent, accurate outputs through programmatic guardrails, agent training capabilities, and flow-based execution control. See our [documentation on guardrails](https://docs.crewai.com/how-to/guardrails/) for implementation details.
- **Model Flexibility**: Run your crew using OpenAI or open source models with production-ready integrations. See [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-Connections/) for detailed configuration options.
- **Event-Driven Flows**: Build complex, real-world workflows with precise control over execution paths, state management, and conditional logic.
- **Process Orchestration**: Achieve any workflow pattern through flows - from simple sequential and hierarchical processes to complex, custom orchestration patterns with conditional branching and parallel execution.
- **Role-Based Agent Design**: Customize agents with specific roles, goals, and tools.
- **Autonomous Inter-Agent Delegation**: Agents can autonomously delegate tasks and inquire amongst themselves, enhancing problem-solving efficiency.
- **Flexible Task Management**: Define tasks with customizable tools and assign them to agents dynamically.
- **Processes Driven**: Currently only supports `sequential` task execution and `hierarchical` processes, but more complex processes like consensual and autonomous are being worked on.
- **Save output as file**: Save the output of individual tasks as a file, so you can use it later.
- **Parse output as Pydantic or Json**: Parse the output of individual tasks as a Pydantic model or as a Json if you want to.
- **Works with Open Source Models**: Run your crew using Open AI or open source models refer to the [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-Connections/) page for details on configuring your agents' connections to models, even ones running locally!
![CrewAI Mind Map](./docs/crewAI-mindmap.png "CrewAI Mind Map")
@@ -357,98 +305,6 @@ You can test different real life examples of AI crews in the [CrewAI-examples re
[![Stock Analysis](https://img.youtube.com/vi/e0Uj4yWdaAg/maxresdefault.jpg)](https://www.youtube.com/watch?v=e0Uj4yWdaAg "Stock Analysis")
### Using Crews and Flows Together
CrewAI's power truly shines when combining Crews with Flows to create sophisticated automation pipelines. Here's how you can orchestrate multiple Crews within a Flow:
```python
from crewai.flow.flow import Flow, listen, start, router
from crewai import Crew, Agent, Task
from pydantic import BaseModel
# Define structured state for precise control
class MarketState(BaseModel):
sentiment: str = "neutral"
confidence: float = 0.0
recommendations: list = []
class AdvancedAnalysisFlow(Flow[MarketState]):
@start()
def fetch_market_data(self):
# Demonstrate low-level control with structured state
self.state.sentiment = "analyzing"
return {"sector": "tech", "timeframe": "1W"} # These parameters match the task description template
@listen(fetch_market_data)
def analyze_with_crew(self, market_data):
# Show crew agency through specialized roles
analyst = Agent(
role="Senior Market Analyst",
goal="Conduct deep market analysis with expert insight",
backstory="You're a veteran analyst known for identifying subtle market patterns"
)
researcher = Agent(
role="Data Researcher",
goal="Gather and validate supporting market data",
backstory="You excel at finding and correlating multiple data sources"
)
analysis_task = Task(
description="Analyze {sector} sector data for the past {timeframe}",
expected_output="Detailed market analysis with confidence score",
agent=analyst
)
research_task = Task(
description="Find supporting data to validate the analysis",
expected_output="Corroborating evidence and potential contradictions",
agent=researcher
)
# Demonstrate crew autonomy
analysis_crew = Crew(
agents=[analyst, researcher],
tasks=[analysis_task, research_task],
process=Process.sequential,
verbose=True
)
return analysis_crew.kickoff(inputs=market_data) # Pass market_data as named inputs
@router(analyze_with_crew)
def determine_next_steps(self):
# Show flow control with conditional routing
if self.state.confidence > 0.8:
return "high_confidence"
elif self.state.confidence > 0.5:
return "medium_confidence"
return "low_confidence"
@listen("high_confidence")
def execute_strategy(self):
# Demonstrate complex decision making
strategy_crew = Crew(
agents=[
Agent(role="Strategy Expert",
goal="Develop optimal market strategy")
],
tasks=[
Task(description="Create detailed strategy based on analysis",
expected_output="Step-by-step action plan")
]
)
return strategy_crew.kickoff()
@listen("medium_confidence", "low_confidence")
def request_additional_analysis(self):
self.state.recommendations.append("Gather more data")
return "Additional analysis required"
```
This example demonstrates how to:
1. Use Python code for basic data operations
2. Create and execute Crews as steps in your workflow
3. Use Flow decorators to manage the sequence of operations
4. Implement conditional branching based on Crew results
## Connecting Your Crew to a Model
CrewAI supports using various LLMs through a variety of connection options. By default your agents will use the OpenAI API when querying the model. However, there are several other ways to allow your agents to connect to models. For example, you can configure your agents to use a local model via the Ollama tool.
@@ -457,13 +313,9 @@ Please refer to the [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-
## How CrewAI Compares
**CrewAI's Advantage**: CrewAI combines autonomous agent intelligence with precise workflow control through its unique Crews and Flows architecture. The framework excels at both high-level orchestration and low-level customization, enabling complex, production-grade systems with granular control.
**CrewAI's Advantage**: CrewAI is built with production in mind. It offers the flexibility of Autogen's conversational agents and the structured process approach of ChatDev, but without the rigidity. CrewAI's processes are designed to be dynamic and adaptable, fitting seamlessly into both development and production workflows.
- **LangGraph**: While LangGraph provides a foundation for building agent workflows, its approach requires significant boilerplate code and complex state management patterns. The framework's tight coupling with LangChain can limit flexibility when implementing custom agent behaviors or integrating with external systems.
*P.S. CrewAI demonstrates significant performance advantages over LangGraph, executing 5.76x faster in certain cases like this QA task example ([see comparison](https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/QA%20Agent)) while achieving higher evaluation scores with faster completion times in certain coding tasks, like in this example ([detailed analysis](https://github.com/crewAIInc/crewAI-examples/blob/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/Coding%20Assistant/coding_assistant_eval.ipynb)).*
- **Autogen**: While Autogen excels at creating conversational agents capable of working together, it lacks an inherent concept of process. In Autogen, orchestrating agents' interactions requires additional programming, which can become complex and cumbersome as the scale of tasks grows.
- **Autogen**: While Autogen does good in creating conversational agents capable of working together, it lacks an inherent concept of process. In Autogen, orchestrating agents' interactions requires additional programming, which can become complex and cumbersome as the scale of tasks grows.
- **ChatDev**: ChatDev introduced the idea of processes into the realm of AI agents, but its implementation is quite rigid. Customizations in ChatDev are limited and not geared towards production environments, which can hinder scalability and flexibility in real-world applications.
@@ -588,8 +440,5 @@ A: CrewAI uses anonymous telemetry to collect usage data for improvement purpose
### Q: Where can I find examples of CrewAI in action?
A: You can find various real-life examples in the [CrewAI-examples repository](https://github.com/crewAIInc/crewAI-examples), including trip planners, stock analysis tools, and more.
### Q: What is the difference between Crews and Flows?
A: Crews and Flows serve different but complementary purposes in CrewAI. Crews are teams of AI agents working together to accomplish specific tasks through role-based collaboration, delivering accurate and predictable results. Flows, on the other hand, are event-driven workflows that can orchestrate both Crews and regular Python code, allowing you to build complex automation pipelines with secure state management and conditional execution paths.
### Q: How can I contribute to CrewAI?
A: Contributions are welcome! You can fork the repository, create a new branch for your feature, add your improvement, and send a pull request. Check the Contribution section in the README for more details.

View File

@@ -171,58 +171,6 @@ crewai reset-memories --knowledge
This is useful when you've updated your knowledge sources and want to ensure that the agents are using the most recent information.
## Agent-Specific Knowledge
While knowledge can be provided at the crew level using `crew.knowledge_sources`, individual agents can also have their own knowledge sources using the `knowledge_sources` parameter:
```python Code
from crewai import Agent, Task, Crew
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
# Create agent-specific knowledge about a product
product_specs = StringKnowledgeSource(
content="""The XPS 13 laptop features:
- 13.4-inch 4K display
- Intel Core i7 processor
- 16GB RAM
- 512GB SSD storage
- 12-hour battery life""",
metadata={"category": "product_specs"}
)
# Create a support agent with product knowledge
support_agent = Agent(
role="Technical Support Specialist",
goal="Provide accurate product information and support.",
backstory="You are an expert on our laptop products and specifications.",
knowledge_sources=[product_specs] # Agent-specific knowledge
)
# Create a task that requires product knowledge
support_task = Task(
description="Answer this customer question: {question}",
agent=support_agent
)
# Create and run the crew
crew = Crew(
agents=[support_agent],
tasks=[support_task]
)
# Get answer about the laptop's specifications
result = crew.kickoff(
inputs={"question": "What is the storage capacity of the XPS 13?"}
)
```
<Info>
Benefits of agent-specific knowledge:
- Give agents specialized information for their roles
- Maintain separation of concerns between agents
- Combine with crew-level knowledge for layered information access
</Info>
## Custom Knowledge Sources
CrewAI allows you to create custom knowledge sources for any type of data by extending the `BaseKnowledgeSource` class. Let's create a practical example that fetches and processes space news articles.

View File

@@ -8,38 +8,27 @@ authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
dependencies = [
# Core Dependencies
"pydantic>=2.4.2",
"openai>=1.13.3",
"litellm>=1.44.22",
"instructor>=1.3.3",
# Text Processing
"pdfplumber>=0.11.4",
"regex>=2024.9.11",
# Telemetry and Monitoring
"opentelemetry-api>=1.22.0",
"opentelemetry-sdk>=1.22.0",
"opentelemetry-exporter-otlp-proto-http>=1.22.0",
# Data Handling
"chromadb>=0.5.23",
"openpyxl>=3.1.5",
"pyvis>=0.3.2",
# Authentication and Security
"auth0-python>=4.7.1",
"python-dotenv>=1.0.0",
# Configuration and Utils
"instructor>=1.3.3",
"regex>=2024.9.11",
"click>=8.1.7",
"python-dotenv>=1.0.0",
"appdirs>=1.4.4",
"jsonref>=1.1.0",
"json-repair>=0.25.2",
"auth0-python>=4.7.1",
"litellm>=1.44.22",
"pyvis>=0.3.2",
"uv>=0.4.25",
"tomli-w>=1.1.0",
"tomli>=2.0.2",
"chromadb>=0.5.23",
"pdfplumber>=0.11.4",
"openpyxl>=3.1.5",
"blinker>=1.9.0",
]
@@ -50,9 +39,6 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools>=0.17.0"]
embeddings = [
"tiktoken~=0.7.0"
]
agentops = ["agentops>=0.3.0"]
fastembed = ["fastembed>=0.4.1"]
pdfplumber = [

View File

@@ -1,141 +0,0 @@
"""Core utility functions for Flow class operations.
This module contains utility functions that are specifically designed to work
with the Flow class and require direct access to Flow class internals. These
utilities are separated from general-purpose utilities to maintain a clean
dependency structure and avoid circular imports.
Functions in this module are core to Flow functionality and are not related
to visualization or other optional features.
"""
import ast
import inspect
import textwrap
from typing import Any, Callable, Dict, List, Optional, Set, Union
from pydantic import BaseModel
def get_possible_return_constants(function: Callable[..., Any]) -> Optional[List[str]]:
"""Extract possible string return values from a function by analyzing its source code.
Analyzes the function's source code using AST to identify string constants that
could be returned, including strings stored in dictionaries and direct returns.
Args:
function: The function to analyze for possible return values
Returns:
list[str] | None: List of possible string return values, or None if:
- Source code cannot be retrieved
- Source code has syntax/indentation errors
- No string return values are found
Raises:
OSError: If source code cannot be retrieved
IndentationError: If source code has invalid indentation
SyntaxError: If source code has syntax errors
Example:
>>> def get_status():
... paths = {"success": "completed", "error": "failed"}
... return paths["success"]
>>> get_possible_return_constants(get_status)
['completed', 'failed']
"""
try:
source = inspect.getsource(function)
except OSError:
# Can't get source code
return None
except Exception as e:
print(f"Error retrieving source code for function {function.__name__}: {e}")
return None
try:
# Remove leading indentation
source = textwrap.dedent(source)
# Parse the source code into an AST
code_ast = ast.parse(source)
except IndentationError as e:
print(f"IndentationError while parsing source code of {function.__name__}: {e}")
print(f"Source code:\n{source}")
return None
except SyntaxError as e:
print(f"SyntaxError while parsing source code of {function.__name__}: {e}")
print(f"Source code:\n{source}")
return None
except Exception as e:
print(f"Unexpected error while parsing source code of {function.__name__}: {e}")
print(f"Source code:\n{source}")
return None
return_values = set()
dict_definitions = {}
class DictionaryAssignmentVisitor(ast.NodeVisitor):
def visit_Assign(self, node):
# Check if this assignment is assigning a dictionary literal to a variable
if isinstance(node.value, ast.Dict) and len(node.targets) == 1:
target = node.targets[0]
if isinstance(target, ast.Name):
var_name = target.id
dict_values = []
# Extract string values from the dictionary
for val in node.value.values:
if isinstance(val, ast.Constant) and isinstance(val.value, str):
dict_values.append(val.value)
# If non-string, skip or just ignore
if dict_values:
dict_definitions[var_name] = dict_values
self.generic_visit(node)
class ReturnVisitor(ast.NodeVisitor):
def visit_Return(self, node):
# Direct string return
if isinstance(node.value, ast.Constant) and isinstance(
node.value.value, str
):
return_values.add(node.value.value)
# Dictionary-based return, like return paths[result]
elif isinstance(node.value, ast.Subscript):
# Check if we're subscripting a known dictionary variable
if isinstance(node.value.value, ast.Name):
var_name = node.value.value.id
if var_name in dict_definitions:
# Add all possible dictionary values
for v in dict_definitions[var_name]:
return_values.add(v)
self.generic_visit(node)
# First pass: identify dictionary assignments
DictionaryAssignmentVisitor().visit(code_ast)
# Second pass: identify returns
ReturnVisitor().visit(code_ast)
return list(return_values) if return_values else None
def is_ancestor(node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]) -> bool:
"""Check if one node is an ancestor of another in the flow graph.
Args:
node: Target node to check ancestors for
ancestor_candidate: Node to check if it's an ancestor
ancestors: Dictionary mapping nodes to their ancestor sets
Returns:
bool: True if ancestor_candidate is an ancestor of node
Raises:
TypeError: If any argument has an invalid type
"""
if not isinstance(node, str):
raise TypeError("Argument 'node' must be a string")
if not isinstance(ancestor_candidate, str):
raise TypeError("Argument 'ancestor_candidate' must be a string")
if not isinstance(ancestors, dict):
raise TypeError("Argument 'ancestors' must be a dictionary")
return ancestor_candidate in ancestors.get(node, set())

View File

@@ -17,43 +17,20 @@ from typing import (
from blinker import Signal
from pydantic import BaseModel, ValidationError
from crewai.flow.core_flow_utils import get_possible_return_constants
from crewai.flow.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
"""Marks a method as a flow starting point, optionally triggered by other methods.
Args:
condition: The condition that triggers this method. Can be:
- str: Name of the triggering method
- dict: Dictionary with 'type' and 'methods' keys for complex conditions
- Callable: A function reference
- None: No trigger condition (default)
Returns:
Callable: The decorated function that will serve as a flow starting point.
Raises:
ValueError: If the condition format is invalid.
Example:
>>> @start() # No condition
>>> def begin_flow():
>>> pass
>>>
>>> @start("method_name") # Triggered by specific method
>>> def conditional_start():
>>> pass
"""
def start(condition=None):
def decorator(func):
func.__is_start_method__ = True
if condition is not None:
@@ -79,30 +56,7 @@ def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
return decorator
def listen(condition: Union[str, dict, Callable]) -> Callable:
"""Marks a method to execute when specified conditions/methods complete.
Args:
condition: The condition that triggers this method. Can be:
- str: Name of the triggering method
- dict: Dictionary with 'type' and 'methods' keys for complex conditions
- Callable: A function reference
Returns:
Callable: The decorated function that will execute when conditions are met.
Raises:
ValueError: If the condition format is invalid.
Example:
>>> @listen("start_method") # Listen to single method
>>> def on_start():
>>> pass
>>>
>>> @listen(and_("method1", "method2")) # Listen with AND condition
>>> def on_both_complete():
>>> pass
"""
def listen(condition):
def decorator(func):
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
@@ -126,33 +80,10 @@ def listen(condition: Union[str, dict, Callable]) -> Callable:
return decorator
def router(condition: Union[str, dict, Callable]) -> Callable:
"""Marks a method as a router to direct flow based on its return value.
A router method can return different string values that trigger different
subsequent methods, allowing for dynamic flow control.
Args:
condition: The condition that triggers this router. Can be:
- str: Name of the triggering method
- dict: Dictionary with 'type' and 'methods' keys for complex conditions
- Callable: A function reference
Returns:
Callable: The decorated function that will serve as a router.
Raises:
ValueError: If the condition format is invalid.
Example:
>>> @router("process_data")
>>> def route_result(result):
>>> if result.success:
>>> return "handle_success"
>>> return "handle_error"
"""
def router(condition):
def decorator(func):
func.__is_router__ = True
# Handle conditions like listen/start
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
@@ -175,27 +106,7 @@ def router(condition: Union[str, dict, Callable]) -> Callable:
return decorator
def or_(*conditions: Union[str, dict, Callable]) -> dict:
"""Combines multiple conditions with OR logic for flow control.
Args:
*conditions: Variable number of conditions. Each can be:
- str: Name of a method
- dict: Dictionary with 'type' and 'methods' keys
- Callable: A function reference
Returns:
dict: A dictionary with 'type': 'OR' and 'methods' list.
Raises:
ValueError: If any condition is invalid.
Example:
>>> @listen(or_("method1", "method2"))
>>> def on_either():
>>> # Executes when either method1 OR method2 completes
>>> pass
"""
def or_(*conditions):
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
@@ -209,27 +120,7 @@ def or_(*conditions: Union[str, dict, Callable]) -> dict:
return {"type": "OR", "methods": methods}
def and_(*conditions: Union[str, dict, Callable]) -> dict:
"""Combines multiple conditions with AND logic for flow control.
Args:
*conditions: Variable number of conditions. Each can be:
- str: Name of a method
- dict: Dictionary with 'type' and 'methods' keys
- Callable: A function reference
Returns:
dict: A dictionary with 'type': 'AND' and 'methods' list.
Raises:
ValueError: If any condition is invalid.
Example:
>>> @listen(and_("method1", "method2"))
>>> def on_both():
>>> # Executes when BOTH method1 AND method2 complete
>>> pass
"""
def and_(*conditions):
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
@@ -288,22 +179,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
event_emitter = Signal("event_emitter")
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
"""Create a generic version of Flow with specified state type.
Args:
cls: The Flow class
item: The type parameter for the flow's state
Returns:
Type["Flow"]: A new Flow class with the specified state type
Example:
>>> class MyState(BaseModel):
>>> value: int
>>>
>>> class MyFlow(Flow[MyState]):
>>> pass
"""
class _FlowGeneric(cls): # type: ignore
_initial_state_T = item # type: ignore
@@ -311,23 +186,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
return _FlowGeneric
def __init__(self) -> None:
"""Initialize a new Flow instance.
Sets up internal state tracking, method registration, and telemetry.
The flow's methods are automatically discovered and registered during initialization.
Attributes initialized:
_methods: Dictionary mapping method names to their callable objects
_state: The flow's state object of type T
_method_execution_counts: Tracks how many times each method has executed
_pending_and_listeners: Tracks methods waiting for AND conditions
_method_outputs: List of all outputs from executed methods
"""
self._methods: Dict[str, Callable] = {}
self._state: T = self._create_initial_state()
self._method_execution_counts: Dict[str, int] = {}
self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = []
self._method_outputs: List[Any] = [] # List to store all method outputs
self._telemetry.flow_creation_span(self.__class__.__name__)
@@ -338,20 +201,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._methods[method_name] = getattr(self, method_name)
def _create_initial_state(self) -> T:
"""Create the initial state for the flow.
The state is created based on the following priority:
1. If initial_state is None and _initial_state_T exists (generic type), use that
2. If initial_state is None, return empty dict
3. If initial_state is a type, instantiate it
4. Otherwise, use initial_state as-is
Returns:
T: The initial state object of type T
Note:
The type T can be either a Pydantic BaseModel or a dictionary.
"""
if self.initial_state is None and hasattr(self, "_initial_state_T"):
return self._initial_state_T() # type: ignore
if self.initial_state is None:
@@ -363,21 +212,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
@property
def state(self) -> T:
"""Get the current state of the flow.
Returns:
T: The current state object, either a Pydantic model or dictionary
"""
return self._state
@property
def method_outputs(self) -> List[Any]:
"""Get the list of all outputs from executed methods.
Returns:
List[Any]: A list containing the output values from all executed flow methods,
in order of execution.
"""
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
@@ -467,23 +306,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
return result
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
"""Execute all listener methods triggered by a completed method.
This method handles both router and non-router listeners in a specific order:
1. First executes all triggered router methods sequentially until no more routers
are triggered
2. Then executes all regular listeners in parallel
Args:
trigger_method: The name of the method that completed execution
result: The result value from the triggering method
Note:
Router methods are executed sequentially to ensure proper flow control,
while regular listeners are executed concurrently for better performance.
This provides fine-grained control over the execution flow while
maintaining efficiency.
"""
# First, handle routers repeatedly until no router triggers anymore
while True:
routers_triggered = self._find_triggered_methods(
@@ -513,27 +335,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
def _find_triggered_methods(
self, trigger_method: str, router_only: bool
) -> List[str]:
"""Find all methods that should be triggered based on completed method and type.
Provides precise control over method triggering by handling both OR and AND
conditions separately for router and non-router methods.
Args:
trigger_method: The name of the method that completed execution
router_only: If True, only find router methods; if False, only regular
listeners
Returns:
List[str]: Names of methods that should be executed next
Note:
This method implements sophisticated flow control by:
1. Filtering methods based on their router/non-router status
2. Handling OR conditions for immediate triggering
3. Managing AND conditions with state tracking for complex dependencies
This ensures predictable and consistent execution order in complex flows.
"""
triggered = []
for listener_name, (condition_type, methods) in self._listeners.items():
is_router = listener_name in self._routers
@@ -562,27 +363,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
return triggered
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
"""Execute a single listener method with precise parameter handling and error tracking.
Provides fine-grained control over method execution through:
1. Automatic parameter inspection to determine if the method accepts results
2. Event emission for execution tracking
3. Comprehensive error handling
4. Recursive listener execution
Args:
listener_name: The name of the listener method to execute
result: The result from the triggering method, passed to the listener
if its signature accepts parameters
Note:
This method ensures precise execution control by:
- Inspecting method signatures to handle parameters correctly
- Emitting events for execution tracking
- Providing comprehensive error handling
- Supporting both parameterized and parameter-less methods
- Maintaining execution chain through recursive listener calls
"""
try:
method = self._methods[listener_name]
@@ -626,32 +406,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
traceback.print_exc()
def plot(self, *args, **kwargs):
"""Generate an interactive visualization of the flow's execution graph.
Creates a detailed HTML visualization showing the relationships between
methods, including start points, listeners, routers, and their
connections. Includes telemetry tracking for flow analysis.
Args:
*args: Variable length argument list passed to plot_flow
**kwargs: Arbitrary keyword arguments passed to plot_flow
Note:
The visualization provides:
- Clear representation of method relationships
- Visual distinction between different method types
- Interactive exploration capabilities
- Execution path tracing
- Telemetry tracking for flow analysis
Example:
>>> flow = MyFlow()
>>> flow.plot("my_workflow") # Creates my_workflow.html
"""
from crewai.flow.flow_visualizer import plot_flow
def plot(self, filename: str = "crewai_flow") -> None:
self._telemetry.flow_plotting_span(
self.__class__.__name__, list(self._methods.keys())
)
return plot_flow(self, *args, **kwargs)
plot_flow(self, filename)

View File

@@ -1,240 +0,0 @@
"""Utility functions for Flow visualization.
This module contains utility functions specifically designed for visualizing
Flow graphs and calculating layout information. These utilities are separated
from general-purpose utilities to maintain a clean dependency structure.
"""
from typing import TYPE_CHECKING, Any, Dict, List, Set
if TYPE_CHECKING:
from crewai.flow.flow import Flow
def calculate_node_levels(flow: Flow[Any]) -> Dict[str, int]:
"""Calculate the hierarchical level of each node in the flow graph.
Uses breadth-first traversal to assign levels to nodes, starting with
start methods at level 0. Handles both OR and AND conditions for listeners,
and considers router paths when calculating levels.
Args:
flow: Flow instance containing methods, listeners, and router configurations
Returns:
dict[str, int]: Dictionary mapping method names to their hierarchical levels,
where level 0 contains start methods and each subsequent level contains
methods triggered by the previous level
Example:
>>> flow = Flow()
>>> @flow.start
... def start(): pass
>>> @flow.on("start")
... def second(): pass
>>> calculate_node_levels(flow)
{'start': 0, 'second': 1}
"""
levels: Dict[str, int] = {}
queue: List[str] = []
visited: Set[str] = set()
pending_and_listeners: Dict[str, Set[str]] = {}
# Make all start methods at level 0
for method_name, method in flow._methods.items():
if hasattr(method, "__is_start_method__"):
levels[method_name] = 0
queue.append(method_name)
# Breadth-first traversal to assign levels
while queue:
current = queue.pop(0)
current_level = levels[current]
visited.add(current)
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
if condition_type == "OR":
if current in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
elif condition_type == "AND":
if listener_name not in pending_and_listeners:
pending_and_listeners[listener_name] = set()
if current in trigger_methods:
pending_and_listeners[listener_name].add(current)
if set(trigger_methods) == pending_and_listeners[listener_name]:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
# Handle router connections
if current in flow._routers:
router_method_name = current
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
return levels
def count_outgoing_edges(flow: Flow[Any]) -> Dict[str, int]:
"""Count the number of outgoing edges for each node in the flow graph.
An outgoing edge represents a connection from a method to a listener
that it triggers. This is useful for visualization and analysis of
flow structure.
Args:
flow: Flow instance containing methods and their connections
Returns:
dict[str, int]: Dictionary mapping method names to their number
of outgoing connections
"""
counts: Dict[str, int] = {}
for method_name in flow._methods:
counts[method_name] = 0
for method_name in flow._listeners:
_, trigger_methods = flow._listeners[method_name]
for trigger in trigger_methods:
if trigger in flow._methods:
counts[trigger] += 1
return counts
def build_ancestor_dict(flow: Flow[Any]) -> Dict[str, Set[str]]:
"""Build a dictionary mapping each node to its set of ancestor nodes.
Uses depth-first search to identify all ancestors (direct and indirect
trigger methods) for each node in the flow graph. Handles both regular
listeners and router paths.
Args:
flow: Flow instance containing methods and their relationships
Returns:
dict[str, set[str]]: Dictionary mapping each method name to a set
of its ancestor method names
"""
ancestors: Dict[str, Set[str]] = {node: set() for node in flow._methods}
visited: Set[str] = set()
for node in flow._methods:
if node not in visited:
dfs_ancestors(node, ancestors, visited, flow)
return ancestors
def dfs_ancestors(node: str, ancestors: Dict[str, Set[str]],
visited: Set[str], flow: Flow[Any]) -> None:
"""Perform depth-first search to populate the ancestors dictionary.
Helper function for build_ancestor_dict that recursively traverses
the flow graph to identify ancestors of each node.
Args:
node: Current node being processed
ancestors: Dictionary mapping nodes to their ancestor sets
visited: Set of already visited nodes
flow: Flow instance containing the graph structure
"""
if node in visited:
return
visited.add(node)
# Handle regular listeners
for listener_name, (_, trigger_methods) in flow._listeners.items():
if node in trigger_methods:
ancestors[listener_name].add(node)
ancestors[listener_name].update(ancestors[node])
dfs_ancestors(listener_name, ancestors, visited, flow)
# Handle router methods separately
if node in flow._routers:
router_method_name = node
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (_, trigger_methods) in flow._listeners.items():
if path in trigger_methods:
# Only propagate the ancestors of the router method, not the router method itself
ancestors[listener_name].update(ancestors[node])
dfs_ancestors(listener_name, ancestors, visited, flow)
def build_parent_children_dict(flow: Flow[Any]) -> Dict[str, List[str]]:
"""Build a dictionary mapping each node to its list of child nodes.
Maps both regular trigger methods to their listeners and router
methods to their path listeners. Useful for visualization and
traversal of the flow graph structure.
Args:
flow: Flow instance containing methods and their relationships
Returns:
dict[str, list[str]]: Dictionary mapping each method name to a
sorted list of its child method names
"""
parent_children: Dict[str, List[str]] = {}
# Map listeners to their trigger methods
for listener_name, (_, trigger_methods) in flow._listeners.items():
for trigger in trigger_methods:
if trigger not in parent_children:
parent_children[trigger] = []
if listener_name not in parent_children[trigger]:
parent_children[trigger].append(listener_name)
# Map router methods to their paths and to listeners
for router_method_name, paths in flow._router_paths.items():
for path in paths:
# Map router method to listeners of each path
for listener_name, (_, trigger_methods) in flow._listeners.items():
if path in trigger_methods:
if router_method_name not in parent_children:
parent_children[router_method_name] = []
if listener_name not in parent_children[router_method_name]:
parent_children[router_method_name].append(listener_name)
return parent_children
def get_child_index(parent: str, child: str,
parent_children: Dict[str, List[str]]) -> int:
"""Get the index of a child node in its parent's sorted children list.
Args:
parent: Parent node name
child: Child node name to find index for
parent_children: Dictionary mapping parents to their children lists
Returns:
int: Zero-based index of the child in parent's sorted children list
Raises:
ValueError: If child is not found in parent's children list
"""
children = parent_children.get(parent, [])
children.sort()
return children.index(child)

View File

@@ -1,15 +1,13 @@
# flow_visualizer.py
import os
from pathlib import Path
from pyvis.network import Network
from crewai.flow.config import COLORS, NODE_STYLES
from crewai.flow.flow_visual_utils import calculate_node_levels
from crewai.flow.html_template_handler import HTMLTemplateHandler
from crewai.flow.legend_generator import generate_legend_items_html, get_legend_items
from crewai.flow.path_utils import safe_path_join, validate_file_path
from crewai.flow.utils import calculate_node_levels
from crewai.flow.visualization_utils import (
add_edges,
add_nodes_to_network,
@@ -18,30 +16,12 @@ from crewai.flow.visualization_utils import (
class FlowPlot:
"""Handles the creation and rendering of flow visualization diagrams."""
def __init__(self, flow):
"""Initialize flow plot with flow instance and styling configuration.
Args:
flow: A Flow instance with required attributes for visualization
Raises:
ValueError: If flow object is invalid or missing required attributes
"""
if not hasattr(flow, '_methods'):
raise ValueError("Invalid flow object: Missing '_methods' attribute")
if not hasattr(flow, '_start_methods'):
raise ValueError("Invalid flow object: Missing '_start_methods' attribute")
if not hasattr(flow, '_listeners'):
raise ValueError("Invalid flow object: Missing '_listeners' attribute")
self.flow = flow
self.colors = COLORS
self.node_styles = NODE_STYLES
def plot(self, filename):
"""Generate and save interactive flow visualization to HTML file."""
net = Network(
directed=True,
height="750px",
@@ -66,29 +46,30 @@ class FlowPlot:
"""
)
# Calculate levels for nodes
node_levels = calculate_node_levels(self.flow)
# Compute positions
node_positions = compute_positions(self.flow, node_levels)
# Add nodes to the network
add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
# Add edges to the network
add_edges(net, self.flow, node_positions, self.colors)
network_html = net.generate_html()
final_html_content = self._generate_final_html(network_html)
try:
# Ensure the output path is safe
output_dir = os.getcwd()
output_path = safe_path_join(output_dir, f"{filename}.html")
with open(output_path, "w", encoding="utf-8") as f:
f.write(final_html_content)
print(f"Plot saved as {output_path}")
except (IOError, ValueError) as e:
raise IOError(f"Failed to save flow visualization: {str(e)}")
# Save the final HTML content to the file
with open(f"{filename}.html", "w", encoding="utf-8") as f:
f.write(final_html_content)
print(f"Plot saved as {filename}.html")
self._cleanup_pyvis_lib()
def _generate_final_html(self, network_html):
"""Generate final HTML content with network visualization and legend."""
# Extract just the body content from the generated HTML
current_dir = os.path.dirname(__file__)
template_path = os.path.join(
current_dir, "assets", "crewai_flow_visual_template.html"
@@ -98,6 +79,7 @@ class FlowPlot:
html_handler = HTMLTemplateHandler(template_path, logo_path)
network_body = html_handler.extract_body_content(network_html)
# Generate the legend items HTML
legend_items = get_legend_items(self.colors)
legend_items_html = generate_legend_items_html(legend_items)
final_html_content = html_handler.generate_final_html(
@@ -106,17 +88,17 @@ class FlowPlot:
return final_html_content
def _cleanup_pyvis_lib(self):
"""Clean up temporary files generated by pyvis library."""
# Clean up the generated lib folder
lib_folder = os.path.join(os.getcwd(), "lib")
try:
if os.path.exists(lib_folder) and os.path.isdir(lib_folder):
import shutil
shutil.rmtree(lib_folder)
except Exception as e:
print(f"Error cleaning up {lib_folder}: {e}")
def plot_flow(flow, filename="flow_plot"):
"""Create and save a visualization of the given flow."""
visualizer = FlowPlot(flow)
visualizer.plot(filename)

View File

@@ -1,107 +1,28 @@
import base64
import os
import re
from pathlib import Path
from crewai.flow.path_utils import safe_path_join, validate_file_path
class HTMLTemplateHandler:
"""Handles HTML template processing and generation for flow visualization diagrams."""
def __init__(self, template_path, logo_path):
"""Initialize template handler with template and logo file paths.
Args:
template_path: Path to the HTML template file
logo_path: Path to the logo SVG file
Raises:
ValueError: If template_path or logo_path is invalid or files don't exist
"""
try:
self.template_path = validate_file_path(template_path)
self.logo_path = validate_file_path(logo_path)
except (ValueError, TypeError) as e:
raise ValueError(f"Invalid file path: {str(e)}")
self.template_path = template_path
self.logo_path = logo_path
def read_template(self):
"""Read and return the HTML template file contents.
Returns:
str: The contents of the template file
Raises:
IOError: If template file cannot be read
"""
try:
with open(self.template_path, "r", encoding="utf-8") as f:
return f.read()
except IOError as e:
raise IOError(f"Failed to read template file {self.template_path}: {str(e)}")
with open(self.template_path, "r", encoding="utf-8") as f:
return f.read()
def encode_logo(self):
"""Convert the logo SVG file to base64 encoded string.
Returns:
str: Base64 encoded logo data
Raises:
IOError: If logo file cannot be read
ValueError: If logo data cannot be encoded
"""
try:
with open(self.logo_path, "rb") as logo_file:
logo_svg_data = logo_file.read()
try:
return base64.b64encode(logo_svg_data).decode("utf-8")
except Exception as e:
raise ValueError(f"Failed to encode logo data: {str(e)}")
except IOError as e:
raise IOError(f"Failed to read logo file {self.logo_path}: {str(e)}")
with open(self.logo_path, "rb") as logo_file:
logo_svg_data = logo_file.read()
return base64.b64encode(logo_svg_data).decode("utf-8")
def extract_body_content(self, html):
"""Extract and return content between body tags from HTML string.
Args:
html: HTML string to extract body content from
Returns:
str: Content between body tags, or empty string if not found
Raises:
ValueError: If input HTML is invalid
"""
if not html or not isinstance(html, str):
raise ValueError("Input HTML must be a non-empty string")
match = re.search("<body.*?>(.*?)</body>", html, re.DOTALL)
return match.group(1) if match else ""
def generate_legend_items_html(self, legend_items):
"""Generate HTML markup for the legend items.
Args:
legend_items: List of dictionaries containing legend item properties
Returns:
str: Generated HTML markup for legend items
Raises:
ValueError: If legend_items is invalid or missing required properties
"""
if not isinstance(legend_items, list):
raise ValueError("legend_items must be a list")
legend_items_html = ""
for item in legend_items:
if not isinstance(item, dict):
raise ValueError("Each legend item must be a dictionary")
if "color" not in item:
raise ValueError("Each legend item must have a 'color' property")
if "label" not in item:
raise ValueError("Each legend item must have a 'label' property")
if "border" in item:
legend_items_html += f"""
<div class="legend-item">
@@ -127,42 +48,18 @@ class HTMLTemplateHandler:
return legend_items_html
def generate_final_html(self, network_body, legend_items_html, title="Flow Plot"):
"""Combine all components into final HTML document with network visualization.
Args:
network_body: HTML string containing network visualization
legend_items_html: HTML string containing legend items markup
title: Title for the visualization page (default: "Flow Plot")
Returns:
str: Complete HTML document with all components integrated
Raises:
ValueError: If any input parameters are invalid
IOError: If template or logo files cannot be read
"""
if not isinstance(network_body, str):
raise ValueError("network_body must be a string")
if not isinstance(legend_items_html, str):
raise ValueError("legend_items_html must be a string")
if not isinstance(title, str):
raise ValueError("title must be a string")
try:
html_template = self.read_template()
logo_svg_base64 = self.encode_logo()
html_template = self.read_template()
logo_svg_base64 = self.encode_logo()
final_html_content = html_template.replace("{{ title }}", title)
final_html_content = final_html_content.replace(
"{{ network_content }}", network_body
)
final_html_content = final_html_content.replace(
"{{ logo_svg_base64 }}", logo_svg_base64
)
final_html_content = final_html_content.replace(
"<!-- LEGEND_ITEMS_PLACEHOLDER -->", legend_items_html
)
final_html_content = html_template.replace("{{ title }}", title)
final_html_content = final_html_content.replace(
"{{ network_content }}", network_body
)
final_html_content = final_html_content.replace(
"{{ logo_svg_base64 }}", logo_svg_base64
)
final_html_content = final_html_content.replace(
"<!-- LEGEND_ITEMS_PLACEHOLDER -->", legend_items_html
)
return final_html_content
except Exception as e:
raise ValueError(f"Failed to generate final HTML: {str(e)}")
return final_html_content

View File

@@ -1,4 +1,3 @@
def get_legend_items(colors):
return [
{"label": "Start Method", "color": colors["start"]},

View File

@@ -1,123 +0,0 @@
"""Utilities for safe path handling in flow visualization.
This module provides a comprehensive set of utilities for secure path handling,
including path joining, validation, and normalization. It helps prevent common
security issues like directory traversal attacks while providing a consistent
interface for path operations.
"""
import os
from pathlib import Path
from typing import List, Optional, Union
def safe_path_join(base_dir: Union[str, Path], filename: str) -> str:
"""Safely join base directory with filename, preventing directory traversal.
Args:
base_dir: Base directory path
filename: Filename or path to join with base_dir
Returns:
str: Safely joined absolute path
Raises:
ValueError: If resulting path would escape base_dir or contains dangerous patterns
TypeError: If inputs are not strings or Path objects
OSError: If path resolution fails
"""
if not isinstance(base_dir, (str, Path)):
raise TypeError("base_dir must be a string or Path object")
if not isinstance(filename, str):
raise TypeError("filename must be a string")
# Check for dangerous patterns
dangerous_patterns = ['..', '~', '*', '?', '|', '>', '<', '$', '&', '`']
if any(pattern in filename for pattern in dangerous_patterns):
raise ValueError(f"Invalid filename: Contains dangerous pattern")
try:
base_path = Path(base_dir).resolve(strict=True)
full_path = Path(base_path, filename).resolve(strict=True)
if not str(full_path).startswith(str(base_path)):
raise ValueError(
f"Invalid path: {filename} would escape base directory {base_dir}"
)
return str(full_path)
except OSError as e:
raise OSError(f"Failed to resolve path: {str(e)}")
except Exception as e:
raise ValueError(f"Failed to process paths: {str(e)}")
def normalize_path(path: Union[str, Path]) -> str:
"""Normalize a path by resolving symlinks and removing redundant separators.
Args:
path: Path to normalize
Returns:
str: Normalized absolute path
Raises:
TypeError: If path is not a string or Path object
OSError: If path resolution fails
"""
if not isinstance(path, (str, Path)):
raise TypeError("path must be a string or Path object")
try:
return str(Path(path).resolve(strict=True))
except OSError as e:
raise OSError(f"Failed to normalize path: {str(e)}")
def validate_path_components(components: List[str]) -> None:
"""Validate path components for potentially dangerous patterns.
Args:
components: List of path components to validate
Raises:
TypeError: If components is not a list or contains non-string items
ValueError: If any component contains dangerous patterns
"""
if not isinstance(components, list):
raise TypeError("components must be a list")
dangerous_patterns = ['..', '~', '*', '?', '|', '>', '<', '$', '&', '`']
for component in components:
if not isinstance(component, str):
raise TypeError(f"Path component '{component}' must be a string")
if any(pattern in component for pattern in dangerous_patterns):
raise ValueError(f"Invalid path component '{component}': Contains dangerous pattern")
def validate_file_path(path: Union[str, Path], must_exist: bool = True) -> str:
"""Validate a file path for security and existence.
Args:
path: File path to validate
must_exist: Whether the file must exist (default: True)
Returns:
str: Validated absolute path
Raises:
ValueError: If path is invalid or file doesn't exist when required
TypeError: If path is not a string or Path object
"""
if not isinstance(path, (str, Path)):
raise TypeError("path must be a string or Path object")
try:
resolved_path = Path(path).resolve()
if must_exist and not resolved_path.is_file():
raise ValueError(f"File not found: {path}")
return str(resolved_path)
except Exception as e:
raise ValueError(f"Invalid file path {path}: {str(e)}")

View File

@@ -1,35 +1,220 @@
"""General utility functions for flow execution.
import ast
import inspect
import textwrap
This module has been deprecated. All functionality has been moved to:
- core_flow_utils.py: Core flow execution utilities
- flow_visual_utils.py: Visualization-related utilities
This module is kept as a temporary redirect to maintain backwards compatibility.
New code should import from the appropriate new modules directly.
"""
def get_possible_return_constants(function):
try:
source = inspect.getsource(function)
except OSError:
# Can't get source code
return None
except Exception as e:
print(f"Error retrieving source code for function {function.__name__}: {e}")
return None
from typing import Any, Dict, List, Optional, Set
try:
# Remove leading indentation
source = textwrap.dedent(source)
# Parse the source code into an AST
code_ast = ast.parse(source)
except IndentationError as e:
print(f"IndentationError while parsing source code of {function.__name__}: {e}")
print(f"Source code:\n{source}")
return None
except SyntaxError as e:
print(f"SyntaxError while parsing source code of {function.__name__}: {e}")
print(f"Source code:\n{source}")
return None
except Exception as e:
print(f"Unexpected error while parsing source code of {function.__name__}: {e}")
print(f"Source code:\n{source}")
return None
from .core_flow_utils import get_possible_return_constants, is_ancestor
from .flow_visual_utils import (
build_ancestor_dict,
build_parent_children_dict,
calculate_node_levels,
count_outgoing_edges,
dfs_ancestors,
get_child_index,
)
return_values = set()
dict_definitions = {}
# Re-export all functions for backwards compatibility
__all__ = [
'get_possible_return_constants',
'calculate_node_levels',
'count_outgoing_edges',
'build_ancestor_dict',
'dfs_ancestors',
'is_ancestor',
'build_parent_children_dict',
'get_child_index',
]
class DictionaryAssignmentVisitor(ast.NodeVisitor):
def visit_Assign(self, node):
# Check if this assignment is assigning a dictionary literal to a variable
if isinstance(node.value, ast.Dict) and len(node.targets) == 1:
target = node.targets[0]
if isinstance(target, ast.Name):
var_name = target.id
dict_values = []
# Extract string values from the dictionary
for val in node.value.values:
if isinstance(val, ast.Constant) and isinstance(val.value, str):
dict_values.append(val.value)
# If non-string, skip or just ignore
if dict_values:
dict_definitions[var_name] = dict_values
self.generic_visit(node)
# Function implementations have been moved to core_flow_utils.py and flow_visual_utils.py
class ReturnVisitor(ast.NodeVisitor):
def visit_Return(self, node):
# Direct string return
if isinstance(node.value, ast.Constant) and isinstance(
node.value.value, str
):
return_values.add(node.value.value)
# Dictionary-based return, like return paths[result]
elif isinstance(node.value, ast.Subscript):
# Check if we're subscripting a known dictionary variable
if isinstance(node.value.value, ast.Name):
var_name = node.value.value.id
if var_name in dict_definitions:
# Add all possible dictionary values
for v in dict_definitions[var_name]:
return_values.add(v)
self.generic_visit(node)
# First pass: identify dictionary assignments
DictionaryAssignmentVisitor().visit(code_ast)
# Second pass: identify returns
ReturnVisitor().visit(code_ast)
return list(return_values) if return_values else None
def calculate_node_levels(flow):
levels = {}
queue = []
visited = set()
pending_and_listeners = {}
# Make all start methods at level 0
for method_name, method in flow._methods.items():
if hasattr(method, "__is_start_method__"):
levels[method_name] = 0
queue.append(method_name)
# Breadth-first traversal to assign levels
while queue:
current = queue.pop(0)
current_level = levels[current]
visited.add(current)
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
if condition_type == "OR":
if current in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
elif condition_type == "AND":
if listener_name not in pending_and_listeners:
pending_and_listeners[listener_name] = set()
if current in trigger_methods:
pending_and_listeners[listener_name].add(current)
if set(trigger_methods) == pending_and_listeners[listener_name]:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
# Handle router connections
if current in flow._routers:
router_method_name = current
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
return levels
def count_outgoing_edges(flow):
counts = {}
for method_name in flow._methods:
counts[method_name] = 0
for method_name in flow._listeners:
_, trigger_methods = flow._listeners[method_name]
for trigger in trigger_methods:
if trigger in flow._methods:
counts[trigger] += 1
return counts
def build_ancestor_dict(flow):
ancestors = {node: set() for node in flow._methods}
visited = set()
for node in flow._methods:
if node not in visited:
dfs_ancestors(node, ancestors, visited, flow)
return ancestors
def dfs_ancestors(node, ancestors, visited, flow):
if node in visited:
return
visited.add(node)
# Handle regular listeners
for listener_name, (_, trigger_methods) in flow._listeners.items():
if node in trigger_methods:
ancestors[listener_name].add(node)
ancestors[listener_name].update(ancestors[node])
dfs_ancestors(listener_name, ancestors, visited, flow)
# Handle router methods separately
if node in flow._routers:
router_method_name = node
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (_, trigger_methods) in flow._listeners.items():
if path in trigger_methods:
# Only propagate the ancestors of the router method, not the router method itself
ancestors[listener_name].update(ancestors[node])
dfs_ancestors(listener_name, ancestors, visited, flow)
def is_ancestor(node, ancestor_candidate, ancestors):
return ancestor_candidate in ancestors.get(node, set())
def build_parent_children_dict(flow):
parent_children = {}
# Map listeners to their trigger methods
for listener_name, (_, trigger_methods) in flow._listeners.items():
for trigger in trigger_methods:
if trigger not in parent_children:
parent_children[trigger] = []
if listener_name not in parent_children[trigger]:
parent_children[trigger].append(listener_name)
# Map router methods to their paths and to listeners
for router_method_name, paths in flow._router_paths.items():
for path in paths:
# Map router method to listeners of each path
for listener_name, (_, trigger_methods) in flow._listeners.items():
if path in trigger_methods:
if router_method_name not in parent_children:
parent_children[router_method_name] = []
if listener_name not in parent_children[router_method_name]:
parent_children[router_method_name].append(listener_name)
return parent_children
def get_child_index(parent, child, parent_children):
children = parent_children.get(parent, [])
children.sort()
return children.index(child)

View File

@@ -1,64 +1,25 @@
import ast
import inspect
import os
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, cast
from pyvis.network import Network
from crewai.flow.flow import Flow
from .core_flow_utils import is_ancestor
from .flow_visual_utils import (
from .utils import (
build_ancestor_dict,
build_parent_children_dict,
get_child_index,
is_ancestor,
)
from .path_utils import safe_path_join, validate_file_path
def method_calls_crew(method: Optional[Callable[..., Any]]) -> bool:
"""Check if the method contains a .crew() call in its implementation.
Analyzes the method's source code using AST to detect if it makes any
calls to the .crew() method, which indicates crew involvement in the
flow execution.
Args:
method: The method to analyze for crew calls, can be None
Returns:
bool: True if the method contains a .crew() call, False otherwise
Raises:
TypeError: If input is not None and not a callable method
ValueError: If method source code cannot be parsed
RuntimeError: If unexpected error occurs during parsing
"""
if method is None:
return False
if not callable(method):
raise TypeError("Input must be a callable method")
def method_calls_crew(method):
"""Check if the method calls `.crew()`."""
try:
source = inspect.getsource(method)
source = inspect.cleandoc(source)
tree = ast.parse(source)
except (TypeError, ValueError, OSError) as e:
raise ValueError(f"Could not parse method {getattr(method, '__name__', str(method))}: {e}")
except Exception as e:
raise RuntimeError(f"Unexpected error parsing method: {e}")
print(f"Could not parse method {method.__name__}: {e}")
return False
class CrewCallVisitor(ast.NodeVisitor):
"""AST visitor to detect .crew() method calls in source code.
A specialized AST visitor that analyzes Python source code to precisely
identify calls to the .crew() method, enabling accurate detection of
crew involvement in flow methods.
Attributes:
found (bool): Indicates whether a .crew() call was found
"""
def __init__(self):
self.found = False
@@ -73,64 +34,8 @@ def method_calls_crew(method: Optional[Callable[..., Any]]) -> bool:
return visitor.found
def add_nodes_to_network(net: Network, flow: Flow[Any],
node_positions: Dict[str, Tuple[float, float]],
node_styles: Dict[str, dict],
output_dir: Optional[str] = None) -> None:
"""Add nodes to the network visualization with precise styling and positioning.
Creates and styles nodes in the visualization network based on their type
(start, router, crew, or regular method) with fine-grained control over
appearance and positioning.
Args:
net: The network visualization object to add nodes to
flow: Flow object containing method definitions and relationships
node_positions: Dictionary mapping method names to (x,y) coordinates
node_styles: Dictionary mapping node types to their visual styles
output_dir: Optional directory path for saving visualization assets
Returns:
None
Raises:
ValueError: If flow object is invalid or required styles are missing
TypeError: If input arguments have incorrect types
OSError: If output directory operations fail
Note:
Node styles are applied with precise control over shape, font, color,
and positioning to ensure accurate visual representation of the flow.
If output_dir is provided, it will be validated and created if needed.
"""
if not hasattr(flow, '_methods'):
raise ValueError("Invalid flow object: missing '_methods' attribute")
if not isinstance(node_positions, dict):
raise TypeError("node_positions must be a dictionary")
if not isinstance(node_styles, dict):
raise TypeError("node_styles must be a dictionary")
required_styles = {'start', 'router', 'crew', 'method'}
missing_styles = required_styles - set(node_styles.keys())
if missing_styles:
raise ValueError(f"Missing required node styles: {missing_styles}")
# Validate and create output directory if specified
if output_dir:
try:
output_dir = validate_file_path(output_dir, must_exist=False)
os.makedirs(output_dir, exist_ok=True)
except (ValueError, OSError) as e:
raise OSError(f"Failed to create or validate output directory: {e}")
def human_friendly_label(method_name: str) -> str:
"""Convert method name to human-readable format.
Args:
method_name: Original method name with underscores
Returns:
str: Formatted method name with spaces and title case
"""
def add_nodes_to_network(net, flow, node_positions, node_styles):
def human_friendly_label(method_name):
return method_name.replace("_", " ").title()
for method_name, (x, y) in node_positions.items():
@@ -147,15 +52,6 @@ def add_nodes_to_network(net: Network, flow: Flow[Any],
node_style = node_style.copy()
label = human_friendly_label(method_name)
# Handle file-based assets if output directory is provided
if output_dir and node_style.get("image"):
try:
image_path = node_style["image"]
safe_image_path = safe_path_join(output_dir, Path(image_path).name)
node_style["image"] = str(safe_image_path)
except (ValueError, OSError) as e:
raise OSError(f"Failed to process node image path: {e}")
node_style.update(
{
"label": label,
@@ -177,41 +73,9 @@ def add_nodes_to_network(net: Network, flow: Flow[Any],
)
def compute_positions(flow: Flow[Any], node_levels: Dict[str, int],
y_spacing: float = 150, x_spacing: float = 150) -> Dict[str, Tuple[float, float]]:
"""Calculate precise x,y coordinates for each node in the flow diagram.
Computes optimal node positions with fine-grained control over spacing
and alignment, ensuring clear visualization of flow hierarchy and
relationships.
Args:
flow: Flow object containing method definitions
node_levels: Dictionary mapping method names to their hierarchy levels
y_spacing: Vertical spacing between hierarchy levels (default: 150)
x_spacing: Horizontal spacing between nodes at same level (default: 150)
Returns:
dict[str, tuple[float, float]]: Dictionary mapping method names to
their calculated (x,y) coordinates in the visualization
Note:
Positions are calculated to maintain clear hierarchical structure while
ensuring optimal spacing and readability of the flow diagram.
"""
if not hasattr(flow, '_methods'):
raise ValueError("Invalid flow object: missing '_methods' attribute")
if not isinstance(node_levels, dict):
raise TypeError("node_levels must be a dictionary")
if not isinstance(y_spacing, (int, float)) or y_spacing <= 0:
raise ValueError("y_spacing must be a positive number")
if not isinstance(x_spacing, (int, float)) or x_spacing <= 0:
raise ValueError("x_spacing must be a positive number")
if not node_levels:
raise ValueError("node_levels dictionary cannot be empty")
level_nodes: Dict[int, List[str]] = {}
node_positions: Dict[str, Tuple[float, float]] = {}
def compute_positions(flow, node_levels, y_spacing=150, x_spacing=150):
level_nodes = {}
node_positions = {}
for method_name, level in node_levels.items():
level_nodes.setdefault(level, []).append(method_name)
@@ -226,34 +90,7 @@ def compute_positions(flow: Flow[Any], node_levels: Dict[str, int],
return node_positions
def add_edges(net: Network, flow: Flow[Any],
node_positions: Dict[str, Tuple[float, float]],
colors: Dict[str, str],
asset_dir: Optional[str] = None) -> None:
if not hasattr(flow, '_methods'):
raise ValueError("Invalid flow object: missing '_methods' attribute")
if not hasattr(flow, '_listeners'):
raise ValueError("Invalid flow object: missing '_listeners' attribute")
if not hasattr(flow, '_router_paths'):
raise ValueError("Invalid flow object: missing '_router_paths' attribute")
if not isinstance(node_positions, dict):
raise TypeError("node_positions must be a dictionary")
if not isinstance(colors, dict):
raise TypeError("colors must be a dictionary")
required_colors = {'edge', 'router_edge'}
missing_colors = required_colors - set(colors.keys())
if missing_colors:
raise ValueError(f"Missing required edge colors: {missing_colors}")
# Validate asset directory if provided
if asset_dir:
try:
asset_dir = validate_file_path(asset_dir, must_exist=False)
os.makedirs(asset_dir, exist_ok=True)
except (ValueError, OSError) as e:
raise OSError(f"Failed to create or validate asset directory: {e}")
def add_edges(net, flow, node_positions, colors):
ancestors = build_ancestor_dict(flow)
parent_children = build_parent_children_dict(flow)
@@ -282,24 +119,24 @@ def add_edges(net: Network, flow: Flow[Any],
dx = target_pos[0] - source_pos[0]
smooth_type = "curvedCCW" if dx <= 0 else "curvedCW"
index = get_child_index(trigger, method_name, parent_children)
edge_config = {
edge_smooth = {
"type": smooth_type,
"roundness": 0.2 + (0.1 * index),
}
else:
edge_config = {"type": "cubicBezier"}
edge_smooth = {"type": "cubicBezier"}
else:
edge_config = {"type": "straight"}
edge_smooth = False
edge_props: Dict[str, Any] = {
edge_style = {
"color": edge_color,
"width": 2,
"arrows": "to",
"dashes": True if is_router_edge or is_and_condition else False,
"smooth": edge_config,
"smooth": edge_smooth,
}
net.add_edge(trigger, method_name, **edge_props)
net.add_edge(trigger, method_name, **edge_style)
else:
# Nodes not found in node_positions. Check if it's a known router outcome and a known method.
is_router_edge = any(
@@ -345,23 +182,23 @@ def add_edges(net: Network, flow: Flow[Any],
index = get_child_index(
router_method_name, listener_name, parent_children
)
edge_config = {
edge_smooth = {
"type": smooth_type,
"roundness": 0.2 + (0.1 * index),
}
else:
edge_config = {"type": "cubicBezier"}
edge_smooth = {"type": "cubicBezier"}
else:
edge_config = {"type": "straight"}
edge_smooth = False
router_edge_props: Dict[str, Any] = {
edge_style = {
"color": colors["router_edge"],
"width": 2,
"arrows": "to",
"dashes": True,
"smooth": edge_config,
"smooth": edge_smooth,
}
net.add_edge(router_method_name, listener_name, **router_edge_props)
net.add_edge(router_method_name, listener_name, **edge_style)
else:
# Same check here: known router edge and known method?
method_known = listener_name in flow._methods

View File

@@ -14,13 +14,13 @@ class Knowledge(BaseModel):
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
collection_name: Optional[str] = None
@@ -49,13 +49,8 @@ class Knowledge(BaseModel):
"""
Query across all knowledge sources to find the most relevant information.
Returns the top_k most relevant chunks.
Raises:
ValueError: If storage is not initialized.
"""
if self.storage is None:
raise ValueError("Storage is not initialized.")
results = self.storage.search(
query,
limit,

View File

@@ -22,14 +22,13 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
default_factory=list, description="The path to the file"
)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
def validate_file_path(cls, v, values):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if v is None and info.data.get("file_path" if info.field_name == "file_paths" else "file_paths") is None:
if v is None and ("file_path" not in values or values.get("file_path") is None):
raise ValueError("Either file_path or file_paths must be provided")
return v
@@ -63,10 +62,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
def _save_documents(self):
"""Save the documents to the storage."""
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
self.storage.save(self.chunks)
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""

View File

@@ -16,7 +16,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
metadata: Dict[str, Any] = Field(default_factory=dict) # Currently unused
collection_name: Optional[str] = Field(default=None)
@@ -46,7 +46,4 @@ class BaseKnowledgeSource(BaseModel, ABC):
Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.
"""
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
self.storage.save(self.chunks)

View File

@@ -6,11 +6,12 @@ import shutil
import uuid
from typing import Any, Dict, List, Optional
import numpy as np
from chromadb.api import ClientAPI
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH, MEMORY_CHUNK_SIZE, MEMORY_CHUNK_OVERLAP
from crewai.utilities.paths import db_storage_path
@@ -138,15 +139,57 @@ class RAGStorage(BaseRAGStorage):
logging.error(f"Error during {self.type} search: {str(e)}")
return []
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
def _chunk_text(self, text: str) -> List[str]:
"""
Split text into chunks to avoid token limits.
Args:
text: Input text to chunk.
Returns:
List[str]: A list of chunked text segments, adhering to defined size and overlap.
Empty list if input text is empty.
"""
if not text:
return []
if len(text) <= MEMORY_CHUNK_SIZE:
return [text]
chunks = []
start_indices = range(0, len(text), MEMORY_CHUNK_SIZE - MEMORY_CHUNK_OVERLAP)
for i in start_indices:
chunk = text[i:i + MEMORY_CHUNK_SIZE]
if chunk: # Only add non-empty chunks
chunks.append(chunk)
return chunks
def _generate_embedding(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> Optional[None]:
"""
Generate embeddings for text and add to collection.
Args:
text: Input text to generate embeddings for.
metadata: Optional metadata to associate with the embeddings.
Returns:
None if successful, None if text is empty.
"""
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
self.collection.add(
documents=[text],
metadatas=[metadata or {}],
ids=[str(uuid.uuid4())],
)
chunks = self._chunk_text(text)
if not chunks:
return None
for chunk in chunks:
self.collection.add(
documents=[chunk],
metadatas=[metadata or {}],
ids=[str(uuid.uuid4())],
)
def reset(self) -> None:
try:

View File

@@ -179,7 +179,6 @@ class Task(BaseModel):
_execution_span: Optional[Span] = PrivateAttr(default=None)
_original_description: Optional[str] = PrivateAttr(default=None)
_original_expected_output: Optional[str] = PrivateAttr(default=None)
_original_output_file: Optional[str] = PrivateAttr(default=None)
_thread: Optional[threading.Thread] = PrivateAttr(default=None)
_execution_time: Optional[float] = PrivateAttr(default=None)
@@ -214,46 +213,8 @@ class Task(BaseModel):
@field_validator("output_file")
@classmethod
def output_file_validation(cls, value: Optional[str]) -> Optional[str]:
"""Validate the output file path.
Args:
value: The output file path to validate. Can be None or a string.
If the path contains template variables (e.g. {var}), leading slashes are preserved.
For regular paths, leading slashes are stripped.
Returns:
The validated and potentially modified path, or None if no path was provided.
Raises:
ValueError: If the path contains invalid characters, path traversal attempts,
or other security concerns.
"""
if value is None:
return None
# Basic security checks
if ".." in value:
raise ValueError("Path traversal attempts are not allowed in output_file paths")
# Check for shell expansion first
if value.startswith('~') or value.startswith('$'):
raise ValueError("Shell expansion characters are not allowed in output_file paths")
# Then check other shell special characters
if any(char in value for char in ['|', '>', '<', '&', ';']):
raise ValueError("Shell special characters are not allowed in output_file paths")
# Don't strip leading slash if it's a template path with variables
if "{" in value or "}" in value:
# Validate template variable format
template_vars = [part.split("}")[0] for part in value.split("{")[1:]]
for var in template_vars:
if not var.isidentifier():
raise ValueError(f"Invalid template variable name: {var}")
return value
# Strip leading slash for regular paths
def output_file_validation(cls, value: str) -> str:
"""Validate the output file path by removing the / from the beginning of the path."""
if value.startswith("/"):
return value[1:]
return value
@@ -432,89 +393,27 @@ class Task(BaseModel):
tasks_slices = [self.description, output]
return "\n".join(tasks_slices)
def interpolate_inputs(self, inputs: Dict[str, Union[str, int, float]]) -> None:
"""Interpolate inputs into the task description, expected output, and output file path.
Args:
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, and floats.
Raises:
ValueError: If a required template variable is missing from inputs.
"""
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolate inputs into the task description and expected output."""
if self._original_description is None:
self._original_description = self.description
if self._original_expected_output is None:
self._original_expected_output = self.expected_output
if self.output_file is not None and self._original_output_file is None:
self._original_output_file = self.output_file
if not inputs:
return
try:
if inputs:
self.description = self._original_description.format(**inputs)
except KeyError as e:
raise ValueError(f"Missing required template variable '{e.args[0]}' in description") from e
except ValueError as e:
raise ValueError(f"Error interpolating description: {str(e)}") from e
try:
self.expected_output = self.interpolate_only(
input_string=self._original_expected_output, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(f"Error interpolating expected_output: {str(e)}") from e
if self.output_file is not None:
try:
self.output_file = self.interpolate_only(
input_string=self._original_output_file, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(f"Error interpolating output_file path: {str(e)}") from e
def interpolate_only(self, input_string: str, inputs: Dict[str, Any]) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched."""
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
def interpolate_only(self, input_string: Optional[str], inputs: Dict[str, Union[str, int, float]]) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
Args:
input_string: The string containing template variables to interpolate.
Can be None or empty, in which case an empty string is returned.
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, and floats.
If input_string is empty or has no placeholders, inputs can be empty.
Returns:
The interpolated string with all template variables replaced with their values.
Empty string if input_string is None or empty.
Raises:
ValueError: If a required template variable is missing from inputs.
KeyError: If a template variable is not found in the inputs dictionary.
"""
if input_string is None or not input_string:
return ""
if "{" not in input_string and "}" not in input_string:
return input_string
if not inputs:
raise ValueError("Inputs dictionary cannot be empty when interpolating variables")
for key in inputs.keys():
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
try:
# Validate input types
for key, value in inputs.items():
if not isinstance(value, (str, int, float)):
raise ValueError(f"Value for key '{key}' must be a string, integer, or float, got {type(value).__name__}")
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
for key in inputs.keys():
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
return escaped_string.format(**inputs)
except KeyError as e:
raise KeyError(f"Template variable '{e.args[0]}' not found in inputs dictionary") from e
except ValueError as e:
raise ValueError(f"Error during string interpolation: {str(e)}") from e
return escaped_string.format(**inputs)
def increment_tools_errors(self) -> None:
"""Increment the tools errors counter."""

View File

@@ -1,4 +1,3 @@
import logging
from typing import Optional, Union
from pydantic import Field
@@ -8,8 +7,6 @@ from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.utilities import I18N
logger = logging.getLogger(__name__)
class BaseAgentTool(BaseTool):
"""Base class for agent-related tools"""
@@ -19,25 +16,6 @@ class BaseAgentTool(BaseTool):
default_factory=I18N, description="Internationalization settings"
)
def sanitize_agent_name(self, name: str) -> str:
"""
Sanitize agent role name by normalizing whitespace and setting to lowercase.
Converts all whitespace (including newlines) to single spaces and removes quotes.
Args:
name (str): The agent role name to sanitize
Returns:
str: The sanitized agent role name, with whitespace normalized,
converted to lowercase, and quotes removed
"""
if not name:
return ""
# Normalize all whitespace (including newlines) to single spaces
normalized = " ".join(name.split())
# Remove quotes and convert to lowercase
return normalized.replace('"', "").casefold()
def _get_coworker(self, coworker: Optional[str], **kwargs) -> Optional[str]:
coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker")
if coworker:
@@ -47,27 +25,11 @@ class BaseAgentTool(BaseTool):
return coworker
def _execute(
self,
agent_name: Optional[str],
task: str,
context: Optional[str] = None
self, agent_name: Union[str, None], task: str, context: Union[str, None]
) -> str:
"""
Execute delegation to an agent with case-insensitive and whitespace-tolerant matching.
Args:
agent_name: Name/role of the agent to delegate to (case-insensitive)
task: The specific question or task to delegate
context: Optional additional context for the task execution
Returns:
str: The execution result from the delegated agent or an error message
if the agent cannot be found
"""
try:
if agent_name is None:
agent_name = ""
logger.debug("No agent name provided, using empty string")
# It is important to remove the quotes from the agent name.
# The reason we have to do this is because less-powerful LLM's
@@ -76,49 +38,31 @@ class BaseAgentTool(BaseTool):
# {"task": "....", "coworker": "....
# when it should look like this:
# {"task": "....", "coworker": "...."}
sanitized_name = self.sanitize_agent_name(agent_name)
logger.debug(f"Sanitized agent name from '{agent_name}' to '{sanitized_name}'")
available_agents = [agent.role for agent in self.agents]
logger.debug(f"Available agents: {available_agents}")
agent_name = agent_name.casefold().replace('"', "").replace("\n", "")
agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None")
available_agent
for available_agent in self.agents
if self.sanitize_agent_name(available_agent.role) == sanitized_name
if available_agent.role.casefold().replace("\n", "") == agent_name
]
logger.debug(f"Found {len(agent)} matching agents for role '{sanitized_name}'")
except (AttributeError, ValueError) as e:
# Handle specific exceptions that might occur during role name processing
except Exception as _:
return self.i18n.errors("agent_tool_unexisting_coworker").format(
coworkers="\n".join(
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
),
error=str(e)
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
if not agent:
# No matching agent found after sanitization
return self.i18n.errors("agent_tool_unexisting_coworker").format(
coworkers="\n".join(
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
),
error=f"No agent found with role '{sanitized_name}'"
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
agent = agent[0]
try:
task_with_assigned_agent = Task(
description=task,
agent=agent,
expected_output=agent.i18n.slice("manager_request"),
i18n=agent.i18n,
)
logger.debug(f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}")
return agent.execute_task(task_with_assigned_agent, context)
except Exception as e:
# Handle task creation or execution errors
return self.i18n.errors("agent_tool_execution_error").format(
agent_role=self.sanitize_agent_name(agent.role),
error=str(e)
)
task_with_assigned_agent = Task( # type: ignore # Incompatible types in assignment (expression has type "Task", variable has type "str")
description=task,
agent=agent,
expected_output=agent.i18n.slice("manager_request"),
i18n=agent.i18n,
)
return agent.execute_task(task_with_assigned_agent, context)

View File

@@ -33,8 +33,7 @@
"tool_usage_error": "I encountered an error: {error}",
"tool_arguments_error": "Error: the Action Input is not a valid key, value dictionary.",
"wrong_tool_name": "You tried to use the tool {tool}, but it doesn't exist. You must use one of the following tools, use one at time: {tools}.",
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}",
"agent_tool_execution_error": "Error executing task with agent '{agent_role}'. Error: {error}"
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}"
},
"tools": {
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",

View File

@@ -4,3 +4,5 @@ DEFAULT_SCORE_THRESHOLD = 0.35
KNOWLEDGE_DIRECTORY = "knowledge"
MAX_LLM_RETRY = 3
MAX_FILE_NAME_LENGTH = 255
MEMORY_CHUNK_SIZE = 4000
MEMORY_CHUNK_OVERLAP = 200

View File

@@ -1,243 +0,0 @@
interactions:
- request:
body: !!binary |
CuIcCiQKIgoMc2VydmljZS5uYW1lEhIKEGNyZXdBSS10ZWxlbWV0cnkSuRwKEgoQY3Jld2FpLnRl
bGVtZXRyeRKjBwoQXK7w4+uvyEkrI9D5qyvcJxII5UmQ7hmczdIqDENyZXcgQ3JlYXRlZDABOfxQ
/hs4jBUYQUi3DBw4jBUYShoKDmNyZXdhaV92ZXJzaW9uEggKBjAuODYuMEoaCg5weXRob25fdmVy
c2lvbhIICgYzLjEyLjdKLgoIY3Jld19rZXkSIgogYzk3YjVmZWI1ZDFiNjZiYjU5MDA2YWFhMDFh
MjljZDZKMQoHY3Jld19pZBImCiRkZjY3NGMwYi1hOTc0LTQ3NTAtYjlkMS0yZWQxNjM3MzFiNTZK
HAoMY3Jld19wcm9jZXNzEgwKCnNlcXVlbnRpYWxKEQoLY3Jld19tZW1vcnkSAhAAShoKFGNyZXdf
bnVtYmVyX29mX3Rhc2tzEgIYAUobChVjcmV3X251bWJlcl9vZl9hZ2VudHMSAhgBStECCgtjcmV3
X2FnZW50cxLBAgq+Alt7ImtleSI6ICIwN2Q5OWI2MzA0MTFkMzVmZDkwNDdhNTMyZDUzZGRhNyIs
ICJpZCI6ICI5MDYwYTQ2Zi02MDY3LTQ1N2MtOGU3ZC04NjAyN2YzY2U5ZDUiLCAicm9sZSI6ICJS
ZXNlYXJjaGVyIiwgInZlcmJvc2U/IjogZmFsc2UsICJtYXhfaXRlciI6IDIwLCAibWF4X3JwbSI6
IG51bGwsICJmdW5jdGlvbl9jYWxsaW5nX2xsbSI6ICIiLCAibGxtIjogImdwdC00by1taW5pIiwg
ImRlbGVnYXRpb25fZW5hYmxlZD8iOiBmYWxzZSwgImFsbG93X2NvZGVfZXhlY3V0aW9uPyI6IGZh
bHNlLCAibWF4X3JldHJ5X2xpbWl0IjogMiwgInRvb2xzX25hbWVzIjogW119XUr/AQoKY3Jld190
YXNrcxLwAQrtAVt7ImtleSI6ICI2Mzk5NjUxN2YzZjNmMWM5NGQ2YmI2MTdhYTBiMWM0ZiIsICJp
ZCI6ICJjYTA4ZjkyOS0yMmI0LTQyZmQtYjViMC05N2M3MjM0ZDk5OTEiLCAiYXN5bmNfZXhlY3V0
aW9uPyI6IGZhbHNlLCAiaHVtYW5faW5wdXQ/IjogZmFsc2UsICJhZ2VudF9yb2xlIjogIlJlc2Vh
cmNoZXIiLCAiYWdlbnRfa2V5IjogIjA3ZDk5YjYzMDQxMWQzNWZkOTA0N2E1MzJkNTNkZGE3Iiwg
InRvb2xzX25hbWVzIjogW119XXoCGAGFAQABAAASjgIKEOTJZh9R45IwgGVg9cinZmISCJopKRMf
bpMJKgxUYXNrIENyZWF0ZWQwATlG+zQcOIwVGEHk0zUcOIwVGEouCghjcmV3X2tleRIiCiBjOTdi
NWZlYjVkMWI2NmJiNTkwMDZhYWEwMWEyOWNkNkoxCgdjcmV3X2lkEiYKJGRmNjc0YzBiLWE5NzQt
NDc1MC1iOWQxLTJlZDE2MzczMWI1NkouCgh0YXNrX2tleRIiCiA2Mzk5NjUxN2YzZjNmMWM5NGQ2
YmI2MTdhYTBiMWM0ZkoxCgd0YXNrX2lkEiYKJGNhMDhmOTI5LTIyYjQtNDJmZC1iNWIwLTk3Yzcy
MzRkOTk5MXoCGAGFAQABAAASowcKEEvwrN8+tNMIBwtnA+ip7jASCI78Hrh2wlsBKgxDcmV3IENy
ZWF0ZWQwATkcRqYeOIwVGEE8erQeOIwVGEoaCg5jcmV3YWlfdmVyc2lvbhIICgYwLjg2LjBKGgoO
cHl0aG9uX3ZlcnNpb24SCAoGMy4xMi43Si4KCGNyZXdfa2V5EiIKIDhjMjc1MmY0OWU1YjlkMmI2
OGNiMzVjYWM4ZmNjODZkSjEKB2NyZXdfaWQSJgokZmRkYzA4ZTMtNDUyNi00N2Q2LThlNWMtNjY0
YzIyMjc4ZDgyShwKDGNyZXdfcHJvY2VzcxIMCgpzZXF1ZW50aWFsShEKC2NyZXdfbWVtb3J5EgIQ
AEoaChRjcmV3X251bWJlcl9vZl90YXNrcxICGAFKGwoVY3Jld19udW1iZXJfb2ZfYWdlbnRzEgIY
AUrRAgoLY3Jld19hZ2VudHMSwQIKvgJbeyJrZXkiOiAiOGJkMjEzOWI1OTc1MTgxNTA2ZTQxZmQ5
YzQ1NjNkNzUiLCAiaWQiOiAiY2UxNjA2YjktMjdiOS00ZDc4LWEyODctNDZiMDNlZDg3ZTA1Iiwg
InJvbGUiOiAiUmVzZWFyY2hlciIsICJ2ZXJib3NlPyI6IGZhbHNlLCAibWF4X2l0ZXIiOiAyMCwg
Im1heF9ycG0iOiBudWxsLCAiZnVuY3Rpb25fY2FsbGluZ19sbG0iOiAiIiwgImxsbSI6ICJncHQt
NG8tbWluaSIsICJkZWxlZ2F0aW9uX2VuYWJsZWQ/IjogZmFsc2UsICJhbGxvd19jb2RlX2V4ZWN1
dGlvbj8iOiBmYWxzZSwgIm1heF9yZXRyeV9saW1pdCI6IDIsICJ0b29sc19uYW1lcyI6IFtdfV1K
/wEKCmNyZXdfdGFza3MS8AEK7QFbeyJrZXkiOiAiMGQ2ODVhMjE5OTRkOTQ5MDk3YmM1YTU2ZDcz
N2U2ZDEiLCAiaWQiOiAiNDdkMzRjZjktMGYxZS00Y2JkLTgzMzItNzRjZjY0YWRlOThlIiwgImFz
eW5jX2V4ZWN1dGlvbj8iOiBmYWxzZSwgImh1bWFuX2lucHV0PyI6IGZhbHNlLCAiYWdlbnRfcm9s
ZSI6ICJSZXNlYXJjaGVyIiwgImFnZW50X2tleSI6ICI4YmQyMTM5YjU5NzUxODE1MDZlNDFmZDlj
NDU2M2Q3NSIsICJ0b29sc19uYW1lcyI6IFtdfV16AhgBhQEAAQAAEo4CChAf4TXS782b0PBJ4NSB
JXwsEgjXnd13GkMzlyoMVGFzayBDcmVhdGVkMAE5mb/cHjiMFRhBGRTiHjiMFRhKLgoIY3Jld19r
ZXkSIgogOGMyNzUyZjQ5ZTViOWQyYjY4Y2IzNWNhYzhmY2M4NmRKMQoHY3Jld19pZBImCiRmZGRj
MDhlMy00NTI2LTQ3ZDYtOGU1Yy02NjRjMjIyNzhkODJKLgoIdGFza19rZXkSIgogMGQ2ODVhMjE5
OTRkOTQ5MDk3YmM1YTU2ZDczN2U2ZDFKMQoHdGFza19pZBImCiQ0N2QzNGNmOS0wZjFlLTRjYmQt
ODMzMi03NGNmNjRhZGU5OGV6AhgBhQEAAQAAEqMHChAyBGKhzDhROB5pmAoXrikyEgj6SCwzj1dU
LyoMQ3JldyBDcmVhdGVkMAE5vkjTHziMFRhBRDbhHziMFRhKGgoOY3Jld2FpX3ZlcnNpb24SCAoG
MC44Ni4wShoKDnB5dGhvbl92ZXJzaW9uEggKBjMuMTIuN0ouCghjcmV3X2tleRIiCiBiNjczNjg2
ZmM4MjJjMjAzYzdlODc5YzY3NTQyNDY5OUoxCgdjcmV3X2lkEiYKJGYyYWVlYTYzLTU2OWUtNDUz
NS1iZTY0LTRiZjYzZmU5NjhjN0ocCgxjcmV3X3Byb2Nlc3MSDAoKc2VxdWVudGlhbEoRCgtjcmV3
X21lbW9yeRICEABKGgoUY3Jld19udW1iZXJfb2ZfdGFza3MSAhgBShsKFWNyZXdfbnVtYmVyX29m
X2FnZW50cxICGAFK0QIKC2NyZXdfYWdlbnRzEsECCr4CW3sia2V5IjogImI1OWNmNzdiNmU3NjU4
NDg3MGViMWMzODgyM2Q3ZTI4IiwgImlkIjogImJiZjNkM2E4LWEwMjUtNGI0ZC1hY2Q0LTFmNzcz
NTI3MWJmMCIsICJyb2xlIjogIlJlc2VhcmNoZXIiLCAidmVyYm9zZT8iOiBmYWxzZSwgIm1heF9p
dGVyIjogMjAsICJtYXhfcnBtIjogbnVsbCwgImZ1bmN0aW9uX2NhbGxpbmdfbGxtIjogIiIsICJs
bG0iOiAiZ3B0LTRvLW1pbmkiLCAiZGVsZWdhdGlvbl9lbmFibGVkPyI6IGZhbHNlLCAiYWxsb3df
Y29kZV9leGVjdXRpb24/IjogZmFsc2UsICJtYXhfcmV0cnlfbGltaXQiOiAyLCAidG9vbHNfbmFt
ZXMiOiBbXX1dSv8BCgpjcmV3X3Rhc2tzEvABCu0BW3sia2V5IjogImE1ZTVjNThjZWExYjlkMDAz
MzJlNjg0NDFkMzI3YmRmIiwgImlkIjogIjBiOTRiMTY0LTM5NTktNGFmYS05Njg4LWJjNmEwZWMy
MWYzOCIsICJhc3luY19leGVjdXRpb24/IjogZmFsc2UsICJodW1hbl9pbnB1dD8iOiBmYWxzZSwg
ImFnZW50X3JvbGUiOiAiUmVzZWFyY2hlciIsICJhZ2VudF9rZXkiOiAiYjU5Y2Y3N2I2ZTc2NTg0
ODcwZWIxYzM4ODIzZDdlMjgiLCAidG9vbHNfbmFtZXMiOiBbXX1degIYAYUBAAEAABKOAgoQyYfi
Ftim717svttBZY3p5hIIUxR5bBHzWWkqDFRhc2sgQ3JlYXRlZDABOV4OBiA4jBUYQbLjBiA4jBUY
Si4KCGNyZXdfa2V5EiIKIGI2NzM2ODZmYzgyMmMyMDNjN2U4NzljNjc1NDI0Njk5SjEKB2NyZXdf
aWQSJgokZjJhZWVhNjMtNTY5ZS00NTM1LWJlNjQtNGJmNjNmZTk2OGM3Si4KCHRhc2tfa2V5EiIK
IGE1ZTVjNThjZWExYjlkMDAzMzJlNjg0NDFkMzI3YmRmSjEKB3Rhc2tfaWQSJgokMGI5NGIxNjQt
Mzk1OS00YWZhLTk2ODgtYmM2YTBlYzIxZjM4egIYAYUBAAEAAA==
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '3685'
Content-Type:
- application/x-protobuf
User-Agent:
- OTel-OTLP-Exporter-Python/1.27.0
method: POST
uri: https://telemetry.crewai.com:4319/v1/traces
response:
body:
string: "\n\0"
headers:
Content-Length:
- '2'
Content-Type:
- application/x-protobuf
Date:
- Sun, 29 Dec 2024 04:43:27 GMT
status:
code: 200
message: OK
- request:
body: '{"messages": [{"role": "system", "content": "You are Researcher. You have
extensive AI research experience.\nYour personal goal is: Analyze AI topics\nTo
give my best complete final answer to the task use the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"}, {"role": "user", "content": "\nCurrent
Task: Explain the advantages of AI.\n\nThis is the expect criteria for your
final answer: A summary of the main advantages, bullet points recommended.\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nBegin!
This is VERY important to you, use the tools available and give your best Final
Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop":
["\nObservation:"], "stream": false}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '922'
content-type:
- application/json
cookie:
- _cfuvid=eff7OIkJ0zWRunpA6z67LHqscmSe6XjNxXiPw1R3xCc-1733770413538-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.52.1
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.52.1
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-AjfR6FDuTw7NGzy8w7sxjvOkUQlru\",\n \"object\":
\"chat.completion\",\n \"created\": 1735447404,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: \\n**Advantages of AI** \\n\\n1. **Increased Efficiency and Productivity**
\ \\n - AI systems can process large amounts of data quickly and accurately,
leading to faster decision-making and increased productivity in various sectors.\\n\\n2.
**Cost Savings** \\n - Automation of repetitive and time-consuming tasks
reduces labor costs and increases operational efficiency, allowing businesses
to allocate resources more effectively.\\n\\n3. **Enhanced Data Analysis** \\n
\ - AI excels at analyzing big data, identifying patterns, and providing insights
that support better strategic planning and business decision-making.\\n\\n4.
**24/7 Availability** \\n - AI solutions, such as chatbots and virtual assistants,
operate continuously without breaks, offering constant support and customer
service, enhancing user experience.\\n\\n5. **Personalization** \\n - AI
enables the customization of content, products, and services based on user preferences
and behaviors, leading to improved customer satisfaction and loyalty.\\n\\n6.
**Improved Accuracy** \\n - AI technologies, such as machine learning algorithms,
reduce the likelihood of human error in various processes, leading to greater
accuracy and reliability.\\n\\n7. **Enhanced Innovation** \\n - AI fosters
innovative solutions by providing new tools and approaches to problem-solving,
enabling companies to develop cutting-edge products and services.\\n\\n8. **Scalability**
\ \\n - AI can be scaled to handle varying amounts of workloads without significant
changes to infrastructure, making it easier for organizations to expand operations.\\n\\n9.
**Predictive Capabilities** \\n - Advanced analytics powered by AI can anticipate
trends and outcomes, allowing businesses to proactively adjust strategies and
improve forecasting.\\n\\n10. **Health Benefits** \\n - In healthcare, AI
assists in diagnostics, personalized treatment plans, and predictive analytics,
leading to better patient care and improved health outcomes.\\n\\n11. **Safety
and Risk Mitigation** \\n - AI can enhance safety in various industries
by taking over dangerous tasks, monitoring for hazards, and predicting maintenance
needs for critical machinery, thereby preventing accidents.\\n\\n12. **Reduced
Environmental Impact** \\n - AI can optimize resource usage in areas such
as energy consumption and supply chain logistics, contributing to sustainability
efforts and reducing overall environmental footprints.\",\n \"refusal\":
null\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 168,\n \"completion_tokens\":
440,\n \"total_tokens\": 608,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"system_fingerprint\":
\"fp_0aa8d3e20b\"\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 8f9721053d1eb9f1-SEA
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Sun, 29 Dec 2024 04:43:32 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=5enubNIoQSGMYEgy8Q2FpzzhphA0y.0lXukRZrWFvMk-1735447412-1.0.1.1-FIK1sMkUl3YnW1gTC6ftDtb2mKsbosb4mwabdFAlWCfJ6pXeavYq.bPsfKNvzAb5WYq60yVGH5lHsJT05bhSgw;
path=/; expires=Sun, 29-Dec-24 05:13:32 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=63wmKMTuFamkLN8FBI4fP8JZWbjWiRxWm7wb3kz.z_A-1735447412038-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '7577'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999793'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_55b8d714656e8f10f4e23cbe9034d66b
http_version: HTTP/1.1
status_code: 200
version: 1

View File

@@ -391,71 +391,6 @@ def test_manager_agent_delegating_to_all_agents():
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_manager_agent_delegates_with_varied_role_cases():
"""
Test that the manager agent can delegate to agents regardless of case or whitespace variations in role names.
This test verifies the fix for issue #1503 where role matching was too strict.
"""
# Create agents with varied case and whitespace in roles
researcher_spaced = Agent(
role=" Researcher ", # Extra spaces
goal="Research with spaces in role",
backstory="A researcher with spaces in role name",
allow_delegation=False,
)
writer_caps = Agent(
role="SENIOR WRITER", # All caps
goal="Write with caps in role",
backstory="A writer with caps in role name",
allow_delegation=False,
)
task = Task(
description="Research and write about AI. The researcher should do the research, and the writer should write it up.",
expected_output="A well-researched article about AI.",
agent=researcher_spaced, # Assign to researcher with spaces
)
crew = Crew(
agents=[researcher_spaced, writer_caps],
process=Process.hierarchical,
manager_llm="gpt-4o",
tasks=[task],
)
mock_task_output = TaskOutput(
description="Mock description",
raw="mocked output",
agent="mocked agent"
)
task.output = mock_task_output
with patch.object(Task, 'execute_sync', return_value=mock_task_output) as mock_execute_sync:
crew.kickoff()
# Verify execute_sync was called once
mock_execute_sync.assert_called_once()
# Get the tools argument from the call
_, kwargs = mock_execute_sync.call_args
tools = kwargs['tools']
# Verify the delegation tools were passed correctly and can handle case/whitespace variations
assert len(tools) == 2
# Check delegation tool descriptions (should work despite case/whitespace differences)
delegation_tool = tools[0]
question_tool = tools[1]
assert "Delegate a specific task to one of the following coworkers:" in delegation_tool.description
assert " Researcher " in delegation_tool.description or "SENIOR WRITER" in delegation_tool.description
assert "Ask a specific question to one of the following coworkers:" in question_tool.description
assert " Researcher " in question_tool.description or "SENIOR WRITER" in question_tool.description
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_with_delegating_agents():
tasks = [
@@ -2006,90 +1941,6 @@ def test_crew_log_file_output(tmp_path):
assert test_file.exists()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_output_file_end_to_end(tmp_path):
"""Test output file functionality in a full crew context."""
# Create an agent
agent = Agent(
role="Researcher",
goal="Analyze AI topics",
backstory="You have extensive AI research experience.",
allow_delegation=False,
)
# Create a task with dynamic output file path
dynamic_path = tmp_path / "output_{topic}.txt"
task = Task(
description="Explain the advantages of {topic}.",
expected_output="A summary of the main advantages, bullet points recommended.",
agent=agent,
output_file=str(dynamic_path),
)
# Create and run the crew
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
crew.kickoff(inputs={"topic": "AI"})
# Verify file creation and cleanup
expected_file = tmp_path / "output_AI.txt"
assert expected_file.exists(), f"Output file {expected_file} was not created"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_output_file_validation_failures():
"""Test output file validation failures in a crew context."""
agent = Agent(
role="Researcher",
goal="Analyze data",
backstory="You analyze data files.",
allow_delegation=False,
)
# Test path traversal
with pytest.raises(ValueError, match="Path traversal"):
task = Task(
description="Analyze data",
expected_output="Analysis results",
agent=agent,
output_file="../output.txt"
)
Crew(agents=[agent], tasks=[task]).kickoff()
# Test shell special characters
with pytest.raises(ValueError, match="Shell special characters"):
task = Task(
description="Analyze data",
expected_output="Analysis results",
agent=agent,
output_file="output.txt | rm -rf /"
)
Crew(agents=[agent], tasks=[task]).kickoff()
# Test shell expansion
with pytest.raises(ValueError, match="Shell expansion"):
task = Task(
description="Analyze data",
expected_output="Analysis results",
agent=agent,
output_file="~/output.txt"
)
Crew(agents=[agent], tasks=[task]).kickoff()
# Test invalid template variable
with pytest.raises(ValueError, match="Invalid template variable"):
task = Task(
description="Analyze data",
expected_output="Analysis results",
agent=agent,
output_file="{invalid-name}/output.txt"
)
Crew(agents=[agent], tasks=[task]).kickoff()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_manager_agent():
from unittest.mock import patch
@@ -3274,4 +3125,4 @@ def test_multimodal_agent_live_image_analysis():
# Verify we got a meaningful response
assert isinstance(result.raw, str)
assert len(result.raw) > 100 # Expecting a detailed analysis
assert "error" not in result.raw.lower() # No error messages in response
assert "error" not in result.raw.lower() # No error messages in response

View File

@@ -584,28 +584,3 @@ def test_docling_source_with_local_file():
docling_source = CrewDoclingSource(file_paths=[pdf_path])
assert docling_source.file_paths == [pdf_path]
assert docling_source.content is not None
def test_file_path_validation():
"""Test file path validation for knowledge sources."""
current_dir = Path(__file__).parent
pdf_path = current_dir / "crewai_quickstart.pdf"
# Test valid single file_path
source = PDFKnowledgeSource(file_path=pdf_path)
assert source.safe_file_paths == [pdf_path]
# Test valid file_paths list
source = PDFKnowledgeSource(file_paths=[pdf_path])
assert source.safe_file_paths == [pdf_path]
# Test both file_path and file_paths provided (should use file_paths)
source = PDFKnowledgeSource(file_path=pdf_path, file_paths=[pdf_path])
assert source.safe_file_paths == [pdf_path]
# Test neither file_path nor file_paths provided
with pytest.raises(
ValueError,
match="file_path/file_paths must be a Path, str, or a list of these types"
):
PDFKnowledgeSource()

View File

@@ -0,0 +1,86 @@
import pytest
import numpy as np
from unittest.mock import patch, MagicMock
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
from crewai.utilities.constants import MEMORY_CHUNK_SIZE
@pytest.fixture
def short_term_memory():
"""Fixture to create a ShortTermMemory instance"""
agent = Agent(
role="Researcher",
goal="Search relevant data and provide results",
backstory="You are a researcher at a leading tech think tank.",
tools=[],
verbose=True,
)
task = Task(
description="Perform a search on specific topics.",
expected_output="A list of relevant URLs based on the search query.",
agent=agent,
)
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
def test_memory_with_large_input(short_term_memory):
"""Test that memory can handle large inputs without token limit errors"""
large_input = "test value " * (MEMORY_CHUNK_SIZE + 1000)
with patch.object(
short_term_memory.storage, '_chunk_text',
return_value=["chunk1", "chunk2"]
) as mock_chunk_text:
with patch.object(
short_term_memory.storage.collection, 'add'
) as mock_add:
short_term_memory.save(value=large_input, agent="test_agent")
assert mock_chunk_text.called
with patch.object(
short_term_memory.storage, 'search',
return_value=[{"context": large_input, "metadata": {"agent": "test_agent"}, "score": 0.95}]
):
result = short_term_memory.search(large_input[:100], score_threshold=0.01)
assert result[0]["context"] == large_input
assert result[0]["metadata"]["agent"] == "test_agent"
def test_memory_with_empty_input(short_term_memory):
"""Test that memory correctly handles empty input strings"""
empty_input = ""
with patch.object(
short_term_memory.storage, '_chunk_text',
return_value=[]
) as mock_chunk_text:
with patch.object(
short_term_memory.storage.collection, 'add'
) as mock_add:
short_term_memory.save(value=empty_input, agent="test_agent")
mock_chunk_text.assert_called_with(empty_input)
mock_add.assert_not_called()
def test_memory_with_exact_chunk_size_input(short_term_memory):
"""Test that memory correctly handles inputs that match chunk size exactly"""
exact_size_input = "x" * MEMORY_CHUNK_SIZE
with patch.object(
short_term_memory.storage, '_chunk_text',
return_value=[exact_size_input]
) as mock_chunk_text:
with patch.object(
short_term_memory.storage.collection, 'add'
) as mock_add:
short_term_memory.save(value=exact_size_input, agent="test_agent")
mock_chunk_text.assert_called_with(exact_size_input)
assert mock_add.call_count == 1

View File

@@ -719,24 +719,21 @@ def test_interpolate_inputs():
task = Task(
description="Give me a list of 5 interesting ideas about {topic} to explore for an article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 interesting ideas about {topic}.",
output_file="/tmp/{topic}/output_{date}.txt"
)
task.interpolate_inputs(inputs={"topic": "AI", "date": "2024"})
task.interpolate_inputs(inputs={"topic": "AI"})
assert (
task.description
== "Give me a list of 5 interesting ideas about AI to explore for an article, what makes them unique and interesting."
)
assert task.expected_output == "Bullet point list of 5 interesting ideas about AI."
assert task.output_file == "/tmp/AI/output_2024.txt"
task.interpolate_inputs(inputs={"topic": "ML", "date": "2025"})
task.interpolate_inputs(inputs={"topic": "ML"})
assert (
task.description
== "Give me a list of 5 interesting ideas about ML to explore for an article, what makes them unique and interesting."
)
assert task.expected_output == "Bullet point list of 5 interesting ideas about ML."
assert task.output_file == "/tmp/ML/output_2025.txt"
def test_interpolate_only():
@@ -875,61 +872,3 @@ def test_key():
assert (
task.key == hash
), "The key should be the hash of the non-interpolated description."
def test_output_file_validation():
"""Test output file path validation."""
# Valid paths
assert Task(
description="Test task",
expected_output="Test output",
output_file="output.txt"
).output_file == "output.txt"
assert Task(
description="Test task",
expected_output="Test output",
output_file="/tmp/output.txt"
).output_file == "tmp/output.txt"
assert Task(
description="Test task",
expected_output="Test output",
output_file="{dir}/output_{date}.txt"
).output_file == "{dir}/output_{date}.txt"
# Invalid paths
with pytest.raises(ValueError, match="Path traversal"):
Task(
description="Test task",
expected_output="Test output",
output_file="../output.txt"
)
with pytest.raises(ValueError, match="Path traversal"):
Task(
description="Test task",
expected_output="Test output",
output_file="folder/../output.txt"
)
with pytest.raises(ValueError, match="Shell special characters"):
Task(
description="Test task",
expected_output="Test output",
output_file="output.txt | rm -rf /"
)
with pytest.raises(ValueError, match="Shell expansion"):
Task(
description="Test task",
expected_output="Test output",
output_file="~/output.txt"
)
with pytest.raises(ValueError, match="Shell expansion"):
Task(
description="Test task",
expected_output="Test output",
output_file="$HOME/output.txt"
)
with pytest.raises(ValueError, match="Invalid template variable"):
Task(
description="Test task",
expected_output="Test output",
output_file="{invalid-name}/output.txt"
)

View File

@@ -1,55 +0,0 @@
from unittest.mock import MagicMock
import pytest
from crewai import Agent, Task
from crewai.tools.agent_tools.base_agent_tools import BaseAgentTool
class TestAgentTool(BaseAgentTool):
"""Concrete implementation of BaseAgentTool for testing."""
def _run(self, *args, **kwargs):
"""Implement required _run method."""
return "Test response"
@pytest.mark.parametrize("role_name,should_match", [
('Futel Official Infopoint', True), # exact match
(' "Futel Official Infopoint" ', True), # extra quotes and spaces
('Futel Official Infopoint\n', True), # trailing newline
('"Futel Official Infopoint"', True), # embedded quotes
(' FUTEL\nOFFICIAL INFOPOINT ', True), # multiple whitespace and newline
('futel official infopoint', True), # lowercase
('FUTEL OFFICIAL INFOPOINT', True), # uppercase
('Non Existent Agent', False), # non-existent agent
(None, False), # None agent name
])
def test_agent_tool_role_matching(role_name, should_match):
"""Test that agent tools can match roles regardless of case, whitespace, and special characters."""
# Create test agent
test_agent = Agent(
role='Futel Official Infopoint',
goal='Answer questions about Futel',
backstory='Futel Football Club info',
allow_delegation=False
)
# Create test agent tool
agent_tool = TestAgentTool(
name="test_tool",
description="Test tool",
agents=[test_agent]
)
# Test role matching
result = agent_tool._execute(
agent_name=role_name,
task='Test task',
context=None
)
if should_match:
assert "coworker mentioned not found" not in result.lower(), \
f"Should find agent with role name: {role_name}"
else:
assert "coworker mentioned not found" in result.lower(), \
f"Should not find agent with role name: {role_name}"

68
uv.lock generated
View File

@@ -1,18 +1,10 @@
version = 1
requires-python = ">=3.10, <3.13"
resolution-markers = [
"python_full_version < '3.11' and sys_platform == 'darwin'",
"python_full_version < '3.11' and platform_machine == 'aarch64' and sys_platform == 'linux'",
"(python_full_version < '3.11' and platform_machine != 'aarch64' and sys_platform == 'linux') or (python_full_version < '3.11' and sys_platform != 'darwin' and sys_platform != 'linux')",
"python_full_version == '3.11.*' and sys_platform == 'darwin'",
"python_full_version == '3.11.*' and platform_machine == 'aarch64' and sys_platform == 'linux'",
"(python_full_version == '3.11.*' and platform_machine != 'aarch64' and sys_platform == 'linux') or (python_full_version == '3.11.*' and sys_platform != 'darwin' and sys_platform != 'linux')",
"python_full_version >= '3.12' and python_full_version < '3.12.4' and sys_platform == 'darwin'",
"python_full_version >= '3.12' and python_full_version < '3.12.4' and platform_machine == 'aarch64' and sys_platform == 'linux'",
"(python_full_version >= '3.12' and python_full_version < '3.12.4' and platform_machine != 'aarch64' and sys_platform == 'linux') or (python_full_version >= '3.12' and python_full_version < '3.12.4' and sys_platform != 'darwin' and sys_platform != 'linux')",
"python_full_version >= '3.12.4' and sys_platform == 'darwin'",
"python_full_version >= '3.12.4' and platform_machine == 'aarch64' and sys_platform == 'linux'",
"(python_full_version >= '3.12.4' and platform_machine != 'aarch64' and sys_platform == 'linux') or (python_full_version >= '3.12.4' and sys_platform != 'darwin' and sys_platform != 'linux')",
"python_full_version < '3.11'",
"python_full_version == '3.11.*'",
"python_full_version >= '3.12' and python_full_version < '3.12.4'",
"python_full_version >= '3.12.4'",
]
[[package]]
@@ -308,7 +300,7 @@ name = "build"
version = "1.2.2.post1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "(os_name == 'nt' and platform_machine != 'aarch64' and sys_platform == 'linux') or (os_name == 'nt' and sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "colorama", marker = "os_name == 'nt'" },
{ name = "importlib-metadata", marker = "python_full_version < '3.10.2'" },
{ name = "packaging" },
{ name = "pyproject-hooks" },
@@ -543,7 +535,7 @@ name = "click"
version = "8.1.7"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "colorama", marker = "platform_system == 'Windows'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/96/d3/f04c7bfcf5c1862a2a5b845c6b2b360488cf47af55dfa79c98f6a6bf98b5/click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de", size = 336121 }
wheels = [
@@ -650,6 +642,7 @@ tools = [
[package.dev-dependencies]
dev = [
{ name = "cairosvg" },
{ name = "crewai-tools" },
{ name = "mkdocs" },
{ name = "mkdocs-material" },
{ name = "mkdocs-material-extensions" },
@@ -703,6 +696,7 @@ requires-dist = [
[package.metadata.requires-dev]
dev = [
{ name = "cairosvg", specifier = ">=2.7.1" },
{ name = "crewai-tools", specifier = ">=0.17.0" },
{ name = "mkdocs", specifier = ">=1.4.3" },
{ name = "mkdocs-material", specifier = ">=9.5.7" },
{ name = "mkdocs-material-extensions", specifier = ">=1.3.1" },
@@ -2468,7 +2462,7 @@ version = "1.6.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "colorama", marker = "platform_system == 'Windows'" },
{ name = "ghp-import" },
{ name = "jinja2" },
{ name = "markdown" },
@@ -2649,7 +2643,7 @@ version = "2.10.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pygments" },
{ name = "pywin32", marker = "sys_platform == 'win32'" },
{ name = "pywin32", marker = "platform_system == 'Windows'" },
{ name = "tqdm" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3a/93/80ac75c20ce54c785648b4ed363c88f148bf22637e10c9863db4fbe73e74/mpire-2.10.2.tar.gz", hash = "sha256:f66a321e93fadff34585a4bfa05e95bd946cf714b442f51c529038eb45773d97", size = 271270 }
@@ -2896,7 +2890,7 @@ name = "nvidia-cudnn-cu12"
version = "9.1.0.70"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and platform_system != 'Darwin') or (platform_system != 'Darwin' and platform_system != 'Linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/9f/fd/713452cd72343f682b1c7b9321e23829f00b842ceaedcda96e742ea0b0b3/nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl", hash = "sha256:165764f44ef8c61fcdfdfdbe769d687e06374059fbb388b6c89ecb0e28793a6f", size = 664752741 },
@@ -2923,9 +2917,9 @@ name = "nvidia-cusolver-cu12"
version = "11.4.5.107"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-cusparse-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-cublas-cu12", marker = "(platform_machine != 'aarch64' and platform_system != 'Darwin') or (platform_system != 'Darwin' and platform_system != 'Linux')" },
{ name = "nvidia-cusparse-cu12", marker = "(platform_machine != 'aarch64' and platform_system != 'Darwin') or (platform_system != 'Darwin' and platform_system != 'Linux')" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and platform_system != 'Darwin') or (platform_system != 'Darwin' and platform_system != 'Linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/bc/1d/8de1e5c67099015c834315e333911273a8c6aaba78923dd1d1e25fc5f217/nvidia_cusolver_cu12-11.4.5.107-py3-none-manylinux1_x86_64.whl", hash = "sha256:8a7ec542f0412294b15072fa7dab71d31334014a69f953004ea7a118206fe0dd", size = 124161928 },
@@ -2936,7 +2930,7 @@ name = "nvidia-cusparse-cu12"
version = "12.1.0.106"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and platform_system != 'Darwin') or (platform_system != 'Darwin' and platform_system != 'Linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/65/5b/cfaeebf25cd9fdec14338ccb16f6b2c4c7fa9163aefcf057d86b9cc248bb/nvidia_cusparse_cu12-12.1.0.106-py3-none-manylinux1_x86_64.whl", hash = "sha256:f3b50f42cf363f86ab21f720998517a659a48131e8d538dc02f8768237bd884c", size = 195958278 },
@@ -3486,7 +3480,7 @@ name = "portalocker"
version = "2.10.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pywin32", marker = "sys_platform == 'win32'" },
{ name = "pywin32", marker = "platform_system == 'Windows'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ed/d3/c6c64067759e87af98cc668c1cc75171347d0f1577fab7ca3749134e3cd4/portalocker-2.10.1.tar.gz", hash = "sha256:ef1bf844e878ab08aee7e40184156e1151f228f103aa5c6bd0724cc330960f8f", size = 40891 }
wheels = [
@@ -5028,19 +5022,19 @@ dependencies = [
{ name = "fsspec" },
{ name = "jinja2" },
{ name = "networkx" },
{ name = "nvidia-cublas-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-cuda-cupti-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-cuda-nvrtc-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-cuda-runtime-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-cudnn-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-cufft-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-curand-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-cusolver-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-cusparse-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-nccl-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-nvtx-cu12", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "nvidia-cublas-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-cuda-cupti-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-cuda-nvrtc-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-cuda-runtime-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-cudnn-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-cufft-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-curand-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-cusolver-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-cusparse-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-nccl-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "nvidia-nvtx-cu12", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "sympy" },
{ name = "triton", marker = "platform_machine == 'x86_64' and sys_platform == 'linux'" },
{ name = "triton", marker = "platform_machine == 'x86_64' and platform_system == 'Linux'" },
{ name = "typing-extensions" },
]
wheels = [
@@ -5087,7 +5081,7 @@ name = "tqdm"
version = "4.66.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "colorama", marker = "platform_system == 'Windows'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/58/83/6ba9844a41128c62e810fddddd72473201f3eacde02046066142a2d96cc5/tqdm-4.66.5.tar.gz", hash = "sha256:e1020aef2e5096702d8a025ac7d16b1577279c9d63f8375b63083e9a5f0fcbad", size = 169504 }
wheels = [
@@ -5130,7 +5124,7 @@ version = "0.27.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "attrs" },
{ name = "cffi", marker = "(implementation_name != 'pypy' and os_name == 'nt' and platform_machine != 'aarch64' and sys_platform == 'linux') or (implementation_name != 'pypy' and os_name == 'nt' and sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "cffi", marker = "implementation_name != 'pypy' and os_name == 'nt'" },
{ name = "exceptiongroup", marker = "python_full_version < '3.11'" },
{ name = "idna" },
{ name = "outcome" },
@@ -5161,7 +5155,7 @@ name = "triton"
version = "3.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "filelock", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" },
{ name = "filelock", marker = "(platform_machine != 'aarch64' and platform_system != 'Darwin') or (platform_system != 'Darwin' and platform_system != 'Linux')" },
]
wheels = [
{ url = "https://files.pythonhosted.org/packages/45/27/14cc3101409b9b4b9241d2ba7deaa93535a217a211c86c4cc7151fb12181/triton-3.0.0-1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:e1efef76935b2febc365bfadf74bcb65a6f959a9872e5bddf44cc9e0adce1e1a", size = 209376304 },