Compare commits

...

4 Commits

Author SHA1 Message Date
Brandon Hancock
0f12b701b2 Still making WIP 2024-10-17 12:51:38 -04:00
Brandon Hancock
9b8b37c9f4 WIP 2024-10-17 09:55:03 -04:00
Brandon Hancock
296965b51d Properly saving events with session id to local sqlite db 2024-10-16 15:57:37 -04:00
Brandon Hancock
1090a789ff wip 2024-10-16 10:14:01 -04:00
14 changed files with 500 additions and 189 deletions

View File

@@ -29,6 +29,7 @@ dependencies = [
"pyvis>=0.3.2",
"uv>=0.4.18",
"tomli-w>=1.1.0",
"blinker>=1.8.2",
]
[project.urls]

View File

@@ -1,8 +1,10 @@
import warnings
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.flow.flow import Flow
from crewai.llm import LLM
from crewai.logging.event_logger import EventLogger
from crewai.pipeline import Pipeline
from crewai.process import Process
from crewai.routers import Router

View File

@@ -212,6 +212,38 @@ class BaseAgent(ABC, BaseModel):
"""Get the converter class for the agent to create json/pydantic outputs."""
pass
def serialize(self) -> Dict[str, Any]:
"""Serialize the BaseAgent into a dictionary excluding complex objects."""
exclude = {
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"agent_executor",
"cache_handler",
"tools_handler",
"llm",
"crew",
# Add any other complex attributes that should be excluded
}
serialized_data = self.model_dump(exclude=exclude)
serialized_data = {k: v for k, v in serialized_data.items() if v is not None}
# Add any additional serialization logic if needed
serialized_data["role"] = self.role
serialized_data["goal"] = self.goal
serialized_data["backstory"] = self.backstory
serialized_data["tools"] = (
[str(tool) for tool in self.tools] if self.tools else None
)
# Include the UUID as a string
serialized_data["id"] = str(self.id)
return serialized_data
def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel"
"""Create a deep copy of the Agent."""
exclude = {

View File

@@ -40,6 +40,12 @@ from crewai.utilities.constants import (
)
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.event_helpers import (
emit_crew_finish,
emit_crew_start,
emit_task_finish,
emit_task_start,
)
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -86,9 +92,9 @@ class Crew(BaseModel):
"""
__hash__ = object.__hash__ # type: ignore
_execution_span: Any = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_logger: Logger = PrivateAttr()
# TODO: MAKE THIS ALSO USE EVENT EMITTER
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
@@ -100,6 +106,7 @@ class Crew(BaseModel):
_logging_color: str = PrivateAttr(
default="bold_purple",
)
# TODO: Figure out how to make this reference event emitter.
_task_output_handler: TaskOutputStorageHandler = PrivateAttr(
default_factory=TaskOutputStorageHandler
)
@@ -457,7 +464,7 @@ class Crew(BaseModel):
inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
emit_crew_start(self, inputs)
self._task_output_handler.reset()
self._logging_color = "bold_purple"
@@ -504,6 +511,8 @@ class Crew(BaseModel):
for metric in metrics:
self.usage_metrics.add_usage_metrics(metric)
emit_crew_finish(self, result)
return result
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
@@ -667,7 +676,9 @@ class Crew(BaseModel):
)
self._prepare_agent_tools(task)
self._log_task_start(task, agent_to_use.role)
emit_task_start(task, agent_to_use.role)
# TODO: ADD ELSEWHERE
# self._log_task_start(task, agent_to_use.role)
if isinstance(task, ConditionalTask):
skipped_task_output = self._handle_conditional_task(
@@ -698,8 +709,17 @@ class Crew(BaseModel):
tools=agent_to_use.tools,
)
task_outputs = [task_output]
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
emit_task_finish(
task,
self._inputs if self._inputs else {},
task_output,
task_index,
was_replayed,
)
# TODO: ADD ELSEWHERE
# self._process_task_result(task, task_output)
# self._store_execution_log(task, task_output, task_index, was_replayed)
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
@@ -774,7 +794,9 @@ class Crew(BaseModel):
def _log_task_start(self, task: Task, role: str = "None"):
if self.output_log_file:
self._file_handler.log(task_name=task.name, task=task.description, agent=role, status="started")
self._file_handler.log(
task_name=task.name, task=task.description, agent=role, status="started"
)
def _update_manager_tools(self, task: Task):
if self.manager_agent:
@@ -796,7 +818,13 @@ class Crew(BaseModel):
def _process_task_result(self, task: Task, output: TaskOutput) -> None:
role = task.agent.role if task.agent is not None else "None"
if self.output_log_file:
self._file_handler.log(task_name=task.name, task=task.description, agent=role, status="completed", output=output.raw)
self._file_handler.log(
task_name=task.name,
task=task.description,
agent=role,
status="completed",
output=output.raw,
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if len(task_outputs) != 1:
@@ -825,10 +853,19 @@ class Crew(BaseModel):
for future_task, future, task_index in futures:
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._store_execution_log(
future_task, task_output, task_index, was_replayed
emit_task_finish(
future_task,
self._inputs if self._inputs else {},
task_output,
task_index,
was_replayed,
)
# TODO: ADD ELSEWHERE
# self._process_task_result(future_task, task_output)
# self._store_execution_log(
# future_task, task_output, task_index, was_replayed
# )
return task_outputs
def _find_task_index(
@@ -884,6 +921,40 @@ class Crew(BaseModel):
result = self._execute_tasks(self.tasks, start_index, True)
return result
def serialize(self) -> Dict[str, Any]:
"""Serialize the Crew into a dictionary excluding complex objects."""
exclude = {
"_rpm_controller",
"_logger",
"_execution_span",
"_file_handler",
"_cache_handler",
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_telemetry",
"agents",
"tasks",
}
# Serialize agents and tasks to a simpler form if needed
serialized_agents = [agent.serialize() for agent in self.agents]
serialized_tasks = [task.serialize() for task in self.tasks]
serialized_data = self.model_dump(exclude=exclude)
serialized_data = {k: v for k, v in serialized_data.items() if v is not None}
# Add serialized agents and tasks
serialized_data["agents"] = serialized_agents
serialized_data["tasks"] = serialized_tasks
# Include the UUID as a string
serialized_data["id"] = str(self.id)
return serialized_data
# TODO: Come back and use the new _serialize method
def copy(self):
"""Create a deep copy of the Crew."""
@@ -979,6 +1050,7 @@ class Crew(BaseModel):
inputs: Optional[Dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
# TODO: Event Emit test execution start
self._test_execution_span = self._telemetry.test_execution_span(
self,
n_iterations,
@@ -992,6 +1064,7 @@ class Crew(BaseModel):
self.kickoff(inputs=inputs)
evaluator.print_crew_evaluation_result()
# TODO:
def __rshift__(self, other: "Crew") -> "Pipeline":
"""

View File

@@ -41,6 +41,19 @@ class CrewOutput(BaseModel):
output_dict.update(self.pydantic.model_dump())
return output_dict
def serialize(self) -> Dict[str, Any]:
"""Serialize the CrewOutput into a dictionary excluding complex objects."""
serialized_data = {
"raw": self.raw,
"pydantic": self.pydantic.model_dump() if self.pydantic else None,
"json_dict": self.json_dict,
"tasks_output": [
task_output.serialize() for task_output in self.tasks_output
],
"token_usage": self.token_usage.model_dump(),
}
return {k: v for k, v in serialized_data.items() if v is not None}
def __getitem__(self, key):
if self.pydantic and hasattr(self.pydantic, key):
return getattr(self.pydantic, key)

View File

@@ -0,0 +1 @@
from .event_logger import Event, EventLogger

View File

@@ -0,0 +1,94 @@
import json
import os
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
from dotenv import load_dotenv
from sqlalchemy import JSON, Column, DateTime, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session as SQLAlchemySession
from sqlalchemy.orm import sessionmaker
from crewai.utilities.event_emitter import crew_events
load_dotenv()
Base = declarative_base()
class Session:
_session_id: Optional[str] = None
@classmethod
def get_session_id(cls) -> str:
if cls._session_id is None:
cls._session_id = str(uuid.uuid4()) # Generate a new UUID
print(f"Generated new session ID: {cls._session_id}")
return cls._session_id
class Event(Base):
__tablename__ = "events"
id = Column(Integer, primary_key=True)
event_name = Column(String)
timestamp = Column(DateTime, default=datetime.now(timezone.utc))
session_id = Column(String)
data = Column(JSON)
error_type = Column(String)
error_message = Column(String)
traceback = Column(String)
DATABASE_URL = os.getenv("CREWAI_DATABASE_URL", "sqlite:///crew_events.db")
engine = create_engine(DATABASE_URL)
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine) # Use a different name to avoid confusion
# Create a session instance
session: SQLAlchemySession = SessionLocal()
class EventLogger:
def __init__(self, session: SQLAlchemySession):
self.session = session
def log_event(self, *args: Any, **kwargs: Any) -> None:
# Extract event name from kwargs
event_name = kwargs.pop("event", "unknown_event")
print("Logging event:", event_name)
print("Args:", args)
print("Kwargs:", kwargs)
# Check if args is a single dictionary and unpack it
if len(args) == 1 and isinstance(args[0], dict):
args_dict = args[0]
else:
# Convert args to a dictionary with keys like 'arg0', 'arg1', etc.
args_dict = {f"arg{i}": arg for i, arg in enumerate(args)}
# Merge args_dict and kwargs into a single dictionary
data = {**args_dict, **kwargs}
print("Data:", data)
event = Event(
event_name=event_name,
session_id=Session.get_session_id(),
data=json.dumps(data),
error_type=kwargs.get("error_type", ""),
error_message=kwargs.get("error_message", ""),
traceback=kwargs.get("traceback", ""),
)
self.session.add(event)
self.session.commit()
print("Successfully logged event:", event_name)
event_logger = EventLogger(session)
print("Connecting event_logger to all signals")
crew_events.on("*", event_logger.log_event)
print("Connected event_logger to all signals")

