mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-03 16:22:49 +00:00
Merge branch 'main' into pydantic_fixup
This commit is contained in:
@@ -14,7 +14,7 @@ warnings.filterwarnings(
|
||||
category=UserWarning,
|
||||
module="pydantic.main",
|
||||
)
|
||||
__version__ = "0.86.0"
|
||||
__version__ = "0.95.0"
|
||||
__all__ = [
|
||||
"Agent",
|
||||
"Crew",
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.13"
|
||||
dependencies = [
|
||||
"crewai[tools]>=0.86.0,<1.0.0"
|
||||
"crewai[tools]>=0.95.0,<1.0.0"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.13"
|
||||
dependencies = [
|
||||
"crewai[tools]>=0.86.0,<1.0.0",
|
||||
"crewai[tools]>=0.95.0,<1.0.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10,<3.13"
|
||||
dependencies = [
|
||||
"crewai[tools]>=0.86.0"
|
||||
"crewai[tools]>=0.95.0"
|
||||
]
|
||||
|
||||
[tool.crewai]
|
||||
|
||||
@@ -726,11 +726,7 @@ class Crew(BaseModel):
|
||||
|
||||
# Determine which tools to use - task tools take precedence over agent tools
|
||||
tools_for_task = task.tools or agent_to_use.tools or []
|
||||
tools_for_task = self._prepare_tools(
|
||||
agent_to_use,
|
||||
task,
|
||||
tools_for_task
|
||||
)
|
||||
tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task)
|
||||
|
||||
self._log_task_start(task, agent_to_use.role)
|
||||
|
||||
@@ -797,14 +793,18 @@ class Crew(BaseModel):
|
||||
return skipped_task_output
|
||||
return None
|
||||
|
||||
def _prepare_tools(self, agent: BaseAgent, task: Task, tools: List[Tool]) -> List[Tool]:
|
||||
def _prepare_tools(
|
||||
self, agent: BaseAgent, task: Task, tools: List[Tool]
|
||||
) -> List[Tool]:
|
||||
# Add delegation tools if agent allows delegation
|
||||
if agent.allow_delegation:
|
||||
if self.process == Process.hierarchical:
|
||||
if self.manager_agent:
|
||||
tools = self._update_manager_tools(task, tools)
|
||||
else:
|
||||
raise ValueError("Manager agent is required for hierarchical process.")
|
||||
raise ValueError(
|
||||
"Manager agent is required for hierarchical process."
|
||||
)
|
||||
|
||||
elif agent and agent.allow_delegation:
|
||||
tools = self._add_delegation_tools(task, tools)
|
||||
@@ -823,7 +823,9 @@ class Crew(BaseModel):
|
||||
return self.manager_agent
|
||||
return task.agent
|
||||
|
||||
def _merge_tools(self, existing_tools: List[Tool], new_tools: List[Tool]) -> List[Tool]:
|
||||
def _merge_tools(
|
||||
self, existing_tools: List[Tool], new_tools: List[Tool]
|
||||
) -> List[Tool]:
|
||||
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
|
||||
if not new_tools:
|
||||
return existing_tools
|
||||
@@ -839,7 +841,9 @@ class Crew(BaseModel):
|
||||
|
||||
return tools
|
||||
|
||||
def _inject_delegation_tools(self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]):
|
||||
def _inject_delegation_tools(
|
||||
self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent]
|
||||
):
|
||||
delegation_tools = task_agent.get_delegation_tools(agents)
|
||||
return self._merge_tools(tools, delegation_tools)
|
||||
|
||||
@@ -856,7 +860,9 @@ class Crew(BaseModel):
|
||||
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
|
||||
if not tools:
|
||||
tools = []
|
||||
tools = self._inject_delegation_tools(tools, task.agent, agents_for_delegation)
|
||||
tools = self._inject_delegation_tools(
|
||||
tools, task.agent, agents_for_delegation
|
||||
)
|
||||
return tools
|
||||
|
||||
def _log_task_start(self, task: Task, role: str = "None"):
|
||||
@@ -870,7 +876,9 @@ class Crew(BaseModel):
|
||||
if task.agent:
|
||||
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
|
||||
else:
|
||||
tools = self._inject_delegation_tools(tools, self.manager_agent, self.agents)
|
||||
tools = self._inject_delegation_tools(
|
||||
tools, self.manager_agent, self.agents
|
||||
)
|
||||
return tools
|
||||
|
||||
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
|
||||
|
||||
@@ -30,7 +30,47 @@ from crewai.telemetry import Telemetry
|
||||
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
|
||||
|
||||
|
||||
def start(condition=None):
|
||||
def start(condition: Optional[Union[str, dict, Callable]] = None) -> Callable:
|
||||
"""
|
||||
Marks a method as a flow's starting point.
|
||||
|
||||
This decorator designates a method as an entry point for the flow execution.
|
||||
It can optionally specify conditions that trigger the start based on other
|
||||
method executions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition : Optional[Union[str, dict, Callable]], optional
|
||||
Defines when the start method should execute. Can be:
|
||||
- str: Name of a method that triggers this start
|
||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
||||
- Callable: A method reference that triggers this start
|
||||
Default is None, meaning unconditional start.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
A decorator function that marks the method as a flow start point.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the condition format is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @start() # Unconditional start
|
||||
>>> def begin_flow(self):
|
||||
... pass
|
||||
|
||||
>>> @start("method_name") # Start after specific method
|
||||
>>> def conditional_start(self):
|
||||
... pass
|
||||
|
||||
>>> @start(and_("method1", "method2")) # Start after multiple methods
|
||||
>>> def complex_start(self):
|
||||
... pass
|
||||
"""
|
||||
def decorator(func):
|
||||
func.__is_start_method__ = True
|
||||
if condition is not None:
|
||||
@@ -55,8 +95,42 @@ def start(condition=None):
|
||||
|
||||
return decorator
|
||||
|
||||
def listen(condition: Union[str, dict, Callable]) -> Callable:
|
||||
"""
|
||||
Creates a listener that executes when specified conditions are met.
|
||||
|
||||
def listen(condition):
|
||||
This decorator sets up a method to execute in response to other method
|
||||
executions in the flow. It supports both simple and complex triggering
|
||||
conditions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition : Union[str, dict, Callable]
|
||||
Specifies when the listener should execute. Can be:
|
||||
- str: Name of a method that triggers this listener
|
||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
||||
- Callable: A method reference that triggers this listener
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
A decorator function that sets up the method as a listener.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the condition format is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @listen("process_data") # Listen to single method
|
||||
>>> def handle_processed_data(self):
|
||||
... pass
|
||||
|
||||
>>> @listen(or_("success", "failure")) # Listen to multiple methods
|
||||
>>> def handle_completion(self):
|
||||
... pass
|
||||
"""
|
||||
def decorator(func):
|
||||
if isinstance(condition, str):
|
||||
func.__trigger_methods__ = [condition]
|
||||
@@ -80,10 +154,49 @@ def listen(condition):
|
||||
return decorator
|
||||
|
||||
|
||||
def router(condition):
|
||||
def router(condition: Union[str, dict, Callable]) -> Callable:
|
||||
"""
|
||||
Creates a routing method that directs flow execution based on conditions.
|
||||
|
||||
This decorator marks a method as a router, which can dynamically determine
|
||||
the next steps in the flow based on its return value. Routers are triggered
|
||||
by specified conditions and can return constants that determine which path
|
||||
the flow should take.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition : Union[str, dict, Callable]
|
||||
Specifies when the router should execute. Can be:
|
||||
- str: Name of a method that triggers this router
|
||||
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
|
||||
- Callable: A method reference that triggers this router
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
A decorator function that sets up the method as a router.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the condition format is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @router("check_status")
|
||||
>>> def route_based_on_status(self):
|
||||
... if self.state.status == "success":
|
||||
... return SUCCESS
|
||||
... return FAILURE
|
||||
|
||||
>>> @router(and_("validate", "process"))
|
||||
>>> def complex_routing(self):
|
||||
... if all([self.state.valid, self.state.processed]):
|
||||
... return CONTINUE
|
||||
... return STOP
|
||||
"""
|
||||
def decorator(func):
|
||||
func.__is_router__ = True
|
||||
# Handle conditions like listen/start
|
||||
if isinstance(condition, str):
|
||||
func.__trigger_methods__ = [condition]
|
||||
func.__condition_type__ = "OR"
|
||||
@@ -105,8 +218,39 @@ def router(condition):
|
||||
|
||||
return decorator
|
||||
|
||||
def or_(*conditions: Union[str, dict, Callable]) -> dict:
|
||||
"""
|
||||
Combines multiple conditions with OR logic for flow control.
|
||||
|
||||
def or_(*conditions):
|
||||
Creates a condition that is satisfied when any of the specified conditions
|
||||
are met. This is used with @start, @listen, or @router decorators to create
|
||||
complex triggering conditions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*conditions : Union[str, dict, Callable]
|
||||
Variable number of conditions that can be:
|
||||
- str: Method names
|
||||
- dict: Existing condition dictionaries
|
||||
- Callable: Method references
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
A condition dictionary with format:
|
||||
{"type": "OR", "methods": list_of_method_names}
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If any condition is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @listen(or_("success", "timeout"))
|
||||
>>> def handle_completion(self):
|
||||
... pass
|
||||
"""
|
||||
methods = []
|
||||
for condition in conditions:
|
||||
if isinstance(condition, dict) and "methods" in condition:
|
||||
@@ -120,7 +264,39 @@ def or_(*conditions):
|
||||
return {"type": "OR", "methods": methods}
|
||||
|
||||
|
||||
def and_(*conditions):
|
||||
def and_(*conditions: Union[str, dict, Callable]) -> dict:
|
||||
"""
|
||||
Combines multiple conditions with AND logic for flow control.
|
||||
|
||||
Creates a condition that is satisfied only when all specified conditions
|
||||
are met. This is used with @start, @listen, or @router decorators to create
|
||||
complex triggering conditions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*conditions : Union[str, dict, Callable]
|
||||
Variable number of conditions that can be:
|
||||
- str: Method names
|
||||
- dict: Existing condition dictionaries
|
||||
- Callable: Method references
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
A condition dictionary with format:
|
||||
{"type": "AND", "methods": list_of_method_names}
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If any condition is invalid.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> @listen(and_("validated", "processed"))
|
||||
>>> def handle_complete_data(self):
|
||||
... pass
|
||||
"""
|
||||
methods = []
|
||||
for condition in conditions:
|
||||
if isinstance(condition, dict) and "methods" in condition:
|
||||
@@ -286,6 +462,23 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return final_output
|
||||
|
||||
async def _execute_start_method(self, start_method_name: str) -> None:
|
||||
"""
|
||||
Executes a flow's start method and its triggered listeners.
|
||||
|
||||
This internal method handles the execution of methods marked with @start
|
||||
decorator and manages the subsequent chain of listener executions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
start_method_name : str
|
||||
The name of the start method to execute.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Executes the start method and captures its result
|
||||
- Triggers execution of any listeners waiting on this start method
|
||||
- Part of the flow's initialization sequence
|
||||
"""
|
||||
result = await self._execute_method(
|
||||
start_method_name, self._methods[start_method_name]
|
||||
)
|
||||
@@ -306,6 +499,28 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return result
|
||||
|
||||
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
|
||||
"""
|
||||
Executes all listeners and routers triggered by a method completion.
|
||||
|
||||
This internal method manages the execution flow by:
|
||||
1. First executing all triggered routers sequentially
|
||||
2. Then executing all triggered listeners in parallel
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trigger_method : str
|
||||
The name of the method that triggered these listeners.
|
||||
result : Any
|
||||
The result from the triggering method, passed to listeners
|
||||
that accept parameters.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Routers are executed sequentially to maintain flow control
|
||||
- Each router's result becomes the new trigger_method
|
||||
- Normal listeners are executed in parallel for efficiency
|
||||
- Listeners can receive the trigger method's result as a parameter
|
||||
"""
|
||||
# First, handle routers repeatedly until no router triggers anymore
|
||||
while True:
|
||||
routers_triggered = self._find_triggered_methods(
|
||||
@@ -335,6 +550,33 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
def _find_triggered_methods(
|
||||
self, trigger_method: str, router_only: bool
|
||||
) -> List[str]:
|
||||
"""
|
||||
Finds all methods that should be triggered based on conditions.
|
||||
|
||||
This internal method evaluates both OR and AND conditions to determine
|
||||
which methods should be executed next in the flow.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trigger_method : str
|
||||
The name of the method that just completed execution.
|
||||
router_only : bool
|
||||
If True, only consider router methods.
|
||||
If False, only consider non-router methods.
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[str]
|
||||
Names of methods that should be triggered.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Handles both OR and AND conditions:
|
||||
* OR: Triggers if any condition is met
|
||||
* AND: Triggers only when all conditions are met
|
||||
- Maintains state for AND conditions using _pending_and_listeners
|
||||
- Separates router and normal listener evaluation
|
||||
"""
|
||||
triggered = []
|
||||
for listener_name, (condition_type, methods) in self._listeners.items():
|
||||
is_router = listener_name in self._routers
|
||||
@@ -363,6 +605,33 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return triggered
|
||||
|
||||
async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
|
||||
"""
|
||||
Executes a single listener method with proper event handling.
|
||||
|
||||
This internal method manages the execution of an individual listener,
|
||||
including parameter inspection, event emission, and error handling.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
listener_name : str
|
||||
The name of the listener method to execute.
|
||||
result : Any
|
||||
The result from the triggering method, which may be passed
|
||||
to the listener if it accepts parameters.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Inspects method signature to determine if it accepts the trigger result
|
||||
- Emits events for method execution start and finish
|
||||
- Handles errors gracefully with detailed logging
|
||||
- Recursively triggers listeners of this listener
|
||||
- Supports both parameterized and parameter-less listeners
|
||||
|
||||
Error Handling
|
||||
-------------
|
||||
Catches and logs any exceptions during execution, preventing
|
||||
individual listener failures from breaking the entire flow.
|
||||
"""
|
||||
try:
|
||||
method = self._methods[listener_name]
|
||||
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
# flow_visualizer.py
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from pyvis.network import Network
|
||||
|
||||
from crewai.flow.config import COLORS, NODE_STYLES
|
||||
from crewai.flow.html_template_handler import HTMLTemplateHandler
|
||||
from crewai.flow.legend_generator import generate_legend_items_html, get_legend_items
|
||||
from crewai.flow.path_utils import safe_path_join, validate_path_exists
|
||||
from crewai.flow.utils import calculate_node_levels
|
||||
from crewai.flow.visualization_utils import (
|
||||
add_edges,
|
||||
@@ -16,89 +18,209 @@ from crewai.flow.visualization_utils import (
|
||||
|
||||
|
||||
class FlowPlot:
|
||||
"""Handles the creation and rendering of flow visualization diagrams."""
|
||||
|
||||
def __init__(self, flow):
|
||||
"""
|
||||
Initialize FlowPlot with a flow object.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Flow
|
||||
A Flow instance to visualize.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If flow object is invalid or missing required attributes.
|
||||
"""
|
||||
if not hasattr(flow, '_methods'):
|
||||
raise ValueError("Invalid flow object: missing '_methods' attribute")
|
||||
if not hasattr(flow, '_listeners'):
|
||||
raise ValueError("Invalid flow object: missing '_listeners' attribute")
|
||||
if not hasattr(flow, '_start_methods'):
|
||||
raise ValueError("Invalid flow object: missing '_start_methods' attribute")
|
||||
|
||||
self.flow = flow
|
||||
self.colors = COLORS
|
||||
self.node_styles = NODE_STYLES
|
||||
|
||||
def plot(self, filename):
|
||||
net = Network(
|
||||
directed=True,
|
||||
height="750px",
|
||||
width="100%",
|
||||
bgcolor=self.colors["bg"],
|
||||
layout=None,
|
||||
)
|
||||
|
||||
# Set options to disable physics
|
||||
net.set_options(
|
||||
"""
|
||||
var options = {
|
||||
"nodes": {
|
||||
"font": {
|
||||
"multi": "html"
|
||||
}
|
||||
},
|
||||
"physics": {
|
||||
"enabled": false
|
||||
}
|
||||
}
|
||||
"""
|
||||
)
|
||||
Generate and save an HTML visualization of the flow.
|
||||
|
||||
# Calculate levels for nodes
|
||||
node_levels = calculate_node_levels(self.flow)
|
||||
Parameters
|
||||
----------
|
||||
filename : str
|
||||
Name of the output file (without extension).
|
||||
|
||||
# Compute positions
|
||||
node_positions = compute_positions(self.flow, node_levels)
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If filename is invalid or network generation fails.
|
||||
IOError
|
||||
If file operations fail or visualization cannot be generated.
|
||||
RuntimeError
|
||||
If network visualization generation fails.
|
||||
"""
|
||||
if not filename or not isinstance(filename, str):
|
||||
raise ValueError("Filename must be a non-empty string")
|
||||
|
||||
try:
|
||||
# Initialize network
|
||||
net = Network(
|
||||
directed=True,
|
||||
height="750px",
|
||||
width="100%",
|
||||
bgcolor=self.colors["bg"],
|
||||
layout=None,
|
||||
)
|
||||
|
||||
# Add nodes to the network
|
||||
add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
|
||||
# Set options to disable physics
|
||||
net.set_options(
|
||||
"""
|
||||
var options = {
|
||||
"nodes": {
|
||||
"font": {
|
||||
"multi": "html"
|
||||
}
|
||||
},
|
||||
"physics": {
|
||||
"enabled": false
|
||||
}
|
||||
}
|
||||
"""
|
||||
)
|
||||
|
||||
# Add edges to the network
|
||||
add_edges(net, self.flow, node_positions, self.colors)
|
||||
# Calculate levels for nodes
|
||||
try:
|
||||
node_levels = calculate_node_levels(self.flow)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to calculate node levels: {str(e)}")
|
||||
|
||||
network_html = net.generate_html()
|
||||
final_html_content = self._generate_final_html(network_html)
|
||||
# Compute positions
|
||||
try:
|
||||
node_positions = compute_positions(self.flow, node_levels)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to compute node positions: {str(e)}")
|
||||
|
||||
# Save the final HTML content to the file
|
||||
with open(f"{filename}.html", "w", encoding="utf-8") as f:
|
||||
f.write(final_html_content)
|
||||
print(f"Plot saved as {filename}.html")
|
||||
# Add nodes to the network
|
||||
try:
|
||||
add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to add nodes to network: {str(e)}")
|
||||
|
||||
self._cleanup_pyvis_lib()
|
||||
# Add edges to the network
|
||||
try:
|
||||
add_edges(net, self.flow, node_positions, self.colors)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to add edges to network: {str(e)}")
|
||||
|
||||
# Generate HTML
|
||||
try:
|
||||
network_html = net.generate_html()
|
||||
final_html_content = self._generate_final_html(network_html)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to generate network visualization: {str(e)}")
|
||||
|
||||
# Save the final HTML content to the file
|
||||
try:
|
||||
with open(f"{filename}.html", "w", encoding="utf-8") as f:
|
||||
f.write(final_html_content)
|
||||
print(f"Plot saved as {filename}.html")
|
||||
except IOError as e:
|
||||
raise IOError(f"Failed to save flow visualization to {filename}.html: {str(e)}")
|
||||
|
||||
except (ValueError, RuntimeError, IOError) as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Unexpected error during flow visualization: {str(e)}")
|
||||
finally:
|
||||
self._cleanup_pyvis_lib()
|
||||
|
||||
def _generate_final_html(self, network_html):
|
||||
# Extract just the body content from the generated HTML
|
||||
current_dir = os.path.dirname(__file__)
|
||||
template_path = os.path.join(
|
||||
current_dir, "assets", "crewai_flow_visual_template.html"
|
||||
)
|
||||
logo_path = os.path.join(current_dir, "assets", "crewai_logo.svg")
|
||||
"""
|
||||
Generate the final HTML content with network visualization and legend.
|
||||
|
||||
html_handler = HTMLTemplateHandler(template_path, logo_path)
|
||||
network_body = html_handler.extract_body_content(network_html)
|
||||
Parameters
|
||||
----------
|
||||
network_html : str
|
||||
HTML content generated by pyvis Network.
|
||||
|
||||
# Generate the legend items HTML
|
||||
legend_items = get_legend_items(self.colors)
|
||||
legend_items_html = generate_legend_items_html(legend_items)
|
||||
final_html_content = html_handler.generate_final_html(
|
||||
network_body, legend_items_html
|
||||
)
|
||||
return final_html_content
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Complete HTML content with styling and legend.
|
||||
|
||||
Raises
|
||||
------
|
||||
IOError
|
||||
If template or logo files cannot be accessed.
|
||||
ValueError
|
||||
If network_html is invalid.
|
||||
"""
|
||||
if not network_html:
|
||||
raise ValueError("Invalid network HTML content")
|
||||
|
||||
try:
|
||||
# Extract just the body content from the generated HTML
|
||||
current_dir = os.path.dirname(__file__)
|
||||
template_path = safe_path_join("assets", "crewai_flow_visual_template.html", root=current_dir)
|
||||
logo_path = safe_path_join("assets", "crewai_logo.svg", root=current_dir)
|
||||
|
||||
if not os.path.exists(template_path):
|
||||
raise IOError(f"Template file not found: {template_path}")
|
||||
if not os.path.exists(logo_path):
|
||||
raise IOError(f"Logo file not found: {logo_path}")
|
||||
|
||||
html_handler = HTMLTemplateHandler(template_path, logo_path)
|
||||
network_body = html_handler.extract_body_content(network_html)
|
||||
|
||||
# Generate the legend items HTML
|
||||
legend_items = get_legend_items(self.colors)
|
||||
legend_items_html = generate_legend_items_html(legend_items)
|
||||
final_html_content = html_handler.generate_final_html(
|
||||
network_body, legend_items_html
|
||||
)
|
||||
return final_html_content
|
||||
except Exception as e:
|
||||
raise IOError(f"Failed to generate visualization HTML: {str(e)}")
|
||||
|
||||
def _cleanup_pyvis_lib(self):
|
||||
# Clean up the generated lib folder
|
||||
lib_folder = os.path.join(os.getcwd(), "lib")
|
||||
"""
|
||||
Clean up the generated lib folder from pyvis.
|
||||
|
||||
This method safely removes the temporary lib directory created by pyvis
|
||||
during network visualization generation.
|
||||
"""
|
||||
try:
|
||||
lib_folder = safe_path_join("lib", root=os.getcwd())
|
||||
if os.path.exists(lib_folder) and os.path.isdir(lib_folder):
|
||||
import shutil
|
||||
|
||||
shutil.rmtree(lib_folder)
|
||||
except ValueError as e:
|
||||
print(f"Error validating lib folder path: {e}")
|
||||
except Exception as e:
|
||||
print(f"Error cleaning up {lib_folder}: {e}")
|
||||
print(f"Error cleaning up lib folder: {e}")
|
||||
|
||||
|
||||
def plot_flow(flow, filename="flow_plot"):
|
||||
"""
|
||||
Convenience function to create and save a flow visualization.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Flow
|
||||
Flow instance to visualize.
|
||||
filename : str, optional
|
||||
Output filename without extension, by default "flow_plot".
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If flow object or filename is invalid.
|
||||
IOError
|
||||
If file operations fail.
|
||||
"""
|
||||
visualizer = FlowPlot(flow)
|
||||
visualizer.plot(filename)
|
||||
|
||||
@@ -1,26 +1,53 @@
|
||||
import base64
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from crewai.flow.path_utils import safe_path_join, validate_path_exists
|
||||
|
||||
|
||||
class HTMLTemplateHandler:
|
||||
"""Handles HTML template processing and generation for flow visualization diagrams."""
|
||||
|
||||
def __init__(self, template_path, logo_path):
|
||||
self.template_path = template_path
|
||||
self.logo_path = logo_path
|
||||
"""
|
||||
Initialize HTMLTemplateHandler with validated template and logo paths.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
template_path : str
|
||||
Path to the HTML template file.
|
||||
logo_path : str
|
||||
Path to the logo image file.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If template or logo paths are invalid or files don't exist.
|
||||
"""
|
||||
try:
|
||||
self.template_path = validate_path_exists(template_path, "file")
|
||||
self.logo_path = validate_path_exists(logo_path, "file")
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid template or logo path: {e}")
|
||||
|
||||
def read_template(self):
|
||||
"""Read and return the HTML template file contents."""
|
||||
with open(self.template_path, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
def encode_logo(self):
|
||||
"""Convert the logo SVG file to base64 encoded string."""
|
||||
with open(self.logo_path, "rb") as logo_file:
|
||||
logo_svg_data = logo_file.read()
|
||||
return base64.b64encode(logo_svg_data).decode("utf-8")
|
||||
|
||||
def extract_body_content(self, html):
|
||||
"""Extract and return content between body tags from HTML string."""
|
||||
match = re.search("<body.*?>(.*?)</body>", html, re.DOTALL)
|
||||
return match.group(1) if match else ""
|
||||
|
||||
def generate_legend_items_html(self, legend_items):
|
||||
"""Generate HTML markup for the legend items."""
|
||||
legend_items_html = ""
|
||||
for item in legend_items:
|
||||
if "border" in item:
|
||||
@@ -48,6 +75,7 @@ class HTMLTemplateHandler:
|
||||
return legend_items_html
|
||||
|
||||
def generate_final_html(self, network_body, legend_items_html, title="Flow Plot"):
|
||||
"""Combine all components into final HTML document with network visualization."""
|
||||
html_template = self.read_template()
|
||||
logo_svg_base64 = self.encode_logo()
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
|
||||
def get_legend_items(colors):
|
||||
return [
|
||||
{"label": "Start Method", "color": colors["start"]},
|
||||
|
||||
135
src/crewai/flow/path_utils.py
Normal file
135
src/crewai/flow/path_utils.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""
|
||||
Path utilities for secure file operations in CrewAI flow module.
|
||||
|
||||
This module provides utilities for secure path handling to prevent directory
|
||||
traversal attacks and ensure paths remain within allowed boundaries.
|
||||
"""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Union
|
||||
|
||||
|
||||
def safe_path_join(*parts: str, root: Union[str, Path, None] = None) -> str:
|
||||
"""
|
||||
Safely join path components and ensure the result is within allowed boundaries.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
*parts : str
|
||||
Variable number of path components to join.
|
||||
root : Union[str, Path, None], optional
|
||||
Root directory to use as base. If None, uses current working directory.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
String representation of the resolved path.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If the resulting path would be outside the root directory
|
||||
or if any path component is invalid.
|
||||
"""
|
||||
if not parts:
|
||||
raise ValueError("No path components provided")
|
||||
|
||||
try:
|
||||
# Convert all parts to strings and clean them
|
||||
clean_parts = [str(part).strip() for part in parts if part]
|
||||
if not clean_parts:
|
||||
raise ValueError("No valid path components provided")
|
||||
|
||||
# Establish root directory
|
||||
root_path = Path(root).resolve() if root else Path.cwd()
|
||||
|
||||
# Join and resolve the full path
|
||||
full_path = Path(root_path, *clean_parts).resolve()
|
||||
|
||||
# Check if the resolved path is within root
|
||||
if not str(full_path).startswith(str(root_path)):
|
||||
raise ValueError(
|
||||
f"Invalid path: Potential directory traversal. Path must be within {root_path}"
|
||||
)
|
||||
|
||||
return str(full_path)
|
||||
|
||||
except Exception as e:
|
||||
if isinstance(e, ValueError):
|
||||
raise
|
||||
raise ValueError(f"Invalid path components: {str(e)}")
|
||||
|
||||
|
||||
def validate_path_exists(path: Union[str, Path], file_type: str = "file") -> str:
|
||||
"""
|
||||
Validate that a path exists and is of the expected type.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : Union[str, Path]
|
||||
Path to validate.
|
||||
file_type : str, optional
|
||||
Expected type ('file' or 'directory'), by default 'file'.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
Validated path as string.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If path doesn't exist or is not of expected type.
|
||||
"""
|
||||
try:
|
||||
path_obj = Path(path).resolve()
|
||||
|
||||
if not path_obj.exists():
|
||||
raise ValueError(f"Path does not exist: {path}")
|
||||
|
||||
if file_type == "file" and not path_obj.is_file():
|
||||
raise ValueError(f"Path is not a file: {path}")
|
||||
elif file_type == "directory" and not path_obj.is_dir():
|
||||
raise ValueError(f"Path is not a directory: {path}")
|
||||
|
||||
return str(path_obj)
|
||||
|
||||
except Exception as e:
|
||||
if isinstance(e, ValueError):
|
||||
raise
|
||||
raise ValueError(f"Invalid path: {str(e)}")
|
||||
|
||||
|
||||
def list_files(directory: Union[str, Path], pattern: str = "*") -> List[str]:
|
||||
"""
|
||||
Safely list files in a directory matching a pattern.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
directory : Union[str, Path]
|
||||
Directory to search in.
|
||||
pattern : str, optional
|
||||
Glob pattern to match files against, by default "*".
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[str]
|
||||
List of matching file paths.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If directory is invalid or inaccessible.
|
||||
"""
|
||||
try:
|
||||
dir_path = Path(directory).resolve()
|
||||
if not dir_path.is_dir():
|
||||
raise ValueError(f"Not a directory: {directory}")
|
||||
|
||||
return [str(p) for p in dir_path.glob(pattern) if p.is_file()]
|
||||
|
||||
except Exception as e:
|
||||
if isinstance(e, ValueError):
|
||||
raise
|
||||
raise ValueError(f"Error listing files: {str(e)}")
|
||||
@@ -1,9 +1,25 @@
|
||||
"""
|
||||
Utility functions for flow visualization and dependency analysis.
|
||||
|
||||
This module provides core functionality for analyzing and manipulating flow structures,
|
||||
including node level calculation, ancestor tracking, and return value analysis.
|
||||
Functions in this module are primarily used by the visualization system to create
|
||||
accurate and informative flow diagrams.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> flow = Flow()
|
||||
>>> node_levels = calculate_node_levels(flow)
|
||||
>>> ancestors = build_ancestor_dict(flow)
|
||||
"""
|
||||
|
||||
import ast
|
||||
import inspect
|
||||
import textwrap
|
||||
from typing import Any, Dict, List, Optional, Set, Union
|
||||
|
||||
|
||||
def get_possible_return_constants(function):
|
||||
def get_possible_return_constants(function: Any) -> Optional[List[str]]:
|
||||
try:
|
||||
source = inspect.getsource(function)
|
||||
except OSError:
|
||||
@@ -77,11 +93,34 @@ def get_possible_return_constants(function):
|
||||
return list(return_values) if return_values else None
|
||||
|
||||
|
||||
def calculate_node_levels(flow):
|
||||
levels = {}
|
||||
queue = []
|
||||
visited = set()
|
||||
pending_and_listeners = {}
|
||||
def calculate_node_levels(flow: Any) -> Dict[str, int]:
|
||||
"""
|
||||
Calculate the hierarchical level of each node in the flow.
|
||||
|
||||
Performs a breadth-first traversal of the flow graph to assign levels
|
||||
to nodes, starting with start methods at level 0.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance containing methods, listeners, and router configurations.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, int]
|
||||
Dictionary mapping method names to their hierarchical levels.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Start methods are assigned level 0
|
||||
- Each subsequent connected node is assigned level = parent_level + 1
|
||||
- Handles both OR and AND conditions for listeners
|
||||
- Processes router paths separately
|
||||
"""
|
||||
levels: Dict[str, int] = {}
|
||||
queue: List[str] = []
|
||||
visited: Set[str] = set()
|
||||
pending_and_listeners: Dict[str, Set[str]] = {}
|
||||
|
||||
# Make all start methods at level 0
|
||||
for method_name, method in flow._methods.items():
|
||||
@@ -140,7 +179,20 @@ def calculate_node_levels(flow):
|
||||
return levels
|
||||
|
||||
|
||||
def count_outgoing_edges(flow):
|
||||
def count_outgoing_edges(flow: Any) -> Dict[str, int]:
|
||||
"""
|
||||
Count the number of outgoing edges for each method in the flow.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance to analyze.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, int]
|
||||
Dictionary mapping method names to their outgoing edge count.
|
||||
"""
|
||||
counts = {}
|
||||
for method_name in flow._methods:
|
||||
counts[method_name] = 0
|
||||
@@ -152,16 +204,53 @@ def count_outgoing_edges(flow):
|
||||
return counts
|
||||
|
||||
|
||||
def build_ancestor_dict(flow):
|
||||
ancestors = {node: set() for node in flow._methods}
|
||||
visited = set()
|
||||
def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
|
||||
"""
|
||||
Build a dictionary mapping each node to its ancestor nodes.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance to analyze.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, Set[str]]
|
||||
Dictionary mapping each node to a set of its ancestor nodes.
|
||||
"""
|
||||
ancestors: Dict[str, Set[str]] = {node: set() for node in flow._methods}
|
||||
visited: Set[str] = set()
|
||||
for node in flow._methods:
|
||||
if node not in visited:
|
||||
dfs_ancestors(node, ancestors, visited, flow)
|
||||
return ancestors
|
||||
|
||||
|
||||
def dfs_ancestors(node, ancestors, visited, flow):
|
||||
def dfs_ancestors(
|
||||
node: str,
|
||||
ancestors: Dict[str, Set[str]],
|
||||
visited: Set[str],
|
||||
flow: Any
|
||||
) -> None:
|
||||
"""
|
||||
Perform depth-first search to build ancestor relationships.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
node : str
|
||||
Current node being processed.
|
||||
ancestors : Dict[str, Set[str]]
|
||||
Dictionary tracking ancestor relationships.
|
||||
visited : Set[str]
|
||||
Set of already visited nodes.
|
||||
flow : Any
|
||||
The flow instance being analyzed.
|
||||
|
||||
Notes
|
||||
-----
|
||||
This function modifies the ancestors dictionary in-place to build
|
||||
the complete ancestor graph.
|
||||
"""
|
||||
if node in visited:
|
||||
return
|
||||
visited.add(node)
|
||||
@@ -185,12 +274,48 @@ def dfs_ancestors(node, ancestors, visited, flow):
|
||||
dfs_ancestors(listener_name, ancestors, visited, flow)
|
||||
|
||||
|
||||
def is_ancestor(node, ancestor_candidate, ancestors):
|
||||
def is_ancestor(node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]) -> bool:
|
||||
"""
|
||||
Check if one node is an ancestor of another.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
node : str
|
||||
The node to check ancestors for.
|
||||
ancestor_candidate : str
|
||||
The potential ancestor node.
|
||||
ancestors : Dict[str, Set[str]]
|
||||
Dictionary containing ancestor relationships.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if ancestor_candidate is an ancestor of node, False otherwise.
|
||||
"""
|
||||
return ancestor_candidate in ancestors.get(node, set())
|
||||
|
||||
|
||||
def build_parent_children_dict(flow):
|
||||
parent_children = {}
|
||||
def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
|
||||
"""
|
||||
Build a dictionary mapping parent nodes to their children.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance to analyze.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, List[str]]
|
||||
Dictionary mapping parent method names to lists of their child method names.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Maps listeners to their trigger methods
|
||||
- Maps router methods to their paths and listeners
|
||||
- Children lists are sorted for consistent ordering
|
||||
"""
|
||||
parent_children: Dict[str, List[str]] = {}
|
||||
|
||||
# Map listeners to their trigger methods
|
||||
for listener_name, (_, trigger_methods) in flow._listeners.items():
|
||||
@@ -214,7 +339,24 @@ def build_parent_children_dict(flow):
|
||||
return parent_children
|
||||
|
||||
|
||||
def get_child_index(parent, child, parent_children):
|
||||
def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str]]) -> int:
|
||||
"""
|
||||
Get the index of a child node in its parent's sorted children list.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
parent : str
|
||||
The parent node name.
|
||||
child : str
|
||||
The child node name to find the index for.
|
||||
parent_children : Dict[str, List[str]]
|
||||
Dictionary mapping parents to their children lists.
|
||||
|
||||
Returns
|
||||
-------
|
||||
int
|
||||
Zero-based index of the child in its parent's sorted children list.
|
||||
"""
|
||||
children = parent_children.get(parent, [])
|
||||
children.sort()
|
||||
return children.index(child)
|
||||
|
||||
@@ -1,5 +1,23 @@
|
||||
"""
|
||||
Utilities for creating visual representations of flow structures.
|
||||
|
||||
This module provides functions for generating network visualizations of flows,
|
||||
including node placement, edge creation, and visual styling. It handles the
|
||||
conversion of flow structures into visual network graphs with appropriate
|
||||
styling and layout.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> flow = Flow()
|
||||
>>> net = Network(directed=True)
|
||||
>>> node_positions = compute_positions(flow, node_levels)
|
||||
>>> add_nodes_to_network(net, flow, node_positions, node_styles)
|
||||
>>> add_edges(net, flow, node_positions, colors)
|
||||
"""
|
||||
|
||||
import ast
|
||||
import inspect
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from .utils import (
|
||||
build_ancestor_dict,
|
||||
@@ -9,8 +27,25 @@ from .utils import (
|
||||
)
|
||||
|
||||
|
||||
def method_calls_crew(method):
|
||||
"""Check if the method calls `.crew()`."""
|
||||
def method_calls_crew(method: Any) -> bool:
|
||||
"""
|
||||
Check if the method contains a call to `.crew()`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
method : Any
|
||||
The method to analyze for crew() calls.
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
True if the method calls .crew(), False otherwise.
|
||||
|
||||
Notes
|
||||
-----
|
||||
Uses AST analysis to detect method calls, specifically looking for
|
||||
attribute access of 'crew'.
|
||||
"""
|
||||
try:
|
||||
source = inspect.getsource(method)
|
||||
source = inspect.cleandoc(source)
|
||||
@@ -20,6 +55,7 @@ def method_calls_crew(method):
|
||||
return False
|
||||
|
||||
class CrewCallVisitor(ast.NodeVisitor):
|
||||
"""AST visitor to detect .crew() method calls."""
|
||||
def __init__(self):
|
||||
self.found = False
|
||||
|
||||
@@ -34,7 +70,34 @@ def method_calls_crew(method):
|
||||
return visitor.found
|
||||
|
||||
|
||||
def add_nodes_to_network(net, flow, node_positions, node_styles):
|
||||
def add_nodes_to_network(
|
||||
net: Any,
|
||||
flow: Any,
|
||||
node_positions: Dict[str, Tuple[float, float]],
|
||||
node_styles: Dict[str, Dict[str, Any]]
|
||||
) -> None:
|
||||
"""
|
||||
Add nodes to the network visualization with appropriate styling.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
net : Any
|
||||
The pyvis Network instance to add nodes to.
|
||||
flow : Any
|
||||
The flow instance containing method information.
|
||||
node_positions : Dict[str, Tuple[float, float]]
|
||||
Dictionary mapping node names to their (x, y) positions.
|
||||
node_styles : Dict[str, Dict[str, Any]]
|
||||
Dictionary containing style configurations for different node types.
|
||||
|
||||
Notes
|
||||
-----
|
||||
Node types include:
|
||||
- Start methods
|
||||
- Router methods
|
||||
- Crew methods
|
||||
- Regular methods
|
||||
"""
|
||||
def human_friendly_label(method_name):
|
||||
return method_name.replace("_", " ").title()
|
||||
|
||||
@@ -73,9 +136,33 @@ def add_nodes_to_network(net, flow, node_positions, node_styles):
|
||||
)
|
||||
|
||||
|
||||
def compute_positions(flow, node_levels, y_spacing=150, x_spacing=150):
|
||||
level_nodes = {}
|
||||
node_positions = {}
|
||||
def compute_positions(
|
||||
flow: Any,
|
||||
node_levels: Dict[str, int],
|
||||
y_spacing: float = 150,
|
||||
x_spacing: float = 150
|
||||
) -> Dict[str, Tuple[float, float]]:
|
||||
"""
|
||||
Compute the (x, y) positions for each node in the flow graph.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
flow : Any
|
||||
The flow instance to compute positions for.
|
||||
node_levels : Dict[str, int]
|
||||
Dictionary mapping node names to their hierarchical levels.
|
||||
y_spacing : float, optional
|
||||
Vertical spacing between levels, by default 150.
|
||||
x_spacing : float, optional
|
||||
Horizontal spacing between nodes, by default 150.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Dict[str, Tuple[float, float]]
|
||||
Dictionary mapping node names to their (x, y) coordinates.
|
||||
"""
|
||||
level_nodes: Dict[int, List[str]] = {}
|
||||
node_positions: Dict[str, Tuple[float, float]] = {}
|
||||
|
||||
for method_name, level in node_levels.items():
|
||||
level_nodes.setdefault(level, []).append(method_name)
|
||||
@@ -90,7 +177,33 @@ def compute_positions(flow, node_levels, y_spacing=150, x_spacing=150):
|
||||
return node_positions
|
||||
|
||||
|
||||
def add_edges(net, flow, node_positions, colors):
|
||||
def add_edges(
|
||||
net: Any,
|
||||
flow: Any,
|
||||
node_positions: Dict[str, Tuple[float, float]],
|
||||
colors: Dict[str, str]
|
||||
) -> None:
|
||||
edge_smooth: Dict[str, Union[str, float]] = {"type": "continuous"} # Default value
|
||||
"""
|
||||
Add edges to the network visualization with appropriate styling.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
net : Any
|
||||
The pyvis Network instance to add edges to.
|
||||
flow : Any
|
||||
The flow instance containing edge information.
|
||||
node_positions : Dict[str, Tuple[float, float]]
|
||||
Dictionary mapping node names to their positions.
|
||||
colors : Dict[str, str]
|
||||
Dictionary mapping edge types to their colors.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Handles both normal listener edges and router edges
|
||||
- Applies appropriate styling (color, dashes) based on edge type
|
||||
- Adds curvature to edges when needed (cycles or multiple children)
|
||||
"""
|
||||
ancestors = build_ancestor_dict(flow)
|
||||
parent_children = build_parent_children_dict(flow)
|
||||
|
||||
@@ -126,7 +239,7 @@ def add_edges(net, flow, node_positions, colors):
|
||||
else:
|
||||
edge_smooth = {"type": "cubicBezier"}
|
||||
else:
|
||||
edge_smooth = False
|
||||
edge_smooth.update({"type": "continuous"})
|
||||
|
||||
edge_style = {
|
||||
"color": edge_color,
|
||||
@@ -189,7 +302,7 @@ def add_edges(net, flow, node_positions, colors):
|
||||
else:
|
||||
edge_smooth = {"type": "cubicBezier"}
|
||||
else:
|
||||
edge_smooth = False
|
||||
edge_smooth.update({"type": "continuous"})
|
||||
|
||||
edge_style = {
|
||||
"color": colors["router_edge"],
|
||||
|
||||
@@ -2,11 +2,16 @@ from pathlib import Path
|
||||
from typing import Iterator, List, Optional, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from docling.datamodel.base_models import InputFormat
|
||||
from docling.document_converter import DocumentConverter
|
||||
from docling.exceptions import ConversionError
|
||||
from docling_core.transforms.chunker.hierarchical_chunker import HierarchicalChunker
|
||||
from docling_core.types.doc.document import DoclingDocument
|
||||
try:
|
||||
from docling.datamodel.base_models import InputFormat
|
||||
from docling.document_converter import DocumentConverter
|
||||
from docling.exceptions import ConversionError
|
||||
from docling_core.transforms.chunker.hierarchical_chunker import HierarchicalChunker
|
||||
from docling_core.types.doc.document import DoclingDocument
|
||||
DOCLING_AVAILABLE = True
|
||||
except ImportError:
|
||||
DOCLING_AVAILABLE = False
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||
@@ -19,6 +24,14 @@ class CrewDoclingSource(BaseKnowledgeSource):
|
||||
This will auto support PDF, DOCX, and TXT, XLSX, Images, and HTML files without any additional dependencies and follows the docling package as the source of truth.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if not DOCLING_AVAILABLE:
|
||||
raise ImportError(
|
||||
"The docling package is required to use CrewDoclingSource. "
|
||||
"Please install it using: uv add docling"
|
||||
)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
_logger: Logger = Logger(verbose=True)
|
||||
|
||||
file_path: Optional[List[Union[Path, str]]] = Field(default=None)
|
||||
|
||||
@@ -4,10 +4,13 @@ import sys
|
||||
import threading
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from importlib import resources
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
import litellm
|
||||
from litellm import get_supported_openai_params
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", UserWarning)
|
||||
import litellm
|
||||
from litellm import get_supported_openai_params
|
||||
|
||||
from crewai.utilities.exceptions.context_window_exceeding_exception import (
|
||||
LLMContextLengthExceededException,
|
||||
@@ -76,6 +79,7 @@ CONTEXT_WINDOW_USAGE_RATIO = 0.75
|
||||
def suppress_warnings():
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings("ignore")
|
||||
warnings.filterwarnings("ignore", message="open_text is deprecated*", category=DeprecationWarning)
|
||||
|
||||
# Redirect stdout and stderr
|
||||
old_stdout = sys.stdout
|
||||
@@ -138,7 +142,7 @@ class LLM:
|
||||
self.kwargs = kwargs
|
||||
|
||||
litellm.drop_params = True
|
||||
litellm.set_verbose = False
|
||||
|
||||
self.set_callbacks(callbacks)
|
||||
self.set_env_callbacks()
|
||||
|
||||
@@ -214,16 +218,17 @@ class LLM:
|
||||
return self.context_window_size
|
||||
|
||||
def set_callbacks(self, callbacks: List[Any]):
|
||||
callback_types = [type(callback) for callback in callbacks]
|
||||
for callback in litellm.success_callback[:]:
|
||||
if type(callback) in callback_types:
|
||||
litellm.success_callback.remove(callback)
|
||||
with suppress_warnings():
|
||||
callback_types = [type(callback) for callback in callbacks]
|
||||
for callback in litellm.success_callback[:]:
|
||||
if type(callback) in callback_types:
|
||||
litellm.success_callback.remove(callback)
|
||||
|
||||
for callback in litellm._async_success_callback[:]:
|
||||
if type(callback) in callback_types:
|
||||
litellm._async_success_callback.remove(callback)
|
||||
for callback in litellm._async_success_callback[:]:
|
||||
if type(callback) in callback_types:
|
||||
litellm._async_success_callback.remove(callback)
|
||||
|
||||
litellm.callbacks = callbacks
|
||||
litellm.callbacks = callbacks
|
||||
|
||||
def set_env_callbacks(self):
|
||||
"""
|
||||
@@ -244,19 +249,20 @@ class LLM:
|
||||
This will set `litellm.success_callback` to ["langfuse", "langsmith"] and
|
||||
`litellm.failure_callback` to ["langfuse"].
|
||||
"""
|
||||
success_callbacks_str = os.environ.get("LITELLM_SUCCESS_CALLBACKS", "")
|
||||
success_callbacks = []
|
||||
if success_callbacks_str:
|
||||
success_callbacks = [
|
||||
callback.strip() for callback in success_callbacks_str.split(",")
|
||||
]
|
||||
with suppress_warnings():
|
||||
success_callbacks_str = os.environ.get("LITELLM_SUCCESS_CALLBACKS", "")
|
||||
success_callbacks = []
|
||||
if success_callbacks_str:
|
||||
success_callbacks = [
|
||||
callback.strip() for callback in success_callbacks_str.split(",")
|
||||
]
|
||||
|
||||
failure_callbacks_str = os.environ.get("LITELLM_FAILURE_CALLBACKS", "")
|
||||
failure_callbacks = []
|
||||
if failure_callbacks_str:
|
||||
failure_callbacks = [
|
||||
callback.strip() for callback in failure_callbacks_str.split(",")
|
||||
]
|
||||
failure_callbacks_str = os.environ.get("LITELLM_FAILURE_CALLBACKS", "")
|
||||
failure_callbacks = []
|
||||
if failure_callbacks_str:
|
||||
failure_callbacks = [
|
||||
callback.strip() for callback in failure_callbacks_str.split(",")
|
||||
]
|
||||
|
||||
litellm.success_callback = success_callbacks
|
||||
litellm.failure_callback = failure_callbacks
|
||||
litellm.success_callback = success_callbacks
|
||||
litellm.failure_callback = failure_callbacks
|
||||
|
||||
@@ -4,18 +4,23 @@ from typing import Callable
|
||||
from crewai import Crew
|
||||
from crewai.project.utils import memoize
|
||||
|
||||
"""Decorators for defining crew components and their behaviors."""
|
||||
|
||||
|
||||
def before_kickoff(func):
|
||||
"""Marks a method to execute before crew kickoff."""
|
||||
func.is_before_kickoff = True
|
||||
return func
|
||||
|
||||
|
||||
def after_kickoff(func):
|
||||
"""Marks a method to execute after crew kickoff."""
|
||||
func.is_after_kickoff = True
|
||||
return func
|
||||
|
||||
|
||||
def task(func):
|
||||
"""Marks a method as a crew task."""
|
||||
func.is_task = True
|
||||
|
||||
@wraps(func)
|
||||
@@ -29,43 +34,51 @@ def task(func):
|
||||
|
||||
|
||||
def agent(func):
|
||||
"""Marks a method as a crew agent."""
|
||||
func.is_agent = True
|
||||
func = memoize(func)
|
||||
return func
|
||||
|
||||
|
||||
def llm(func):
|
||||
"""Marks a method as an LLM provider."""
|
||||
func.is_llm = True
|
||||
func = memoize(func)
|
||||
return func
|
||||
|
||||
|
||||
def output_json(cls):
|
||||
"""Marks a class as JSON output format."""
|
||||
cls.is_output_json = True
|
||||
return cls
|
||||
|
||||
|
||||
def output_pydantic(cls):
|
||||
"""Marks a class as Pydantic output format."""
|
||||
cls.is_output_pydantic = True
|
||||
return cls
|
||||
|
||||
|
||||
def tool(func):
|
||||
"""Marks a method as a crew tool."""
|
||||
func.is_tool = True
|
||||
return memoize(func)
|
||||
|
||||
|
||||
def callback(func):
|
||||
"""Marks a method as a crew callback."""
|
||||
func.is_callback = True
|
||||
return memoize(func)
|
||||
|
||||
|
||||
def cache_handler(func):
|
||||
"""Marks a method as a cache handler."""
|
||||
func.is_cache_handler = True
|
||||
return memoize(func)
|
||||
|
||||
|
||||
def crew(func) -> Callable[..., Crew]:
|
||||
"""Marks a method as the main crew execution point."""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(self, *args, **kwargs) -> Crew:
|
||||
|
||||
@@ -9,8 +9,10 @@ load_dotenv()
|
||||
|
||||
T = TypeVar("T", bound=type)
|
||||
|
||||
"""Base decorator for creating crew classes with configuration and function management."""
|
||||
|
||||
def CrewBase(cls: T) -> T:
|
||||
"""Wraps a class with crew functionality and configuration management."""
|
||||
class WrappedClass(cls): # type: ignore
|
||||
is_crew_class: bool = True # type: ignore
|
||||
|
||||
@@ -216,5 +218,5 @@ def CrewBase(cls: T) -> T:
|
||||
# Include base class (qual)name in the wrapper class (qual)name.
|
||||
WrappedClass.__name__ = CrewBase.__name__ + "(" + cls.__name__ + ")"
|
||||
WrappedClass.__qualname__ = CrewBase.__qualname__ + "(" + cls.__name__ + ")"
|
||||
|
||||
|
||||
return cast(T, WrappedClass)
|
||||
|
||||
@@ -41,6 +41,7 @@ from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities.config import process_config
|
||||
from crewai.utilities.converter import Converter, convert_to_model
|
||||
from crewai.utilities.i18n import I18N
|
||||
from crewai.utilities.printer import Printer
|
||||
|
||||
|
||||
class Task(BaseModel):
|
||||
@@ -127,38 +128,40 @@ class Task(BaseModel):
|
||||
processed_by_agents: Set[str] = Field(default_factory=set)
|
||||
guardrail: Optional[Callable[[TaskOutput], Tuple[bool, Any]]] = Field(
|
||||
default=None,
|
||||
description="Function to validate task output before proceeding to next task"
|
||||
description="Function to validate task output before proceeding to next task",
|
||||
)
|
||||
max_retries: int = Field(
|
||||
default=3,
|
||||
description="Maximum number of retries when guardrail fails"
|
||||
default=3, description="Maximum number of retries when guardrail fails"
|
||||
)
|
||||
retry_count: int = Field(
|
||||
default=0,
|
||||
description="Current number of retries"
|
||||
retry_count: int = Field(default=0, description="Current number of retries")
|
||||
start_time: Optional[datetime.datetime] = Field(
|
||||
default=None, description="Start time of the task execution"
|
||||
)
|
||||
end_time: Optional[datetime.datetime] = Field(
|
||||
default=None, description="End time of the task execution"
|
||||
)
|
||||
|
||||
@field_validator("guardrail")
|
||||
@classmethod
|
||||
def validate_guardrail_function(cls, v: Optional[Callable]) -> Optional[Callable]:
|
||||
"""Validate that the guardrail function has the correct signature and behavior.
|
||||
|
||||
|
||||
While type hints provide static checking, this validator ensures runtime safety by:
|
||||
1. Verifying the function accepts exactly one parameter (the TaskOutput)
|
||||
2. Checking return type annotations match Tuple[bool, Any] if present
|
||||
3. Providing clear, immediate error messages for debugging
|
||||
|
||||
|
||||
This runtime validation is crucial because:
|
||||
- Type hints are optional and can be ignored at runtime
|
||||
- Function signatures need immediate validation before task execution
|
||||
- Clear error messages help users debug guardrail implementation issues
|
||||
|
||||
|
||||
Args:
|
||||
v: The guardrail function to validate
|
||||
|
||||
|
||||
Returns:
|
||||
The validated guardrail function
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If the function signature is invalid or return annotation
|
||||
doesn't match Tuple[bool, Any]
|
||||
@@ -171,16 +174,21 @@ class Task(BaseModel):
|
||||
# Check return annotation if present, but don't require it
|
||||
return_annotation = sig.return_annotation
|
||||
if return_annotation != inspect.Signature.empty:
|
||||
if not (return_annotation == Tuple[bool, Any] or str(return_annotation) == 'Tuple[bool, Any]'):
|
||||
raise ValueError("If return type is annotated, it must be Tuple[bool, Any]")
|
||||
if not (
|
||||
return_annotation == Tuple[bool, Any]
|
||||
or str(return_annotation) == "Tuple[bool, Any]"
|
||||
):
|
||||
raise ValueError(
|
||||
"If return type is annotated, it must be Tuple[bool, Any]"
|
||||
)
|
||||
return v
|
||||
|
||||
_telemetry: Telemetry = PrivateAttr(default_factory=Telemetry)
|
||||
_execution_span: Optional[Span] = PrivateAttr(default=None)
|
||||
_original_description: Optional[str] = PrivateAttr(default=None)
|
||||
_original_expected_output: Optional[str] = PrivateAttr(default=None)
|
||||
_original_output_file: Optional[str] = PrivateAttr(default=None)
|
||||
_thread: Optional[threading.Thread] = PrivateAttr(default=None)
|
||||
_execution_time: Optional[float] = PrivateAttr(default=None)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
@@ -205,16 +213,54 @@ class Task(BaseModel):
|
||||
"may_not_set_field", "This field is not to be set by the user.", {}
|
||||
)
|
||||
|
||||
def _set_start_execution_time(self) -> float:
|
||||
return datetime.datetime.now().timestamp()
|
||||
|
||||
def _set_end_execution_time(self, start_time: float) -> None:
|
||||
self._execution_time = datetime.datetime.now().timestamp() - start_time
|
||||
|
||||
@field_validator("output_file")
|
||||
@classmethod
|
||||
def output_file_validation(cls, value: str) -> str:
|
||||
"""Validate the output file path by removing the / from the beginning of the path."""
|
||||
def output_file_validation(cls, value: Optional[str]) -> Optional[str]:
|
||||
"""Validate the output file path.
|
||||
|
||||
Args:
|
||||
value: The output file path to validate. Can be None or a string.
|
||||
If the path contains template variables (e.g. {var}), leading slashes are preserved.
|
||||
For regular paths, leading slashes are stripped.
|
||||
|
||||
Returns:
|
||||
The validated and potentially modified path, or None if no path was provided.
|
||||
|
||||
Raises:
|
||||
ValueError: If the path contains invalid characters, path traversal attempts,
|
||||
or other security concerns.
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
# Basic security checks
|
||||
if ".." in value:
|
||||
raise ValueError(
|
||||
"Path traversal attempts are not allowed in output_file paths"
|
||||
)
|
||||
|
||||
# Check for shell expansion first
|
||||
if value.startswith("~") or value.startswith("$"):
|
||||
raise ValueError(
|
||||
"Shell expansion characters are not allowed in output_file paths"
|
||||
)
|
||||
|
||||
# Then check other shell special characters
|
||||
if any(char in value for char in ["|", ">", "<", "&", ";"]):
|
||||
raise ValueError(
|
||||
"Shell special characters are not allowed in output_file paths"
|
||||
)
|
||||
|
||||
# Don't strip leading slash if it's a template path with variables
|
||||
if "{" in value or "}" in value:
|
||||
# Validate template variable format
|
||||
template_vars = [part.split("}")[0] for part in value.split("{")[1:]]
|
||||
for var in template_vars:
|
||||
if not var.isidentifier():
|
||||
raise ValueError(f"Invalid template variable name: {var}")
|
||||
return value
|
||||
|
||||
# Strip leading slash for regular paths
|
||||
if value.startswith("/"):
|
||||
return value[1:]
|
||||
return value
|
||||
@@ -263,6 +309,12 @@ class Task(BaseModel):
|
||||
|
||||
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
|
||||
|
||||
@property
|
||||
def execution_duration(self) -> float | None:
|
||||
if not self.start_time or not self.end_time:
|
||||
return None
|
||||
return (self.end_time - self.start_time).total_seconds()
|
||||
|
||||
def execute_async(
|
||||
self,
|
||||
agent: BaseAgent | None = None,
|
||||
@@ -303,7 +355,7 @@ class Task(BaseModel):
|
||||
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
|
||||
)
|
||||
|
||||
start_time = self._set_start_execution_time()
|
||||
self.start_time = datetime.datetime.now()
|
||||
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
|
||||
|
||||
self.prompt_context = context
|
||||
@@ -339,10 +391,14 @@ class Task(BaseModel):
|
||||
)
|
||||
|
||||
self.retry_count += 1
|
||||
context = (
|
||||
f"### Previous attempt failed validation: {guardrail_result.error}\n\n\n"
|
||||
f"### Previous result:\n{task_output.raw}\n\n\n"
|
||||
"Try again, making sure to address the validation error."
|
||||
context = self.i18n.errors("validation_error").format(
|
||||
guardrail_result_error=guardrail_result.error,
|
||||
task_output=task_output.raw
|
||||
)
|
||||
printer = Printer()
|
||||
printer.print(
|
||||
content=f"Guardrail blocked, retrying, due to:{guardrail_result.error}\n",
|
||||
color="yellow",
|
||||
)
|
||||
return self._execute_core(agent, context, tools)
|
||||
|
||||
@@ -353,15 +409,17 @@ class Task(BaseModel):
|
||||
|
||||
if isinstance(guardrail_result.result, str):
|
||||
task_output.raw = guardrail_result.result
|
||||
pydantic_output, json_output = self._export_output(guardrail_result.result)
|
||||
pydantic_output, json_output = self._export_output(
|
||||
guardrail_result.result
|
||||
)
|
||||
task_output.pydantic = pydantic_output
|
||||
task_output.json_dict = json_output
|
||||
elif isinstance(guardrail_result.result, TaskOutput):
|
||||
task_output = guardrail_result.result
|
||||
|
||||
self.output = task_output
|
||||
self.end_time = datetime.datetime.now()
|
||||
|
||||
self._set_end_execution_time(start_time)
|
||||
if self.callback:
|
||||
self.callback(self.output)
|
||||
|
||||
@@ -373,7 +431,9 @@ class Task(BaseModel):
|
||||
content = (
|
||||
json_output
|
||||
if json_output
|
||||
else pydantic_output.model_dump_json() if pydantic_output else result
|
||||
else pydantic_output.model_dump_json()
|
||||
if pydantic_output
|
||||
else result
|
||||
)
|
||||
self._save_file(content)
|
||||
|
||||
@@ -393,27 +453,101 @@ class Task(BaseModel):
|
||||
tasks_slices = [self.description, output]
|
||||
return "\n".join(tasks_slices)
|
||||
|
||||
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
|
||||
"""Interpolate inputs into the task description and expected output."""
|
||||
def interpolate_inputs(self, inputs: Dict[str, Union[str, int, float]]) -> None:
|
||||
"""Interpolate inputs into the task description, expected output, and output file path.
|
||||
|
||||
Args:
|
||||
inputs: Dictionary mapping template variables to their values.
|
||||
Supported value types are strings, integers, and floats.
|
||||
|
||||
Raises:
|
||||
ValueError: If a required template variable is missing from inputs.
|
||||
"""
|
||||
if self._original_description is None:
|
||||
self._original_description = self.description
|
||||
if self._original_expected_output is None:
|
||||
self._original_expected_output = self.expected_output
|
||||
if self.output_file is not None and self._original_output_file is None:
|
||||
self._original_output_file = self.output_file
|
||||
|
||||
if inputs:
|
||||
if not inputs:
|
||||
return
|
||||
|
||||
try:
|
||||
self.description = self._original_description.format(**inputs)
|
||||
except KeyError as e:
|
||||
raise ValueError(
|
||||
f"Missing required template variable '{e.args[0]}' in description"
|
||||
) from e
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Error interpolating description: {str(e)}") from e
|
||||
|
||||
try:
|
||||
self.expected_output = self.interpolate_only(
|
||||
input_string=self._original_expected_output, inputs=inputs
|
||||
)
|
||||
except (KeyError, ValueError) as e:
|
||||
raise ValueError(f"Error interpolating expected_output: {str(e)}") from e
|
||||
|
||||
def interpolate_only(self, input_string: str, inputs: Dict[str, Any]) -> str:
|
||||
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched."""
|
||||
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
|
||||
if self.output_file is not None:
|
||||
try:
|
||||
self.output_file = self.interpolate_only(
|
||||
input_string=self._original_output_file, inputs=inputs
|
||||
)
|
||||
except (KeyError, ValueError) as e:
|
||||
raise ValueError(
|
||||
f"Error interpolating output_file path: {str(e)}"
|
||||
) from e
|
||||
|
||||
for key in inputs.keys():
|
||||
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
|
||||
def interpolate_only(
|
||||
self, input_string: Optional[str], inputs: Dict[str, Union[str, int, float]]
|
||||
) -> str:
|
||||
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
|
||||
|
||||
return escaped_string.format(**inputs)
|
||||
Args:
|
||||
input_string: The string containing template variables to interpolate.
|
||||
Can be None or empty, in which case an empty string is returned.
|
||||
inputs: Dictionary mapping template variables to their values.
|
||||
Supported value types are strings, integers, and floats.
|
||||
If input_string is empty or has no placeholders, inputs can be empty.
|
||||
|
||||
Returns:
|
||||
The interpolated string with all template variables replaced with their values.
|
||||
Empty string if input_string is None or empty.
|
||||
|
||||
Raises:
|
||||
ValueError: If a required template variable is missing from inputs.
|
||||
KeyError: If a template variable is not found in the inputs dictionary.
|
||||
"""
|
||||
if input_string is None or not input_string:
|
||||
return ""
|
||||
if "{" not in input_string and "}" not in input_string:
|
||||
return input_string
|
||||
if not inputs:
|
||||
raise ValueError(
|
||||
"Inputs dictionary cannot be empty when interpolating variables"
|
||||
)
|
||||
|
||||
try:
|
||||
# Validate input types
|
||||
for key, value in inputs.items():
|
||||
if not isinstance(value, (str, int, float)):
|
||||
raise ValueError(
|
||||
f"Value for key '{key}' must be a string, integer, or float, got {type(value).__name__}"
|
||||
)
|
||||
|
||||
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
|
||||
|
||||
for key in inputs.keys():
|
||||
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
|
||||
|
||||
return escaped_string.format(**inputs)
|
||||
except KeyError as e:
|
||||
raise KeyError(
|
||||
f"Template variable '{e.args[0]}' not found in inputs dictionary"
|
||||
) from e
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Error during string interpolation: {str(e)}") from e
|
||||
|
||||
def increment_tools_errors(self) -> None:
|
||||
"""Increment the tools errors counter."""
|
||||
@@ -496,10 +630,10 @@ class Task(BaseModel):
|
||||
|
||||
def _save_file(self, result: Any) -> None:
|
||||
"""Save task output to a file.
|
||||
|
||||
|
||||
Args:
|
||||
result: The result to save to the file. Can be a dict or any stringifiable object.
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If output_file is not set
|
||||
RuntimeError: If there is an error writing to the file
|
||||
@@ -517,6 +651,7 @@ class Task(BaseModel):
|
||||
with resolved_path.open("w", encoding="utf-8") as file:
|
||||
if isinstance(result, dict):
|
||||
import json
|
||||
|
||||
json.dump(result, file, ensure_ascii=False, indent=2)
|
||||
else:
|
||||
file.write(str(result))
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from typing import Optional, Union
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
@@ -7,6 +8,8 @@ from crewai.task import Task
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities import I18N
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseAgentTool(BaseTool):
|
||||
"""Base class for agent-related tools"""
|
||||
@@ -16,6 +19,25 @@ class BaseAgentTool(BaseTool):
|
||||
default_factory=I18N, description="Internationalization settings"
|
||||
)
|
||||
|
||||
def sanitize_agent_name(self, name: str) -> str:
|
||||
"""
|
||||
Sanitize agent role name by normalizing whitespace and setting to lowercase.
|
||||
Converts all whitespace (including newlines) to single spaces and removes quotes.
|
||||
|
||||
Args:
|
||||
name (str): The agent role name to sanitize
|
||||
|
||||
Returns:
|
||||
str: The sanitized agent role name, with whitespace normalized,
|
||||
converted to lowercase, and quotes removed
|
||||
"""
|
||||
if not name:
|
||||
return ""
|
||||
# Normalize all whitespace (including newlines) to single spaces
|
||||
normalized = " ".join(name.split())
|
||||
# Remove quotes and convert to lowercase
|
||||
return normalized.replace('"', "").casefold()
|
||||
|
||||
def _get_coworker(self, coworker: Optional[str], **kwargs) -> Optional[str]:
|
||||
coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker")
|
||||
if coworker:
|
||||
@@ -25,11 +47,27 @@ class BaseAgentTool(BaseTool):
|
||||
return coworker
|
||||
|
||||
def _execute(
|
||||
self, agent_name: Union[str, None], task: str, context: Union[str, None]
|
||||
self,
|
||||
agent_name: Optional[str],
|
||||
task: str,
|
||||
context: Optional[str] = None
|
||||
) -> str:
|
||||
"""
|
||||
Execute delegation to an agent with case-insensitive and whitespace-tolerant matching.
|
||||
|
||||
Args:
|
||||
agent_name: Name/role of the agent to delegate to (case-insensitive)
|
||||
task: The specific question or task to delegate
|
||||
context: Optional additional context for the task execution
|
||||
|
||||
Returns:
|
||||
str: The execution result from the delegated agent or an error message
|
||||
if the agent cannot be found
|
||||
"""
|
||||
try:
|
||||
if agent_name is None:
|
||||
agent_name = ""
|
||||
logger.debug("No agent name provided, using empty string")
|
||||
|
||||
# It is important to remove the quotes from the agent name.
|
||||
# The reason we have to do this is because less-powerful LLM's
|
||||
@@ -38,31 +76,49 @@ class BaseAgentTool(BaseTool):
|
||||
# {"task": "....", "coworker": "....
|
||||
# when it should look like this:
|
||||
# {"task": "....", "coworker": "...."}
|
||||
agent_name = agent_name.casefold().replace('"', "").replace("\n", "")
|
||||
sanitized_name = self.sanitize_agent_name(agent_name)
|
||||
logger.debug(f"Sanitized agent name from '{agent_name}' to '{sanitized_name}'")
|
||||
|
||||
available_agents = [agent.role for agent in self.agents]
|
||||
logger.debug(f"Available agents: {available_agents}")
|
||||
|
||||
agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None")
|
||||
available_agent
|
||||
for available_agent in self.agents
|
||||
if available_agent.role.casefold().replace("\n", "") == agent_name
|
||||
if self.sanitize_agent_name(available_agent.role) == sanitized_name
|
||||
]
|
||||
except Exception as _:
|
||||
logger.debug(f"Found {len(agent)} matching agents for role '{sanitized_name}'")
|
||||
except (AttributeError, ValueError) as e:
|
||||
# Handle specific exceptions that might occur during role name processing
|
||||
return self.i18n.errors("agent_tool_unexisting_coworker").format(
|
||||
coworkers="\n".join(
|
||||
[f"- {agent.role.casefold()}" for agent in self.agents]
|
||||
)
|
||||
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
|
||||
),
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
if not agent:
|
||||
# No matching agent found after sanitization
|
||||
return self.i18n.errors("agent_tool_unexisting_coworker").format(
|
||||
coworkers="\n".join(
|
||||
[f"- {agent.role.casefold()}" for agent in self.agents]
|
||||
)
|
||||
[f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents]
|
||||
),
|
||||
error=f"No agent found with role '{sanitized_name}'"
|
||||
)
|
||||
|
||||
agent = agent[0]
|
||||
task_with_assigned_agent = Task( # type: ignore # Incompatible types in assignment (expression has type "Task", variable has type "str")
|
||||
description=task,
|
||||
agent=agent,
|
||||
expected_output=agent.i18n.slice("manager_request"),
|
||||
i18n=agent.i18n,
|
||||
)
|
||||
return agent.execute_task(task_with_assigned_agent, context)
|
||||
try:
|
||||
task_with_assigned_agent = Task(
|
||||
description=task,
|
||||
agent=agent,
|
||||
expected_output=agent.i18n.slice("manager_request"),
|
||||
i18n=agent.i18n,
|
||||
)
|
||||
logger.debug(f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}")
|
||||
return agent.execute_task(task_with_assigned_agent, context)
|
||||
except Exception as e:
|
||||
# Handle task creation or execution errors
|
||||
return self.i18n.errors("agent_tool_execution_error").format(
|
||||
agent_role=self.sanitize_agent_name(agent.role),
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
@@ -1,12 +1,23 @@
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
from inspect import signature
|
||||
from typing import Any, Callable, Type, get_args, get_origin
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, create_model, validator
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
ConfigDict,
|
||||
Field,
|
||||
PydanticDeprecatedSince20,
|
||||
create_model,
|
||||
validator,
|
||||
)
|
||||
from pydantic import BaseModel as PydanticBaseModel
|
||||
|
||||
from crewai.tools.structured_tool import CrewStructuredTool
|
||||
|
||||
# Ignore all "PydanticDeprecatedSince20" warnings globally
|
||||
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20)
|
||||
|
||||
|
||||
class BaseTool(BaseModel, ABC):
|
||||
class _ArgsSchemaPlaceholder(PydanticBaseModel):
|
||||
|
||||
@@ -33,7 +33,9 @@
|
||||
"tool_usage_error": "I encountered an error: {error}",
|
||||
"tool_arguments_error": "Error: the Action Input is not a valid key, value dictionary.",
|
||||
"wrong_tool_name": "You tried to use the tool {tool}, but it doesn't exist. You must use one of the following tools, use one at time: {tools}.",
|
||||
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}"
|
||||
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}",
|
||||
"agent_tool_execution_error": "Error executing task with agent '{agent_role}'. Error: {error}",
|
||||
"validation_error": "### Previous attempt failed validation: {guardrail_result_error}\n\n\n### Previous result:\n{task_output}\n\n\nTry again, making sure to address the validation error."
|
||||
},
|
||||
"tools": {
|
||||
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
"""JSON encoder for handling CrewAI specific types."""
|
||||
|
||||
import json
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
@@ -8,6 +10,7 @@ from pydantic import BaseModel
|
||||
|
||||
|
||||
class CrewJSONEncoder(json.JSONEncoder):
|
||||
"""Custom JSON encoder for CrewAI objects and special types."""
|
||||
def default(self, obj):
|
||||
if isinstance(obj, BaseModel):
|
||||
return self._handle_pydantic_model(obj)
|
||||
|
||||
@@ -6,9 +6,10 @@ from pydantic import BaseModel, ValidationError
|
||||
|
||||
from crewai.agents.parser import OutputParserException
|
||||
|
||||
"""Parser for converting text outputs into Pydantic models."""
|
||||
|
||||
class CrewPydanticOutputParser:
|
||||
"""Parses the text into pydantic models"""
|
||||
"""Parses text outputs into specified Pydantic models."""
|
||||
|
||||
pydantic_object: Type[BaseModel]
|
||||
|
||||
|
||||
@@ -180,12 +180,12 @@ class CrewEvaluator:
|
||||
self._test_result_span = self._telemetry.individual_test_result_span(
|
||||
self.crew,
|
||||
evaluation_result.pydantic.quality,
|
||||
current_task._execution_time,
|
||||
current_task.execution_duration,
|
||||
self.openai_model_name,
|
||||
)
|
||||
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
|
||||
self.run_execution_times[self.iteration].append(
|
||||
current_task._execution_time
|
||||
current_task.execution_duration
|
||||
)
|
||||
else:
|
||||
raise ValueError("Evaluation result is not in the expected format")
|
||||
|
||||
@@ -4,8 +4,10 @@ from typing import Dict, Optional, Union
|
||||
|
||||
from pydantic import BaseModel, Field, PrivateAttr, model_validator
|
||||
|
||||
"""Internationalization support for CrewAI prompts and messages."""
|
||||
|
||||
class I18N(BaseModel):
|
||||
"""Handles loading and retrieving internationalized prompts."""
|
||||
_prompts: Dict[str, Dict[str, str]] = PrivateAttr()
|
||||
prompt_file: Optional[str] = Field(
|
||||
default=None,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import warnings
|
||||
from typing import Any, Optional, Type
|
||||
|
||||
|
||||
@@ -25,14 +26,15 @@ class InternalInstructor:
|
||||
if self.agent and not self.llm:
|
||||
self.llm = self.agent.function_calling_llm or self.agent.llm
|
||||
|
||||
# Lazy import
|
||||
import instructor
|
||||
from litellm import completion
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", UserWarning)
|
||||
import instructor
|
||||
from litellm import completion
|
||||
|
||||
self._client = instructor.from_litellm(
|
||||
completion,
|
||||
mode=instructor.Mode.TOOLS,
|
||||
)
|
||||
self._client = instructor.from_litellm(
|
||||
completion,
|
||||
mode=instructor.Mode.TOOLS,
|
||||
)
|
||||
|
||||
def to_json(self):
|
||||
model = self.to_pydantic()
|
||||
|
||||
@@ -3,8 +3,10 @@ from pathlib import Path
|
||||
|
||||
import appdirs
|
||||
|
||||
"""Path management utilities for CrewAI storage and configuration."""
|
||||
|
||||
def db_storage_path():
|
||||
"""Returns the path for database storage."""
|
||||
app_name = get_project_directory_name()
|
||||
app_author = "CrewAI"
|
||||
|
||||
@@ -14,6 +16,7 @@ def db_storage_path():
|
||||
|
||||
|
||||
def get_project_directory_name():
|
||||
"""Returns the current project directory name."""
|
||||
project_directory_name = os.environ.get("CREWAI_STORAGE_DIR")
|
||||
|
||||
if project_directory_name:
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import logging
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
@@ -5,8 +6,11 @@ from pydantic import BaseModel, Field
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
|
||||
"""Handles planning and coordination of crew tasks."""
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class PlanPerTask(BaseModel):
|
||||
"""Represents a plan for a specific task."""
|
||||
task: str = Field(..., description="The task for which the plan is created")
|
||||
plan: str = Field(
|
||||
...,
|
||||
@@ -15,6 +19,7 @@ class PlanPerTask(BaseModel):
|
||||
|
||||
|
||||
class PlannerTaskPydanticOutput(BaseModel):
|
||||
"""Output format for task planning results."""
|
||||
list_of_plans_per_task: List[PlanPerTask] = Field(
|
||||
...,
|
||||
description="Step by step plan on how the agents can execute their tasks using the available tools with mastery",
|
||||
@@ -22,6 +27,7 @@ class PlannerTaskPydanticOutput(BaseModel):
|
||||
|
||||
|
||||
class CrewPlanner:
|
||||
"""Plans and coordinates the execution of crew tasks."""
|
||||
def __init__(self, tasks: List[Task], planning_agent_llm: Optional[Any] = None):
|
||||
self.tasks = tasks
|
||||
|
||||
@@ -68,19 +74,39 @@ class CrewPlanner:
|
||||
output_pydantic=PlannerTaskPydanticOutput,
|
||||
)
|
||||
|
||||
def _get_agent_knowledge(self, task: Task) -> List[str]:
|
||||
"""
|
||||
Safely retrieve knowledge source content from the task's agent.
|
||||
|
||||
Args:
|
||||
task: The task containing an agent with potential knowledge sources
|
||||
|
||||
Returns:
|
||||
List[str]: A list of knowledge source strings
|
||||
"""
|
||||
try:
|
||||
if task.agent and task.agent.knowledge_sources:
|
||||
return [source.content for source in task.agent.knowledge_sources]
|
||||
except AttributeError:
|
||||
logger.warning("Error accessing agent knowledge sources")
|
||||
return []
|
||||
|
||||
def _create_tasks_summary(self) -> str:
|
||||
"""Creates a summary of all tasks."""
|
||||
tasks_summary = []
|
||||
for idx, task in enumerate(self.tasks):
|
||||
tasks_summary.append(
|
||||
f"""
|
||||
knowledge_list = self._get_agent_knowledge(task)
|
||||
task_summary = f"""
|
||||
Task Number {idx + 1} - {task.description}
|
||||
"task_description": {task.description}
|
||||
"task_expected_output": {task.expected_output}
|
||||
"agent": {task.agent.role if task.agent else "None"}
|
||||
"agent_goal": {task.agent.goal if task.agent else "None"}
|
||||
"task_tools": {task.tools}
|
||||
"agent_tools": {task.agent.tools if task.agent else "None"}
|
||||
"""
|
||||
)
|
||||
"agent_tools": %s%s""" % (
|
||||
f"[{', '.join(str(tool) for tool in task.agent.tools)}]" if task.agent and task.agent.tools else '"agent has no tools"',
|
||||
f',\n "agent_knowledge": "[\\"{knowledge_list[0]}\\"]"' if knowledge_list and str(knowledge_list) != "None" else ""
|
||||
)
|
||||
|
||||
tasks_summary.append(task_summary)
|
||||
return " ".join(tasks_summary)
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
"""Utility for colored console output."""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class Printer:
|
||||
"""Handles colored console output formatting."""
|
||||
|
||||
def print(self, content: str, color: Optional[str] = None):
|
||||
if color == "purple":
|
||||
self._print_purple(content)
|
||||
|
||||
@@ -6,8 +6,10 @@ from pydantic import BaseModel, Field, PrivateAttr, model_validator
|
||||
|
||||
from crewai.utilities.logger import Logger
|
||||
|
||||
"""Controls request rate limiting for API calls."""
|
||||
|
||||
class RPMController(BaseModel):
|
||||
"""Manages requests per minute limiting."""
|
||||
max_rpm: Optional[int] = Field(default=None)
|
||||
logger: Logger = Field(default_factory=lambda: Logger(verbose=False))
|
||||
_current_rpm: int = PrivateAttr(default=0)
|
||||
|
||||
@@ -8,8 +8,10 @@ from crewai.memory.storage.kickoff_task_outputs_storage import (
|
||||
)
|
||||
from crewai.task import Task
|
||||
|
||||
"""Handles storage and retrieval of task execution outputs."""
|
||||
|
||||
class ExecutionLog(BaseModel):
|
||||
"""Represents a log entry for task execution."""
|
||||
task_id: str
|
||||
expected_output: Optional[str] = None
|
||||
output: Dict[str, Any]
|
||||
@@ -22,6 +24,8 @@ class ExecutionLog(BaseModel):
|
||||
return getattr(self, key)
|
||||
|
||||
|
||||
"""Manages storage and retrieval of task outputs."""
|
||||
|
||||
class TaskOutputStorageHandler:
|
||||
def __init__(self) -> None:
|
||||
self.storage = KickoffTaskOutputsSQLiteStorage()
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import warnings
|
||||
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.types.utils import Usage
|
||||
|
||||
@@ -12,11 +14,13 @@ class TokenCalcHandler(CustomLogger):
|
||||
if self.token_cost_process is None:
|
||||
return
|
||||
|
||||
usage: Usage = response_obj["usage"]
|
||||
self.token_cost_process.sum_successful_requests(1)
|
||||
self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens)
|
||||
self.token_cost_process.sum_completion_tokens(usage.completion_tokens)
|
||||
if usage.prompt_tokens_details:
|
||||
self.token_cost_process.sum_cached_prompt_tokens(
|
||||
usage.prompt_tokens_details.cached_tokens
|
||||
)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", UserWarning)
|
||||
usage: Usage = response_obj["usage"]
|
||||
self.token_cost_process.sum_successful_requests(1)
|
||||
self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens)
|
||||
self.token_cost_process.sum_completion_tokens(usage.completion_tokens)
|
||||
if usage.prompt_tokens_details:
|
||||
self.token_cost_process.sum_cached_prompt_tokens(
|
||||
usage.prompt_tokens_details.cached_tokens
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user