mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-05 14:18:30 +00:00
Compare commits
3 Commits
fix/lint-t
...
devin/1735
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c54a65983a | ||
|
|
91fa7a38e5 | ||
|
|
e9cec842b2 |
17
README.md
17
README.md
@@ -85,6 +85,7 @@ First, install CrewAI:
|
||||
```shell
|
||||
pip install crewai
|
||||
```
|
||||
|
||||
If you want to install the 'crewai' package along with its optional features that include additional tools for agents, you can do so by using the following command:
|
||||
|
||||
```shell
|
||||
@@ -92,22 +93,6 @@ pip install 'crewai[tools]'
|
||||
```
|
||||
The command above installs the basic package and also adds extra components which require more dependencies to function.
|
||||
|
||||
### Troubleshooting Dependencies
|
||||
|
||||
If you encounter issues during installation or usage, here are some common solutions:
|
||||
|
||||
#### Common Issues
|
||||
|
||||
1. **ModuleNotFoundError: No module named 'tiktoken'**
|
||||
- Install tiktoken explicitly: `pip install 'crewai[embeddings]'`
|
||||
- If using embedchain or other tools: `pip install 'crewai[tools]'`
|
||||
|
||||
2. **Failed building wheel for tiktoken**
|
||||
- Ensure Rust compiler is installed (see installation steps above)
|
||||
- For Windows: Verify Visual C++ Build Tools are installed
|
||||
- Try upgrading pip: `pip install --upgrade pip`
|
||||
- If issues persist, use a pre-built wheel: `pip install tiktoken --prefer-binary`
|
||||
|
||||
### 2. Setting Up Your Crew with the YAML Configuration
|
||||
|
||||
To create a new CrewAI project, run the following CLI (Command Line Interface) command:
|
||||
|
||||
@@ -10,11 +10,26 @@ icon: bars-staggered
|
||||
These processes ensure tasks are distributed and executed efficiently, in alignment with a predefined strategy.
|
||||
</Tip>
|
||||
|
||||
## Process Implementations
|
||||
## Process Types
|
||||
|
||||
- **Sequential**: Executes tasks sequentially, ensuring tasks are completed in an orderly progression.
|
||||
- **Hierarchical**: Organizes tasks in a managerial hierarchy, where tasks are delegated and executed based on a structured chain of command. A manager language model (`manager_llm`) or a custom manager agent (`manager_agent`) must be specified in the crew to enable the hierarchical process, facilitating the creation and management of tasks by the manager.
|
||||
- **Consensual Process (Planned)**: Aiming for collaborative decision-making among agents on task execution, this process type introduces a democratic approach to task management within CrewAI. It is planned for future development and is not currently implemented in the codebase.
|
||||
CrewAI supports two process types that determine how tasks are assigned to agents:
|
||||
|
||||
- **Static/Assigned Process** (formerly "Sequential"): In this process, each task must be pre-assigned to a specific agent. While the name "Sequential" was previously used, it's important to note that tasks are always executed in the order they are defined, regardless of the process type chosen. The key characteristic of this process is that it requires explicit agent assignments for each task.
|
||||
|
||||
- **Dynamic/Unassigned Process** (formerly "Hierarchical"): In this process, you do not have to assign agents to tasks explicitly. Instead, the crew will assess available agents and automatically select the most suitable one for each task based on their roles and expertise. This requires specifying either a manager language model (`manager_llm`) or a custom manager agent (`manager_agent`) to handle the agent selection and task delegation.
|
||||
|
||||
- **Consensual Process** (Planned): Aiming for collaborative decision-making among agents on task execution, this process type introduces a democratic approach to task management within CrewAI. It is planned for future development and is not currently implemented in the codebase.
|
||||
|
||||
### Process Type Comparison
|
||||
|
||||
| Aspect | Static/Assigned Process | Dynamic/Unassigned Process |
|
||||
|--------|------------------------|---------------------------|
|
||||
| Agent Assignment | Pre-assigned by developer | Automatic based on agent capabilities |
|
||||
| Task Order | Sequential (defined order) | Sequential (defined order) |
|
||||
| Manager Required | No | Yes (manager_llm or manager_agent) |
|
||||
| Use Case | Fixed workflows with known agent assignments | Dynamic workflows needing flexible assignment |
|
||||
| Configuration | Simpler setup, explicit control | Requires manager configuration |
|
||||
| Task-Agent Mapping | One-to-one, defined at creation | Determined during execution |
|
||||
|
||||
## The Role of Processes in Teamwork
|
||||
Processes enable individual agents to operate as a cohesive unit, streamlining their efforts to achieve common objectives with efficiency and coherence.
|
||||
@@ -23,45 +38,149 @@ Processes enable individual agents to operate as a cohesive unit, streamlining t
|
||||
To assign a process to a crew, specify the process type upon crew creation to set the execution strategy. For a hierarchical process, ensure to define `manager_llm` or `manager_agent` for the manager agent.
|
||||
|
||||
```python
|
||||
from crewai import Crew
|
||||
from crewai import Crew, Agent, Task
|
||||
from crewai.process import Process
|
||||
from langchain_openai import ChatOpenAI
|
||||
|
||||
# Example: Creating a crew with a sequential process
|
||||
crew = Crew(
|
||||
agents=my_agents,
|
||||
tasks=my_tasks,
|
||||
process=Process.sequential
|
||||
# Define agents with specific roles and expertise
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Conduct thorough market analysis",
|
||||
backstory="Expert in data analysis and market research"
|
||||
)
|
||||
|
||||
# Example: Creating a crew with a hierarchical process
|
||||
# Ensure to provide a manager_llm or manager_agent
|
||||
crew = Crew(
|
||||
agents=my_agents,
|
||||
tasks=my_tasks,
|
||||
process=Process.hierarchical,
|
||||
manager_llm=ChatOpenAI(model="gpt-4")
|
||||
# or
|
||||
# manager_agent=my_manager_agent
|
||||
writer = Agent(
|
||||
role="Writer",
|
||||
goal="Create comprehensive reports",
|
||||
backstory="Technical writer with expertise in market analysis"
|
||||
)
|
||||
|
||||
# Example 1: Static/Assigned Process
|
||||
# Tasks must be explicitly assigned to agents
|
||||
research_task = Task(
|
||||
description="Research emerging market trends",
|
||||
agent=researcher # Explicit agent assignment required
|
||||
)
|
||||
|
||||
writing_task = Task(
|
||||
description="Create market analysis report",
|
||||
agent=writer, # Explicit agent assignment required
|
||||
context="Use research findings to create a detailed report"
|
||||
)
|
||||
|
||||
# Create crew with Static/Assigned process
|
||||
static_crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
tasks=[research_task, writing_task],
|
||||
process=Process.sequential # Note: Using old enum value for backward compatibility
|
||||
)
|
||||
|
||||
# Example 2: Dynamic/Unassigned Process
|
||||
# Tasks without pre-assigned agents
|
||||
dynamic_research_task = Task(
|
||||
description="Research emerging market trends",
|
||||
required_skills=["market analysis", "data interpretation"] # Help manager select agent
|
||||
)
|
||||
|
||||
dynamic_writing_task = Task(
|
||||
description="Create market analysis report",
|
||||
context="Use research findings to create a detailed report",
|
||||
required_skills=["technical writing", "data visualization"]
|
||||
)
|
||||
|
||||
# Create crew with Dynamic/Unassigned process
|
||||
dynamic_crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
tasks=[dynamic_research_task, dynamic_writing_task],
|
||||
process=Process.hierarchical, # Note: Using old enum value for backward compatibility
|
||||
manager_llm=ChatOpenAI(model="gpt-4") # Manager will assign tasks to suitable agents
|
||||
# Alternative: Use custom manager agent
|
||||
# manager_agent=custom_manager_agent
|
||||
)
|
||||
```
|
||||
**Note:** Ensure `my_agents` and `my_tasks` are defined prior to creating a `Crew` object, and for the hierarchical process, either `manager_llm` or `manager_agent` is also required.
|
||||
|
||||
## Sequential Process
|
||||
## Static/Assigned Process
|
||||
|
||||
This method mirrors dynamic team workflows, progressing through tasks in a thoughtful and systematic manner. Task execution follows the predefined order in the task list, with the output of one task serving as context for the next.
|
||||
This process type requires explicit agent assignments for each task. Tasks are executed in their defined order, with the output of one task serving as context for the next. This approach provides direct control over which agent handles each specific task.
|
||||
|
||||
To customize task context, utilize the `context` parameter in the `Task` class to specify outputs that should be used as context for subsequent tasks.
|
||||
|
||||
## Hierarchical Process
|
||||
## Dynamic/Unassigned Process
|
||||
|
||||
Emulates a corporate hierarchy, CrewAI allows specifying a custom manager agent or automatically creates one, requiring the specification of a manager language model (`manager_llm`). This agent oversees task execution, including planning, delegation, and validation. Tasks are not pre-assigned; the manager allocates tasks to agents based on their capabilities, reviews outputs, and assesses task completion.
|
||||
This process type enables automatic agent selection through a manager component. You must specify either a custom manager agent or a manager language model (`manager_llm`). The manager oversees task execution by:
|
||||
- Analyzing task requirements
|
||||
- Selecting the most suitable agent based on roles and expertise
|
||||
- Delegating tasks automatically
|
||||
- Reviewing outputs and assessing task completion
|
||||
|
||||
## Process Class: Detailed Overview
|
||||
## Choosing the Right Process
|
||||
|
||||
The `Process` class is implemented as an enumeration (`Enum`), ensuring type safety and restricting process values to the defined types (`sequential`, `hierarchical`). The consensual process is planned for future inclusion, emphasizing our commitment to continuous development and innovation.
|
||||
When deciding between Static/Assigned and Dynamic/Unassigned processes, consider these technical factors:
|
||||
|
||||
### Static/Assigned Process
|
||||
Consider this process when:
|
||||
- Your workflow has predefined task-agent mappings
|
||||
- You need deterministic agent assignments for auditing or compliance
|
||||
- You want to minimize runtime overhead (no manager required)
|
||||
- You have specific agents optimized for particular tasks
|
||||
- You need fine-grained control over task execution
|
||||
|
||||
### Dynamic/Unassigned Process
|
||||
Consider this process when:
|
||||
- Your agent pool has overlapping capabilities
|
||||
- Task requirements are determined at runtime
|
||||
- You need failover capabilities between agents
|
||||
- You have a manager (LLM or agent) to handle assignment logic
|
||||
- You want to scale agent pools without modifying task definitions
|
||||
|
||||
### Technical Considerations
|
||||
- **Performance**: Static/Assigned processes have lower overhead as they skip the agent selection phase
|
||||
- **Scalability**: Dynamic/Unassigned processes better handle changes in agent availability
|
||||
- **Maintenance**: Static assignments require updating task definitions when agent roles change
|
||||
- **Error Handling**: Dynamic processes can potentially reassign tasks on agent failures
|
||||
|
||||
## Technical Implementation Details
|
||||
|
||||
### Process Class
|
||||
The `Process` class is implemented as an enumeration (`Enum`), ensuring type safety and restricting process values to the defined types:
|
||||
```python
|
||||
class Process(Enum):
|
||||
sequential = "sequential" # Static/Assigned process
|
||||
hierarchical = "hierarchical" # Dynamic/Unassigned process
|
||||
```
|
||||
|
||||
### Default Configuration
|
||||
- The default process type is `Process.sequential` (Static/Assigned)
|
||||
- When using `Process.hierarchical`, a manager (either `manager_llm` or `manager_agent`) must be provided
|
||||
|
||||
### Version Compatibility
|
||||
- Since v1.0.0: Original process types (`sequential`, `hierarchical`)
|
||||
- Current version maintains the same enum values for backward compatibility
|
||||
- Future versions will continue supporting these values while using new terminology in documentation
|
||||
|
||||
### Error Handling
|
||||
Common error scenarios and their solutions:
|
||||
```python
|
||||
# Error: Missing manager in Dynamic/Unassigned process
|
||||
crew = Crew(
|
||||
agents=[agent1, agent2],
|
||||
tasks=[task1, task2],
|
||||
process=Process.hierarchical
|
||||
# Error: ValueError: Manager (manager_llm or manager_agent) is required for hierarchical process
|
||||
)
|
||||
|
||||
# Error: Missing agent assignment in Static/Assigned process
|
||||
task = Task(description="Task without agent") # Missing agent assignment
|
||||
crew = Crew(
|
||||
agents=[agent1, agent2],
|
||||
tasks=[task],
|
||||
process=Process.sequential
|
||||
# Error: ValueError: Agent assignment required for all tasks in sequential process
|
||||
)
|
||||
```
|
||||
|
||||
## Conclusion
|
||||
|
||||
The structured collaboration facilitated by processes within CrewAI is crucial for enabling systematic teamwork among agents.
|
||||
This documentation has been updated to reflect the latest features, enhancements, and the planned integration of the Consensual Process, ensuring users have access to the most current and comprehensive information.
|
||||
This documentation has been updated to reflect the latest features, enhancements, and the planned integration of the Consensual Process, ensuring users have access to the most current and comprehensive information.
|
||||
|
||||
@@ -12,20 +12,8 @@ Tasks provide all necessary details for execution, such as a description, the ag
|
||||
|
||||
Tasks within CrewAI can be collaborative, requiring multiple agents to work together. This is managed through the task properties and orchestrated by the Crew's process, enhancing teamwork and efficiency.
|
||||
|
||||
### Task Execution Flow
|
||||
|
||||
Tasks can be executed in two ways:
|
||||
- **Sequential**: Tasks are executed in the order they are defined
|
||||
- **Hierarchical**: Tasks are assigned to agents based on their roles and expertise
|
||||
|
||||
The execution flow is defined when creating the crew:
|
||||
```python Code
|
||||
crew = Crew(
|
||||
agents=[agent1, agent2],
|
||||
tasks=[task1, task2],
|
||||
process=Process.sequential # or Process.hierarchical
|
||||
)
|
||||
```
|
||||
### Task Execution
|
||||
Tasks are always executed in the order they are defined. For information about how tasks are assigned to agents and the different process types available (Static/Assigned vs Dynamic/Unassigned), please refer to the [Processes](/concepts/processes) section.
|
||||
|
||||
## Task Attributes
|
||||
|
||||
|
||||
@@ -8,38 +8,27 @@ authors = [
|
||||
{ name = "Joao Moura", email = "joao@crewai.com" }
|
||||
]
|
||||
dependencies = [
|
||||
# Core Dependencies
|
||||
"pydantic>=2.4.2",
|
||||
"openai>=1.13.3",
|
||||
"litellm>=1.44.22",
|
||||
"instructor>=1.3.3",
|
||||
|
||||
# Text Processing
|
||||
"pdfplumber>=0.11.4",
|
||||
"regex>=2024.9.11",
|
||||
|
||||
# Telemetry and Monitoring
|
||||
"opentelemetry-api>=1.22.0",
|
||||
"opentelemetry-sdk>=1.22.0",
|
||||
"opentelemetry-exporter-otlp-proto-http>=1.22.0",
|
||||
|
||||
# Data Handling
|
||||
"chromadb>=0.5.23",
|
||||
"openpyxl>=3.1.5",
|
||||
"pyvis>=0.3.2",
|
||||
|
||||
# Authentication and Security
|
||||
"auth0-python>=4.7.1",
|
||||
"python-dotenv>=1.0.0",
|
||||
|
||||
# Configuration and Utils
|
||||
"instructor>=1.3.3",
|
||||
"regex>=2024.9.11",
|
||||
"click>=8.1.7",
|
||||
"python-dotenv>=1.0.0",
|
||||
"appdirs>=1.4.4",
|
||||
"jsonref>=1.1.0",
|
||||
"json-repair>=0.25.2",
|
||||
"auth0-python>=4.7.1",
|
||||
"litellm>=1.44.22",
|
||||
"pyvis>=0.3.2",
|
||||
"uv>=0.4.25",
|
||||
"tomli-w>=1.1.0",
|
||||
"tomli>=2.0.2",
|
||||
"chromadb>=0.5.23",
|
||||
"pdfplumber>=0.11.4",
|
||||
"openpyxl>=3.1.5",
|
||||
"blinker>=1.9.0",
|
||||
]
|
||||
|
||||
@@ -50,9 +39,6 @@ Repository = "https://github.com/crewAIInc/crewAI"
|
||||
|
||||
[project.optional-dependencies]
|
||||
tools = ["crewai-tools>=0.17.0"]
|
||||
embeddings = [
|
||||
"tiktoken~=0.7.0"
|
||||
]
|
||||
agentops = ["agentops>=0.3.0"]
|
||||
fastembed = ["fastembed>=0.4.1"]
|
||||
pdfplumber = [
|
||||
|
||||
@@ -30,47 +30,7 @@ from crewai.telemetry import Telemetry
|
||||
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
|
||||
|
||||
|
||||
def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
|
||||
"""
|
||||
Marks a method as a flow's starting point.
|
||||
|
||||
This decorator designates a method as an entry point for the flow execution.
|
||||
It can optionally specify conditions that trigger the start based on other
|
||||
method executions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition : Optional[Union[str, dict, Callable]], optional
|
||||
Defines when the start method should execute. Can be:
|
||||
- str: Name of a method that triggers this start
|
||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
||||
- Callable: A method reference that triggers this start
|
||||
Default is None, meaning unconditional start.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
A decorator function that marks the method as a flow start point.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the condition format is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @start() # Unconditional start
|
||||
>>> def begin_flow(self):
|
||||
... pass
|
||||
|
||||
>>> @start("method_name") # Start after specific method
|
||||
>>> def conditional_start(self):
|
||||
... pass
|
||||
|
||||
>>> @start(and_("method1", "method2")) # Start after multiple methods
|
||||
>>> def complex_start(self):
|
||||
... pass
|
||||
"""
|
||||
def start(condition=None):
|
||||
def decorator(func):
|
||||
func.__is_start_method__ = True
|
||||
if condition is not None:
|
||||
@@ -95,42 +55,8 @@ def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
|
||||
|
||||
return decorator
|
||||
|
||||
def listen(condition: Union[str, dict, Callable]) -> Callable:
|
||||
"""
|
||||
Creates a listener that executes when specified conditions are met.
|
||||
|
||||
This decorator sets up a method to execute in response to other method
|
||||
executions in the flow. It supports both simple and complex triggering
|
||||
conditions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition : Union[str, dict, Callable]
|
||||
Specifies when the listener should execute. Can be:
|
||||
- str: Name of a method that triggers this listener
|
||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
||||
- Callable: A method reference that triggers this listener
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
A decorator function that sets up the method as a listener.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the condition format is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @listen("process_data") # Listen to single method
|
||||
>>> def handle_processed_data(self):
|
||||
... pass
|
||||
|
||||
>>> @listen(or_("success", "failure")) # Listen to multiple methods
|
||||
>>> def handle_completion(self):
|
||||
... pass
|
||||
"""
|
||||
def listen(condition):
|
||||
def decorator(func):
|
||||
if isinstance(condition, str):
|
||||
func.__trigger_methods__ = [condition]
|
||||
@@ -154,49 +80,10 @@ def listen(condition: Union[str, dict, Callable]) -> Callable:
|
||||
return decorator
|
||||
|
||||
|
||||
def router(condition: Union[str, dict, Callable]) -> Callable:
|
||||
"""
|
||||
Creates a routing method that directs flow execution based on conditions.
|
||||
|
||||
This decorator marks a method as a router, which can dynamically determine
|
||||
the next steps in the flow based on its return value. Routers are triggered
|
||||
by specified conditions and can return constants that determine which path
|
||||
the flow should take.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition : Union[str, dict, Callable]
|
||||
Specifies when the router should execute. Can be:
|
||||
- str: Name of a method that triggers this router
|
||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
||||
- Callable: A method reference that triggers this router
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
A decorator function that sets up the method as a router.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the condition format is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @router("check_status")
|
||||
>>> def route_based_on_status(self):
|
||||
... if self.state.status == "success":
|
||||
... return SUCCESS
|
||||
... return FAILURE
|
||||
|
||||
>>> @router(and_("validate", "process"))
|
||||
>>> def complex_routing(self):
|
||||
... if all([self.state.valid, self.state.processed]):
|
||||
... return CONTINUE
|
||||
... return STOP
|
||||
"""
|
||||
def router(condition):
|
||||
def decorator(func):
|
||||
func.__is_router__ = True
|
||||
# Handle conditions like listen/start
|
||||
if isinstance(condition, str):
|
||||
func.__trigger_methods__ = [condition]
|
||||
func.__condition_type__ = "OR"
|
||||
@@ -218,39 +105,8 @@ def router(condition: Union[str, dict, Callable]) -> Callable:
|
||||
|
||||
return decorator
|
||||
|
||||
def or_(*conditions: Union[str, dict, Callable]) -> dict:
|
||||
"""
|
||||
Combines multiple conditions with OR logic for flow control.
|
||||
|
||||
Creates a condition that is satisfied when any of the specified conditions
|
||||
are met. This is used with @start, @listen, or @router decorators to create
|
||||
complex triggering conditions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*conditions : Union[str, dict, Callable]
|
||||
Variable number of conditions that can be:
|
||||
- str: Method names
|
||||
- dict: Existing condition dictionaries
|
||||
- Callable: Method references
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
A condition dictionary with format:
|
||||
{"type": "OR", "methods": list_of_method_names}
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If any condition is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @listen(or_("success", "timeout"))
|
||||
>>> def handle_completion(self):
|
||||
... pass
|
||||
"""
|
||||
def or_(*conditions):
|
||||
methods = []
|
||||
for condition in conditions:
|
||||
if isinstance(condition, dict) and "methods" in condition:
|
||||
@@ -264,39 +120,7 @@ def or_(*conditions: Union[str, dict, Callable]) -> dict:
|
||||
return {"type": "OR", "methods": methods}
|
||||
|
||||
|
||||
def and_(*conditions: Union[str, dict, Callable]) -> dict:
|
||||
"""
|
||||
Combines multiple conditions with AND logic for flow control.
|
||||
|
||||
Creates a condition that is satisfied only when all specified conditions
|
||||
are met. This is used with @start, @listen, or @router decorators to create
|
||||
complex triggering conditions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*conditions : Union[str, dict, Callable]
|
||||
Variable number of conditions that can be:
|
||||
- str: Method names
|
||||
- dict: Existing condition dictionaries
|
||||
- Callable: Method references
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
A condition dictionary with format:
|
||||
{"type": "AND", "methods": list_of_method_names}
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If any condition is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @listen(and_("validated", "processed"))
|
||||
>>> def handle_complete_data(self):
|
||||
... pass
|
||||
"""
|
||||
def and_(*conditions):
|
||||
methods = []
|
||||
for condition in conditions:
|
||||
if isinstance(condition, dict) and "methods" in condition:
|
||||
@@ -462,23 +286,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return final_output
|
||||
|
||||
async def _execute_start_method(self, start_method_name: str) -> None:
|
||||
"""
|
||||
Executes a flow's start method and its triggered listeners.
|
||||
|
||||
This internal method handles the execution of methods marked with @start
|
||||
decorator and manages the subsequent chain of listener executions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
start_method_name : str
|
||||
The name of the start method to execute.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Executes the start method and captures its result
|
||||
- Triggers execution of any listeners waiting on this start method
|
||||
- Part of the flow's initialization sequence
|
||||
"""
|
||||
result = await self._execute_method(
|
||||
start_method_name, self._methods[start_method_name]
|
||||
)
|
||||
@@ -499,28 +306,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return result
|
||||
|
||||
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
|
||||
"""
|
||||
Executes all listeners and routers triggered by a method completion.
|
||||
|
||||
This internal method manages the execution flow by:
|
||||
1. First executing all triggered routers sequentially
|
||||
2. Then executing all triggered listeners in parallel
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trigger_method : str
|
||||
The name of the method that triggered these listeners.
|
||||
result : Any
|
||||
The result from the triggering method, passed to listeners
|
||||
that accept parameters.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Routers are executed sequentially to maintain flow control
|
||||
- Each router's result becomes the new trigger_method
|
||||
- Normal listeners are executed in parallel for efficiency
|
||||
- Listeners can receive the trigger method's result as a parameter
|
||||
"""
|
||||
# First, handle routers repeatedly until no router triggers anymore
|
||||
while True:
|
||||
routers_triggered = self._find_triggered_methods(
|
||||
@@ -550,33 +335,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
def _find_triggered_methods(
|
||||
self, trigger_method: str, router_only: bool
|
||||
) -> List[str]:
|
||||
"""
|
||||
Finds all methods that should be triggered based on conditions.
|
||||
|
||||
This internal method evaluates both OR and AND conditions to determine
|
||||
which methods should be executed next in the flow.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trigger_method : str
|
||||
The name of the method that just completed execution.
|
||||
router_only : bool
|
||||
If True, only consider router methods.
|
||||
If False, only consider non-router methods.
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[str]
|
||||
Names of methods that should be triggered.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Handles both OR and AND conditions:
|
||||
* OR: Triggers if any condition is met
|
||||
* AND: Triggers only when all conditions are met
|
||||
- Maintains state for AND conditions using _pending_and_listeners
|
||||
- Separates router and normal listener evaluation
|
||||
"""
|
||||
triggered = []
|
||||
for listener_name, (condition_type, methods) in self._listeners.items():
|
||||
is_router = listener_name in self._routers
|
||||
@@ -605,33 +363,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return triggered
|
||||
|
||||
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
|
||||
"""
|
||||
Executes a single listener method with proper event handling.
|
||||
|
||||
This internal method manages the execution of an individual listener,
|
||||
including parameter inspection, event emission, and error handling.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
listener_name : str
|
||||
The name of the listener method to execute.
|
||||
result : Any
|
||||
The result from the triggering method, which may be passed
|
||||
to the listener if it accepts parameters.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Inspects method signature to determine if it accepts the trigger result
|
||||
- Emits events for method execution start and finish
|
||||
- Handles errors gracefully with detailed logging
|
||||
- Recursively triggers listeners of this listener
|
||||
- Supports both parameterized and parameter-less listeners
|
||||
|
||||
Error Handling
|
||||
-------------
|
||||
Catches and logs any exceptions during execution, preventing
|
||||
individual listener failures from breaking the entire flow.
|
||||
"""
|
||||
try:
|
||||
method = self._methods[listener_name]
|
||||
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
# flow_visualizer.py
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from pyvis.network import Network
|
||||
|
||||
from crewai.flow.config import COLORS, NODE_STYLES
|
||||
from crewai.flow.html_template_handler import HTMLTemplateHandler
|
||||
from crewai.flow.legend_generator import generate_legend_items_html, get_legend_items
|
||||
from crewai.flow.path_utils import safe_path_join, validate_path_exists
|
||||
from crewai.flow.utils import calculate_node_levels
|
||||
from crewai.flow.visualization_utils import (
|
||||
add_edges,
|
||||
@@ -18,209 +16,89 @@ from crewai.flow.visualization_utils import (
|
||||
|
||||
|
||||
class FlowPlot:
|
||||
"""Handles the creation and rendering of flow visualization diagrams."""
|
||||
|
||||
def __init__(self, flow):
|
||||
"""
|
||||
Initialize FlowPlot with a flow object.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Flow
|
||||
A Flow instance to visualize.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If flow object is invalid or missing required attributes.
|
||||
"""
|
||||
if not hasattr(flow, '_methods'):
|
||||
raise ValueError("Invalid flow object: missing '_methods' attribute")
|
||||
if not hasattr(flow, '_listeners'):
|
||||
raise ValueError("Invalid flow object: missing '_listeners' attribute")
|
||||
if not hasattr(flow, '_start_methods'):
|
||||
raise ValueError("Invalid flow object: missing '_start_methods' attribute")
|
||||
|
||||
self.flow = flow
|
||||
self.colors = COLORS
|
||||
self.node_styles = NODE_STYLES
|
||||
|
||||
def plot(self, filename):
|
||||
"""
|
||||
Generate and save an HTML visualization of the flow.
|
||||
net = Network(
|
||||
directed=True,
|
||||
height="750px",
|
||||
width="100%",
|
||||
bgcolor=self.colors["bg"],
|
||||
layout=None,
|
||||
)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filename : str
|
||||
Name of the output file (without extension).
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If filename is invalid or network generation fails.
|
||||
IOError
|
||||
If file operations fail or visualization cannot be generated.
|
||||
RuntimeError
|
||||
If network visualization generation fails.
|
||||
"""
|
||||
if not filename or not isinstance(filename, str):
|
||||
raise ValueError("Filename must be a non-empty string")
|
||||
|
||||
try:
|
||||
# Initialize network
|
||||
net = Network(
|
||||
directed=True,
|
||||
height="750px",
|
||||
width="100%",
|
||||
bgcolor=self.colors["bg"],
|
||||
layout=None,
|
||||
)
|
||||
|
||||
# Set options to disable physics
|
||||
net.set_options(
|
||||
"""
|
||||
var options = {
|
||||
"nodes": {
|
||||
"font": {
|
||||
"multi": "html"
|
||||
}
|
||||
},
|
||||
"physics": {
|
||||
"enabled": false
|
||||
}
|
||||
}
|
||||
# Set options to disable physics
|
||||
net.set_options(
|
||||
"""
|
||||
)
|
||||
var options = {
|
||||
"nodes": {
|
||||
"font": {
|
||||
"multi": "html"
|
||||
}
|
||||
},
|
||||
"physics": {
|
||||
"enabled": false
|
||||
}
|
||||
}
|
||||
"""
|
||||
)
|
||||
|
||||
# Calculate levels for nodes
|
||||
try:
|
||||
node_levels = calculate_node_levels(self.flow)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to calculate node levels: {str(e)}")
|
||||
# Calculate levels for nodes
|
||||
node_levels = calculate_node_levels(self.flow)
|
||||
|
||||
# Compute positions
|
||||
try:
|
||||
node_positions = compute_positions(self.flow, node_levels)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to compute node positions: {str(e)}")
|
||||
# Compute positions
|
||||
node_positions = compute_positions(self.flow, node_levels)
|
||||
|
||||
# Add nodes to the network
|
||||
try:
|
||||
add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to add nodes to network: {str(e)}")
|
||||
# Add nodes to the network
|
||||
add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
|
||||
|
||||
# Add edges to the network
|
||||
try:
|
||||
add_edges(net, self.flow, node_positions, self.colors)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to add edges to network: {str(e)}")
|
||||
# Add edges to the network
|
||||
add_edges(net, self.flow, node_positions, self.colors)
|
||||
|
||||
# Generate HTML
|
||||
try:
|
||||
network_html = net.generate_html()
|
||||
final_html_content = self._generate_final_html(network_html)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to generate network visualization: {str(e)}")
|
||||
network_html = net.generate_html()
|
||||
final_html_content = self._generate_final_html(network_html)
|
||||
|
||||
# Save the final HTML content to the file
|
||||
try:
|
||||
with open(f"{filename}.html", "w", encoding="utf-8") as f:
|
||||
f.write(final_html_content)
|
||||
print(f"Plot saved as {filename}.html")
|
||||
except IOError as e:
|
||||
raise IOError(f"Failed to save flow visualization to {filename}.html: {str(e)}")
|
||||
# Save the final HTML content to the file
|
||||
with open(f"{filename}.html", "w", encoding="utf-8") as f:
|
||||
f.write(final_html_content)
|
||||
print(f"Plot saved as {filename}.html")
|
||||
|
||||
except (ValueError, RuntimeError, IOError) as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Unexpected error during flow visualization: {str(e)}")
|
||||
finally:
|
||||
self._cleanup_pyvis_lib()
|
||||
self._cleanup_pyvis_lib()
|
||||
|
||||
def _generate_final_html(self, network_html):
|
||||
"""
|
||||
Generate the final HTML content with network visualization and legend.
|
||||
# Extract just the body content from the generated HTML
|
||||
current_dir = os.path.dirname(__file__)
|
||||
template_path = os.path.join(
|
||||
current_dir, "assets", "crewai_flow_visual_template.html"
|
||||
)
|
||||
logo_path = os.path.join(current_dir, "assets", "crewai_logo.svg")
|
||||
|
||||
Parameters
|
||||
----------
|
||||
network_html : str
|
||||
HTML content generated by pyvis Network.
|
||||
html_handler = HTMLTemplateHandler(template_path, logo_path)
|
||||
network_body = html_handler.extract_body_content(network_html)
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Complete HTML content with styling and legend.
|
||||
|
||||
Raises
|
||||
------
|
||||
IOError
|
||||
If template or logo files cannot be accessed.
|
||||
ValueError
|
||||
If network_html is invalid.
|
||||
"""
|
||||
if not network_html:
|
||||
raise ValueError("Invalid network HTML content")
|
||||
|
||||
try:
|
||||
# Extract just the body content from the generated HTML
|
||||
current_dir = os.path.dirname(__file__)
|
||||
template_path = safe_path_join("assets", "crewai_flow_visual_template.html", root=current_dir)
|
||||
logo_path = safe_path_join("assets", "crewai_logo.svg", root=current_dir)
|
||||
|
||||
if not os.path.exists(template_path):
|
||||
raise IOError(f"Template file not found: {template_path}")
|
||||
if not os.path.exists(logo_path):
|
||||
raise IOError(f"Logo file not found: {logo_path}")
|
||||
|
||||
html_handler = HTMLTemplateHandler(template_path, logo_path)
|
||||
network_body = html_handler.extract_body_content(network_html)
|
||||
|
||||
# Generate the legend items HTML
|
||||
legend_items = get_legend_items(self.colors)
|
||||
legend_items_html = generate_legend_items_html(legend_items)
|
||||
final_html_content = html_handler.generate_final_html(
|
||||
network_body, legend_items_html
|
||||
)
|
||||
return final_html_content
|
||||
except Exception as e:
|
||||
raise IOError(f"Failed to generate visualization HTML: {str(e)}")
|
||||
# Generate the legend items HTML
|
||||
legend_items = get_legend_items(self.colors)
|
||||
legend_items_html = generate_legend_items_html(legend_items)
|
||||
final_html_content = html_handler.generate_final_html(
|
||||
network_body, legend_items_html
|
||||
)
|
||||
return final_html_content
|
||||
|
||||
def _cleanup_pyvis_lib(self):
|
||||
"""
|
||||
Clean up the generated lib folder from pyvis.
|
||||
|
||||
This method safely removes the temporary lib directory created by pyvis
|
||||
during network visualization generation.
|
||||
"""
|
||||
# Clean up the generated lib folder
|
||||
lib_folder = os.path.join(os.getcwd(), "lib")
|
||||
try:
|
||||
lib_folder = safe_path_join("lib", root=os.getcwd())
|
||||
if os.path.exists(lib_folder) and os.path.isdir(lib_folder):
|
||||
import shutil
|
||||
|
||||
shutil.rmtree(lib_folder)
|
||||
except ValueError as e:
|
||||
print(f"Error validating lib folder path: {e}")
|
||||
except Exception as e:
|
||||
print(f"Error cleaning up lib folder: {e}")
|
||||
print(f"Error cleaning up {lib_folder}: {e}")
|
||||
|
||||
|
||||
def plot_flow(flow, filename="flow_plot"):
|
||||
"""
|
||||
Convenience function to create and save a flow visualization.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Flow
|
||||
Flow instance to visualize.
|
||||
filename : str, optional
|
||||
Output filename without extension, by default "flow_plot".
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If flow object or filename is invalid.
|
||||
IOError
|
||||
If file operations fail.
|
||||
"""
|
||||
visualizer = FlowPlot(flow)
|
||||
visualizer.plot(filename)
|
||||
|
||||
@@ -1,53 +1,26 @@
|
||||
import base64
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from crewai.flow.path_utils import safe_path_join, validate_path_exists
|
||||
|
||||
|
||||
class HTMLTemplateHandler:
|
||||
"""Handles HTML template processing and generation for flow visualization diagrams."""
|
||||
|
||||
def __init__(self, template_path, logo_path):
|
||||
"""
|
||||
Initialize HTMLTemplateHandler with validated template and logo paths.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
template_path : str
|
||||
Path to the HTML template file.
|
||||
logo_path : str
|
||||
Path to the logo image file.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If template or logo paths are invalid or files don't exist.
|
||||
"""
|
||||
try:
|
||||
self.template_path = validate_path_exists(template_path, "file")
|
||||
self.logo_path = validate_path_exists(logo_path, "file")
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid template or logo path: {e}")
|
||||
self.template_path = template_path
|
||||
self.logo_path = logo_path
|
||||
|
||||
def read_template(self):
|
||||
"""Read and return the HTML template file contents."""
|
||||
with open(self.template_path, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
def encode_logo(self):
|
||||
"""Convert the logo SVG file to base64 encoded string."""
|
||||
with open(self.logo_path, "rb") as logo_file:
|
||||
logo_svg_data = logo_file.read()
|
||||
return base64.b64encode(logo_svg_data).decode("utf-8")
|
||||
|
||||
def extract_body_content(self, html):
|
||||
"""Extract and return content between body tags from HTML string."""
|
||||
match = re.search("<body.*?>(.*?)</body>", html, re.DOTALL)
|
||||
return match.group(1) if match else ""
|
||||
|
||||
def generate_legend_items_html(self, legend_items):
|
||||
"""Generate HTML markup for the legend items."""
|
||||
legend_items_html = ""
|
||||
for item in legend_items:
|
||||
if "border" in item:
|
||||
@@ -75,7 +48,6 @@ class HTMLTemplateHandler:
|
||||
return legend_items_html
|
||||
|
||||
def generate_final_html(self, network_body, legend_items_html, title="Flow Plot"):
|
||||
"""Combine all components into final HTML document with network visualization."""
|
||||
html_template = self.read_template()
|
||||
logo_svg_base64 = self.encode_logo()
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
def get_legend_items(colors):
|
||||
return [
|
||||
{"label": "Start Method", "color": colors["start"]},
|
||||
|
||||
@@ -1,135 +0,0 @@
|
||||
"""
|
||||
Path utilities for secure file operations in CrewAI flow module.
|
||||
|
||||
This module provides utilities for secure path handling to prevent directory
|
||||
traversal attacks and ensure paths remain within allowed boundaries.
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Union
|
||||
|
||||
|
||||
def safe_path_join(*parts: str, root: Union[str, Path, None] = None) -> str:
|
||||
"""
|
||||
Safely join path components and ensure the result is within allowed boundaries.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*parts : str
|
||||
Variable number of path components to join.
|
||||
root : Union[str, Path, None], optional
|
||||
Root directory to use as base. If None, uses current working directory.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
String representation of the resolved path.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the resulting path would be outside the root directory
|
||||
or if any path component is invalid.
|
||||
"""
|
||||
if not parts:
|
||||
raise ValueError("No path components provided")
|
||||
|
||||
try:
|
||||
# Convert all parts to strings and clean them
|
||||
clean_parts = [str(part).strip() for part in parts if part]
|
||||
if not clean_parts:
|
||||
raise ValueError("No valid path components provided")
|
||||
|
||||
# Establish root directory
|
||||
root_path = Path(root).resolve() if root else Path.cwd()
|
||||
|
||||
# Join and resolve the full path
|
||||
full_path = Path(root_path, *clean_parts).resolve()
|
||||
|
||||
# Check if the resolved path is within root
|
||||
if not str(full_path).startswith(str(root_path)):
|
||||
raise ValueError(
|
||||
f"Invalid path: Potential directory traversal. Path must be within {root_path}"
|
||||
)
|
||||
|
||||
return str(full_path)
|
||||
|
||||
except Exception as e:
|
||||
if isinstance(e, ValueError):
|
||||
raise
|
||||
raise ValueError(f"Invalid path components: {str(e)}")
|
||||
|
||||
|
||||
def validate_path_exists(path: Union[str, Path], file_type: str = "file") -> str:
|
||||
"""
|
||||
Validate that a path exists and is of the expected type.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : Union[str, Path]
|
||||
Path to validate.
|
||||
file_type : str, optional
|
||||
Expected type ('file' or 'directory'), by default 'file'.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Validated path as string.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If path doesn't exist or is not of expected type.
|
||||
"""
|
||||
try:
|
||||
path_obj = Path(path).resolve()
|
||||
|
||||
if not path_obj.exists():
|
||||
raise ValueError(f"Path does not exist: {path}")
|
||||
|
||||
if file_type == "file" and not path_obj.is_file():
|
||||
raise ValueError(f"Path is not a file: {path}")
|
||||
elif file_type == "directory" and not path_obj.is_dir():
|
||||
raise ValueError(f"Path is not a directory: {path}")
|
||||
|
||||
return str(path_obj)
|
||||
|
||||
except Exception as e:
|
||||
if isinstance(e, ValueError):
|
||||
raise
|
||||
raise ValueError(f"Invalid path: {str(e)}")
|
||||
|
||||
|
||||
def list_files(directory: Union[str, Path], pattern: str = "*") -> List[str]:
|
||||
"""
|
||||
Safely list files in a directory matching a pattern.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
directory : Union[str, Path]
|
||||
Directory to search in.
|
||||
pattern : str, optional
|
||||
Glob pattern to match files against, by default "*".
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[str]
|
||||
List of matching file paths.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If directory is invalid or inaccessible.
|
||||
"""
|
||||
try:
|
||||
dir_path = Path(directory).resolve()
|
||||
if not dir_path.is_dir():
|
||||
raise ValueError(f"Not a directory: {directory}")
|
||||
|
||||
return [str(p) for p in dir_path.glob(pattern) if p.is_file()]
|
||||
|
||||
except Exception as e:
|
||||
if isinstance(e, ValueError):
|
||||
raise
|
||||
raise ValueError(f"Error listing files: {str(e)}")
|
||||
@@ -1,25 +1,9 @@
|
||||
"""
|
||||
Utility functions for flow visualization and dependency analysis.
|
||||
|
||||
This module provides core functionality for analyzing and manipulating flow structures,
|
||||
including node level calculation, ancestor tracking, and return value analysis.
|
||||
Functions in this module are primarily used by the visualization system to create
|
||||
accurate and informative flow diagrams.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> flow = Flow()
|
||||
>>> node_levels = calculate_node_levels(flow)
|
||||
>>> ancestors = build_ancestor_dict(flow)
|
||||
"""
|
||||
|
||||
import ast
|
||||
import inspect
|
||||
import textwrap
|
||||
from typing import Any, Dict, List, Optional, Set, Union
|
||||
|
||||
|
||||
def get_possible_return_constants(function: Any) -> Optional[List[str]]:
|
||||
def get_possible_return_constants(function):
|
||||
try:
|
||||
source = inspect.getsource(function)
|
||||
except OSError:
|
||||
@@ -93,34 +77,11 @@ def get_possible_return_constants(function: Any) -> Optional[List[str]]:
|
||||
return list(return_values) if return_values else None
|
||||
|
||||
|
||||
def calculate_node_levels(flow: Any) -> Dict[str, int]:
|
||||
"""
|
||||
Calculate the hierarchical level of each node in the flow.
|
||||
|
||||
Performs a breadth-first traversal of the flow graph to assign levels
|
||||
to nodes, starting with start methods at level 0.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance containing methods, listeners, and router configurations.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, int]
|
||||
Dictionary mapping method names to their hierarchical levels.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Start methods are assigned level 0
|
||||
- Each subsequent connected node is assigned level = parent_level + 1
|
||||
- Handles both OR and AND conditions for listeners
|
||||
- Processes router paths separately
|
||||
"""
|
||||
levels: Dict[str, int] = {}
|
||||
queue: List[str] = []
|
||||
visited: Set[str] = set()
|
||||
pending_and_listeners: Dict[str, Set[str]] = {}
|
||||
def calculate_node_levels(flow):
|
||||
levels = {}
|
||||
queue = []
|
||||
visited = set()
|
||||
pending_and_listeners = {}
|
||||
|
||||
# Make all start methods at level 0
|
||||
for method_name, method in flow._methods.items():
|
||||
@@ -179,20 +140,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
|
||||
return levels
|
||||
|
||||
|
||||
def count_outgoing_edges(flow: Any) -> Dict[str, int]:
|
||||
"""
|
||||
Count the number of outgoing edges for each method in the flow.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance to analyze.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, int]
|
||||
Dictionary mapping method names to their outgoing edge count.
|
||||
"""
|
||||
def count_outgoing_edges(flow):
|
||||
counts = {}
|
||||
for method_name in flow._methods:
|
||||
counts[method_name] = 0
|
||||
@@ -204,53 +152,16 @@ def count_outgoing_edges(flow: Any) -> Dict[str, int]:
|
||||
return counts
|
||||
|
||||
|
||||
def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
|
||||
"""
|
||||
Build a dictionary mapping each node to its ancestor nodes.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance to analyze.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, Set[str]]
|
||||
Dictionary mapping each node to a set of its ancestor nodes.
|
||||
"""
|
||||
ancestors: Dict[str, Set[str]] = {node: set() for node in flow._methods}
|
||||
visited: Set[str] = set()
|
||||
def build_ancestor_dict(flow):
|
||||
ancestors = {node: set() for node in flow._methods}
|
||||
visited = set()
|
||||
for node in flow._methods:
|
||||
if node not in visited:
|
||||
dfs_ancestors(node, ancestors, visited, flow)
|
||||
return ancestors
|
||||
|
||||
|
||||
def dfs_ancestors(
|
||||
node: str,
|
||||
ancestors: Dict[str, Set[str]],
|
||||
visited: Set[str],
|
||||
flow: Any
|
||||
) -> None:
|
||||
"""
|
||||
Perform depth-first search to build ancestor relationships.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
node : str
|
||||
Current node being processed.
|
||||
ancestors : Dict[str, Set[str]]
|
||||
Dictionary tracking ancestor relationships.
|
||||
visited : Set[str]
|
||||
Set of already visited nodes.
|
||||
flow : Any
|
||||
The flow instance being analyzed.
|
||||
|
||||
Notes
|
||||
-----
|
||||
This function modifies the ancestors dictionary in-place to build
|
||||
the complete ancestor graph.
|
||||
"""
|
||||
def dfs_ancestors(node, ancestors, visited, flow):
|
||||
if node in visited:
|
||||
return
|
||||
visited.add(node)
|
||||
@@ -274,48 +185,12 @@ def dfs_ancestors(
|
||||
dfs_ancestors(listener_name, ancestors, visited, flow)
|
||||
|
||||
|
||||
def is_ancestor(node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]) -> bool:
|
||||
"""
|
||||
Check if one node is an ancestor of another.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
node : str
|
||||
The node to check ancestors for.
|
||||
ancestor_candidate : str
|
||||
The potential ancestor node.
|
||||
ancestors : Dict[str, Set[str]]
|
||||
Dictionary containing ancestor relationships.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if ancestor_candidate is an ancestor of node, False otherwise.
|
||||
"""
|
||||
def is_ancestor(node, ancestor_candidate, ancestors):
|
||||
return ancestor_candidate in ancestors.get(node, set())
|
||||
|
||||
|
||||
def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
|
||||
"""
|
||||
Build a dictionary mapping parent nodes to their children.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance to analyze.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, List[str]]
|
||||
Dictionary mapping parent method names to lists of their child method names.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Maps listeners to their trigger methods
|
||||
- Maps router methods to their paths and listeners
|
||||
- Children lists are sorted for consistent ordering
|
||||
"""
|
||||
parent_children: Dict[str, List[str]] = {}
|
||||
def build_parent_children_dict(flow):
|
||||
parent_children = {}
|
||||
|
||||
# Map listeners to their trigger methods
|
||||
for listener_name, (_, trigger_methods) in flow._listeners.items():
|
||||
@@ -339,24 +214,7 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
|
||||
return parent_children
|
||||
|
||||
|
||||
def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str]]) -> int:
|
||||
"""
|
||||
Get the index of a child node in its parent's sorted children list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
parent : str
|
||||
The parent node name.
|
||||
child : str
|
||||
The child node name to find the index for.
|
||||
parent_children : Dict[str, List[str]]
|
||||
Dictionary mapping parents to their children lists.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
Zero-based index of the child in its parent's sorted children list.
|
||||
"""
|
||||
def get_child_index(parent, child, parent_children):
|
||||
children = parent_children.get(parent, [])
|
||||
children.sort()
|
||||
return children.index(child)
|
||||
|
||||
@@ -1,23 +1,5 @@
|
||||
"""
|
||||
Utilities for creating visual representations of flow structures.
|
||||
|
||||
This module provides functions for generating network visualizations of flows,
|
||||
including node placement, edge creation, and visual styling. It handles the
|
||||
conversion of flow structures into visual network graphs with appropriate
|
||||
styling and layout.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> flow = Flow()
|
||||
>>> net = Network(directed=True)
|
||||
>>> node_positions = compute_positions(flow, node_levels)
|
||||
>>> add_nodes_to_network(net, flow, node_positions, node_styles)
|
||||
>>> add_edges(net, flow, node_positions, colors)
|
||||
"""
|
||||
|
||||
import ast
|
||||
import inspect
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from .utils import (
|
||||
build_ancestor_dict,
|
||||
@@ -27,25 +9,8 @@ from .utils import (
|
||||
)
|
||||
|
||||
|
||||
def method_calls_crew(method: Any) -> bool:
|
||||
"""
|
||||
Check if the method contains a call to `.crew()`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
method : Any
|
||||
The method to analyze for crew() calls.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the method calls .crew(), False otherwise.
|
||||
|
||||
Notes
|
||||
-----
|
||||
Uses AST analysis to detect method calls, specifically looking for
|
||||
attribute access of 'crew'.
|
||||
"""
|
||||
def method_calls_crew(method):
|
||||
"""Check if the method calls `.crew()`."""
|
||||
try:
|
||||
source = inspect.getsource(method)
|
||||
source = inspect.cleandoc(source)
|
||||
@@ -55,7 +20,6 @@ def method_calls_crew(method: Any) -> bool:
|
||||
return False
|
||||
|
||||
class CrewCallVisitor(ast.NodeVisitor):
|
||||
"""AST visitor to detect .crew() method calls."""
|
||||
def __init__(self):
|
||||
self.found = False
|
||||
|
||||
@@ -70,34 +34,7 @@ def method_calls_crew(method: Any) -> bool:
|
||||
return visitor.found
|
||||
|
||||
|
||||
def add_nodes_to_network(
|
||||
net: Any,
|
||||
flow: Any,
|
||||
node_positions: Dict[str, Tuple[float, float]],
|
||||
node_styles: Dict[str, Dict[str, Any]]
|
||||
) -> None:
|
||||
"""
|
||||
Add nodes to the network visualization with appropriate styling.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
net : Any
|
||||
The pyvis Network instance to add nodes to.
|
||||
flow : Any
|
||||
The flow instance containing method information.
|
||||
node_positions : Dict[str, Tuple[float, float]]
|
||||
Dictionary mapping node names to their (x, y) positions.
|
||||
node_styles : Dict[str, Dict[str, Any]]
|
||||
Dictionary containing style configurations for different node types.
|
||||
|
||||
Notes
|
||||
-----
|
||||
Node types include:
|
||||
- Start methods
|
||||
- Router methods
|
||||
- Crew methods
|
||||
- Regular methods
|
||||
"""
|
||||
def add_nodes_to_network(net, flow, node_positions, node_styles):
|
||||
def human_friendly_label(method_name):
|
||||
return method_name.replace("_", " ").title()
|
||||
|
||||
@@ -136,33 +73,9 @@ def add_nodes_to_network(
|
||||
)
|
||||
|
||||
|
||||
def compute_positions(
|
||||
flow: Any,
|
||||
node_levels: Dict[str, int],
|
||||
y_spacing: float = 150,
|
||||
x_spacing: float = 150
|
||||
) -> Dict[str, Tuple[float, float]]:
|
||||
"""
|
||||
Compute the (x, y) positions for each node in the flow graph.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance to compute positions for.
|
||||
node_levels : Dict[str, int]
|
||||
Dictionary mapping node names to their hierarchical levels.
|
||||
y_spacing : float, optional
|
||||
Vertical spacing between levels, by default 150.
|
||||
x_spacing : float, optional
|
||||
Horizontal spacing between nodes, by default 150.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, Tuple[float, float]]
|
||||
Dictionary mapping node names to their (x, y) coordinates.
|
||||
"""
|
||||
level_nodes: Dict[int, List[str]] = {}
|
||||
node_positions: Dict[str, Tuple[float, float]] = {}
|
||||
def compute_positions(flow, node_levels, y_spacing=150, x_spacing=150):
|
||||
level_nodes = {}
|
||||
node_positions = {}
|
||||
|
||||
for method_name, level in node_levels.items():
|
||||
level_nodes.setdefault(level, []).append(method_name)
|
||||
@@ -177,33 +90,7 @@ def compute_positions(
|
||||
return node_positions
|
||||
|
||||
|
||||
def add_edges(
|
||||
net: Any,
|
||||
flow: Any,
|
||||
node_positions: Dict[str, Tuple[float, float]],
|
||||
colors: Dict[str, str]
|
||||
) -> None:
|
||||
edge_smooth: Dict[str, Union[str, float]] = {"type": "continuous"} # Default value
|
||||
"""
|
||||
Add edges to the network visualization with appropriate styling.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
net : Any
|
||||
The pyvis Network instance to add edges to.
|
||||
flow : Any
|
||||
The flow instance containing edge information.
|
||||
node_positions : Dict[str, Tuple[float, float]]
|
||||
Dictionary mapping node names to their positions.
|
||||
colors : Dict[str, str]
|
||||
Dictionary mapping edge types to their colors.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Handles both normal listener edges and router edges
|
||||
- Applies appropriate styling (color, dashes) based on edge type
|
||||
- Adds curvature to edges when needed (cycles or multiple children)
|
||||
"""
|
||||
def add_edges(net, flow, node_positions, colors):
|
||||
ancestors = build_ancestor_dict(flow)
|
||||
parent_children = build_parent_children_dict(flow)
|
||||
|
||||
@@ -239,7 +126,7 @@ def add_edges(
|
||||
else:
|
||||
edge_smooth = {"type": "cubicBezier"}
|
||||
else:
|
||||
edge_smooth.update({"type": "continuous"})
|
||||
edge_smooth = False
|
||||
|
||||
edge_style = {
|
||||
"color": edge_color,
|
||||
@@ -302,7 +189,7 @@ def add_edges(
|
||||
else:
|
||||
edge_smooth = {"type": "cubicBezier"}
|
||||
else:
|
||||
edge_smooth.update({"type": "continuous"})
|
||||
edge_smooth = False
|
||||
|
||||
edge_style = {
|
||||
"color": colors["router_edge"],
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import logging
|
||||
from typing import Optional, Union
|
||||
|
||||
from pydantic import Field
|
||||
@@ -8,8 +7,6 @@ from crewai.task import Task
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities import I18N
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseAgentTool(BaseTool):
|
||||
"""Base class for agent-related tools"""
|
||||
@@ -19,25 +16,6 @@ class BaseAgentTool(BaseTool):
|
||||
default_factory=I18N, description="Internationalization settings"
|
||||
)
|
||||
|
||||
def sanitize_agent_name(self, name: str) -> str:
|
||||
"""
|
||||
Sanitize agent role name by normalizing whitespace and setting to lowercase.
|
||||
Converts all whitespace (including newlines) to single spaces and removes quotes.
|
||||
|
||||
Args:
|
||||
name (str): The agent role name to sanitize
|
||||
|
||||
Returns:
|
||||
str: The sanitized agent role name, with whitespace normalized,
|
||||
converted to lowercase, and quotes removed
|
||||
"""
|
||||
if not name:
|
||||
return ""
|
||||
# Normalize all whitespace (including newlines) to single spaces
|
||||
normalized = " ".join(name.split())
|
||||
# Remove quotes and convert to lowercase
|
||||
return normalized.replace('"', "").casefold()
|
||||
|
||||
def _get_coworker(self, coworker: Optional[str], **kwargs) -> Optional[str]:
|
||||
coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker")
|
||||
if coworker:
|
||||
@@ -47,27 +25,11 @@ class BaseAgentTool(BaseTool):
|
||||
return coworker
|
||||
|
||||
def _execute(
|
||||
self,
|
||||
agent_name: Optional[str],
|
||||
task: str,
|
||||
context: Optional[str] = None
|
||||
self, agent_name: Union[str, None], task: str, context: Union[str, None]
|
||||
) -> str:
|
||||
"""
|
||||
Execute delegation to an agent with case-insensitive and whitespace-tolerant matching.
|
||||
|
||||
Args:
|
||||
agent_name: Name/role of the agent to delegate to (case-insensitive)
|
||||
task: The specific question or task to delegate
|
||||
context: Optional additional context for the task execution
|
||||
|
||||
Returns:
|
||||
str: The execution result from the delegated agent or an error message
|
||||
if the agent cannot be found
|
||||
"""
|
||||
try:
|
||||
if agent_name is None:
|
||||
agent_name = ""
|
||||
logger.debug("No agent name provided, using empty string")
|
||||
|
||||
# It is important to remove the quotes from the agent name.
|
||||
# The reason we have to do this is because less-powerful LLM's
|
||||
@@ -76,49 +38,31 @@ class BaseAgentTool(BaseTool):
|
||||
# {"task": "....", "coworker": "....
|
||||
# when it should look like this:
|
||||
# {"task": "....", "coworker": "...."}
|
||||
sanitized_name = self.sanitize_agent_name(agent_name)
|
||||
logger.debug(f"Sanitized agent name from '{agent_name}' to '{sanitized_name}'")
|
||||
|
||||
available_agents = [agent.role for agent in self.agents]
|
||||
logger.debug(f"Available agents: {available_agents}")
|
||||
|
||||
agent_name = agent_name.casefold().replace('"', "").replace("\n", "")
|
||||
agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None")
|
||||
available_agent
|
||||
for available_agent in self.agents
|
||||
if self.sanitize_agent_name(available_agent.role) == sanitized_name
|
||||
if available_agent.role.casefold().replace("\n", "") == agent_name
|
||||
]
|
||||
logger.debug(f"Found {len(agent)} matching agents for role '{sanitized_name}'")
|
||||
except (AttributeError, ValueError) as e:
|
||||
# Handle specific exceptions that might occur during role name processing
|
||||
except Exception as _:
|
||||
return self.i18n.errors("agent_tool_unexisting_coworker").format(
|
||||
coworkers="\n".join(
|
||||
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
|
||||
),
|
||||
error=str(e)
|
||||
[f"- {agent.role.casefold()}" for agent in self.agents]
|
||||
)
|
||||
)
|
||||
|
||||
if not agent:
|
||||
# No matching agent found after sanitization
|
||||
return self.i18n.errors("agent_tool_unexisting_coworker").format(
|
||||
coworkers="\n".join(
|
||||
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
|
||||
),
|
||||
error=f"No agent found with role '{sanitized_name}'"
|
||||
[f"- {agent.role.casefold()}" for agent in self.agents]
|
||||
)
|
||||
)
|
||||
|
||||
agent = agent[0]
|
||||
try:
|
||||
task_with_assigned_agent = Task(
|
||||
description=task,
|
||||
agent=agent,
|
||||
expected_output=agent.i18n.slice("manager_request"),
|
||||
i18n=agent.i18n,
|
||||
)
|
||||
logger.debug(f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}")
|
||||
return agent.execute_task(task_with_assigned_agent, context)
|
||||
except Exception as e:
|
||||
# Handle task creation or execution errors
|
||||
return self.i18n.errors("agent_tool_execution_error").format(
|
||||
agent_role=self.sanitize_agent_name(agent.role),
|
||||
error=str(e)
|
||||
)
|
||||
task_with_assigned_agent = Task( # type: ignore # Incompatible types in assignment (expression has type "Task", variable has type "str")
|
||||
description=task,
|
||||
agent=agent,
|
||||
expected_output=agent.i18n.slice("manager_request"),
|
||||
i18n=agent.i18n,
|
||||
)
|
||||
return agent.execute_task(task_with_assigned_agent, context)
|
||||
|
||||
@@ -33,8 +33,7 @@
|
||||
"tool_usage_error": "I encountered an error: {error}",
|
||||
"tool_arguments_error": "Error: the Action Input is not a valid key, value dictionary.",
|
||||
"wrong_tool_name": "You tried to use the tool {tool}, but it doesn't exist. You must use one of the following tools, use one at time: {tools}.",
|
||||
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}",
|
||||
"agent_tool_execution_error": "Error executing task with agent '{agent_role}'. Error: {error}"
|
||||
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}"
|
||||
},
|
||||
"tools": {
|
||||
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
@@ -7,8 +5,6 @@ from pydantic import BaseModel, Field
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PlanPerTask(BaseModel):
|
||||
task: str = Field(..., description="The task for which the plan is created")
|
||||
@@ -72,39 +68,19 @@ class CrewPlanner:
|
||||
output_pydantic=PlannerTaskPydanticOutput,
|
||||
)
|
||||
|
||||
def _get_agent_knowledge(self, task: Task) -> List[str]:
|
||||
"""
|
||||
Safely retrieve knowledge source content from the task's agent.
|
||||
|
||||
Args:
|
||||
task: The task containing an agent with potential knowledge sources
|
||||
|
||||
Returns:
|
||||
List[str]: A list of knowledge source strings
|
||||
"""
|
||||
try:
|
||||
if task.agent and task.agent.knowledge_sources:
|
||||
return [source.content for source in task.agent.knowledge_sources]
|
||||
except AttributeError:
|
||||
logger.warning("Error accessing agent knowledge sources")
|
||||
return []
|
||||
|
||||
def _create_tasks_summary(self) -> str:
|
||||
"""Creates a summary of all tasks."""
|
||||
tasks_summary = []
|
||||
for idx, task in enumerate(self.tasks):
|
||||
knowledge_list = self._get_agent_knowledge(task)
|
||||
task_summary = f"""
|
||||
tasks_summary.append(
|
||||
f"""
|
||||
Task Number {idx + 1} - {task.description}
|
||||
"task_description": {task.description}
|
||||
"task_expected_output": {task.expected_output}
|
||||
"agent": {task.agent.role if task.agent else "None"}
|
||||
"agent_goal": {task.agent.goal if task.agent else "None"}
|
||||
"task_tools": {task.tools}
|
||||
"agent_tools": %s%s""" % (
|
||||
f"[{', '.join(str(tool) for tool in task.agent.tools)}]" if task.agent and task.agent.tools else '"agent has no tools"',
|
||||
f',\n "agent_knowledge": "[\\"{knowledge_list[0]}\\"]"' if knowledge_list and str(knowledge_list) != "None" else ""
|
||||
)
|
||||
|
||||
tasks_summary.append(task_summary)
|
||||
"agent_tools": {task.agent.tools if task.agent else "None"}
|
||||
"""
|
||||
)
|
||||
return " ".join(tasks_summary)
|
||||
|
||||
@@ -391,71 +391,6 @@ def test_manager_agent_delegating_to_all_agents():
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_manager_agent_delegates_with_varied_role_cases():
|
||||
"""
|
||||
Test that the manager agent can delegate to agents regardless of case or whitespace variations in role names.
|
||||
This test verifies the fix for issue #1503 where role matching was too strict.
|
||||
"""
|
||||
# Create agents with varied case and whitespace in roles
|
||||
researcher_spaced = Agent(
|
||||
role=" Researcher ", # Extra spaces
|
||||
goal="Research with spaces in role",
|
||||
backstory="A researcher with spaces in role name",
|
||||
allow_delegation=False,
|
||||
)
|
||||
|
||||
writer_caps = Agent(
|
||||
role="SENIOR WRITER", # All caps
|
||||
goal="Write with caps in role",
|
||||
backstory="A writer with caps in role name",
|
||||
allow_delegation=False,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Research and write about AI. The researcher should do the research, and the writer should write it up.",
|
||||
expected_output="A well-researched article about AI.",
|
||||
agent=researcher_spaced, # Assign to researcher with spaces
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[researcher_spaced, writer_caps],
|
||||
process=Process.hierarchical,
|
||||
manager_llm="gpt-4o",
|
||||
tasks=[task],
|
||||
)
|
||||
|
||||
mock_task_output = TaskOutput(
|
||||
description="Mock description",
|
||||
raw="mocked output",
|
||||
agent="mocked agent"
|
||||
)
|
||||
task.output = mock_task_output
|
||||
|
||||
with patch.object(Task, 'execute_sync', return_value=mock_task_output) as mock_execute_sync:
|
||||
crew.kickoff()
|
||||
|
||||
# Verify execute_sync was called once
|
||||
mock_execute_sync.assert_called_once()
|
||||
|
||||
# Get the tools argument from the call
|
||||
_, kwargs = mock_execute_sync.call_args
|
||||
tools = kwargs['tools']
|
||||
|
||||
# Verify the delegation tools were passed correctly and can handle case/whitespace variations
|
||||
assert len(tools) == 2
|
||||
|
||||
# Check delegation tool descriptions (should work despite case/whitespace differences)
|
||||
delegation_tool = tools[0]
|
||||
question_tool = tools[1]
|
||||
|
||||
assert "Delegate a specific task to one of the following coworkers:" in delegation_tool.description
|
||||
assert " Researcher " in delegation_tool.description or "SENIOR WRITER" in delegation_tool.description
|
||||
|
||||
assert "Ask a specific question to one of the following coworkers:" in question_tool.description
|
||||
assert " Researcher " in question_tool.description or "SENIOR WRITER" in question_tool.description
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_crew_with_delegating_agents():
|
||||
tasks = [
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai import Agent, Task
|
||||
from crewai.tools.agent_tools.base_agent_tools import BaseAgentTool
|
||||
|
||||
|
||||
class TestAgentTool(BaseAgentTool):
|
||||
"""Concrete implementation of BaseAgentTool for testing."""
|
||||
def _run(self, *args, **kwargs):
|
||||
"""Implement required _run method."""
|
||||
return "Test response"
|
||||
|
||||
@pytest.mark.parametrize("role_name,should_match", [
|
||||
('Futel Official Infopoint', True), # exact match
|
||||
(' "Futel Official Infopoint" ', True), # extra quotes and spaces
|
||||
('Futel Official Infopoint\n', True), # trailing newline
|
||||
('"Futel Official Infopoint"', True), # embedded quotes
|
||||
(' FUTEL\nOFFICIAL INFOPOINT ', True), # multiple whitespace and newline
|
||||
('futel official infopoint', True), # lowercase
|
||||
('FUTEL OFFICIAL INFOPOINT', True), # uppercase
|
||||
('Non Existent Agent', False), # non-existent agent
|
||||
(None, False), # None agent name
|
||||
])
|
||||
def test_agent_tool_role_matching(role_name, should_match):
|
||||
"""Test that agent tools can match roles regardless of case, whitespace, and special characters."""
|
||||
# Create test agent
|
||||
test_agent = Agent(
|
||||
role='Futel Official Infopoint',
|
||||
goal='Answer questions about Futel',
|
||||
backstory='Futel Football Club info',
|
||||
allow_delegation=False
|
||||
)
|
||||
|
||||
# Create test agent tool
|
||||
agent_tool = TestAgentTool(
|
||||
name="test_tool",
|
||||
description="Test tool",
|
||||
agents=[test_agent]
|
||||
)
|
||||
|
||||
# Test role matching
|
||||
result = agent_tool._execute(
|
||||
agent_name=role_name,
|
||||
task='Test task',
|
||||
context=None
|
||||
)
|
||||
|
||||
if should_match:
|
||||
assert "coworker mentioned not found" not in result.lower(), \
|
||||
f"Should find agent with role name: {role_name}"
|
||||
else:
|
||||
assert "coworker mentioned not found" in result.lower(), \
|
||||
f"Should not find agent with role name: {role_name}"
|
||||
@@ -1,84 +0,0 @@
|
||||
"""
|
||||
Tests for verifying the integration of knowledge sources in the planning process.
|
||||
This module ensures that agent knowledge is properly included during task planning.
|
||||
"""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
|
||||
from crewai.task import Task
|
||||
from crewai.utilities.planning_handler import CrewPlanner
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_knowledge_source():
|
||||
"""
|
||||
Create a mock knowledge source with test content.
|
||||
Returns:
|
||||
StringKnowledgeSource:
|
||||
A knowledge source containing AI-related test content
|
||||
"""
|
||||
content = """
|
||||
Important context about AI:
|
||||
1. AI systems use machine learning algorithms
|
||||
2. Neural networks are a key component
|
||||
3. Training data is essential for good performance
|
||||
"""
|
||||
return StringKnowledgeSource(content=content)
|
||||
|
||||
@patch('crewai.knowledge.storage.knowledge_storage.chromadb')
|
||||
def test_knowledge_included_in_planning(mock_chroma):
|
||||
"""Test that verifies knowledge sources are properly included in planning."""
|
||||
# Mock ChromaDB collection
|
||||
mock_collection = mock_chroma.return_value.get_or_create_collection.return_value
|
||||
mock_collection.add.return_value = None
|
||||
|
||||
# Create an agent with knowledge
|
||||
agent = Agent(
|
||||
role="AI Researcher",
|
||||
goal="Research and explain AI concepts",
|
||||
backstory="Expert in artificial intelligence",
|
||||
knowledge_sources=[
|
||||
StringKnowledgeSource(
|
||||
content="AI systems require careful training and validation."
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
# Create a task for the agent
|
||||
task = Task(
|
||||
description="Explain the basics of AI systems",
|
||||
expected_output="A clear explanation of AI fundamentals",
|
||||
agent=agent
|
||||
)
|
||||
|
||||
# Create a crew planner
|
||||
planner = CrewPlanner([task], None)
|
||||
|
||||
# Get the task summary
|
||||
task_summary = planner._create_tasks_summary()
|
||||
|
||||
# Verify that knowledge is included in planning when present
|
||||
assert "AI systems require careful training" in task_summary, \
|
||||
"Knowledge content should be present in task summary when knowledge exists"
|
||||
assert '"agent_knowledge"' in task_summary, \
|
||||
"agent_knowledge field should be present in task summary when knowledge exists"
|
||||
|
||||
# Verify that knowledge is properly formatted
|
||||
assert isinstance(task.agent.knowledge_sources, list), \
|
||||
"Knowledge sources should be stored in a list"
|
||||
assert len(task.agent.knowledge_sources) > 0, \
|
||||
"At least one knowledge source should be present"
|
||||
assert task.agent.knowledge_sources[0].content in task_summary, \
|
||||
"Knowledge source content should be included in task summary"
|
||||
|
||||
# Verify that other expected components are still present
|
||||
assert task.description in task_summary, \
|
||||
"Task description should be present in task summary"
|
||||
assert task.expected_output in task_summary, \
|
||||
"Expected output should be present in task summary"
|
||||
assert agent.role in task_summary, \
|
||||
"Agent role should be present in task summary"
|
||||
@@ -1,14 +1,10 @@
|
||||
from typing import Optional
|
||||
from unittest.mock import MagicMock, patch
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities.planning_handler import (
|
||||
CrewPlanner,
|
||||
PlannerTaskPydanticOutput,
|
||||
@@ -96,72 +92,7 @@ class TestCrewPlanner:
|
||||
tasks_summary = crew_planner._create_tasks_summary()
|
||||
assert isinstance(tasks_summary, str)
|
||||
assert tasks_summary.startswith("\n Task Number 1 - Task 1")
|
||||
assert '"agent_tools": "agent has no tools"' in tasks_summary
|
||||
# Knowledge field should not be present when empty
|
||||
assert '"agent_knowledge"' not in tasks_summary
|
||||
|
||||
@patch('crewai.knowledge.storage.knowledge_storage.chromadb')
|
||||
def test_create_tasks_summary_with_knowledge_and_tools(self, mock_chroma):
|
||||
"""Test task summary generation with both knowledge and tools present."""
|
||||
# Mock ChromaDB collection
|
||||
mock_collection = mock_chroma.return_value.get_or_create_collection.return_value
|
||||
mock_collection.add.return_value = None
|
||||
|
||||
# Create mock tools with proper string descriptions and structured tool support
|
||||
class MockTool(BaseTool):
|
||||
name: str
|
||||
description: str
|
||||
|
||||
def __init__(self, name: str, description: str):
|
||||
tool_data = {"name": name, "description": description}
|
||||
super().__init__(**tool_data)
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
def __repr__(self):
|
||||
return self.name
|
||||
|
||||
def to_structured_tool(self):
|
||||
return self
|
||||
|
||||
def _run(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def _generate_description(self) -> str:
|
||||
"""Override _generate_description to avoid args_schema handling."""
|
||||
return self.description
|
||||
|
||||
tool1 = MockTool("tool1", "Tool 1 description")
|
||||
tool2 = MockTool("tool2", "Tool 2 description")
|
||||
|
||||
# Create a task with knowledge and tools
|
||||
task = Task(
|
||||
description="Task with knowledge and tools",
|
||||
expected_output="Expected output",
|
||||
agent=Agent(
|
||||
role="Test Agent",
|
||||
goal="Test Goal",
|
||||
backstory="Test Backstory",
|
||||
tools=[tool1, tool2],
|
||||
knowledge_sources=[
|
||||
StringKnowledgeSource(content="Test knowledge content")
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
# Create planner with the new task
|
||||
planner = CrewPlanner([task], None)
|
||||
tasks_summary = planner._create_tasks_summary()
|
||||
|
||||
# Verify task summary content
|
||||
assert isinstance(tasks_summary, str)
|
||||
assert task.description in tasks_summary
|
||||
assert task.expected_output in tasks_summary
|
||||
assert '"agent_tools": [tool1, tool2]' in tasks_summary
|
||||
assert '"agent_knowledge": "[\\"Test knowledge content\\"]"' in tasks_summary
|
||||
assert task.agent.role in tasks_summary
|
||||
assert task.agent.goal in tasks_summary
|
||||
assert tasks_summary.endswith('"agent_tools": []\n ')
|
||||
|
||||
def test_handle_crew_planning_different_llm(self, crew_planner_different_llm):
|
||||
with patch.object(Task, "execute_sync") as execute:
|
||||
|
||||
Reference in New Issue
Block a user