View File

@@ -317,6 +317,35 @@ class Task(BaseModel):
self.processed_by_agents.add(agent_name)
self.delegations += 1
def serialize(self) -> Dict[str, Any]:
"""Serialize the Task into a dictionary excluding complex objects."""
# Define attributes to exclude from serialization
exclude = {
"_telemetry",
"_execution_span",
"_thread",
"_execution_time",
"agent",
"context",
"tools",
"output",
}
# Use model_dump or similar to get a dictionary representation
serialized_data = self.model_dump(exclude=exclude)
# Add any additional serialization logic if needed
serialized_data["id"] = str(self.id)
serialized_data["name"] = self.name
serialized_data["description"] = self.description
serialized_data["expected_output"] = self.expected_output
serialized_data["output_file"] = self.output_file
serialized_data["human_input"] = self.human_input
serialized_data["processed_by_agents"] = list(self.processed_by_agents)
return serialized_data
def copy(
self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"]
) -> "Task":

View File

@@ -56,6 +56,21 @@ class TaskOutput(BaseModel):
output_dict.update(self.pydantic.model_dump())
return output_dict
def serialize(self) -> Dict[str, Any]:
"""Serialize the TaskOutput into a dictionary excluding complex objects."""
serialized_data = {
"description": self.description,
"name": self.name,
"expected_output": self.expected_output,
"summary": self.summary,
"raw": self.raw,
"pydantic": self.pydantic.model_dump() if self.pydantic else None,
"json_dict": self.json_dict,
"agent": self.agent,
"output_format": self.output_format.value,
}
return {k: v for k, v in serialized_data.items() if v is not None}
def __str__(self) -> str:
if self.pydantic:
return str(self.pydantic)

