Merge branch 'main' into feat/poetry-to-uv-migration

This commit is contained in:
Eduardo Chiarotti
2024-10-07 16:17:59 -03:00
21 changed files with 1238 additions and 378 deletions

View File

@@ -1,5 +1,4 @@
import warnings
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.flow.flow import Flow
@@ -15,5 +14,6 @@ warnings.filterwarnings(
category=UserWarning,
module="pydantic.main",
)
__version__ = "0.65.2"
__all__ = ["Agent", "Crew", "Process", "Task", "Pipeline", "Router", "LLM", "Flow"]

View File

@@ -1,18 +1,19 @@
import os
from inspect import signature
from typing import Any, List, Optional, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
from crewai.agents import CacheHandler
from crewai.utilities import Converter, Prompts
from crewai.tools.agent_tools import AgentTools
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
def mock_agent_ops_provider():
@@ -292,9 +293,9 @@ class Agent(BaseAgent):
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=self._rpm_controller.check_or_wait
if self._rpm_controller
else None,
request_within_rpm_limit=(
self._rpm_controller.check_or_wait if self._rpm_controller else None
),
callbacks=[TokenCalcHandler(self._token_process)],
)

View File

@@ -276,12 +276,13 @@ def tool_install(handle: str):
@tool.command(name="publish")
@click.option("--force", is_flag=True, show_default=True, default=False, help="Bypasses Git remote validations")
@click.option("--public", "is_public", flag_value=True, default=False)
@click.option("--private", "is_public", flag_value=False)
def tool_publish(is_public: bool):
def tool_publish(is_public: bool, force: bool):
tool_cmd = ToolCommand()
tool_cmd.login()
tool_cmd.publish(is_public)
tool_cmd.publish(is_public, force)
@crewai.group()

View File

@@ -2,6 +2,8 @@ from pathlib import Path
import click
from crewai.telemetry import Telemetry
def create_flow(name):
"""Create a new flow."""
@@ -15,6 +17,10 @@ def create_flow(name):
click.secho(f"Error: Folder {folder_name} already exists.", fg="red")
return
# Initialize telemetry
telemetry = Telemetry()
telemetry.flow_creation_span(class_name)
# Create directory structure
(project_root / "src" / folder_name).mkdir(parents=True)
(project_root / "src" / folder_name / "crews").mkdir(parents=True)

View File

