From 1090a789ffe595fd02154190d88e737fe51b66a4 Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Wed, 16 Oct 2024 10:14:01 -0400 Subject: [PATCH] wip --- pyproject.toml | 1 + src/crewai/agents/agent_builder/base_agent.py | 30 ++++++ src/crewai/crew.py | 45 ++++++++- src/crewai/logging/__init__.py | 1 + src/crewai/logging/event_logger.py | 53 ++++++++++ src/crewai/task.py | 29 ++++++ src/crewai/utilities/event_emitter.py | 44 +++++++++ src/crewai/utilities/event_helpers.py | 99 +++++++++++++++++++ src/crewai/utilities/events.py | 4 +- uv.lock | 17 +++- 10 files changed, 316 insertions(+), 7 deletions(-) create mode 100644 src/crewai/logging/__init__.py create mode 100644 src/crewai/logging/event_logger.py create mode 100644 src/crewai/utilities/event_emitter.py create mode 100644 src/crewai/utilities/event_helpers.py diff --git a/pyproject.toml b/pyproject.toml index 2dbcdedc9..126492129 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "pyvis>=0.3.2", "uv>=0.4.18", "tomli-w>=1.1.0", + "blinker>=1.8.2", ] [project.urls] diff --git a/src/crewai/agents/agent_builder/base_agent.py b/src/crewai/agents/agent_builder/base_agent.py index f42ab3172..0e93ed0e8 100644 --- a/src/crewai/agents/agent_builder/base_agent.py +++ b/src/crewai/agents/agent_builder/base_agent.py @@ -212,6 +212,36 @@ class BaseAgent(ABC, BaseModel): """Get the converter class for the agent to create json/pydantic outputs.""" pass + def serialize(self) -> Dict[str, Any]: + """Serialize the BaseAgent into a dictionary excluding complex objects.""" + + # Define attributes to exclude from serialization + exclude = { + "_logger", + "_rpm_controller", + "_request_within_rpm_limit", + "_token_process", + "agent_executor", + "cache_handler", + "tools_handler", + "llm", + "crew", + # Add any other complex attributes that should be excluded + } + + # Use model_dump or similar to get a dictionary representation + serialized_data = self.model_dump(exclude=exclude) + + # Add any additional serialization logic if needed + serialized_data["role"] = self.role + serialized_data["goal"] = self.goal + serialized_data["backstory"] = self.backstory + serialized_data["tools"] = ( + [str(tool) for tool in self.tools] if self.tools else None + ) + + return serialized_data + def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel" """Create a deep copy of the Agent.""" exclude = { diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 29baa4499..800362939 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -40,6 +40,7 @@ from crewai.utilities.constants import ( ) from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator +from crewai.utilities.event_helpers import emit_crew_start from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, @@ -457,6 +458,7 @@ class Crew(BaseModel): inputs: Optional[Dict[str, Any]] = None, ) -> CrewOutput: """Starts the crew to work on its assigned tasks.""" + emit_crew_start(self) self._execution_span = self._telemetry.crew_execution_span(self, inputs) self._task_output_handler.reset() self._logging_color = "bold_purple" @@ -774,7 +776,9 @@ class Crew(BaseModel): def _log_task_start(self, task: Task, role: str = "None"): if self.output_log_file: - self._file_handler.log(task_name=task.name, task=task.description, agent=role, status="started") + self._file_handler.log( + task_name=task.name, task=task.description, agent=role, status="started" + ) def _update_manager_tools(self, task: Task): if self.manager_agent: @@ -796,7 +800,13 @@ class Crew(BaseModel): def _process_task_result(self, task: Task, output: TaskOutput) -> None: role = task.agent.role if task.agent is not None else "None" if self.output_log_file: - self._file_handler.log(task_name=task.name, task=task.description, agent=role, status="completed", output=output.raw) + self._file_handler.log( + task_name=task.name, + task=task.description, + agent=role, + status="completed", + output=output.raw, + ) def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput: if len(task_outputs) != 1: @@ -884,6 +894,37 @@ class Crew(BaseModel): result = self._execute_tasks(self.tasks, start_index, True) return result + def serialize(self) -> Dict[str, Any]: + """Serialize the Crew into a dictionary excluding complex objects.""" + + exclude = { + "_rpm_controller", + "_logger", + "_execution_span", + "_file_handler", + "_cache_handler", + "_short_term_memory", + "_long_term_memory", + "_entity_memory", + "_telemetry", + "agents", + "tasks", + } + + # Serialize agents and tasks to a simpler form if needed + serialized_agents = [agent.serialize() for agent in self.agents] + serialized_tasks = [task.serialize() for task in self.tasks] + + serialized_data = self.model_dump(exclude=exclude) + serialized_data = {k: v for k, v in serialized_data.items() if v is not None} + + # Add serialized agents and tasks + serialized_data["agents"] = serialized_agents + serialized_data["tasks"] = serialized_tasks + + return serialized_data + + # TODO: Come back and use the new _serialize method def copy(self): """Create a deep copy of the Crew.""" diff --git a/src/crewai/logging/__init__.py b/src/crewai/logging/__init__.py new file mode 100644 index 000000000..4627b4f25 --- /dev/null +++ b/src/crewai/logging/__init__.py @@ -0,0 +1 @@ +from .event_logger import Event, EventLogger diff --git a/src/crewai/logging/event_logger.py b/src/crewai/logging/event_logger.py new file mode 100644 index 000000000..e958d508b --- /dev/null +++ b/src/crewai/logging/event_logger.py @@ -0,0 +1,53 @@ +import os +from datetime import datetime, timezone +from typing import Any + +from dotenv import load_dotenv +from sqlalchemy import JSON, Column, DateTime, Integer, String, create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import Session, sessionmaker + +load_dotenv() + +Base = declarative_base() + + +class Event(Base): + __tablename__ = "events" + id = Column(Integer, primary_key=True) + event_name = Column(String) + timestamp = Column(DateTime, default=datetime.now(timezone.utc)) + crew_id = Column(String) + data = Column(JSON) + error_type = Column(String) + error_message = Column(String) + traceback = Column(String) + + +DATABASE_URL = os.getenv("CREWAI_DATABASE_URL", "sqlite:///crew_events.db") +engine = create_engine(DATABASE_URL) +Base.metadata.create_all(engine) +SessionLocal = sessionmaker(bind=engine) # Use a different name to avoid confusion + +# Create a session instance +session: Session = SessionLocal() + + +class EventLogger: + def __init__(self, session: Session): + self.session = session + + def log_event(self, event_name: str, *args: Any, **kwargs: Any) -> None: + event = Event( + event_name=event_name, + crew_id=kwargs.get("crew_id", ""), + data=kwargs, + error_type=kwargs.get("error_type", ""), + error_message=kwargs.get("error_message", ""), + traceback=kwargs.get("traceback", ""), + ) + self.session.add(event) + self.session.commit() + + +event_logger = EventLogger(session) diff --git a/src/crewai/task.py b/src/crewai/task.py index 82baa9959..536b1d500 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -317,6 +317,35 @@ class Task(BaseModel): self.processed_by_agents.add(agent_name) self.delegations += 1 + def serialize(self) -> Dict[str, Any]: + """Serialize the Task into a dictionary excluding complex objects.""" + + # Define attributes to exclude from serialization + exclude = { + "_telemetry", + "_execution_span", + "_thread", + "_execution_time", + "agent", + "context", + "tools", + "output", + } + + # Use model_dump or similar to get a dictionary representation + serialized_data = self.model_dump(exclude=exclude) + + # Add any additional serialization logic if needed + serialized_data["id"] = str(self.id) + serialized_data["name"] = self.name + serialized_data["description"] = self.description + serialized_data["expected_output"] = self.expected_output + serialized_data["output_file"] = self.output_file + serialized_data["human_input"] = self.human_input + serialized_data["processed_by_agents"] = list(self.processed_by_agents) + + return serialized_data + def copy( self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"] ) -> "Task": diff --git a/src/crewai/utilities/event_emitter.py b/src/crewai/utilities/event_emitter.py new file mode 100644 index 000000000..4f64ea89c --- /dev/null +++ b/src/crewai/utilities/event_emitter.py @@ -0,0 +1,44 @@ +from enum import Enum +from typing import Any, Callable + +from blinker import signal + + +class CrewEvents(Enum): + CREW_START = "crew_start" + CREW_FINISH = "crew_finish" + CREW_FAILURE = "crew_failure" + TASK_START = "task_start" + TASK_FINISH = "task_finish" + TASK_FAILURE = "task_failure" + AGENT_ACTION = "agent_action" + TOOL_USE = "tool_use" + TOKEN_USAGE = "token_usage" + + +class CrewEventEmitter: + def __init__(self): + self._all_signal = signal("all") + + def on(self, event_name: CrewEvents, callback: Callable) -> None: + if event_name == "*": + self._all_signal.connect(callback) + else: + signal(event_name.value).connect(callback) + + def emit(self, event_name: CrewEvents, *args: Any, **kwargs: Any) -> None: + signal(event_name.value).send(*args, **kwargs) + self._all_signal.send(event_name, *args, **kwargs) + + +crew_events = CrewEventEmitter() + + +def emit(event_name: CrewEvents, *args: Any, **kwargs: Any) -> None: + try: + crew_events.emit(event_name, *args, **kwargs) + except Exception as e: + if kwargs.get("raise_on_error", False): + raise e + else: + print(f"Error emitting event: {e}") diff --git a/src/crewai/utilities/event_helpers.py b/src/crewai/utilities/event_helpers.py new file mode 100644 index 000000000..647ad8f5b --- /dev/null +++ b/src/crewai/utilities/event_helpers.py @@ -0,0 +1,99 @@ +# event_helpers.py + +from datetime import datetime +from typing import Any, Dict, Optional + +from crewai.crew import Crew +from crewai.utilities.event_emitter import CrewEvents, emit + + +def emit_crew_start( + crew: Crew, + inputs: Optional[Dict[str, Any]] = None, +) -> None: + serialized_crew = crew.serialize() + emit( + CrewEvents.CREW_START, + {**serialized_crew, "inputs": inputs}, + ) + + +def emit_crew_finish(crew_id: str, name: str, result: Any, duration: float) -> None: + emit( + CrewEvents.CREW_FINISH, + { + "crew_id": crew_id, + "name": name, + "finish_time": datetime.now().isoformat(), + "result": result, + "duration": duration, + }, + ) + + +def emit_crew_failure( + crew_id: str, name: str, error: Exception, traceback: str, duration: float +) -> None: + emit( + CrewEvents.CREW_FAILURE, + { + "crew_id": crew_id, + "name": name, + "failure_time": datetime.now().isoformat(), + "error_type": type(error).__name__, + "error_message": str(error), + "traceback": traceback, + "duration": duration, + }, + ) + + +def emit_task_start(crew_id: str, task_id: str, task_name: str) -> None: + emit( + CrewEvents.TASK_START, + { + "crew_id": crew_id, + "task_id": task_id, + "task_name": task_name, + "start_time": datetime.now().isoformat(), + }, + ) + + +def emit_task_finish( + crew_id: str, task_id: str, task_name: str, result: Any, duration: float +) -> None: + emit( + CrewEvents.TASK_FINISH, + { + "crew_id": crew_id, + "task_id": task_id, + "task_name": task_name, + "finish_time": datetime.now().isoformat(), + "result": result, + "duration": duration, + }, + ) + + +def emit_task_failure( + crew_id: str, + task_id: str, + task_name: str, + error: Exception, + traceback: str, + duration: float, +) -> None: + emit( + CrewEvents.TASK_FAILURE, + { + "crew_id": crew_id, + "task_id": task_id, + "task_name": task_name, + "failure_time": datetime.now().isoformat(), + "error_type": type(error).__name__, + "error_message": str(error), + "traceback": traceback, + "duration": duration, + }, + ) diff --git a/src/crewai/utilities/events.py b/src/crewai/utilities/events.py index 75425fca2..11175e0d2 100644 --- a/src/crewai/utilities/events.py +++ b/src/crewai/utilities/events.py @@ -1,7 +1,7 @@ -from typing import Any, Callable, Generic, List, Dict, Type, TypeVar from functools import wraps -from pydantic import BaseModel +from typing import Any, Callable, Dict, Generic, List, Type, TypeVar +from pydantic import BaseModel T = TypeVar("T") EVT = TypeVar("EVT", bound=BaseModel) diff --git a/uv.lock b/uv.lock index 6abd2b4a8..aebba0422 100644 --- a/uv.lock +++ b/uv.lock @@ -290,6 +290,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b1/fe/e8c672695b37eecc5cbf43e1d0638d88d66ba3a44c4d321c796f4e59167f/beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed", size = 147925 }, ] +[[package]] +name = "blinker" +version = "1.8.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/1e/57/a6a1721eff09598fb01f3c7cda070c1b6a0f12d63c83236edf79a440abcc/blinker-1.8.2.tar.gz", hash = "sha256:8f77b09d3bf7c795e969e9486f39c2c5e9c39d4ee07424be2bc594ece9642d83", size = 23161 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bb/2a/10164ed1f31196a2f7f3799368a821765c62851ead0e630ab52b8e14b4d0/blinker-1.8.2-py3-none-any.whl", hash = "sha256:1779309f71bf239144b9399d06ae925637cf6634cf6bd131104184531bf67c01", size = 9456 }, +] + [[package]] name = "boto3" version = "1.35.32" @@ -627,12 +636,13 @@ wheels = [ [[package]] name = "crewai" -version = "0.67.1" +version = "0.70.1" source = { editable = "." } dependencies = [ { name = "agentops" }, { name = "appdirs" }, { name = "auth0-python" }, + { name = "blinker" }, { name = "click" }, { name = "crewai-tools" }, { name = "embedchain" }, @@ -687,6 +697,7 @@ requires-dist = [ { name = "agentops", marker = "extra == 'agentops'", specifier = ">=0.3.0" }, { name = "appdirs", specifier = ">=1.4.4" }, { name = "auth0-python", specifier = ">=4.7.1" }, + { name = "blinker" }, { name = "click", specifier = ">=8.1.7" }, { name = "crewai-tools", specifier = ">=0.12.1" }, { name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.12.1" }, @@ -2496,6 +2507,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b2/07/8cbb75d6cfbe8712d8f7f6a5615f083c6e710ab916b748fbb20373ddb142/multiprocess-0.70.17-py311-none-any.whl", hash = "sha256:2884701445d0177aec5bd5f6ee0df296773e4fb65b11903b94c613fb46cfb7d1", size = 144346 }, { url = "https://files.pythonhosted.org/packages/a4/69/d3f343a61a2f86ef10ed7865a26beda7c71554136ce187b0384b1c2c9ca3/multiprocess-0.70.17-py312-none-any.whl", hash = "sha256:2818af14c52446b9617d1b0755fa70ca2f77c28b25ed97bdaa2c69a22c47b46c", size = 147990 }, { url = "https://files.pythonhosted.org/packages/c8/b7/2e9a4fcd871b81e1f2a812cd5c6fb52ad1e8da7bf0d7646c55eaae220484/multiprocess-0.70.17-py313-none-any.whl", hash = "sha256:20c28ca19079a6c879258103a6d60b94d4ffe2d9da07dda93fb1c8bc6243f522", size = 149843 }, + { url = "https://files.pythonhosted.org/packages/ae/d7/fd7a092fc0ab1845a1a97ca88e61b9b7cc2e9d6fcf0ed24e9480590c2336/multiprocess-0.70.17-py38-none-any.whl", hash = "sha256:1d52f068357acd1e5bbc670b273ef8f81d57863235d9fbf9314751886e141968", size = 132635 }, + { url = "https://files.pythonhosted.org/packages/f9/41/0618ac724b8a56254962c143759e04fa01c73b37aa69dd433f16643bd38b/multiprocess-0.70.17-py39-none-any.whl", hash = "sha256:c3feb874ba574fbccfb335980020c1ac631fbf2a3f7bee4e2042ede62558a021", size = 133359 }, ] [[package]] @@ -3179,8 +3192,6 @@ version = "5.9.8" source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/90/c7/6dc0a455d111f68ee43f27793971cf03fe29b6ef972042549db29eec39a2/psutil-5.9.8.tar.gz", hash = "sha256:6be126e3225486dff286a8fb9a06246a5253f4c7c53b475ea5f5ac934e64194c", size = 503247 } wheels = [ - { url = "https://files.pythonhosted.org/packages/fe/5f/c26deb822fd3daf8fde4bdb658bf87d9ab1ffd3fca483816e89a9a9a9084/psutil-5.9.8-cp27-none-win32.whl", hash = "sha256:36f435891adb138ed3c9e58c6af3e2e6ca9ac2f365efe1f9cfef2794e6c93b4e", size = 248660 }, - { url = "https://files.pythonhosted.org/packages/32/1d/cf66073d74d6146187e2d0081a7616df4437214afa294ee4f16f80a2f96a/psutil-5.9.8-cp27-none-win_amd64.whl", hash = "sha256:bd1184ceb3f87651a67b2708d4c3338e9b10c5df903f2e3776b62303b26cb631", size = 251966 }, { url = "https://files.pythonhosted.org/packages/e7/e3/07ae864a636d70a8a6f58da27cb1179192f1140d5d1da10886ade9405797/psutil-5.9.8-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:aee678c8720623dc456fa20659af736241f575d79429a0e5e9cf88ae0605cc81", size = 248702 }, { url = "https://files.pythonhosted.org/packages/b3/bd/28c5f553667116b2598b9cc55908ec435cb7f77a34f2bff3e3ca765b0f78/psutil-5.9.8-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8cb6403ce6d8e047495a701dc7c5bd788add903f8986d523e3e20b98b733e421", size = 285242 }, { url = "https://files.pythonhosted.org/packages/c5/4f/0e22aaa246f96d6ac87fe5ebb9c5a693fbe8877f537a1022527c47ca43c5/psutil-5.9.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d06016f7f8625a1825ba3732081d77c94589dca78b7a3fc072194851e88461a4", size = 288191 },