View File

@@ -6,7 +6,7 @@ import os
import platform
import warnings
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional
@contextmanager
@@ -21,12 +21,16 @@ with suppress_warnings():
from opentelemetry import trace # noqa: E402
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # noqa: E402
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter, # noqa: E402
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
from crewai.utilities.event_emitter import CrewEvents, crew_events
if TYPE_CHECKING:
from crewai.crew import Crew
from crewai.task import Task
@@ -83,87 +87,37 @@ class Telemetry:
self.ready = False
self.trace_set = False
def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
def crew_creation(
self, crew_data: Dict[str, Any], inputs: Optional[Dict[str, Any]] = None
):
"""Records the creation of a crew."""
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Created")
# Accessing data from the serialized crew dictionary
self._add_attribute(
span,
"crewai_version",
pkg_resources.get_distribution("crewai").version,
span, "crewai_version", crew_data.get("crewai_version")
)
self._add_attribute(span, "python_version", platform.python_version())
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
if crew.share_crew:
self._add_attribute(span, "crew_key", crew_data.get("key"))
self._add_attribute(span, "crew_id", crew_data.get("id"))
self._add_attribute(span, "crew_process", crew_data.get("process"))
self._add_attribute(span, "crew_memory", crew_data.get("memory"))
self._add_attribute(
span, "crew_number_of_tasks", len(crew_data.get("tasks", []))
)
self._add_attribute(
span, "crew_number_of_agents", len(crew_data.get("agents", []))
)
if crew_data.get("share_crew"):
self._add_attribute(
span,
"crew_agents",
json.dumps(
[
{
"key": agent.key,
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
"backstory": agent.backstory,
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"i18n": agent.i18n.prompt_file,
"function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution,
"max_retry_limit": agent.max_retry_limit,
"tools_names": [
tool.name.casefold()
for tool in agent.tools or []
],
}
for agent in crew.agents
]
),
span, "crew_agents", json.dumps(crew_data.get("agents", []))
)
self._add_attribute(
span,
"crew_tasks",
json.dumps(
[
{
"key": task.key,
"id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": (
task.agent.role if task.agent else "None"
),
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
if task.context
else None
),
"tools_names": [
tool.name.casefold()
for tool in task.tools or []
],
}
for task in crew.tasks
]
),
span, "crew_tasks", json.dumps(crew_data.get("tasks", []))
)
self._add_attribute(span, "platform", platform.platform())
self._add_attribute(span, "platform_release", platform.release())
@@ -174,59 +128,10 @@ class Telemetry:
span, "crew_inputs", json.dumps(inputs) if inputs else None
)
else:
self._add_attribute(
span,
"crew_agents",
json.dumps(
[
{
"key": agent.key,
"id": str(agent.id),
"role": agent.role,
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution,
"max_retry_limit": agent.max_retry_limit,
"tools_names": [
tool.name.casefold()
for tool in agent.tools or []
],
}
for agent in crew.agents
]
),
)
self._add_attribute(
span,
"crew_tasks",
json.dumps(
[
{
"key": task.key,
"id": str(task.id),
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": (
task.agent.role if task.agent else "None"
),
"agent_key": task.agent.key if task.agent else None,
"tools_names": [
tool.name.casefold()
for tool in task.tools or []
],
}
for task in crew.tasks
]
),
)
# Handle the case where share_crew is False
# You might want to add limited data here
pass
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
@@ -461,77 +366,39 @@ class Telemetry:
except Exception:
pass
def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
def crew_execution_span(
self, crew_data: Dict[str, Any], inputs: Optional[Dict[str, Any]] = None
):
"""Records the complete execution of a crew.
This is only collected if the user has opted-in to share the crew.
"""
self.crew_creation(crew, inputs)
if (self.ready) and (crew.share_crew):
if self.ready and crew_data.get("share_crew"):
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Execution")
self._add_attribute(
span,
"crewai_version",
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "crew_key", crew_data.get("key"))
self._add_attribute(span, "crew_id", crew_data.get("id"))
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
)
self._add_attribute(
span,
"crew_agents",
json.dumps(
[
{
"key": agent.key,
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
"backstory": agent.backstory,
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"i18n": agent.i18n.prompt_file,
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"tools_names": [
tool.name.casefold() for tool in agent.tools or []
],
}
for agent in crew.agents
]
),
json.dumps(crew_data.get("agents", [])),
)
self._add_attribute(
span,
"crew_tasks",
json.dumps(
[
{
"id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None",
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
if task.context
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools or []
],
}
for task in crew.tasks
]
),
json.dumps(crew_data.get("tasks", [])),
)
span.set_status(Status(StatusCode.OK))
span.end()
return span
except Exception:
pass
@@ -607,3 +474,9 @@ class Telemetry:
span.end()
except Exception:
pass
telemetry = Telemetry()
crew_events.on(CrewEvents.CREW_START, telemetry.crew_execution_span)

