This commit is contained in:
Brandon Hancock
2024-10-16 10:14:01 -04:00
parent 725d159e44
commit 1090a789ff
10 changed files with 316 additions and 7 deletions

View File

@@ -29,6 +29,7 @@ dependencies = [
"pyvis>=0.3.2",
"uv>=0.4.18",
"tomli-w>=1.1.0",
"blinker>=1.8.2",
]
[project.urls]

View File

@@ -212,6 +212,36 @@ class BaseAgent(ABC, BaseModel):
"""Get the converter class for the agent to create json/pydantic outputs."""
pass
def serialize(self) -> Dict[str, Any]:
"""Serialize the BaseAgent into a dictionary excluding complex objects."""
# Define attributes to exclude from serialization
exclude = {
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"agent_executor",
"cache_handler",
"tools_handler",
"llm",
"crew",
# Add any other complex attributes that should be excluded
}
# Use model_dump or similar to get a dictionary representation
serialized_data = self.model_dump(exclude=exclude)
# Add any additional serialization logic if needed
serialized_data["role"] = self.role
serialized_data["goal"] = self.goal
serialized_data["backstory"] = self.backstory
serialized_data["tools"] = (
[str(tool) for tool in self.tools] if self.tools else None
)
return serialized_data
def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel"
"""Create a deep copy of the Agent."""
exclude = {

View File

@@ -40,6 +40,7 @@ from crewai.utilities.constants import (
)
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.event_helpers import emit_crew_start
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -457,6 +458,7 @@ class Crew(BaseModel):
inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
"""Starts the crew to work on its assigned tasks."""
emit_crew_start(self)
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
self._task_output_handler.reset()
self._logging_color = "bold_purple"
@@ -774,7 +776,9 @@ class Crew(BaseModel):
def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file:
self._file_handler.log(task_name=task.name, task=task.description, agent=role, status="started")
self._file_handler.log(
task_name=task.name, task=task.description, agent=role, status="started"
)
def _update_manager_tools(self, task: Task):
if self.manager_agent:
@@ -796,7 +800,13 @@ class Crew(BaseModel):
def _process_task_result(self, task: Task, output: TaskOutput) -> None:
role = task.agent.role if task.agent is not None else "None"
if self.output_log_file:
self._file_handler.log(task_name=task.name, task=task.description, agent=role, status="completed", output=output.raw)
self._file_handler.log(
task_name=task.name,
task=task.description,
agent=role,
status="completed",
output=output.raw,
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if len(task_outputs) != 1:
@@ -884,6 +894,37 @@ class Crew(BaseModel):
result = self._execute_tasks(self.tasks, start_index, True)
return result
def serialize(self) -> Dict[str, Any]:
"""Serialize the Crew into a dictionary excluding complex objects."""
exclude = {
"_rpm_controller",
"_logger",
"_execution_span",
"_file_handler",
"_cache_handler",
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_telemetry",
"agents",
"tasks",
}
# Serialize agents and tasks to a simpler form if needed
serialized_agents = [agent.serialize() for agent in self.agents]
serialized_tasks = [task.serialize() for task in self.tasks]
serialized_data = self.model_dump(exclude=exclude)
serialized_data = {k: v for k, v in serialized_data.items() if v is not None}
# Add serialized agents and tasks
serialized_data["agents"] = serialized_agents
serialized_data["tasks"] = serialized_tasks
return serialized_data
# TODO: Come back and use the new _serialize method
def copy(self):
"""Create a deep copy of the Crew."""

View File

@@ -0,0 +1 @@
from .event_logger import Event, EventLogger

View File

@@ -0,0 +1,53 @@
import os
from datetime import datetime, timezone
from typing import Any
from dotenv import load_dotenv
from sqlalchemy import JSON, Column, DateTime, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, sessionmaker
load_dotenv()
Base = declarative_base()
class Event(Base):
__tablename__ = "events"
id = Column(Integer, primary_key=True)
event_name = Column(String)
timestamp = Column(DateTime, default=datetime.now(timezone.utc))
crew_id = Column(String)
data = Column(JSON)
error_type = Column(String)
error_message = Column(String)
traceback = Column(String)
DATABASE_URL = os.getenv("CREWAI_DATABASE_URL", "sqlite:///crew_events.db")
engine = create_engine(DATABASE_URL)
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine) # Use a different name to avoid confusion
# Create a session instance
session: Session = SessionLocal()
class EventLogger:
def __init__(self, session: Session):
self.session = session
def log_event(self, event_name: str, *args: Any, **kwargs: Any) -> None:
event = Event(
event_name=event_name,
crew_id=kwargs.get("crew_id", ""),
data=kwargs,
error_type=kwargs.get("error_type", ""),
error_message=kwargs.get("error_message", ""),
traceback=kwargs.get("traceback", ""),
)
self.session.add(event)
self.session.commit()
event_logger = EventLogger(session)

View File

@@ -317,6 +317,35 @@ class Task(BaseModel):
self.processed_by_agents.add(agent_name)
self.delegations += 1
def serialize(self) -> Dict[str, Any]:
"""Serialize the Task into a dictionary excluding complex objects."""
# Define attributes to exclude from serialization
exclude = {
"_telemetry",
"_execution_span",
"_thread",
"_execution_time",
"agent",
"context",
"tools",
"output",
}
# Use model_dump or similar to get a dictionary representation
serialized_data = self.model_dump(exclude=exclude)
# Add any additional serialization logic if needed
serialized_data["id"] = str(self.id)
serialized_data["name"] = self.name
serialized_data["description"] = self.description
serialized_data["expected_output"] = self.expected_output
serialized_data["output_file"] = self.output_file
serialized_data["human_input"] = self.human_input
serialized_data["processed_by_agents"] = list(self.processed_by_agents)
return serialized_data
def copy(
self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"]
) -> "Task":

View File

@@ -0,0 +1,44 @@
from enum import Enum
from typing import Any, Callable
from blinker import signal
class CrewEvents(Enum):
CREW_START = "crew_start"
CREW_FINISH = "crew_finish"
CREW_FAILURE = "crew_failure"
TASK_START = "task_start"
TASK_FINISH = "task_finish"
TASK_FAILURE = "task_failure"
AGENT_ACTION = "agent_action"
TOOL_USE = "tool_use"
TOKEN_USAGE = "token_usage"
class CrewEventEmitter:
def __init__(self):
self._all_signal = signal("all")
def on(self, event_name: CrewEvents, callback: Callable) -> None:
if event_name == "*":
self._all_signal.connect(callback)
else:
signal(event_name.value).connect(callback)
def emit(self, event_name: CrewEvents, *args: Any, **kwargs: Any) -> None:
signal(event_name.value).send(*args, **kwargs)
self._all_signal.send(event_name, *args, **kwargs)
crew_events = CrewEventEmitter()
def emit(event_name: CrewEvents, *args: Any, **kwargs: Any) -> None:
try:
crew_events.emit(event_name, *args, **kwargs)
except Exception as e:
if kwargs.get("raise_on_error", False):
raise e
else:
print(f"Error emitting event: {e}")

View File

@@ -0,0 +1,99 @@
# event_helpers.py
from datetime import datetime
from typing import Any, Dict, Optional
from crewai.crew import Crew
from crewai.utilities.event_emitter import CrewEvents, emit
def emit_crew_start(
crew: Crew,
inputs: Optional[Dict[str, Any]] = None,
) -> None:
serialized_crew = crew.serialize()
emit(
CrewEvents.CREW_START,
{**serialized_crew, "inputs": inputs},
)
def emit_crew_finish(crew_id: str, name: str, result: Any, duration: float) -> None:
emit(
CrewEvents.CREW_FINISH,
{
"crew_id": crew_id,
"name": name,
"finish_time": datetime.now().isoformat(),
"result": result,
"duration": duration,
},
)
def emit_crew_failure(
crew_id: str, name: str, error: Exception, traceback: str, duration: float
) -> None:
emit(
CrewEvents.CREW_FAILURE,
{
"crew_id": crew_id,
"name": name,
"failure_time": datetime.now().isoformat(),
"error_type": type(error).__name__,
"error_message": str(error),
"traceback": traceback,
"duration": duration,
},
)
def emit_task_start(crew_id: str, task_id: str, task_name: str) -> None:
emit(
CrewEvents.TASK_START,
{
"crew_id": crew_id,
"task_id": task_id,
"task_name": task_name,
"start_time": datetime.now().isoformat(),
},
)
def emit_task_finish(
crew_id: str, task_id: str, task_name: str, result: Any, duration: float
) -> None:
emit(
CrewEvents.TASK_FINISH,
{
"crew_id": crew_id,
"task_id": task_id,
"task_name": task_name,
"finish_time": datetime.now().isoformat(),
"result": result,
"duration": duration,
},
)
def emit_task_failure(
crew_id: str,
task_id: str,
task_name: str,
error: Exception,
traceback: str,
duration: float,
) -> None:
emit(
CrewEvents.TASK_FAILURE,
{
"crew_id": crew_id,
"task_id": task_id,
"task_name": task_name,
"failure_time": datetime.now().isoformat(),
"error_type": type(error).__name__,
"error_message": str(error),
"traceback": traceback,
"duration": duration,
},
)

View File

@@ -1,7 +1,7 @@
from typing import Any, Callable, Generic, List, Dict, Type, TypeVar
from functools import wraps
from pydantic import BaseModel
from typing import Any, Callable, Dict, Generic, List, Type, TypeVar
from pydantic import BaseModel
T = TypeVar("T")
EVT = TypeVar("EVT", bound=BaseModel)

17
uv.lock generated
View File

@@ -290,6 +290,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b1/fe/e8c672695b37eecc5cbf43e1d0638d88d66ba3a44c4d321c796f4e59167f/beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed", size = 147925 },
]
[[package]]
name = "blinker"
version = "1.8.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/1e/57/a6a1721eff09598fb01f3c7cda070c1b6a0f12d63c83236edf79a440abcc/blinker-1.8.2.tar.gz", hash = "sha256:8f77b09d3bf7c795e969e9486f39c2c5e9c39d4ee07424be2bc594ece9642d83", size = 23161 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bb/2a/10164ed1f31196a2f7f3799368a821765c62851ead0e630ab52b8e14b4d0/blinker-1.8.2-py3-none-any.whl", hash = "sha256:1779309f71bf239144b9399d06ae925637cf6634cf6bd131104184531bf67c01", size = 9456 },
]
[[package]]
name = "boto3"
version = "1.35.32"
@@ -627,12 +636,13 @@ wheels = [
[[package]]
name = "crewai"
version = "0.67.1"
version = "0.70.1"
source = { editable = "." }
dependencies = [
{ name = "agentops" },
{ name = "appdirs" },
{ name = "auth0-python" },
{ name = "blinker" },
{ name = "click" },
{ name = "crewai-tools" },
{ name = "embedchain" },
@@ -687,6 +697,7 @@ requires-dist = [
{ name = "agentops", marker = "extra == 'agentops'", specifier = ">=0.3.0" },
{ name = "appdirs", specifier = ">=1.4.4" },
{ name = "auth0-python", specifier = ">=4.7.1" },
{ name = "blinker" },
{ name = "click", specifier = ">=8.1.7" },
{ name = "crewai-tools", specifier = ">=0.12.1" },
{ name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.12.1" },
@@ -2496,6 +2507,8 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b2/07/8cbb75d6cfbe8712d8f7f6a5615f083c6e710ab916b748fbb20373ddb142/multiprocess-0.70.17-py311-none-any.whl", hash = "sha256:2884701445d0177aec5bd5f6ee0df296773e4fb65b11903b94c613fb46cfb7d1", size = 144346 },
{ url = "https://files.pythonhosted.org/packages/a4/69/d3f343a61a2f86ef10ed7865a26beda7c71554136ce187b0384b1c2c9ca3/multiprocess-0.70.17-py312-none-any.whl", hash = "sha256:2818af14c52446b9617d1b0755fa70ca2f77c28b25ed97bdaa2c69a22c47b46c", size = 147990 },
{ url = "https://files.pythonhosted.org/packages/c8/b7/2e9a4fcd871b81e1f2a812cd5c6fb52ad1e8da7bf0d7646c55eaae220484/multiprocess-0.70.17-py313-none-any.whl", hash = "sha256:20c28ca19079a6c879258103a6d60b94d4ffe2d9da07dda93fb1c8bc6243f522", size = 149843 },
{ url = "https://files.pythonhosted.org/packages/ae/d7/fd7a092fc0ab1845a1a97ca88e61b9b7cc2e9d6fcf0ed24e9480590c2336/multiprocess-0.70.17-py38-none-any.whl", hash = "sha256:1d52f068357acd1e5bbc670b273ef8f81d57863235d9fbf9314751886e141968", size = 132635 },
{ url = "https://files.pythonhosted.org/packages/f9/41/0618ac724b8a56254962c143759e04fa01c73b37aa69dd433f16643bd38b/multiprocess-0.70.17-py39-none-any.whl", hash = "sha256:c3feb874ba574fbccfb335980020c1ac631fbf2a3f7bee4e2042ede62558a021", size = 133359 },
]
[[package]]
@@ -3179,8 +3192,6 @@ version = "5.9.8"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/90/c7/6dc0a455d111f68ee43f27793971cf03fe29b6ef972042549db29eec39a2/psutil-5.9.8.tar.gz", hash = "sha256:6be126e3225486dff286a8fb9a06246a5253f4c7c53b475ea5f5ac934e64194c", size = 503247 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fe/5f/c26deb822fd3daf8fde4bdb658bf87d9ab1ffd3fca483816e89a9a9a9084/psutil-5.9.8-cp27-none-win32.whl", hash = "sha256:36f435891adb138ed3c9e58c6af3e2e6ca9ac2f365efe1f9cfef2794e6c93b4e", size = 248660 },
{ url = "https://files.pythonhosted.org/packages/32/1d/cf66073d74d6146187e2d0081a7616df4437214afa294ee4f16f80a2f96a/psutil-5.9.8-cp27-none-win_amd64.whl", hash = "sha256:bd1184ceb3f87651a67b2708d4c3338e9b10c5df903f2e3776b62303b26cb631", size = 251966 },
{ url = "https://files.pythonhosted.org/packages/e7/e3/07ae864a636d70a8a6f58da27cb1179192f1140d5d1da10886ade9405797/psutil-5.9.8-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:aee678c8720623dc456fa20659af736241f575d79429a0e5e9cf88ae0605cc81", size = 248702 },
{ url = "https://files.pythonhosted.org/packages/b3/bd/28c5f553667116b2598b9cc55908ec435cb7f77a34f2bff3e3ca765b0f78/psutil-5.9.8-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8cb6403ce6d8e047495a701dc7c5bd788add903f8986d523e3e20b98b733e421", size = 285242 },
{ url = "https://files.pythonhosted.org/packages/c5/4f/0e22aaa246f96d6ac87fe5ebb9c5a693fbe8877f537a1022527c47ca43c5/psutil-5.9.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d06016f7f8625a1825ba3732081d77c94589dca78b7a3fc072194851e88461a4", size = 288191 },