@@ -60,8 +60,8 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
finally:
os.chdir(old_directory)
def publish(self, is_public: bool):
if not git.Repository().is_synced():
def publish(self, is_public: bool, force: bool = False):
if not git.Repository().is_synced() and not force:
console.print(
"[bold red]Failed to publish tool.[/bold red]\n"
"Local changes need to be resolved before publishing. Please do the following:\n"

View File

@@ -2,6 +2,7 @@ DARK_GRAY = "#333333"
CREWAI_ORANGE = "#FF5A50"
GRAY = "#666666"
WHITE = "#FFFFFF"
BLACK = "#000000"
COLORS = {
"bg": WHITE,
@@ -16,31 +17,43 @@ COLORS = {
NODE_STYLES = {
"start": {
"color": COLORS["start"],
"color": CREWAI_ORANGE,
"shape": "box",
"font": {"color": COLORS["text"]},
"font": {"color": WHITE},
"margin": {"top": 10, "bottom": 8, "left": 10, "right": 10},
},
"method": {
"color": COLORS["method"],
"color": DARK_GRAY,
"shape": "box",
"font": {"color": COLORS["text"]},
"font": {"color": WHITE},
"margin": {"top": 10, "bottom": 8, "left": 10, "right": 10},
},
"router": {
"color": {
"background": COLORS["router"],
"border": COLORS["router_border"],
"background": DARK_GRAY,
"border": CREWAI_ORANGE,
"highlight": {
"border": COLORS["router_border"],
"background": COLORS["router"],
"border": CREWAI_ORANGE,
"background": DARK_GRAY,
},
},
"shape": "box",
"font": {"color": COLORS["text"]},
"font": {"color": WHITE},
"borderWidth": 3,
"borderWidthSelected": 4,
"shapeProperties": {"borderDashes": [5, 5]},
"margin": {"top": 10, "bottom": 8, "left": 10, "right": 10},
},
"crew": {
"color": {
"background": WHITE,
"border": CREWAI_ORANGE,
},
"shape": "box",
"font": {"color": BLACK},
"borderWidth": 3,
"borderWidthSelected": 4,
"shapeProperties": {"borderDashes": False},
"margin": {"top": 10, "bottom": 8, "left": 10, "right": 10},
},
}

View File

@@ -1,7 +1,5 @@
# flow.py
# flow.py
import asyncio
import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
@@ -9,6 +7,8 @@ from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
from pydantic import BaseModel
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.utils import get_possible_return_constants
from crewai.telemetry import Telemetry
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
@@ -63,12 +63,10 @@ def listen(condition):
return decorator
def router(method, paths=None):
def router(method):
def decorator(func):
func.__is_router__ = True
func.__router_for__ = method.__name__
if paths:
func.__router_paths__ = paths
return func
return decorator
@@ -124,10 +122,11 @@ class FlowMeta(type):
listeners[attr_name] = (condition_type, methods)
elif hasattr(attr_value, "__is_router__"):
routers[attr_value.__router_for__] = attr_name
if hasattr(attr_value, "__router_paths__"):
router_paths[attr_name] = attr_value.__router_paths__
possible_returns = get_possible_return_constants(attr_value)
if possible_returns:
router_paths[attr_name] = possible_returns
# **Register router as a listener to its triggering method**
# Register router as a listener to its triggering method
trigger_method_name = attr_value.__router_for__
methods = [trigger_method_name]
condition_type = "OR"
@@ -142,25 +141,30 @@ class FlowMeta(type):
class Flow(Generic[T], metaclass=FlowMeta):
_telemetry = Telemetry()
_start_methods: List[str] = []
_listeners: Dict[str, tuple[str, List[str]]] = {}
_routers: Dict[str, str] = {}
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
def __class_getitem__(cls, item):
def __class_getitem__(cls, item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls):
_initial_state_T = item
_initial_state_T: Type[T] = item
_FlowGeneric.__name__ = f"{cls.__name__}[{item.__name__}]"
return _FlowGeneric
def __init__(self):
def __init__(self) -> None:
self._methods: Dict[str, Callable] = {}
self._state = self._create_initial_state()
self._state: T = self._create_initial_state()
self._completed_methods: Set[str] = set()
self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs
self._telemetry.flow_creation_span(self.__class__.__name__)
for method_name in dir(self):
if callable(getattr(self, method_name)) and not method_name.startswith(
"__"
@@ -190,6 +194,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
if not self._start_methods:
raise ValueError("No start method defined")
self._telemetry.flow_execution_span(
self.__class__.__name__, list(self._methods.keys())
)
# Create tasks for all start methods
tasks = [
self._execute_start_method(start_method)
@@ -205,11 +213,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
else:
return None # Or raise an exception if no methods were executed
async def _execute_start_method(self, start_method: str):
async def _execute_start_method(self, start_method: str) -> None:
result = await self._execute_method(self._methods[start_method])
await self._execute_listeners(start_method, result)
async def _execute_method(self, method: Callable, *args, **kwargs):
async def _execute_method(self, method: Callable, *args: Any, **kwargs: Any) -> Any:
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
@@ -218,7 +226,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_outputs.append(result) # Store the output
return result
async def _execute_listeners(self, trigger_method: str, result: Any):
async def _execute_listeners(self, trigger_method: str, result: Any) -> None:
listener_tasks = []
if trigger_method in self._routers:
@@ -246,7 +254,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Run all listener tasks concurrently and wait for them to complete
await asyncio.gather(*listener_tasks)
async def _execute_single_listener(self, listener: str, result: Any):
async def _execute_single_listener(self, listener: str, result: Any) -> None:
try:
method = self._methods[listener]
sig = inspect.signature(method)
@@ -270,5 +278,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
traceback.print_exc()
def plot(self, filename: str = "crewai_flow_graph"):
def plot(self, filename: str = "crewai_flow") -> None:
self._telemetry.flow_plotting_span(
self.__class__.__name__, list(self._methods.keys())
)
plot_flow(self, filename)

View File

@@ -30,6 +30,22 @@ class FlowPlot:
layout=None,
)
# Set options to disable physics
net.set_options(
"""
var options = {
"nodes": {
"font": {
"multi": "html"
}
},
"physics": {
"enabled": false
}
}
"""
)
# Calculate levels for nodes
node_levels = calculate_node_levels(self.flow)
@@ -42,24 +58,13 @@ class FlowPlot:
# Add edges to the network
add_edges(net, self.flow, node_positions, self.colors)
# Set options to disable physics
net.set_options(
"""
var options = {
"physics": {
"enabled": false
}
}
"""
)
network_html = net.generate_html()
final_html_content = self._generate_final_html(network_html)
# Save the final HTML content to the file
with open(f"{filename}.html", "w", encoding="utf-8") as f:
f.write(final_html_content)
print(f"Graph saved as {filename}.html")
print(f"Plot saved as {filename}.html")
self._cleanup_pyvis_lib()
@@ -94,6 +99,6 @@ class FlowPlot:
print(f"Error cleaning up {lib_folder}: {e}")
def plot_flow(flow, filename="flow_graph"):
def plot_flow(flow, filename="flow_plot"):
visualizer = FlowPlot(flow)
visualizer.plot(filename)

View File

@@ -1,5 +1,4 @@
import base64
import os
import re
@@ -48,7 +47,7 @@ class HTMLTemplateHandler:
"""
return legend_items_html
def generate_final_html(self, network_body, legend_items_html, title="Flow Graph"):
def generate_final_html(self, network_body, legend_items_html, title="Flow Plot"):
html_template = self.read_template()
logo_svg_base64 = self.encode_logo()

View File

@@ -2,6 +2,12 @@ def get_legend_items(colors):
return [
{"label": "Start Method", "color": colors["start"]},
{"label": "Method", "color": colors["method"]},
{
"label": "Crew Method",
"color": colors["bg"],
"border": colors["start"],
"dashed": False,
},
{
"label": "Router",
"color": colors["router"],
@@ -22,9 +28,10 @@ def generate_legend_items_html(legend_items):
legend_items_html = ""
for item in legend_items:
if "border" in item:
style = "dashed" if item["dashed"] else "solid"
legend_items_html += f"""
<div class="legend-item">
<div class="legend-color-box" style="background-color: {item['color']}; border: 2px dashed {item['border']};"></div>
<div class="legend-color-box" style="background-color: {item['color']}; border: 2px {style} {item['border']}; border-radius: 5px;"></div>
<div>{item['label']}</div>
</div>
"""
@@ -32,14 +39,14 @@ def generate_legend_items_html(legend_items):
style = "dashed" if item["dashed"] else "solid"
legend_items_html += f"""
<div class="legend-item">
<div class="legend-{style}" style="border-bottom: 2px {style} {item['color']};"></div>
<div class="legend-{style}" style="border-bottom: 2px {style} {item['color']}; border-radius: 5px;"></div>
<div>{item['label']}</div>
</div>
"""
else:
legend_items_html += f"""
<div class="legend-item">
<div class="legend-color-box" style="background-color: {item['color']};"></div>
<div class="legend-color-box" style="background-color: {item['color']}; border-radius: 5px;"></div>
<div>{item['label']}</div>
</div>
"""

View File

@@ -1,3 +1,48 @@
import ast
import inspect
import textwrap
def get_possible_return_constants(function):
try:
source = inspect.getsource(function)
except OSError:
# Can't get source code
return None
except Exception as e:
print(f"Error retrieving source code for function {function.__name__}: {e}")
return None
try:
# Remove leading indentation
source = textwrap.dedent(source)
# Parse the source code into an AST
code_ast = ast.parse(source)
except IndentationError as e:
print(f"IndentationError while parsing source code of {function.__name__}: {e}")
print(f"Source code:\n{source}")
return None
except SyntaxError as e:
print(f"SyntaxError while parsing source code of {function.__name__}: {e}")
print(f"Source code:\n{source}")
return None
except Exception as e:
print(f"Unexpected error while parsing source code of {function.__name__}: {e}")
print(f"Source code:\n{source}")
return None
return_values = []
class ReturnVisitor(ast.NodeVisitor):
def visit_Return(self, node):
# Check if the return value is a constant (Python 3.8+)
if isinstance(node.value, ast.Constant):
return_values.append(node.value.value)
ReturnVisitor().visit(code_ast)
return return_values
def calculate_node_levels(flow):
levels = {}
queue = []

View File

@@ -1,3 +1,6 @@
import ast
import inspect
from .utils import (
build_ancestor_dict,
build_parent_children_dict,
@@ -6,6 +9,70 @@ from .utils import (
)
def method_calls_crew(method):
"""Check if the method calls `.crew()`."""
try:
source = inspect.getsource(method)
source = inspect.cleandoc(source)
tree = ast.parse(source)
except Exception as e:
print(f"Could not parse method {method.__name__}: {e}")
return False
class CrewCallVisitor(ast.NodeVisitor):
def __init__(self):
self.found = False
def visit_Call(self, node):
if isinstance(node.func, ast.Attribute):
if node.func.attr == "crew":
self.found = True
self.generic_visit(node)
visitor = CrewCallVisitor()
visitor.visit(tree)
return visitor.found
def add_nodes_to_network(net, flow, node_positions, node_styles):
def human_friendly_label(method_name):
return method_name.replace("_", " ").title()
for method_name, (x, y) in node_positions.items():
method = flow._methods.get(method_name)
if hasattr(method, "__is_start_method__"):
node_style = node_styles["start"]
elif hasattr(method, "__is_router__"):
node_style = node_styles["router"]
elif method_calls_crew(method):
node_style = node_styles["crew"]
else:
node_style = node_styles["method"]
node_style = node_style.copy()
label = human_friendly_label(method_name)
node_style.update(
{
"label": label,
"shape": "box",
"font": {
"multi": "html",
"color": node_style.get("font", {}).get("color", "#FFFFFF"),
},
}
)
net.add_node(
method_name,
x=x,
y=y,
fixed=True,
physics=False,
**node_style,
)
def compute_positions(flow, node_levels, y_spacing=150, x_spacing=150):
level_nodes = {}
node_positions = {}
@@ -109,24 +176,3 @@ def add_edges(net, flow, node_positions, colors):
"smooth": edge_smooth,
}
net.add_edge(router_method_name, listener_name, **edge_style)
def add_nodes_to_network(net, flow, node_positions, node_styles):
for method_name, (x, y) in node_positions.items():
method = flow._methods.get(method_name)
if hasattr(method, "__is_start_method__"):
node_style = node_styles["start"]
elif hasattr(method, "__is_router__"):
node_style = node_styles["router"]
else:
node_style = node_styles["method"]
net.add_node(
method_name,
label=method_name,
x=x,
y=y,
fixed=True,
physics=False,
**node_style,
)

View File

@@ -5,11 +5,6 @@ import os
import shutil
from typing import Any, Dict, List, Optional
from embedchain import App
from embedchain.llm.base import BaseLlm
from embedchain.models.data_type import DataType
from embedchain.vectordb.chroma import InvalidDimensionException
from crewai.memory.storage.interface import Storage
from crewai.utilities.paths import db_storage_path
@@ -29,10 +24,6 @@ def suppress_logging(
logger.setLevel(original_level)
class FakeLLM(BaseLlm):
pass
class RAGStorage(Storage):
"""
Extends Storage to handle embeddings for memory entries, improving
@@ -74,9 +65,19 @@ class RAGStorage(Storage):
if embedder_config:
config["embedder"] = embedder_config
self.type = type
self.app = App.from_config(config=config)
self.config = config
self.allow_reset = allow_reset
def _initialize_app(self):
from embedchain import App
from embedchain.llm.base import BaseLlm
class FakeLLM(BaseLlm):
pass
self.app = App.from_config(config=self.config)
self.app.llm = FakeLLM()
if allow_reset:
if self.allow_reset:
self.app.reset()
def _sanitize_role(self, role: str) -> str:
@@ -86,6 +87,8 @@ class RAGStorage(Storage):
return role.replace("\n", "").replace(" ", "_").replace("/", "_")
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
if not hasattr(self, "app"):
self._initialize_app()
self._generate_embedding(value, metadata)
def search( # type: ignore # BUG?: Signature of "search" incompatible with supertype "Storage"
@@ -95,6 +98,10 @@ class RAGStorage(Storage):
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Any]:
if not hasattr(self, "app"):
self._initialize_app()
from embedchain.vectordb.chroma import InvalidDimensionException
with suppress_logging():
try:
results = (
@@ -108,6 +115,10 @@ class RAGStorage(Storage):
return [r for r in results if r["metadata"]["score"] >= score_threshold]
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> Any:
if not hasattr(self, "app"):
self._initialize_app()
from embedchain.models.data_type import DataType
self.app.add(text, data_type=DataType.TEXT, metadata=metadata)
def reset(self) -> None:

View File

@@ -5,8 +5,8 @@ import json
import os
import platform
import warnings
from typing import TYPE_CHECKING, Any, Optional
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Optional
@contextmanager
@@ -117,9 +117,11 @@ class Telemetry:
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"i18n": agent.i18n.prompt_file,
"function_calling_llm": agent.function_calling_llm.model
if agent.function_calling_llm
else "",
"function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution,
@@ -145,9 +147,9 @@ class Telemetry:
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role
if task.agent
else "None",
"agent_role": (
task.agent.role if task.agent else "None"
),
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
@@ -184,9 +186,11 @@ class Telemetry:
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"function_calling_llm": agent.function_calling_llm.model
if agent.function_calling_llm
else "",
"function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution,
@@ -210,9 +214,9 @@ class Telemetry:
"id": str(task.id),
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role
if task.agent
else "None",
"agent_role": (
task.agent.role if task.agent else "None"
),
"agent_key": task.agent.key if task.agent else None,
"tools_names": [
tool.name.casefold()
@@ -568,3 +572,38 @@ class Telemetry:
return span.set_attribute(key, value)
except Exception:
pass
def flow_creation_span(self, flow_name: str):
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
self._add_attribute(span, "flow_name", flow_name)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def flow_plotting_span(self, flow_name: str, node_names: list[str]):
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Plotting")
self._add_attribute(span, "flow_name", flow_name)
self._add_attribute(span, "node_names", json.dumps(node_names))
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def flow_execution_span(self, flow_name: str, node_names: list[str]):
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Execution")
self._add_attribute(span, "flow_name", flow_name)
self._add_attribute(span, "node_names", json.dumps(node_names))
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass

View File

@@ -1,4 +1,3 @@
from langchain.tools import StructuredTool
from crewai.agents.agent_builder.utilities.base_agent_tool import BaseAgentTools
@@ -6,6 +5,8 @@ class AgentTools(BaseAgentTools):
"""Default tools around agent delegation"""
def tools(self):
from langchain.tools import StructuredTool
coworkers = ", ".join([f"{agent.role}" for agent in self.agents])
tools = [
StructuredTool.from_function(

View File

@@ -1,4 +1,3 @@
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
from crewai.agents.cache import CacheHandler
@@ -14,6 +13,8 @@ class CacheTools(BaseModel):
)
def tool(self):
from langchain.tools import StructuredTool
return StructuredTool.from_function(
func=self.hit_cache,
name=self.name,

View File

@@ -1,8 +1,5 @@
from typing import Any, Optional, Type
import instructor
from litellm import completion
class InternalInstructor:
"""Class that wraps an agent llm with instructor."""
@@ -28,6 +25,10 @@ class InternalInstructor:
if self.agent and not self.llm:
self.llm = self.agent.function_calling_llm or self.agent.llm
# Lazy import
import instructor
from litellm import completion
self._client = instructor.from_litellm(
completion,
mode=instructor.Mode.TOOLS,

View File

@@ -1,4 +1,5 @@
from litellm.integrations.custom_logger import CustomLogger
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess