adding user the otpion to share all data of their crews

This commit is contained in:
João Moura
2024-02-08 22:23:24 -08:00
parent 5cf61cac22
commit ed0131b46f
5 changed files with 178 additions and 23 deletions

View File

@@ -262,6 +262,8 @@ Data collected includes:
- Roles of agents in a crew
- Tools names available
Users can opt-in sharing the complete telemetry data by setting the `share_crew` attribute to `True` on their Crews.
## License
CrewAI is released under the MIT License.

View File

@@ -19,6 +19,8 @@ description: Understanding and utilizing crews in the crewAI framework.
| **Config** | Configuration settings for the crew. |
| **Max RPM** | Maximum requests per minute the crew adheres to during execution. |
| **Language** | Language setting for the crew's operation. |
| **Share Crew** | Whether you want to share the complete crew infromation and execution with the crewAI team to make the library better, and allow us to train models. |
!!! note "Crew Max RPM"
The `max_rpm` attribute sets the maximum number of requests per minute the crew can perform to avoid rate limits and will override individual agents `max_rpm` settings if you set it.

View File

@@ -14,4 +14,6 @@ Data collected includes:
- If Tasks are being executed in parallel or sequentially
- Language model being used
- Roles of agents in a crew
- Tools names available
- Tools names available
Users can opt-in sharing the complete telemetry data by setting the `share_crew` attribute to `True` on their Crews.

View File