View File

@@ -0,0 +1,46 @@
from enum import Enum
from typing import Any, Callable
from blinker import signal
class CrewEvents(Enum):
CREW_START = "crew_start"
CREW_FINISH = "crew_finish"
CREW_FAILURE = "crew_failure"
TASK_START = "task_start"
TASK_FINISH = "task_finish"
TASK_FAILURE = "task_failure"
AGENT_ACTION = "agent_action"
TOOL_USE = "tool_use"
TOKEN_USAGE = "token_usage"
class CrewEventEmitter:
def __init__(self):
self._all_signal = signal("all")
def on(self, event_name: CrewEvents | str, callback: Callable) -> None:
if event_name == "*" or event_name == "all":
self._all_signal.connect(callback, weak=False)
else:
signal(
event_name.value if isinstance(event_name, CrewEvents) else event_name
).connect(callback, weak=False)
def emit(self, event_name: CrewEvents, *args: Any, **kwargs: Any) -> None:
signal(event_name.value).send(*args, **kwargs)
self._all_signal.send(*args, event=event_name.value, **kwargs)
crew_events = CrewEventEmitter()
def emit(event_name: CrewEvents, *args: Any, **kwargs: Any) -> None:
try:
crew_events.emit(event_name, *args, **kwargs)
except Exception as e:
if kwargs.get("raise_on_error", False):
raise e
else:
print(f"Error emitting event: {e}")

