Properly saving events with session id to local sqlite db

This commit is contained in:
Brandon Hancock
2024-10-16 15:57:37 -04:00
parent 1090a789ff
commit 296965b51d
6 changed files with 79 additions and 18 deletions

View File

@@ -1,8 +1,10 @@
import warnings import warnings
from crewai.agent import Agent from crewai.agent import Agent
from crewai.crew import Crew from crewai.crew import Crew
from crewai.flow.flow import Flow from crewai.flow.flow import Flow
from crewai.llm import LLM from crewai.llm import LLM
from crewai.logging.event_logger import EventLogger
from crewai.pipeline import Pipeline from crewai.pipeline import Pipeline
from crewai.process import Process from crewai.process import Process
from crewai.routers import Router from crewai.routers import Router

View File

@@ -215,7 +215,6 @@ class BaseAgent(ABC, BaseModel):
def serialize(self) -> Dict[str, Any]: def serialize(self) -> Dict[str, Any]:
"""Serialize the BaseAgent into a dictionary excluding complex objects.""" """Serialize the BaseAgent into a dictionary excluding complex objects."""
# Define attributes to exclude from serialization
exclude = { exclude = {
"_logger", "_logger",
"_rpm_controller", "_rpm_controller",
@@ -229,8 +228,8 @@ class BaseAgent(ABC, BaseModel):
# Add any other complex attributes that should be excluded # Add any other complex attributes that should be excluded
} }
# Use model_dump or similar to get a dictionary representation
serialized_data = self.model_dump(exclude=exclude) serialized_data = self.model_dump(exclude=exclude)
serialized_data = {k: v for k, v in serialized_data.items() if v is not None}
# Add any additional serialization logic if needed # Add any additional serialization logic if needed
serialized_data["role"] = self.role serialized_data["role"] = self.role
@@ -240,6 +239,9 @@ class BaseAgent(ABC, BaseModel):
[str(tool) for tool in self.tools] if self.tools else None [str(tool) for tool in self.tools] if self.tools else None
) )
# Include the UUID as a string
serialized_data["id"] = str(self.id)
return serialized_data return serialized_data
def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel" def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel"

View File

@@ -922,6 +922,9 @@ class Crew(BaseModel):
serialized_data["agents"] = serialized_agents serialized_data["agents"] = serialized_agents
serialized_data["tasks"] = serialized_tasks serialized_data["tasks"] = serialized_tasks
# Include the UUID as a string
serialized_data["id"] = str(self.id)
return serialized_data return serialized_data
# TODO: Come back and use the new _serialize method # TODO: Come back and use the new _serialize method

View File

