mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 08:38:30 +00:00
WIP
This commit is contained in:
@@ -40,7 +40,11 @@ from crewai.utilities.constants import (
|
|||||||
)
|
)
|
||||||
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
|
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
|
||||||
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
||||||
from crewai.utilities.event_helpers import emit_crew_start
|
from crewai.utilities.event_helpers import (
|
||||||
|
emit_crew_start,
|
||||||
|
emit_task_finish,
|
||||||
|
emit_task_start,
|
||||||
|
)
|
||||||
from crewai.utilities.formatter import (
|
from crewai.utilities.formatter import (
|
||||||
aggregate_raw_outputs_from_task_outputs,
|
aggregate_raw_outputs_from_task_outputs,
|
||||||
aggregate_raw_outputs_from_tasks,
|
aggregate_raw_outputs_from_tasks,
|
||||||
@@ -87,9 +91,9 @@ class Crew(BaseModel):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
__hash__ = object.__hash__ # type: ignore
|
__hash__ = object.__hash__ # type: ignore
|
||||||
_execution_span: Any = PrivateAttr()
|
|
||||||
_rpm_controller: RPMController = PrivateAttr()
|
_rpm_controller: RPMController = PrivateAttr()
|
||||||
_logger: Logger = PrivateAttr()
|
_logger: Logger = PrivateAttr()
|
||||||
|
# TODO: MAKE THIS ALSO USE EVENT EMITTER
|
||||||
_file_handler: FileHandler = PrivateAttr()
|
_file_handler: FileHandler = PrivateAttr()
|
||||||
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
|
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
|
||||||
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
|
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
|
||||||
@@ -101,6 +105,7 @@ class Crew(BaseModel):
|
|||||||
_logging_color: str = PrivateAttr(
|
_logging_color: str = PrivateAttr(
|
||||||
default="bold_purple",
|
default="bold_purple",
|
||||||
)
|
)
|
||||||
|
# TODO: Figure out how to make this reference event emitter.
|
||||||
_task_output_handler: TaskOutputStorageHandler = PrivateAttr(
|
_task_output_handler: TaskOutputStorageHandler = PrivateAttr(
|
||||||
default_factory=TaskOutputStorageHandler
|
default_factory=TaskOutputStorageHandler
|
||||||
)
|
)
|
||||||
@@ -458,8 +463,7 @@ class Crew(BaseModel):
|
|||||||
inputs: Optional[Dict[str, Any]] = None,
|
inputs: Optional[Dict[str, Any]] = None,
|
||||||
) -> CrewOutput:
|
) -> CrewOutput:
|
||||||
"""Starts the crew to work on its assigned tasks."""
|
"""Starts the crew to work on its assigned tasks."""
|
||||||
emit_crew_start(self)
|
emit_crew_start(self, inputs)
|
||||||
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
|
|
||||||
self._task_output_handler.reset()
|
self._task_output_handler.reset()
|
||||||
self._logging_color = "bold_purple"
|
self._logging_color = "bold_purple"
|
||||||
|
|
||||||
@@ -506,6 +510,8 @@ class Crew(BaseModel):
|
|||||||
for metric in metrics:
|
for metric in metrics:
|
||||||
self.usage_metrics.add_usage_metrics(metric)
|
self.usage_metrics.add_usage_metrics(metric)
|
||||||
|
|
||||||
|
# TODO: ADD CREW FINISH EVENT
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
|
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
|
||||||
@@ -669,7 +675,9 @@ class Crew(BaseModel):
|
|||||||
)
|
)
|
||||||
|
|
||||||
self._prepare_agent_tools(task)
|
self._prepare_agent_tools(task)
|
||||||
self._log_task_start(task, agent_to_use.role)
|
emit_task_start(task, agent_to_use.role)
|
||||||
|
# TODO: ADD ELSEWHERE
|
||||||
|
# self._log_task_start(task, agent_to_use.role)
|
||||||
|
|
||||||
if isinstance(task, ConditionalTask):
|
if isinstance(task, ConditionalTask):
|
||||||
skipped_task_output = self._handle_conditional_task(
|
skipped_task_output = self._handle_conditional_task(
|
||||||
@@ -700,8 +708,17 @@ class Crew(BaseModel):
|
|||||||
tools=agent_to_use.tools,
|
tools=agent_to_use.tools,
|
||||||
)
|
)
|
||||||
task_outputs = [task_output]
|
task_outputs = [task_output]
|
||||||
self._process_task_result(task, task_output)
|
|
||||||
self._store_execution_log(task, task_output, task_index, was_replayed)
|
emit_task_finish(
|
||||||
|
task,
|
||||||
|
self._inputs if self._inputs else {},
|
||||||
|
task_output,
|
||||||
|
task_index,
|
||||||
|
was_replayed,
|
||||||
|
)
|
||||||
|
# TODO: ADD ELSEWHERE
|
||||||
|
# self._process_task_result(task, task_output)
|
||||||
|
# self._store_execution_log(task, task_output, task_index, was_replayed)
|
||||||
|
|
||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
task_outputs = self._process_async_tasks(futures, was_replayed)
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import os
|
|||||||
import platform
|
import platform
|
||||||
import warnings
|
import warnings
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from typing import TYPE_CHECKING, Any, Optional
|
from typing import TYPE_CHECKING, Any, Dict, Optional
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
@@ -21,12 +21,16 @@ with suppress_warnings():
|
|||||||
|
|
||||||
|
|
||||||
from opentelemetry import trace # noqa: E402
|
from opentelemetry import trace # noqa: E402
|
||||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # noqa: E402
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||||
|
OTLPSpanExporter, # noqa: E402
|
||||||
|
)
|
||||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
|
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
|
||||||
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
|
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
|
||||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
|
||||||
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
|
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
|
||||||
|
|
||||||
|
from crewai.utilities.event_emitter import CrewEvents, crew_events
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from crewai.crew import Crew
|
from crewai.crew import Crew
|
||||||
from crewai.task import Task
|
from crewai.task import Task
|
||||||
@@ -83,87 +87,37 @@ class Telemetry:
|
|||||||
self.ready = False
|
self.ready = False
|
||||||
self.trace_set = False
|
self.trace_set = False
|
||||||
|
|
||||||
def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
|
def crew_creation(
|
||||||
|
self, crew_data: Dict[str, Any], inputs: Optional[Dict[str, Any]] = None
|
||||||
|
):
|
||||||
"""Records the creation of a crew."""
|
"""Records the creation of a crew."""
|
||||||
if self.ready:
|
if self.ready:
|
||||||
try:
|
try:
|
||||||
tracer = trace.get_tracer("crewai.telemetry")
|
tracer = trace.get_tracer("crewai.telemetry")
|
||||||
span = tracer.start_span("Crew Created")
|
span = tracer.start_span("Crew Created")
|
||||||
|
|
||||||
|
# Accessing data from the serialized crew dictionary
|
||||||
self._add_attribute(
|
self._add_attribute(
|
||||||
span,
|
span, "crewai_version", crew_data.get("crewai_version")
|
||||||
"crewai_version",
|
|
||||||
pkg_resources.get_distribution("crewai").version,
|
|
||||||
)
|
)
|
||||||
self._add_attribute(span, "python_version", platform.python_version())
|
self._add_attribute(span, "python_version", platform.python_version())
|
||||||
self._add_attribute(span, "crew_key", crew.key)
|
self._add_attribute(span, "crew_key", crew_data.get("key"))
|
||||||
self._add_attribute(span, "crew_id", str(crew.id))
|
self._add_attribute(span, "crew_id", crew_data.get("id"))
|
||||||
self._add_attribute(span, "crew_process", crew.process)
|
self._add_attribute(span, "crew_process", crew_data.get("process"))
|
||||||
self._add_attribute(span, "crew_memory", crew.memory)
|
self._add_attribute(span, "crew_memory", crew_data.get("memory"))
|
||||||
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
|
|
||||||
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
|
|
||||||
if crew.share_crew:
|
|
||||||
self._add_attribute(
|
self._add_attribute(
|
||||||
span,
|
span, "crew_number_of_tasks", len(crew_data.get("tasks", []))
|
||||||
"crew_agents",
|
|
||||||
json.dumps(
|
|
||||||
[
|
|
||||||
{
|
|
||||||
"key": agent.key,
|
|
||||||
"id": str(agent.id),
|
|
||||||
"role": agent.role,
|
|
||||||
"goal": agent.goal,
|
|
||||||
"backstory": agent.backstory,
|
|
||||||
"verbose?": agent.verbose,
|
|
||||||
"max_iter": agent.max_iter,
|
|
||||||
"max_rpm": agent.max_rpm,
|
|
||||||
"i18n": agent.i18n.prompt_file,
|
|
||||||
"function_calling_llm": (
|
|
||||||
agent.function_calling_llm.model
|
|
||||||
if agent.function_calling_llm
|
|
||||||
else ""
|
|
||||||
),
|
|
||||||
"llm": agent.llm.model,
|
|
||||||
"delegation_enabled?": agent.allow_delegation,
|
|
||||||
"allow_code_execution?": agent.allow_code_execution,
|
|
||||||
"max_retry_limit": agent.max_retry_limit,
|
|
||||||
"tools_names": [
|
|
||||||
tool.name.casefold()
|
|
||||||
for tool in agent.tools or []
|
|
||||||
],
|
|
||||||
}
|
|
||||||
for agent in crew.agents
|
|
||||||
]
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
self._add_attribute(
|
self._add_attribute(
|
||||||
span,
|
span, "crew_number_of_agents", len(crew_data.get("agents", []))
|
||||||
"crew_tasks",
|
)
|
||||||
json.dumps(
|
|
||||||
[
|
if crew_data.get("share_crew"):
|
||||||
{
|
self._add_attribute(
|
||||||
"key": task.key,
|
span, "crew_agents", json.dumps(crew_data.get("agents", []))
|
||||||
"id": str(task.id),
|
)
|
||||||
"description": task.description,
|
self._add_attribute(
|
||||||
"expected_output": task.expected_output,
|
span, "crew_tasks", json.dumps(crew_data.get("tasks", []))
|
||||||
"async_execution?": task.async_execution,
|
|
||||||
"human_input?": task.human_input,
|
|
||||||
"agent_role": (
|
|
||||||
task.agent.role if task.agent else "None"
|
|
||||||
),
|
|
||||||
"agent_key": task.agent.key if task.agent else None,
|
|
||||||
"context": (
|
|
||||||
[task.description for task in task.context]
|
|
||||||
if task.context
|
|
||||||
else None
|
|
||||||
),
|
|
||||||
"tools_names": [
|
|
||||||
tool.name.casefold()
|
|
||||||
for tool in task.tools or []
|
|
||||||
],
|
|
||||||
}
|
|
||||||
for task in crew.tasks
|
|
||||||
]
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
self._add_attribute(span, "platform", platform.platform())
|
self._add_attribute(span, "platform", platform.platform())
|
||||||
self._add_attribute(span, "platform_release", platform.release())
|
self._add_attribute(span, "platform_release", platform.release())
|
||||||
@@ -174,59 +128,10 @@ class Telemetry:
|
|||||||
span, "crew_inputs", json.dumps(inputs) if inputs else None
|
span, "crew_inputs", json.dumps(inputs) if inputs else None
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self._add_attribute(
|
# Handle the case where share_crew is False
|
||||||
span,
|
# You might want to add limited data here
|
||||||
"crew_agents",
|
pass
|
||||||
json.dumps(
|
|
||||||
[
|
|
||||||
{
|
|
||||||
"key": agent.key,
|
|
||||||
"id": str(agent.id),
|
|
||||||
"role": agent.role,
|
|
||||||
"verbose?": agent.verbose,
|
|
||||||
"max_iter": agent.max_iter,
|
|
||||||
"max_rpm": agent.max_rpm,
|
|
||||||
"function_calling_llm": (
|
|
||||||
agent.function_calling_llm.model
|
|
||||||
if agent.function_calling_llm
|
|
||||||
else ""
|
|
||||||
),
|
|
||||||
"llm": agent.llm.model,
|
|
||||||
"delegation_enabled?": agent.allow_delegation,
|
|
||||||
"allow_code_execution?": agent.allow_code_execution,
|
|
||||||
"max_retry_limit": agent.max_retry_limit,
|
|
||||||
"tools_names": [
|
|
||||||
tool.name.casefold()
|
|
||||||
for tool in agent.tools or []
|
|
||||||
],
|
|
||||||
}
|
|
||||||
for agent in crew.agents
|
|
||||||
]
|
|
||||||
),
|
|
||||||
)
|
|
||||||
self._add_attribute(
|
|
||||||
span,
|
|
||||||
"crew_tasks",
|
|
||||||
json.dumps(
|
|
||||||
[
|
|
||||||
{
|
|
||||||
"key": task.key,
|
|
||||||
"id": str(task.id),
|
|
||||||
"async_execution?": task.async_execution,
|
|
||||||
"human_input?": task.human_input,
|
|
||||||
"agent_role": (
|
|
||||||
task.agent.role if task.agent else "None"
|
|
||||||
),
|
|
||||||
"agent_key": task.agent.key if task.agent else None,
|
|
||||||
"tools_names": [
|
|
||||||
tool.name.casefold()
|
|
||||||
for tool in task.tools or []
|
|
||||||
],
|
|
||||||
}
|
|
||||||
for task in crew.tasks
|
|
||||||
]
|
|
||||||
),
|
|
||||||
)
|
|
||||||
span.set_status(Status(StatusCode.OK))
|
span.set_status(Status(StatusCode.OK))
|
||||||
span.end()
|
span.end()
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -461,77 +366,39 @@ class Telemetry:
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
|
def crew_execution_span(
|
||||||
|
self, crew_data: Dict[str, Any], inputs: Optional[Dict[str, Any]] = None
|
||||||
|
):
|
||||||
"""Records the complete execution of a crew.
|
"""Records the complete execution of a crew.
|
||||||
This is only collected if the user has opted-in to share the crew.
|
This is only collected if the user has opted-in to share the crew.
|
||||||
"""
|
"""
|
||||||
self.crew_creation(crew, inputs)
|
if self.ready and crew_data.get("share_crew"):
|
||||||
|
|
||||||
if (self.ready) and (crew.share_crew):
|
|
||||||
try:
|
try:
|
||||||
tracer = trace.get_tracer("crewai.telemetry")
|
tracer = trace.get_tracer("crewai.telemetry")
|
||||||
span = tracer.start_span("Crew Execution")
|
span = tracer.start_span("Crew Execution")
|
||||||
|
|
||||||
self._add_attribute(
|
self._add_attribute(
|
||||||
span,
|
span,
|
||||||
"crewai_version",
|
"crewai_version",
|
||||||
pkg_resources.get_distribution("crewai").version,
|
pkg_resources.get_distribution("crewai").version,
|
||||||
)
|
)
|
||||||
self._add_attribute(span, "crew_key", crew.key)
|
self._add_attribute(span, "crew_key", crew_data.get("key"))
|
||||||
self._add_attribute(span, "crew_id", str(crew.id))
|
self._add_attribute(span, "crew_id", crew_data.get("id"))
|
||||||
self._add_attribute(
|
self._add_attribute(
|
||||||
span, "crew_inputs", json.dumps(inputs) if inputs else None
|
span, "crew_inputs", json.dumps(inputs) if inputs else None
|
||||||
)
|
)
|
||||||
self._add_attribute(
|
self._add_attribute(
|
||||||
span,
|
span,
|
||||||
"crew_agents",
|
"crew_agents",
|
||||||
json.dumps(
|
json.dumps(crew_data.get("agents", [])),
|
||||||
[
|
|
||||||
{
|
|
||||||
"key": agent.key,
|
|
||||||
"id": str(agent.id),
|
|
||||||
"role": agent.role,
|
|
||||||
"goal": agent.goal,
|
|
||||||
"backstory": agent.backstory,
|
|
||||||
"verbose?": agent.verbose,
|
|
||||||
"max_iter": agent.max_iter,
|
|
||||||
"max_rpm": agent.max_rpm,
|
|
||||||
"i18n": agent.i18n.prompt_file,
|
|
||||||
"llm": agent.llm.model,
|
|
||||||
"delegation_enabled?": agent.allow_delegation,
|
|
||||||
"tools_names": [
|
|
||||||
tool.name.casefold() for tool in agent.tools or []
|
|
||||||
],
|
|
||||||
}
|
|
||||||
for agent in crew.agents
|
|
||||||
]
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
self._add_attribute(
|
self._add_attribute(
|
||||||
span,
|
span,
|
||||||
"crew_tasks",
|
"crew_tasks",
|
||||||
json.dumps(
|
json.dumps(crew_data.get("tasks", [])),
|
||||||
[
|
|
||||||
{
|
|
||||||
"id": str(task.id),
|
|
||||||
"description": task.description,
|
|
||||||
"expected_output": task.expected_output,
|
|
||||||
"async_execution?": task.async_execution,
|
|
||||||
"human_input?": task.human_input,
|
|
||||||
"agent_role": task.agent.role if task.agent else "None",
|
|
||||||
"agent_key": task.agent.key if task.agent else None,
|
|
||||||
"context": (
|
|
||||||
[task.description for task in task.context]
|
|
||||||
if task.context
|
|
||||||
else None
|
|
||||||
),
|
|
||||||
"tools_names": [
|
|
||||||
tool.name.casefold() for tool in task.tools or []
|
|
||||||
],
|
|
||||||
}
|
|
||||||
for task in crew.tasks
|
|
||||||
]
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
span.set_status(Status(StatusCode.OK))
|
||||||
|
span.end()
|
||||||
return span
|
return span
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
@@ -607,3 +474,9 @@ class Telemetry:
|
|||||||
span.end()
|
span.end()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
telemetry = Telemetry()
|
||||||
|
|
||||||
|
|
||||||
|
crew_events.on(CrewEvents.CREW_START, telemetry.crew_execution_span)
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ from crewai.utilities.event_emitter import CrewEvents, emit
|
|||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from crewai.crew import Crew
|
from crewai.crew import Crew
|
||||||
|
from crewai.task import Task
|
||||||
|
from crewai.tasks.task_output import TaskOutput
|
||||||
|
|
||||||
|
|
||||||
def emit_crew_start(
|
def emit_crew_start(
|
||||||
@@ -16,7 +18,10 @@ def emit_crew_start(
|
|||||||
serialized_crew = crew.serialize()
|
serialized_crew = crew.serialize()
|
||||||
emit(
|
emit(
|
||||||
CrewEvents.CREW_START,
|
CrewEvents.CREW_START,
|
||||||
{**serialized_crew, "inputs": inputs},
|
{
|
||||||
|
**serialized_crew,
|
||||||
|
},
|
||||||
|
inputs=inputs,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -50,30 +55,43 @@ def emit_crew_failure(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def emit_task_start(crew_id: str, task_id: str, task_name: str) -> None:
|
def emit_task_start(
|
||||||
|
task: Task,
|
||||||
|
agent_role: str = "None",
|
||||||
|
) -> None:
|
||||||
|
serialized_task = task.serialize()
|
||||||
emit(
|
emit(
|
||||||
CrewEvents.TASK_START,
|
CrewEvents.TASK_START,
|
||||||
{
|
{
|
||||||
"crew_id": crew_id,
|
**serialized_task,
|
||||||
"task_id": task_id,
|
|
||||||
"task_name": task_name,
|
|
||||||
"start_time": datetime.now().isoformat(),
|
|
||||||
},
|
},
|
||||||
|
agent_role=agent_role,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def emit_task_finish(
|
def emit_task_finish(
|
||||||
crew_id: str, task_id: str, task_name: str, result: Any, duration: float
|
task: Task,
|
||||||
|
inputs: Dict[str, Any],
|
||||||
|
output: TaskOutput,
|
||||||
|
task_index: int,
|
||||||
|
was_replayed: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
emit(
|
emit(
|
||||||
CrewEvents.TASK_FINISH,
|
CrewEvents.TASK_FINISH,
|
||||||
{
|
{
|
||||||
"crew_id": crew_id,
|
"task": task.serialize(),
|
||||||
"task_id": task_id,
|
"output": {
|
||||||
"task_name": task_name,
|
"description": output.description,
|
||||||
"finish_time": datetime.now().isoformat(),
|
"summary": output.summary,
|
||||||
"result": result,
|
"raw": output.raw,
|
||||||
"duration": duration,
|
"pydantic": output.pydantic,
|
||||||
|
"json_dict": output.json_dict,
|
||||||
|
"output_format": output.output_format,
|
||||||
|
"agent": output.agent,
|
||||||
|
},
|
||||||
|
"task_index": task_index,
|
||||||
|
"inputs": inputs,
|
||||||
|
"was_replayed": was_replayed,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user