View File

@@ -0,0 +1,121 @@
# event_helpers.py
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, Optional
from crewai.utilities.event_emitter import CrewEvents, emit
if TYPE_CHECKING:
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
def emit_crew_start(
crew: "Crew", # Use a forward reference
inputs: Optional[Dict[str, Any]] = None,
) -> None:
serialized_crew = crew.serialize()
emit(
CrewEvents.CREW_START,
{
**serialized_crew,
},
inputs=inputs,
)
def emit_crew_finish(crew: "Crew", result: "CrewOutput") -> None:
serialized_crew = crew.serialize()
serialized_result = result.serialize()
print("emit crew finish")
emit(
CrewEvents.CREW_FINISH,
{
**serialized_crew,
"result": serialized_result,
},
)
def emit_crew_failure(
crew_id: str, name: str, error: Exception, traceback: str, duration: float
) -> None:
emit(
CrewEvents.CREW_FAILURE,
{
"crew_id": crew_id,
"name": name,
"failure_time": datetime.now().isoformat(),
"error_type": type(error).__name__,
"error_message": str(error),
"traceback": traceback,
"duration": duration,
},
)
def emit_task_start(
task: "Task",
agent_role: str = "None",
) -> None:
serialized_task = task.serialize()
emit(
CrewEvents.TASK_START,
{
**serialized_task,
},
agent_role=agent_role,
)
def emit_task_finish(
task: "Task",
inputs: Dict[str, Any],
output: "TaskOutput",
task_index: int,
was_replayed: bool = False,
) -> None:
emit(
CrewEvents.TASK_FINISH,
{
"task": task.serialize(),
"output": {
"description": output.description,
"summary": output.summary,
"raw": output.raw,
"pydantic": output.pydantic,
"json_dict": output.json_dict,
"output_format": output.output_format,
"agent": output.agent,
},
"task_index": task_index,
"inputs": inputs,
"was_replayed": was_replayed,
},
)
def emit_task_failure(
crew_id: str,
task_id: str,
task_name: str,
error: Exception,
traceback: str,
duration: float,
) -> None:
emit(
CrewEvents.TASK_FAILURE,
{
"crew_id": crew_id,
"task_id": task_id,
"task_name": task_name,
"failure_time": datetime.now().isoformat(),
"error_type": type(error).__name__,
"error_message": str(error),
"traceback": traceback,
"duration": duration,
},
)

View File

@@ -1,7 +1,7 @@
from typing import Any, Callable, Generic, List, Dict, Type, TypeVar
from functools import wraps
from pydantic import BaseModel
from typing import Any, Callable, Dict, Generic, List, Type, TypeVar
from pydantic import BaseModel
T = TypeVar("T")
EVT = TypeVar("EVT", bound=BaseModel)

17
uv.lock generated
View File