@@ -1,23 +1,39 @@
import json
import os import os
import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any from typing import Any, Optional
from dotenv import load_dotenv from dotenv import load_dotenv
from sqlalchemy import JSON, Column, DateTime, Integer, String, create_engine from sqlalchemy import JSON, Column, DateTime, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, sessionmaker from sqlalchemy.orm import Session as SQLAlchemySession
from sqlalchemy.orm import sessionmaker
from crewai.utilities.event_emitter import crew_events
load_dotenv() load_dotenv()
Base = declarative_base() Base = declarative_base()
class Session:
_session_id: Optional[str] = None
@classmethod
def get_session_id(cls) -> str:
if cls._session_id is None:
cls._session_id = str(uuid.uuid4()) # Generate a new UUID
print(f"Generated new session ID: {cls._session_id}")
return cls._session_id
class Event(Base): class Event(Base):
__tablename__ = "events" __tablename__ = "events"
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
event_name = Column(String) event_name = Column(String)
timestamp = Column(DateTime, default=datetime.now(timezone.utc)) timestamp = Column(DateTime, default=datetime.now(timezone.utc))
crew_id = Column(String) session_id = Column(String)
data = Column(JSON) data = Column(JSON)
error_type = Column(String) error_type = Column(String)
error_message = Column(String) error_message = Column(String)
@@ -30,24 +46,49 @@ Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine) # Use a different name to avoid confusion SessionLocal = sessionmaker(bind=engine) # Use a different name to avoid confusion
# Create a session instance # Create a session instance
session: Session = SessionLocal() session: SQLAlchemySession = SessionLocal()
class EventLogger: class EventLogger:
def __init__(self, session: Session): def __init__(self, session: SQLAlchemySession):
self.session = session self.session = session
def log_event(self, event_name: str, *args: Any, **kwargs: Any) -> None: def log_event(self, *args: Any, **kwargs: Any) -> None:
# Extract event name from kwargs
event_name = kwargs.pop("event", "unknown_event")
print("Logging event:", event_name)
print("Args:", args)
print("Kwargs:", kwargs)
# Check if args is a single dictionary and unpack it
if len(args) == 1 and isinstance(args[0], dict):
args_dict = args[0]
else:
# Convert args to a dictionary with keys like 'arg0', 'arg1', etc.
args_dict = {f"arg{i}": arg for i, arg in enumerate(args)}
# Merge args_dict and kwargs into a single dictionary
data = {**args_dict, **kwargs}
print("Data:", data)
event = Event( event = Event(
event_name=event_name, event_name=event_name,
crew_id=kwargs.get("crew_id", ""), session_id=Session.get_session_id(),
data=kwargs, data=json.dumps(data),
error_type=kwargs.get("error_type", ""), error_type=kwargs.get("error_type", ""),
error_message=kwargs.get("error_message", ""), error_message=kwargs.get("error_message", ""),
traceback=kwargs.get("traceback", ""), traceback=kwargs.get("traceback", ""),
) )
self.session.add(event) self.session.add(event)
self.session.commit() self.session.commit()
print("Successfully logged event:", event_name)
event_logger = EventLogger(session) event_logger = EventLogger(session)
print("Connecting event_logger to all signals")
crew_events.on("*", event_logger.log_event)
print("Connected event_logger to all signals")

View File

@@ -20,21 +20,32 @@ class CrewEventEmitter:
def __init__(self): def __init__(self):
self._all_signal = signal("all") self._all_signal = signal("all")
def on(self, event_name: CrewEvents, callback: Callable) -> None: def on(self, event_name: CrewEvents | str, callback: Callable) -> None:
if event_name == "*": print("Connecting signal:", event_name)
self._all_signal.connect(callback) if event_name == "*" or event_name == "all":
self._all_signal.connect(callback, weak=False)
print("Connected to all_signal")
else: else:
signal(event_name.value).connect(callback) signal(
event_name.value if isinstance(event_name, CrewEvents) else event_name
).connect(callback, weak=False)
def emit(self, event_name: CrewEvents, *args: Any, **kwargs: Any) -> None: def emit(self, event_name: CrewEvents, *args: Any, **kwargs: Any) -> None:
print(f"Emitting signal: {event_name.value}")
print("args", args)
print("kwargs", kwargs)
signal(event_name.value).send(*args, **kwargs) signal(event_name.value).send(*args, **kwargs)
self._all_signal.send(event_name, *args, **kwargs) print(f"Emitting all signal for: {event_name.value}")
self._all_signal.send(*args, event=event_name.value, **kwargs)
crew_events = CrewEventEmitter() crew_events = CrewEventEmitter()
def emit(event_name: CrewEvents, *args: Any, **kwargs: Any) -> None: def emit(event_name: CrewEvents, *args: Any, **kwargs: Any) -> None:
print("Calling emit", event_name)
print("Args:", args)
print("Kwargs:", kwargs)
try: try:
crew_events.emit(event_name, *args, **kwargs) crew_events.emit(event_name, *args, **kwargs)
except Exception as e: except Exception as e:

View File

@@ -1,14 +1,16 @@
# event_helpers.py # event_helpers.py
from datetime import datetime from datetime import datetime
from typing import Any, Dict, Optional from typing import TYPE_CHECKING, Any, Dict, Optional
from crewai.crew import Crew
from crewai.utilities.event_emitter import CrewEvents, emit from crewai.utilities.event_emitter import CrewEvents, emit
if TYPE_CHECKING:
from crewai.crew import Crew
def emit_crew_start( def emit_crew_start(
crew: Crew, crew: "Crew", # Use a forward reference
inputs: Optional[Dict[str, Any]] = None, inputs: Optional[Dict[str, Any]] = None,
) -> None: ) -> None:
serialized_crew = crew.serialize() serialized_crew = crew.serialize()