@@ -35,12 +35,14 @@ class Crew(BaseModel):
process: The process flow that the crew will follow (e.g., sequential).
verbose: Indicates the verbosity level for logging during execution.
config: Configuration settings for the crew.
_cache_handler: Handles caching for the crew's operations.
max_rpm: Maximum number of requests per minute for the crew execution to be respected.
id: A unique identifier for the crew instance.
share_crew: Whether you want to share the complete crew infromation and execution with crewAI to make the library better, and allow us to train models.
_cache_handler: Handles caching for the crew's operations.
"""
__hash__ = object.__hash__ # type: ignore
_execution_span: Any = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_logger: Logger = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
@@ -54,6 +56,7 @@ class Crew(BaseModel):
)
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
share_crew: bool = Field(default=False)
max_rpm: Optional[int] = Field(
default=None,
description="Maximum number of requests per minute for the crew execution to be respected.",
@@ -157,6 +160,8 @@ class Crew(BaseModel):
def kickoff(self) -> str:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
for agent in self.agents:
agent.i18n = I18N(language=self.language)
@@ -190,9 +195,7 @@ class Crew(BaseModel):
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"[{role}] Task output: {task_output}\n\n")
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
self._finish_execution(task_output)
return task_output
def _run_hierarchical_process(self) -> str:
@@ -221,7 +224,10 @@ class Crew(BaseModel):
"debug", f"[{manager.role}] Task output: {task_output}\n\n"
)
self._finish_execution(task_output)
return task_output
def _finish_execution(self, output) -> None:
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
return task_output
self._telemetry.end_crew(self, output)

View File

@@ -32,6 +32,9 @@ class Telemetry:
- Language model being used
- Roles of agents in a crew
- Tools names available
Users can opt-in to sharing more complete data suing the `share_crew`
attribute in the Crew class.
"""
def __init__(self):
@@ -49,17 +52,17 @@ class Telemetry:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Created")
self.add_attribute(
self._add_attribute(
span, "crewai_version", pkg_resources.get_distribution("crewai").version
)
self.add_attribute(span, "python_version", platform.python_version())
self.add_attribute(span, "hostname", socket.gethostname())
self.add_attribute(span, "crewid", str(crew.id))
self.add_attribute(span, "crew_process", crew.process)
self.add_attribute(span, "crew_language", crew.language)
self.add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self.add_attribute(span, "crew_number_of_agents", len(crew.agents))
self.add_attribute(
self._add_attribute(span, "python_version", platform.python_version())
self._add_attribute(span, "hostname", socket.gethostname())
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(span, "crew_language", crew.language)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
self._add_attribute(
span,
"crew_agents",
json.dumps(
@@ -68,6 +71,10 @@ class Telemetry:
"id": str(agent.id),
"role": agent.role,
"memory_enabled?": agent.memory,
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"i18n": agent.i18n.language,
"llm": json.dumps(self._safe_llm_attributes(agent.llm)),
"delegation_enabled?": agent.allow_delegation,
"tools_names": [tool.name for tool in agent.tools],
@@ -76,7 +83,7 @@ class Telemetry:
]
),
)
self.add_attribute(
self._add_attribute(
span,
"crew_tasks",
json.dumps(
@@ -84,23 +91,159 @@ class Telemetry:
{
"id": str(task.id),
"async_execution?": task.async_execution,
"agent_role": task.agent.role if task.agent else "None",
"tools_names": [tool.name for tool in task.tools],
}
for task in crew.tasks
]
),
)
self.add_attribute(span, "platform", platform.platform())
self.add_attribute(span, "platform_release", platform.release())
self.add_attribute(span, "platform_system", platform.system())
self.add_attribute(span, "platform_version", platform.version())
self.add_attribute(span, "cpus", os.cpu_count())
self._add_attribute(span, "platform", platform.platform())
self._add_attribute(span, "platform_release", platform.release())
self._add_attribute(span, "platform_system", platform.system())
self._add_attribute(span, "platform_version", platform.version())
self._add_attribute(span, "cpus", os.cpu_count())
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def add_attribute(self, span, key, value):
def crew_execution_span(self, crew):
"""Records the complete execution of a crew.
This is only collected if the user has opted-in to share the crew.
"""
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Execution")
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(
span,
"crew_agents",
json.dumps(
[
{
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
"backstory": agent.backstory,
"memory_enabled?": agent.memory,
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"i18n": agent.i18n.language,
"llm": json.dumps(self._safe_llm_attributes(agent.llm)),
"delegation_enabled?": agent.allow_delegation,
"tools_names": [tool.name for tool in agent.tools],
}
for agent in crew.agents
]
),
)
self._add_attribute(
span,
"crew_tasks",
json.dumps(
[
{
"id": str(task.id),
"description": task.description,
"async_execution?": task.async_execution,
"output": task.expected_output,
"agent_role": task.agent.role if task.agent else "None",
"context": [task.description for task in task.context]
if task.context
else "None",
"tools_names": [tool.name for tool in task.tools],
}
for task in crew.tasks
]
),
)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def crew_execution_span(self, crew):
"""Records the complete execution of a crew.
This is only collected if the user has opted-in to share the crew.
"""
if crew.share_crew:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Execution")
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(
span,
"crew_agents",
json.dumps(
[
{
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
"backstory": agent.backstory,
"memory_enabled?": agent.memory,
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"i18n": agent.i18n.language,
"llm": json.dumps(self._safe_llm_attributes(agent.llm)),
"delegation_enabled?": agent.allow_delegation,
"tools_names": [tool.name for tool in agent.tools],
}
for agent in crew.agents
]
),
)
self._add_attribute(
span,
"crew_tasks",
json.dumps(
[
{
"id": str(task.id),
"description": task.description,
"async_execution?": task.async_execution,
"output": task.expected_output,
"agent_role": task.agent.role if task.agent else "None",
"context": [task.description for task in task.context]
if task.context
else "None",
"tools_names": [tool.name for tool in task.tools],
}
for task in crew.tasks
]
),
)
return span
except Exception:
pass
def end_crew(self, crew, output):
if crew.share_crew:
try:
self._add_attribute(crew._execution_span, "crew_output", output)
self._add_attribute(
crew._execution_span,
"crew_tasks_output",
json.dumps(
[
{
"id": str(task.id),
"description": task.description,
"output": task.output.result,
}
for task in crew.tasks
]
),
)
crew._execution_span.set_status(Status(StatusCode.OK))
crew._execution_span.end()
except Exception:
pass
def _add_attribute(self, span, key, value):
"""Add an attribute to a span."""
try:
return span.set_attribute(key, value)