@@ -290,6 +290,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b1/fe/e8c672695b37eecc5cbf43e1d0638d88d66ba3a44c4d321c796f4e59167f/beautifulsoup4-4.12.3-py3-none-any.whl", hash = "sha256:b80878c9f40111313e55da8ba20bdba06d8fa3969fc68304167741bbf9e082ed", size = 147925 },
]
[[package]]
name = "blinker"
version = "1.8.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/1e/57/a6a1721eff09598fb01f3c7cda070c1b6a0f12d63c83236edf79a440abcc/blinker-1.8.2.tar.gz", hash = "sha256:8f77b09d3bf7c795e969e9486f39c2c5e9c39d4ee07424be2bc594ece9642d83", size = 23161 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bb/2a/10164ed1f31196a2f7f3799368a821765c62851ead0e630ab52b8e14b4d0/blinker-1.8.2-py3-none-any.whl", hash = "sha256:1779309f71bf239144b9399d06ae925637cf6634cf6bd131104184531bf67c01", size = 9456 },
]
[[package]]
name = "boto3"
version = "1.35.32"
@@ -627,12 +636,13 @@ wheels = [
[[package]]
name = "crewai"
version = "0.67.1"
version = "0.70.1"
source = { editable = "." }
dependencies = [
{ name = "agentops" },
{ name = "appdirs" },
{ name = "auth0-python" },
{ name = "blinker" },
{ name = "click" },
{ name = "crewai-tools" },
{ name = "embedchain" },
@@ -687,6 +697,7 @@ requires-dist = [
{ name = "agentops", marker = "extra == 'agentops'", specifier = ">=0.3.0" },
{ name = "appdirs", specifier = ">=1.4.4" },
{ name = "auth0-python", specifier = ">=4.7.1" },
{ name = "blinker" },
{ name = "click", specifier = ">=8.1.7" },
{ name = "crewai-tools", specifier = ">=0.12.1" },
{ name = "crewai-tools", marker = "extra == 'tools'", specifier = ">=0.12.1" },
@@ -2496,6 +2507,8 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b2/07/8cbb75d6cfbe8712d8f7f6a5615f083c6e710ab916b748fbb20373ddb142/multiprocess-0.70.17-py311-none-any.whl", hash = "sha256:2884701445d0177aec5bd5f6ee0df296773e4fb65b11903b94c613fb46cfb7d1", size = 144346 },
{ url = "https://files.pythonhosted.org/packages/a4/69/d3f343a61a2f86ef10ed7865a26beda7c71554136ce187b0384b1c2c9ca3/multiprocess-0.70.17-py312-none-any.whl", hash = "sha256:2818af14c52446b9617d1b0755fa70ca2f77c28b25ed97bdaa2c69a22c47b46c", size = 147990 },
{ url = "https://files.pythonhosted.org/packages/c8/b7/2e9a4fcd871b81e1f2a812cd5c6fb52ad1e8da7bf0d7646c55eaae220484/multiprocess-0.70.17-py313-none-any.whl", hash = "sha256:20c28ca19079a6c879258103a6d60b94d4ffe2d9da07dda93fb1c8bc6243f522", size = 149843 },
{ url = "https://files.pythonhosted.org/packages/ae/d7/fd7a092fc0ab1845a1a97ca88e61b9b7cc2e9d6fcf0ed24e9480590c2336/multiprocess-0.70.17-py38-none-any.whl", hash = "sha256:1d52f068357acd1e5bbc670b273ef8f81d57863235d9fbf9314751886e141968", size = 132635 },
{ url = "https://files.pythonhosted.org/packages/f9/41/0618ac724b8a56254962c143759e04fa01c73b37aa69dd433f16643bd38b/multiprocess-0.70.17-py39-none-any.whl", hash = "sha256:c3feb874ba574fbccfb335980020c1ac631fbf2a3f7bee4e2042ede62558a021", size = 133359 },
]
[[package]]
@@ -3179,8 +3192,6 @@ version = "5.9.8"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/90/c7/6dc0a455d111f68ee43f27793971cf03fe29b6ef972042549db29eec39a2/psutil-5.9.8.tar.gz", hash = "sha256:6be126e3225486dff286a8fb9a06246a5253f4c7c53b475ea5f5ac934e64194c", size = 503247 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fe/5f/c26deb822fd3daf8fde4bdb658bf87d9ab1ffd3fca483816e89a9a9a9084/psutil-5.9.8-cp27-none-win32.whl", hash = "sha256:36f435891adb138ed3c9e58c6af3e2e6ca9ac2f365efe1f9cfef2794e6c93b4e", size = 248660 },
{ url = "https://files.pythonhosted.org/packages/32/1d/cf66073d74d6146187e2d0081a7616df4437214afa294ee4f16f80a2f96a/psutil-5.9.8-cp27-none-win_amd64.whl", hash = "sha256:bd1184ceb3f87651a67b2708d4c3338e9b10c5df903f2e3776b62303b26cb631", size = 251966 },
{ url = "https://files.pythonhosted.org/packages/e7/e3/07ae864a636d70a8a6f58da27cb1179192f1140d5d1da10886ade9405797/psutil-5.9.8-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:aee678c8720623dc456fa20659af736241f575d79429a0e5e9cf88ae0605cc81", size = 248702 },
{ url = "https://files.pythonhosted.org/packages/b3/bd/28c5f553667116b2598b9cc55908ec435cb7f77a34f2bff3e3ca765b0f78/psutil-5.9.8-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8cb6403ce6d8e047495a701dc7c5bd788add903f8986d523e3e20b98b733e421", size = 285242 },
{ url = "https://files.pythonhosted.org/packages/c5/4f/0e22aaa246f96d6ac87fe5ebb9c5a693fbe8877f537a1022527c47ca43c5/psutil-5.9.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d06016f7f8625a1825ba3732081d77c94589dca78b7a3fc072194851e88461a4", size = 288191 },