From 9b8b37c9f4e36ac268bdef2d72d1f1a1942cbe8d Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Thu, 17 Oct 2024 09:55:03 -0400 Subject: [PATCH] WIP --- src/crewai/crew.py | 31 +++- src/crewai/telemetry/telemetry.py | 221 ++++++-------------------- src/crewai/utilities/event_helpers.py | 44 +++-- 3 files changed, 102 insertions(+), 194 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index c8ae9860d..f66ff3250 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -40,7 +40,11 @@ from crewai.utilities.constants import ( ) from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator -from crewai.utilities.event_helpers import emit_crew_start +from crewai.utilities.event_helpers import ( + emit_crew_start, + emit_task_finish, + emit_task_start, +) from crewai.utilities.formatter import ( aggregate_raw_outputs_from_task_outputs, aggregate_raw_outputs_from_tasks, @@ -87,9 +91,9 @@ class Crew(BaseModel): """ __hash__ = object.__hash__ # type: ignore - _execution_span: Any = PrivateAttr() _rpm_controller: RPMController = PrivateAttr() _logger: Logger = PrivateAttr() + # TODO: MAKE THIS ALSO USE EVENT EMITTER _file_handler: FileHandler = PrivateAttr() _cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler()) _short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr() @@ -101,6 +105,7 @@ class Crew(BaseModel): _logging_color: str = PrivateAttr( default="bold_purple", ) + # TODO: Figure out how to make this reference event emitter. _task_output_handler: TaskOutputStorageHandler = PrivateAttr( default_factory=TaskOutputStorageHandler ) @@ -458,8 +463,7 @@ class Crew(BaseModel): inputs: Optional[Dict[str, Any]] = None, ) -> CrewOutput: """Starts the crew to work on its assigned tasks.""" - emit_crew_start(self) - self._execution_span = self._telemetry.crew_execution_span(self, inputs) + emit_crew_start(self, inputs) self._task_output_handler.reset() self._logging_color = "bold_purple" @@ -506,6 +510,8 @@ class Crew(BaseModel): for metric in metrics: self.usage_metrics.add_usage_metrics(metric) + # TODO: ADD CREW FINISH EVENT + return result def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]: @@ -669,7 +675,9 @@ class Crew(BaseModel): ) self._prepare_agent_tools(task) - self._log_task_start(task, agent_to_use.role) + emit_task_start(task, agent_to_use.role) + # TODO: ADD ELSEWHERE + # self._log_task_start(task, agent_to_use.role) if isinstance(task, ConditionalTask): skipped_task_output = self._handle_conditional_task( @@ -700,8 +708,17 @@ class Crew(BaseModel): tools=agent_to_use.tools, ) task_outputs = [task_output] - self._process_task_result(task, task_output) - self._store_execution_log(task, task_output, task_index, was_replayed) + + emit_task_finish( + task, + self._inputs if self._inputs else {}, + task_output, + task_index, + was_replayed, + ) + # TODO: ADD ELSEWHERE + # self._process_task_result(task, task_output) + # self._store_execution_log(task, task_output, task_index, was_replayed) if futures: task_outputs = self._process_async_tasks(futures, was_replayed) diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index f6a018f27..96df8f6e1 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -6,7 +6,7 @@ import os import platform import warnings from contextlib import contextmanager -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional @contextmanager @@ -21,12 +21,16 @@ with suppress_warnings(): from opentelemetry import trace # noqa: E402 -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # noqa: E402 +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter, # noqa: E402 +) from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402 from opentelemetry.sdk.trace import TracerProvider # noqa: E402 from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402 from opentelemetry.trace import Span, Status, StatusCode # noqa: E402 +from crewai.utilities.event_emitter import CrewEvents, crew_events + if TYPE_CHECKING: from crewai.crew import Crew from crewai.task import Task @@ -83,87 +87,37 @@ class Telemetry: self.ready = False self.trace_set = False - def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None): + def crew_creation( + self, crew_data: Dict[str, Any], inputs: Optional[Dict[str, Any]] = None + ): """Records the creation of a crew.""" if self.ready: try: tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Crew Created") + + # Accessing data from the serialized crew dictionary self._add_attribute( - span, - "crewai_version", - pkg_resources.get_distribution("crewai").version, + span, "crewai_version", crew_data.get("crewai_version") ) self._add_attribute(span, "python_version", platform.python_version()) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) - self._add_attribute(span, "crew_process", crew.process) - self._add_attribute(span, "crew_memory", crew.memory) - self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks)) - self._add_attribute(span, "crew_number_of_agents", len(crew.agents)) - if crew.share_crew: + self._add_attribute(span, "crew_key", crew_data.get("key")) + self._add_attribute(span, "crew_id", crew_data.get("id")) + self._add_attribute(span, "crew_process", crew_data.get("process")) + self._add_attribute(span, "crew_memory", crew_data.get("memory")) + self._add_attribute( + span, "crew_number_of_tasks", len(crew_data.get("tasks", [])) + ) + self._add_attribute( + span, "crew_number_of_agents", len(crew_data.get("agents", [])) + ) + + if crew_data.get("share_crew"): self._add_attribute( - span, - "crew_agents", - json.dumps( - [ - { - "key": agent.key, - "id": str(agent.id), - "role": agent.role, - "goal": agent.goal, - "backstory": agent.backstory, - "verbose?": agent.verbose, - "max_iter": agent.max_iter, - "max_rpm": agent.max_rpm, - "i18n": agent.i18n.prompt_file, - "function_calling_llm": ( - agent.function_calling_llm.model - if agent.function_calling_llm - else "" - ), - "llm": agent.llm.model, - "delegation_enabled?": agent.allow_delegation, - "allow_code_execution?": agent.allow_code_execution, - "max_retry_limit": agent.max_retry_limit, - "tools_names": [ - tool.name.casefold() - for tool in agent.tools or [] - ], - } - for agent in crew.agents - ] - ), + span, "crew_agents", json.dumps(crew_data.get("agents", [])) ) self._add_attribute( - span, - "crew_tasks", - json.dumps( - [ - { - "key": task.key, - "id": str(task.id), - "description": task.description, - "expected_output": task.expected_output, - "async_execution?": task.async_execution, - "human_input?": task.human_input, - "agent_role": ( - task.agent.role if task.agent else "None" - ), - "agent_key": task.agent.key if task.agent else None, - "context": ( - [task.description for task in task.context] - if task.context - else None - ), - "tools_names": [ - tool.name.casefold() - for tool in task.tools or [] - ], - } - for task in crew.tasks - ] - ), + span, "crew_tasks", json.dumps(crew_data.get("tasks", [])) ) self._add_attribute(span, "platform", platform.platform()) self._add_attribute(span, "platform_release", platform.release()) @@ -174,59 +128,10 @@ class Telemetry: span, "crew_inputs", json.dumps(inputs) if inputs else None ) else: - self._add_attribute( - span, - "crew_agents", - json.dumps( - [ - { - "key": agent.key, - "id": str(agent.id), - "role": agent.role, - "verbose?": agent.verbose, - "max_iter": agent.max_iter, - "max_rpm": agent.max_rpm, - "function_calling_llm": ( - agent.function_calling_llm.model - if agent.function_calling_llm - else "" - ), - "llm": agent.llm.model, - "delegation_enabled?": agent.allow_delegation, - "allow_code_execution?": agent.allow_code_execution, - "max_retry_limit": agent.max_retry_limit, - "tools_names": [ - tool.name.casefold() - for tool in agent.tools or [] - ], - } - for agent in crew.agents - ] - ), - ) - self._add_attribute( - span, - "crew_tasks", - json.dumps( - [ - { - "key": task.key, - "id": str(task.id), - "async_execution?": task.async_execution, - "human_input?": task.human_input, - "agent_role": ( - task.agent.role if task.agent else "None" - ), - "agent_key": task.agent.key if task.agent else None, - "tools_names": [ - tool.name.casefold() - for tool in task.tools or [] - ], - } - for task in crew.tasks - ] - ), - ) + # Handle the case where share_crew is False + # You might want to add limited data here + pass + span.set_status(Status(StatusCode.OK)) span.end() except Exception: @@ -461,77 +366,39 @@ class Telemetry: except Exception: pass - def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): + def crew_execution_span( + self, crew_data: Dict[str, Any], inputs: Optional[Dict[str, Any]] = None + ): """Records the complete execution of a crew. This is only collected if the user has opted-in to share the crew. """ - self.crew_creation(crew, inputs) - - if (self.ready) and (crew.share_crew): + if self.ready and crew_data.get("share_crew"): try: tracer = trace.get_tracer("crewai.telemetry") span = tracer.start_span("Crew Execution") + self._add_attribute( span, "crewai_version", pkg_resources.get_distribution("crewai").version, ) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) + self._add_attribute(span, "crew_key", crew_data.get("key")) + self._add_attribute(span, "crew_id", crew_data.get("id")) self._add_attribute( span, "crew_inputs", json.dumps(inputs) if inputs else None ) self._add_attribute( span, "crew_agents", - json.dumps( - [ - { - "key": agent.key, - "id": str(agent.id), - "role": agent.role, - "goal": agent.goal, - "backstory": agent.backstory, - "verbose?": agent.verbose, - "max_iter": agent.max_iter, - "max_rpm": agent.max_rpm, - "i18n": agent.i18n.prompt_file, - "llm": agent.llm.model, - "delegation_enabled?": agent.allow_delegation, - "tools_names": [ - tool.name.casefold() for tool in agent.tools or [] - ], - } - for agent in crew.agents - ] - ), + json.dumps(crew_data.get("agents", [])), ) self._add_attribute( span, "crew_tasks", - json.dumps( - [ - { - "id": str(task.id), - "description": task.description, - "expected_output": task.expected_output, - "async_execution?": task.async_execution, - "human_input?": task.human_input, - "agent_role": task.agent.role if task.agent else "None", - "agent_key": task.agent.key if task.agent else None, - "context": ( - [task.description for task in task.context] - if task.context - else None - ), - "tools_names": [ - tool.name.casefold() for tool in task.tools or [] - ], - } - for task in crew.tasks - ] - ), + json.dumps(crew_data.get("tasks", [])), ) + span.set_status(Status(StatusCode.OK)) + span.end() return span except Exception: pass @@ -607,3 +474,9 @@ class Telemetry: span.end() except Exception: pass + + +telemetry = Telemetry() + + +crew_events.on(CrewEvents.CREW_START, telemetry.crew_execution_span) diff --git a/src/crewai/utilities/event_helpers.py b/src/crewai/utilities/event_helpers.py index b15cf9978..00787a76f 100644 --- a/src/crewai/utilities/event_helpers.py +++ b/src/crewai/utilities/event_helpers.py @@ -7,6 +7,8 @@ from crewai.utilities.event_emitter import CrewEvents, emit if TYPE_CHECKING: from crewai.crew import Crew + from crewai.task import Task + from crewai.tasks.task_output import TaskOutput def emit_crew_start( @@ -16,7 +18,10 @@ def emit_crew_start( serialized_crew = crew.serialize() emit( CrewEvents.CREW_START, - {**serialized_crew, "inputs": inputs}, + { + **serialized_crew, + }, + inputs=inputs, ) @@ -50,30 +55,43 @@ def emit_crew_failure( ) -def emit_task_start(crew_id: str, task_id: str, task_name: str) -> None: +def emit_task_start( + task: Task, + agent_role: str = "None", +) -> None: + serialized_task = task.serialize() emit( CrewEvents.TASK_START, { - "crew_id": crew_id, - "task_id": task_id, - "task_name": task_name, - "start_time": datetime.now().isoformat(), + **serialized_task, }, + agent_role=agent_role, ) def emit_task_finish( - crew_id: str, task_id: str, task_name: str, result: Any, duration: float + task: Task, + inputs: Dict[str, Any], + output: TaskOutput, + task_index: int, + was_replayed: bool = False, ) -> None: emit( CrewEvents.TASK_FINISH, { - "crew_id": crew_id, - "task_id": task_id, - "task_name": task_name, - "finish_time": datetime.now().isoformat(), - "result": result, - "duration": duration, + "task": task.serialize(), + "output": { + "description": output.description, + "summary": output.summary, + "raw": output.raw, + "pydantic": output.pydantic, + "json_dict": output.json_dict, + "output_format": output.output_format, + "agent": output.agent, + }, + "task_index": task_index, + "inputs": inputs, + "was_replayed": was_replayed, }, )