Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
7c70cb493c Fix: Add connect_timeout parameter to get_mcp_tools method
- Add Optional[int] connect_timeout parameter with default value of 30 seconds
- Pass connect_timeout to MCPServerAdapter constructor to prevent timeouts
- Add comprehensive tests for timeout functionality
- Maintain backward compatibility with existing code
- Fixes issue #3326 where get_mcp_tools hits timeout with multiple MCP servers

Co-Authored-By: João <joao@crewai.com>
2025-08-15 18:38:10 +00:00
41 changed files with 4340 additions and 3790 deletions

1
.gitignore vendored
View File

@@ -21,6 +21,7 @@ crew_tasks_output.json
.mypy_cache
.ruff_cache
.venv
agentops.log
test_flow.html
crewairules.mdc
plan.md

View File

@@ -226,6 +226,7 @@
"group": "Observability",
"pages": [
"en/observability/overview",
"en/observability/agentops",
"en/observability/arize-phoenix",
"en/observability/langdb",
"en/observability/langfuse",
@@ -565,6 +566,7 @@
"group": "Observabilidade",
"pages": [
"pt-BR/observability/overview",
"pt-BR/observability/agentops",
"pt-BR/observability/arize-phoenix",
"pt-BR/observability/langdb",
"pt-BR/observability/langfuse",
@@ -912,6 +914,7 @@
"group": "오브저버빌리티",
"pages": [
"ko/observability/overview",
"ko/observability/agentops",
"ko/observability/arize-phoenix",
"ko/observability/langdb",
"ko/observability/langfuse",

View File

@@ -177,7 +177,14 @@ class MyCustomCrew:
# Your crew implementation...
```
This is how third-party event listeners are registered in the CrewAI codebase.
This is exactly how CrewAI's built-in `agentops_listener` is registered. In the CrewAI codebase, you'll find:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Available Event Types
@@ -273,6 +280,77 @@ The structure of the event object depends on the event type, but all events inhe
Additional fields vary by event type. For example, `CrewKickoffCompletedEvent` includes `crew_name` and `output` fields.
## Real-World Example: Integration with AgentOps
CrewAI includes an example of a third-party integration with [AgentOps](https://github.com/AgentOps-AI/agentops), a monitoring and observability platform for AI agents. Here's how it's implemented:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
This listener initializes an AgentOps session when a Crew starts, registers agents with AgentOps, tracks tool usage, and ends the session when the Crew completes.
The AgentOps listener is registered in CrewAI's event system through the import in `src/crewai/utilities/events/third_party/__init__.py`:
```python
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Advanced Usage: Scoped Handlers

View File

@@ -0,0 +1,126 @@
---
title: AgentOps Integration
description: Understanding and logging your agent performance with AgentOps.
icon: paperclip
---
# Introduction
Observability is a key aspect of developing and deploying conversational AI agents. It allows developers to understand how their agents are performing,
how their agents are interacting with users, and how their agents use external tools and APIs.
AgentOps is a product independent of CrewAI that provides a comprehensive observability solution for agents.
## AgentOps
[AgentOps](https://agentops.ai/?=crew) provides session replays, metrics, and monitoring for agents.
At a high level, AgentOps gives you the ability to monitor cost, token usage, latency, agent failures, session-wide statistics, and more.
For more info, check out the [AgentOps Repo](https://github.com/AgentOps-AI/agentops).
### Overview
AgentOps provides monitoring for agents in development and production.
It provides a dashboard for tracking agent performance, session replays, and custom reporting.
Additionally, AgentOps provides session drilldowns for viewing Crew agent interactions, LLM calls, and tool usage in real-time.
This feature is useful for debugging and understanding how agents interact with users as well as other agents.
![Overview of a select series of agent session runs](/images/agentops-overview.png)
![Overview of session drilldowns for examining agent runs](/images/agentops-session.png)
![Viewing a step-by-step agent replay execution graph](/images/agentops-replay.png)
### Features
- **LLM Cost Management and Tracking**: Track spend with foundation model providers.
- **Replay Analytics**: Watch step-by-step agent execution graphs.
- **Recursive Thought Detection**: Identify when agents fall into infinite loops.
- **Custom Reporting**: Create custom analytics on agent performance.
- **Analytics Dashboard**: Monitor high-level statistics about agents in development and production.
- **Public Model Testing**: Test your agents against benchmarks and leaderboards.
- **Custom Tests**: Run your agents against domain-specific tests.
- **Time Travel Debugging**: Restart your sessions from checkpoints.
- **Compliance and Security**: Create audit logs and detect potential threats such as profanity and PII leaks.
- **Prompt Injection Detection**: Identify potential code injection and secret leaks.
### Using AgentOps
<Steps>
<Step title="Create an API Key">
Create a user API key here: [Create API Key](https://app.agentops.ai/account)
</Step>
<Step title="Configure Your Environment">
Add your API key to your environment variables:
```bash
AGENTOPS_API_KEY=<YOUR_AGENTOPS_API_KEY>
```
</Step>
<Step title="Install AgentOps">
Install AgentOps with:
```bash
pip install 'crewai[agentops]'
```
or
```bash
pip install agentops
```
</Step>
<Step title="Initialize AgentOps">
Before using `Crew` in your script, include these lines:
```python
import agentops
agentops.init()
```
This will initiate an AgentOps session as well as automatically track Crew agents. For further info on how to outfit more complex agentic systems,
check out the [AgentOps documentation](https://docs.agentops.ai) or join the [Discord](https://discord.gg/j4f3KbeH).
</Step>
</Steps>
### Crew + AgentOps Examples
<CardGroup cols={3}>
<Card
title="Job Posting"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/job-posting"
icon="briefcase"
iconType="solid"
>
Example of a Crew agent that generates job posts.
</Card>
<Card
title="Markdown Validator"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/markdown_validator"
icon="markdown"
iconType="solid"
>
Example of a Crew agent that validates Markdown files.
</Card>
<Card
title="Instagram Post"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/instagram_post"
icon="square-instagram"
iconType="brands"
>
Example of a Crew agent that generates Instagram posts.
</Card>
</CardGroup>
### Further Information
To get started, create an [AgentOps account](https://agentops.ai/?=crew).
For feature requests or bug reports, please reach out to the AgentOps team on the [AgentOps Repo](https://github.com/AgentOps-AI/agentops).
#### Extra links
<a href="https://twitter.com/agentopsai/">🐦 Twitter</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://discord.gg/JHPt4C7r">📢 Discord</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://app.agentops.ai/?=crew">🖇️ AgentOps Dashboard</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://docs.agentops.ai/introduction">📙 Documentation</a>

View File

@@ -21,6 +21,9 @@ Observability is crucial for understanding how your CrewAI agents perform, ident
### Monitoring & Tracing Platforms
<CardGroup cols={2}>
<Card title="AgentOps" icon="paperclip" href="/en/observability/agentops">
Session replays, metrics, and monitoring for agent development and production.
</Card>
<Card title="LangDB" icon="database" href="/en/observability/langdb">
End-to-end tracing for CrewAI workflows with automatic agent interaction capture.

Binary file not shown.

After

Width:  |  Height:  |  Size: 288 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 419 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 263 KiB

View File

@@ -177,7 +177,14 @@ class MyCustomCrew:
# Your crew implementation...
```
이것이 CrewAI 코드베이스에서 서드파티 이벤트 리스너가 등록되는 방식입니다.
이것이 바로 CrewAI의 내장 `agentops_listener`가 등록되는 방식과 동일합니다. CrewAI 코드베이스에서는 다음과 같이 되어 있습니다:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
이렇게 하면 `crewai.utilities.events` 패키지가 임포트될 때 `agentops_listener`가 자동으로 로드됩니다.
## 사용 가능한 이벤트 유형
@@ -273,6 +280,77 @@ CrewAI는 여러분이 청취할 수 있는 다양한 이벤트를 제공합니
추가 필드는 이벤트 타입에 따라 다릅니다. 예를 들어, `CrewKickoffCompletedEvent`에는 `crew_name`과 `output` 필드가 포함됩니다.
## 실제 예시: AgentOps와의 통합
CrewAI는 AI 에이전트를 위한 모니터링 및 관찰 플랫폼인 [AgentOps](https://github.com/AgentOps-AI/agentops)와의 서드파티 통합 예시를 포함하고 있습니다. 구현 방식은 다음과 같습니다:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
이 listener는 crew가 시작될 때 AgentOps 세션을 초기화하고, agent를 AgentOps에 등록하며, 도구 사용을 추적하고, crew가 완료되면 세션을 종료합니다.
AgentOps listener는 `src/crewai/utilities/events/third_party/__init__.py` 파일의 import를 통해 CrewAI 이벤트 시스템에 등록됩니다:
```python
from .agentops_listener import agentops_listener
```
이렇게 하면 `crewai.utilities.events` 패키지가 import될 때 `agentops_listener`가 로드되는 것이 보장됩니다.
## 고급 사용법: Scoped Handlers

View File

@@ -0,0 +1,124 @@
---
title: AgentOps 통합
description: AgentOps를 사용하여 에이전트 성능을 이해하고 로깅하기
icon: paperclip
---
# 소개
Observability는 대화형 AI 에이전트를 개발하고 배포하는 데 있어 핵심적인 요소입니다. 이는 개발자가 에이전트의 성능을 이해하고, 에이전트가 사용자와 어떻게 상호작용하는지, 그리고 에이전트가 외부 도구와 API를 어떻게 사용하는지를 파악할 수 있게 해줍니다.
AgentOps는 CrewAI와 독립적인 제품으로, 에이전트를 위한 종합적인 observability 솔루션을 제공합니다.
## AgentOps
[AgentOps](https://agentops.ai/?=crew)은 에이전트에 대한 세션 리플레이, 메트릭, 모니터링을 제공합니다.
AgentOps는 높은 수준에서 비용, 토큰 사용량, 대기 시간, 에이전트 실패, 세션 전체 통계 등 다양한 항목을 모니터링할 수 있는 기능을 제공합니다.
더 자세한 내용은 [AgentOps Repo](https://github.com/AgentOps-AI/agentops)를 확인하세요.
### 개요
AgentOps는 개발 및 프로덕션 환경에서 에이전트에 대한 모니터링을 제공합니다.
에이전트 성능, 세션 리플레이, 맞춤형 리포팅을 추적할 수 있는 대시보드를 제공합니다.
또한, AgentOps는 Crew 에이전트 상호작용, LLM 호출, 툴 사용을 실시간으로 볼 수 있는 세션 드릴다운 기능을 제공합니다.
이 기능은 에이전트가 사용자 및 다른 에이전트와 어떻게 상호작용하는지 디버깅하고 이해하는 데 유용합니다.
![선택된 에이전트 세션 실행 시리즈의 개요](/images/agentops-overview.png)
![에이전트 실행을 조사하기 위한 세션 드릴다운 개요](/images/agentops-session.png)
![단계별 에이전트 리플레이 실행 그래프 보기](/images/agentops-replay.png)
### 특징
- **LLM 비용 관리 및 추적**: 기반 모델 공급자와의 지출을 추적합니다.
- **재생 분석**: 단계별 에이전트 실행 그래프를 시청할 수 있습니다.
- **재귀적 사고 감지**: 에이전트가 무한 루프에 빠졌는지 식별합니다.
- **맞춤형 보고서**: 에이전트 성능에 대한 맞춤형 분석을 생성합니다.
- **분석 대시보드**: 개발 및 운영 중인 에이전트에 대한 상위 수준 통계를 모니터링합니다.
- **공개 모델 테스트**: 벤치마크 및 리더보드를 통해 에이전트를 테스트할 수 있습니다.
- **맞춤형 테스트**: 도메인별 테스트로 에이전트를 실행합니다.
- **타임 트래블 디버깅**: 체크포인트에서 세션을 재시작합니다.
- **컴플라이언스 및 보안**: 감사 로그를 생성하고 욕설 및 PII 유출과 같은 잠재적 위협을 감지합니다.
- **프롬프트 인젝션 감지**: 잠재적 코드 인젝션 및 시크릿 유출을 식별합니다.
### AgentOps 사용하기
<Steps>
<Step title="API 키 생성">
사용자 API 키를 여기서 생성하세요: [API 키 생성](https://app.agentops.ai/account)
</Step>
<Step title="환경 설정">
API 키를 환경 변수에 추가하세요:
```bash
AGENTOPS_API_KEY=<YOUR_AGENTOPS_API_KEY>
```
</Step>
<Step title="AgentOps 설치">
다음 명령어로 AgentOps를 설치하세요:
```bash
pip install 'crewai[agentops]'
```
또는
```bash
pip install agentops
```
</Step>
<Step title="AgentOps 초기화">
스크립트에서 `Crew`를 사용하기 전에 다음 코드를 포함하세요:
```python
import agentops
agentops.init()
```
이렇게 하면 AgentOps 세션이 시작되고 Crew 에이전트가 자동으로 추적됩니다. 더 복잡한 agentic 시스템을 구성하는 방법에 대한 자세한 정보는 [AgentOps 문서](https://docs.agentops.ai) 또는 [Discord](https://discord.gg/j4f3KbeH)를 참조하세요.
</Step>
</Steps>
### Crew + AgentOps 예시
<CardGroup cols={3}>
<Card
title="Job Posting"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/job-posting"
icon="briefcase"
iconType="solid"
>
채용 공고를 생성하는 Crew agent의 예시입니다.
</Card>
<Card
title="Markdown Validator"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/markdown_validator"
icon="markdown"
iconType="solid"
>
Markdown 파일을 검증하는 Crew agent의 예시입니다.
</Card>
<Card
title="Instagram Post"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/instagram_post"
icon="square-instagram"
iconType="brands"
>
Instagram 게시물을 생성하는 Crew agent의 예시입니다.
</Card>
</CardGroup>
### 추가 정보
시작하려면 [AgentOps 계정](https://agentops.ai/?=crew)을 생성하세요.
기능 요청이나 버그 보고가 필요하시면 [AgentOps Repo](https://github.com/AgentOps-AI/agentops)에서 AgentOps 팀에 문의해 주세요.
#### 추가 링크
<a href="https://twitter.com/agentopsai/">🐦 트위터</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://discord.gg/JHPt4C7r">📢 디스코드</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://app.agentops.ai/?=crew">🖇️ AgentOps 대시보드</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://docs.agentops.ai/introduction">📙 문서화</a>

View File

@@ -21,6 +21,9 @@ icon: "face-smile"
### 모니터링 & 트레이싱 플랫폼
<CardGroup cols={2}>
<Card title="AgentOps" icon="paperclip" href="/ko/observability/agentops">
에이전트 개발 및 운영을 위한 세션 리플레이, 메트릭, 모니터링 제공.
</Card>
<Card title="LangDB" icon="database" href="/ko/observability/langdb">
자동 에이전트 상호작용 캡처를 포함한 CrewAI 워크플로의 엔드-투-엔드 트레이싱.

View File

@@ -177,7 +177,14 @@ class MyCustomCrew:
# Sua implementação do crew...
```
É assim que listeners de eventos de terceiros são registrados no código do CrewAI.
É exatamente assim que o `agentops_listener` integrado do CrewAI é registrado. No código-fonte do CrewAI, você encontrará:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
Isso garante que o `agentops_listener` seja carregado quando o pacote `crewai.utilities.events` for importado.
## Tipos de Eventos Disponíveis
@@ -262,6 +269,77 @@ A estrutura do objeto de evento depende do tipo do evento, mas todos herdam de `
Campos adicionais variam pelo tipo de evento. Por exemplo, `CrewKickoffCompletedEvent` inclui os campos `crew_name` e `output`.
## Exemplo Real: Integração com AgentOps
O CrewAI inclui um exemplo de integração com [AgentOps](https://github.com/AgentOps-AI/agentops), uma plataforma de monitoramento e observabilidade para agentes de IA. Veja como é implementado:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
Esse listener inicializa uma sessão do AgentOps quando um Crew inicia, cadastra agentes no AgentOps, rastreia o uso de ferramentas e finaliza a sessão quando o Crew é concluído.
O listener AgentOps é registrado no sistema de eventos do CrewAI via importação em `src/crewai/utilities/events/third_party/__init__.py`:
```python
from .agentops_listener import agentops_listener
```
Isso garante que o `agentops_listener` seja carregado quando o pacote `crewai.utilities.events` for importado.
## Uso Avançado: Handlers Escopados

View File

@@ -0,0 +1,126 @@
---
title: Integração com AgentOps
description: Entendendo e registrando a performance do seu agente com AgentOps.
icon: paperclip
---
# Introdução
Observabilidade é um aspecto fundamental no desenvolvimento e implantação de agentes de IA conversacional. Ela permite que desenvolvedores compreendam como seus agentes estão performando,
como eles estão interagindo com os usuários e como utilizam ferramentas externas e APIs.
AgentOps é um produto independente do CrewAI que fornece uma solução completa de observabilidade para agentes.
## AgentOps
[AgentOps](https://agentops.ai/?=crew) oferece replay de sessões, métricas e monitoramento para agentes.
Em um alto nível, o AgentOps oferece a capacidade de monitorar custos, uso de tokens, latência, falhas do agente, estatísticas de sessão e muito mais.
Para mais informações, confira o [Repositório do AgentOps](https://github.com/AgentOps-AI/agentops).
### Visão Geral
AgentOps fornece monitoramento para agentes em desenvolvimento e produção.
Disponibiliza um dashboard para acompanhamento de performance dos agentes, replay de sessões e relatórios personalizados.
Além disso, o AgentOps traz análises detalhadas das sessões para visualizar interações do agente Crew, chamadas LLM e uso de ferramentas em tempo real.
Esse recurso é útil para depuração e entendimento de como os agentes interagem com usuários e entre si.
![Visão geral de uma série selecionada de execuções de sessões do agente](/images/agentops-overview.png)
![Visão geral das análises detalhadas de sessões para examinar execuções de agentes](/images/agentops-session.png)
![Visualizando um gráfico de execução passo a passo do replay do agente](/images/agentops-replay.png)
### Funcionalidades
- **Gerenciamento e Rastreamento de Custos de LLM**: Acompanhe gastos com provedores de modelos fundamentais.
- **Análises de Replay**: Assista gráficos de execução do agente, passo a passo.
- **Detecção de Pensamento Recursivo**: Identifique quando agentes entram em loops infinitos.
- **Relatórios Personalizados**: Crie análises customizadas sobre a performance dos agentes.
- **Dashboard Analítico**: Monitore estatísticas gerais de agentes em desenvolvimento e produção.
- **Teste de Modelos Públicos**: Teste seus agentes em benchmarks e rankings.
- **Testes Personalizados**: Execute seus agentes em testes específicos de domínio.
- **Depuração com Viagem no Tempo**: Reinicie suas sessões a partir de checkpoints.
- **Conformidade e Segurança**: Crie registros de auditoria e detecte possíveis ameaças como uso de palavrões e vazamento de dados pessoais.
- **Detecção de Prompt Injection**: Identifique possíveis injeções de código e vazamentos de segredos.
### Utilizando o AgentOps
<Steps>
<Step title="Crie uma Chave de API">
Crie uma chave de API de usuário aqui: [Create API Key](https://app.agentops.ai/account)
</Step>
<Step title="Configure seu Ambiente">
Adicione sua chave API nas variáveis de ambiente:
```bash
AGENTOPS_API_KEY=<YOUR_AGENTOPS_API_KEY>
```
</Step>
<Step title="Instale o AgentOps">
Instale o AgentOps com:
```bash
pip install 'crewai[agentops]'
```
ou
```bash
pip install agentops
```
</Step>
<Step title="Inicialize o AgentOps">
Antes de utilizar o `Crew` no seu script, inclua estas linhas:
```python
import agentops
agentops.init()
```
Isso irá iniciar uma sessão do AgentOps e também rastrear automaticamente os agentes Crew. Para mais detalhes sobre como adaptar sistemas de agentes mais complexos,
confira a [documentação do AgentOps](https://docs.agentops.ai) ou participe do [Discord](https://discord.gg/j4f3KbeH).
</Step>
</Steps>
### Exemplos de Crew + AgentOps
<CardGroup cols={3}>
<Card
title="Vaga de Emprego"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/job-posting"
icon="briefcase"
iconType="solid"
>
Exemplo de um agente Crew que gera vagas de emprego.
</Card>
<Card
title="Validador de Markdown"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/markdown_validator"
icon="markdown"
iconType="solid"
>
Exemplo de um agente Crew que valida arquivos Markdown.
</Card>
<Card
title="Post no Instagram"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/instagram_post"
icon="square-instagram"
iconType="brands"
>
Exemplo de um agente Crew que gera posts para Instagram.
</Card>
</CardGroup>
### Mais Informações
Para começar, crie uma [conta AgentOps](https://agentops.ai/?=crew).
Para sugestões de funcionalidades ou relatos de bugs, entre em contato com o time do AgentOps pelo [Repositório do AgentOps](https://github.com/AgentOps-AI/agentops).
#### Links Extras
<a href="https://twitter.com/agentopsai/">🐦 Twitter</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://discord.gg/JHPt4C7r">📢 Discord</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://app.agentops.ai/?=crew">🖇️ Dashboard AgentOps</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://docs.agentops.ai/introduction">📙 Documentação</a>

View File

@@ -21,6 +21,9 @@ A observabilidade é fundamental para entender como seus agentes CrewAI estão d
### Plataformas de Monitoramento e Rastreamento
<CardGroup cols={2}>
<Card title="AgentOps" icon="paperclip" href="/pt-BR/observability/agentops">
Replays de sessões, métricas e monitoramento para desenvolvimento e produção de agentes.
</Card>
<Card title="LangDB" icon="database" href="/pt-BR/observability/langdb">
Rastreamento ponta a ponta para fluxos de trabalho CrewAI com captura automática de interações de agentes.

View File

@@ -52,6 +52,7 @@ tools = ["crewai-tools~=0.62.0"]
embeddings = [
"tiktoken~=0.8.0"
]
agentops = ["agentops==0.3.18"]
pdfplumber = [
"pdfplumber>=0.11.4",
]

View File

@@ -1,7 +1,7 @@
import shutil
import subprocess
import time
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Type, Union, cast
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Type, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -399,13 +399,8 @@ class Agent(BaseAgent):
),
)
if tools is not None:
agent_tools: List[Union[BaseTool, dict]] = cast(List[Union[BaseTool, dict]], tools)
elif self.tools is not None:
agent_tools = cast(List[Union[BaseTool, dict]], self.tools)
else:
agent_tools = []
self.create_agent_executor(tools=agent_tools, task=task)
tools = tools or self.tools or []
self.create_agent_executor(tools=tools, task=task)
if self.crew and self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt)
@@ -542,14 +537,14 @@ class Agent(BaseAgent):
)["output"]
def create_agent_executor(
self, tools: Optional[List[Union[BaseTool, dict]]] = None, task=None
self, tools: Optional[List[BaseTool]] = None, task=None
) -> None:
"""Create an agent executor for the agent.
Returns:
An instance of the CrewAgentExecutor class.
"""
raw_tools: List[Union[BaseTool, dict]] = tools or self.tools or []
raw_tools: List[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
prompt = Prompts(
@@ -808,7 +803,7 @@ class Agent(BaseAgent):
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=[tool for tool in (self.tools or []) if isinstance(tool, BaseTool)],
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,
@@ -846,7 +841,7 @@ class Agent(BaseAgent):
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=[tool for tool in (self.tools or []) if isinstance(tool, BaseTool)],
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional
from pydantic import PrivateAttr
@@ -25,11 +25,11 @@ class BaseAgentAdapter(BaseAgent, ABC):
self._agent_config = agent_config
@abstractmethod
def configure_tools(self, tools: Optional[List[Union[BaseTool, dict]]] = None) -> None:
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure and adapt tools for the specific agent implementation.
Args:
tools: Optional list of BaseTool instances and raw tool definitions to be configured
tools: Optional list of BaseTool instances to be configured
"""
pass

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Union
from typing import Any, AsyncIterable, Dict, List, Optional
from pydantic import Field, PrivateAttr
@@ -22,6 +22,7 @@ from crewai.utilities.events.agent_events import (
)
try:
from langchain_core.messages import ToolMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
@@ -127,8 +128,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task using the LangGraph workflow."""
mixed_tools: Optional[List[Union[BaseTool, dict]]] = tools # type: ignore[assignment]
self.create_agent_executor(mixed_tools)
self.create_agent_executor(tools)
self.configure_structured_output(task)
@@ -198,20 +198,17 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: Optional[List[Union[BaseTool, dict]]] = None) -> None:
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure the LangGraph agent for execution."""
self.configure_tools(tools)
def configure_tools(self, tools: Optional[List[Union[BaseTool, dict]]] = None) -> None:
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure tools for the LangGraph agent."""
if tools:
base_tools = [tool for tool in tools if isinstance(tool, BaseTool)]
existing_base_tools = [tool for tool in (self.tools or []) if isinstance(tool, BaseTool)]
all_tools = existing_base_tools + base_tools
if all_tools:
self._tool_adapter.configure_tools(all_tools)
available_tools = self._tool_adapter.tools()
self._graph.tools = available_tools
all_tools = list(self.tools or []) + list(tools or [])
self._tool_adapter.configure_tools(all_tools)
available_tools = self._tool_adapter.tools()
self._graph.tools = available_tools
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
"""Implement delegation tools support for LangGraph."""

View File

@@ -1,4 +1,4 @@
from typing import Any, List, Optional, Union
from typing import Any, List, Optional
from pydantic import Field, PrivateAttr
@@ -152,12 +152,10 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
self.agent_executor = Runner
def configure_tools(self, tools: Optional[List[Union[BaseTool, dict]]] = None) -> None:
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
"""Configure tools for the OpenAI Assistant"""
if tools:
base_tools = [tool for tool in tools if isinstance(tool, BaseTool)]
if base_tools:
self._tool_adapter.configure_tools(base_tools)
self._tool_adapter.configure_tools(tools)
if self._tool_adapter.converted_tools:
self._openai_agent.tools = self._tool_adapter.converted_tools

View File

@@ -2,7 +2,7 @@ import uuid
from abc import ABC, abstractmethod
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, TypeVar, Union
from typing import Any, Callable, Dict, List, Optional, TypeVar
from pydantic import (
UUID4,
@@ -25,6 +25,7 @@ from crewai.security.security_config import SecurityConfig
from crewai.tools.base_tool import BaseTool, Tool
from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter
from crewai.utilities.string_utils import interpolate_only
T = TypeVar("T", bound="BaseAgent")
@@ -107,7 +108,7 @@ class BaseAgent(ABC, BaseModel):
default=False,
description="Enable agent to delegate and ask questions among each other.",
)
tools: Optional[List[Union[BaseTool, dict]]] = Field(
tools: Optional[List[BaseTool]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: int = Field(
@@ -167,34 +168,29 @@ class BaseAgent(ABC, BaseModel):
@field_validator("tools")
@classmethod
def validate_tools(cls, tools: List[Any]) -> List[Union[BaseTool, dict]]:
def validate_tools(cls, tools: List[Any]) -> List[BaseTool]:
"""Validate and process the tools provided to the agent.
This method ensures that each tool is either an instance of BaseTool,
an object with 'name', 'func', and 'description' attributes, or a
raw tool definition (dict) for hosted/server-side tools.
This method ensures that each tool is either an instance of BaseTool
or an object with 'name', 'func', and 'description' attributes. If the
tool meets these criteria, it is processed and added to the list of
tools. Otherwise, a ValueError is raised.
"""
if not tools:
return []
processed_tools: List[Union[BaseTool, dict]] = []
processed_tools = []
required_attrs = ["name", "func", "description"]
for tool in tools:
if isinstance(tool, BaseTool):
processed_tools.append(tool)
elif isinstance(tool, dict):
if "name" not in tool:
raise ValueError(
f"Raw tool definition must have a 'name' field: {tool}"
)
processed_tools.append(tool)
elif all(hasattr(tool, attr) for attr in required_attrs):
# Tool has the required attributes, create a Tool instance
processed_tools.append(Tool.from_langchain(tool))
else:
raise ValueError(
f"Invalid tool type: {type(tool)}. "
"Tool must be an instance of BaseTool, a dict for hosted tools, or "
"Tool must be an instance of BaseTool or "
"an object with 'name', 'func', and 'description' attributes."
)
return processed_tools

View File

@@ -48,7 +48,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
agent: BaseAgent,
prompt: dict[str, str],
max_iter: int,
tools: List[Union[CrewStructuredTool, dict]],
tools: List[CrewStructuredTool],
tools_names: str,
stop_words: List[str],
tools_description: str,
@@ -84,8 +84,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.messages: List[Dict[str, str]] = []
self.iterations = 0
self.log_error_after = 3
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool, dict]] = {
tool.name if hasattr(tool, 'name') else tool.get('name', 'unknown'): tool for tool in self.tools
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
tool.name: tool for tool in self.tools
}
existing_stop = self.llm.stop or []
self.llm.stop = list(

View File

@@ -1,13 +1,9 @@
from .utils import TokenManager
class AuthError(Exception):
pass
def get_auth_token() -> str:
"""Get the authentication token."""
access_token = TokenManager().get_token()
if not access_token:
raise AuthError("No token found, make sure you are logged in")
raise Exception("No token found, make sure you are logged in")
return access_token

View File

@@ -18,7 +18,6 @@ class PlusAPI:
CREWS_RESOURCE = "/crewai_plus/api/v1/crews"
AGENTS_RESOURCE = "/crewai_plus/api/v1/agents"
TRACING_RESOURCE = "/crewai_plus/api/v1/tracing"
EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral"
def __init__(self, api_key: str) -> None:
self.api_key = api_key
@@ -125,11 +124,6 @@ class PlusAPI:
"POST", f"{self.TRACING_RESOURCE}/batches", json=payload
)
def initialize_ephemeral_trace_batch(self, payload) -> requests.Response:
return self._make_request(
"POST", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches", json=payload
)
def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"POST",
@@ -137,27 +131,9 @@ class PlusAPI:
json=payload,
)
def send_ephemeral_trace_events(
self, trace_batch_id: str, payload
) -> requests.Response:
return self._make_request(
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
)
def finalize_trace_batch(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
)
def finalize_ephemeral_trace_batch(
self, trace_batch_id: str, payload
) -> requests.Response:
return self._make_request(
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
)

View File

@@ -77,10 +77,7 @@ from crewai.utilities.events.listeners.tracing.trace_listener import (
)
from crewai.utilities.events.listeners.tracing.utils import (
is_tracing_enabled,
on_first_execution_tracing_confirmation,
)
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -286,11 +283,8 @@ class Crew(FlowTrackable, BaseModel):
self._cache_handler = CacheHandler()
event_listener = EventListener()
if on_first_execution_tracing_confirmation():
self.tracing = True
if is_tracing_enabled() or self.tracing:
trace_listener = TraceCollectionListener()
trace_listener = TraceCollectionListener(tracing=self.tracing)
trace_listener.setup_listeners(crewai_event_bus)
event_listener.verbose = self.verbose
event_listener.formatter.verbose = self.verbose

View File

@@ -43,9 +43,7 @@ class ToolSelectionEvaluator(BaseEvaluator):
available_tools_info = ""
if agent.tools:
for tool in agent.tools:
tool_name = tool.name if hasattr(tool, 'name') else tool.get('name', 'unknown')
tool_desc = tool.description if hasattr(tool, 'description') else tool.get('description', 'No description')
available_tools_info += f"- {tool_name}: {tool_desc}\n"
available_tools_info += f"- {tool.name}: {tool.description}\n"
else:
available_tools_info = "No tools available"

View File

@@ -38,10 +38,7 @@ from crewai.utilities.events.flow_events import (
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.events.listeners.tracing.utils import (
is_tracing_enabled,
on_first_execution_tracing_confirmation,
)
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
@@ -479,12 +476,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Initialize state with initial values
self._state = self._create_initial_state()
self.tracing = tracing
if (
on_first_execution_tracing_confirmation()
or is_tracing_enabled()
or self.tracing
):
trace_listener = TraceCollectionListener()
if is_tracing_enabled() or tracing:
trace_listener = TraceCollectionListener(tracing=tracing)
trace_listener.setup_listeners(crewai_event_bus)
# Apply any additional kwargs
if kwargs:

View File

@@ -564,10 +564,9 @@ class LiteAgent(FlowTrackable, BaseModel):
if isinstance(formatted_answer, AgentAction):
try:
from typing import cast
tool_result = execute_tool_and_check_finality(
agent_action=formatted_answer,
tools=cast(List[Union[CrewStructuredTool, dict]], self._parsed_tools),
tools=self._parsed_tools,
i18n=self.i18n,
agent_key=self.key,
agent_role=self.role,

View File

@@ -1,7 +1,7 @@
import inspect
import logging
from pathlib import Path
from typing import Any, Callable, Dict, TypeVar, cast, List
from typing import Any, Callable, Dict, TypeVar, cast, List, Optional
from crewai.tools import BaseTool
import yaml
@@ -86,7 +86,7 @@ def CrewBase(cls: T) -> T:
import types
return types.MethodType(_close_mcp_server, self)
def get_mcp_tools(self, *tool_names: list[str]) -> List[BaseTool]:
def get_mcp_tools(self, *tool_names: list[str], connect_timeout: Optional[int] = 30) -> List[BaseTool]:
if not self.mcp_server_params:
return []
@@ -94,7 +94,7 @@ def CrewBase(cls: T) -> T:
adapter = getattr(self, '_mcp_server_adapter', None)
if not adapter:
self._mcp_server_adapter = MCPServerAdapter(self.mcp_server_params)
self._mcp_server_adapter = MCPServerAdapter(self.mcp_server_params, connect_timeout=connect_timeout)
return self._mcp_server_adapter.tools.filter_by_names(tool_names or None)

View File

@@ -11,6 +11,7 @@ from crewai.agents.parser import (
)
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.tools import BaseTool as CrewAITool
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
@@ -24,28 +25,26 @@ from crewai.cli.config import Settings
console = Console()
def parse_tools(tools: List[Union[BaseTool, dict]]) -> List[Union[CrewStructuredTool, dict]]:
def parse_tools(tools: List[BaseTool]) -> List[CrewStructuredTool]:
"""Parse tools to be used for the task."""
tools_list: List[Union[CrewStructuredTool, dict]] = []
tools_list = []
for tool in tools:
if isinstance(tool, dict):
tools_list.append(tool)
elif isinstance(tool, BaseTool):
if isinstance(tool, CrewAITool):
tools_list.append(tool.to_structured_tool())
else:
raise ValueError(f"Tool is not a CrewStructuredTool, BaseTool, or raw tool definition (dict): {type(tool)}")
raise ValueError("Tool is not a CrewStructuredTool or BaseTool")
return tools_list
def get_tool_names(tools: Sequence[Union[CrewStructuredTool, BaseTool, dict]]) -> str:
def get_tool_names(tools: Sequence[Union[CrewStructuredTool, BaseTool]]) -> str:
"""Get the names of the tools."""
return ", ".join([t.name if hasattr(t, 'name') else t.get('name', 'unknown') for t in tools])
return ", ".join([t.name for t in tools])
def render_text_description_and_args(
tools: Sequence[Union[CrewStructuredTool, BaseTool, dict]],
tools: Sequence[Union[CrewStructuredTool, BaseTool]],
) -> str:
"""Render the tool name, description, and args in plain text.
@@ -55,12 +54,7 @@ def render_text_description_and_args(
"""
tool_strings = []
for tool in tools:
if hasattr(tool, 'description'):
tool_strings.append(tool.description)
elif isinstance(tool, dict) and 'description' in tool:
tool_strings.append(f"Tool name: {tool.get('name', 'unknown')}\nTool description:\n{tool['description']}")
else:
tool_strings.append(f"Tool name: {tool.get('name', 'unknown') if isinstance(tool, dict) else 'unknown'}")
tool_strings.append(tool.description)
return "\n".join(tool_strings)

View File

@@ -67,9 +67,11 @@ from .memory_events import (
# events
from .event_listener import EventListener
from .third_party.agentops_listener import agentops_listener
__all__ = [
"EventListener",
"agentops_listener",
"CrewAIEventsBus",
"crewai_event_bus",
"AgentExecutionStartedEvent",
@@ -103,6 +105,7 @@ __all__ = [
"MemoryRetrievalStartedEvent",
"MemoryRetrievalCompletedEvent",
"EventListener",
"agentops_listener",
"CrewKickoffStartedEvent",
"CrewKickoffCompletedEvent",
"CrewKickoffFailedEvent",

View File

@@ -4,7 +4,7 @@ from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from crewai.utilities.constants import CREWAI_BASE_URL
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.cli.plus_api import PlusAPI
@@ -41,21 +41,14 @@ class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
def __init__(self):
try:
self.plus_api = PlusAPI(api_key=get_auth_token())
except AuthError:
self.plus_api = PlusAPI(api_key="")
self.plus_api = PlusAPI(api_key=get_auth_token())
self.trace_batch_id: Optional[str] = None # Backend ID
self.current_batch: Optional[TraceBatch] = None
self.event_buffer: List[TraceEvent] = []
self.execution_start_times: Dict[str, datetime] = {}
def initialize_batch(
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
use_ephemeral: bool = False,
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
) -> TraceBatch:
"""Initialize a new trace batch"""
self.current_batch = TraceBatch(
@@ -64,15 +57,13 @@ class TraceBatchManager:
self.event_buffer.clear()
self.record_start_time("execution")
self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral)
self._initialize_backend_batch(user_context, execution_metadata)
return self.current_batch
def _initialize_backend_batch(
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
use_ephemeral: bool = False,
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
):
"""Send batch initialization to backend"""
@@ -83,7 +74,6 @@ class TraceBatchManager:
payload = {
"trace_id": self.current_batch.batch_id,
"execution_type": execution_metadata.get("execution_type", "crew"),
"user_identifier": execution_metadata.get("user_context", None),
"execution_context": {
"crew_fingerprint": execution_metadata.get("crew_fingerprint"),
"crew_name": execution_metadata.get("crew_name", None),
@@ -101,22 +91,12 @@ class TraceBatchManager:
"execution_started_at": datetime.now(timezone.utc).isoformat(),
},
}
if use_ephemeral:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
response = self.plus_api.initialize_trace_batch(payload)
if response.status_code == 201 or response.status_code == 200:
response_data = response.json()
self.trace_batch_id = (
response_data["trace_id"]
if not use_ephemeral
else response_data["ephemeral_trace_id"]
)
self.trace_batch_id = response_data["trace_id"]
console = Console()
panel = Panel(
f"✅ Trace batch initialized with session ID: {self.trace_batch_id}",
@@ -136,7 +116,7 @@ class TraceBatchManager:
"""Add event to buffer"""
self.event_buffer.append(trace_event)
def _send_events_to_backend(self, ephemeral: bool = True):
def _send_events_to_backend(self):
"""Send buffered events to backend"""
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return
@@ -154,11 +134,7 @@ class TraceBatchManager:
if not self.trace_batch_id:
raise Exception("❌ Trace batch ID not found")
response = (
self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload)
if ephemeral
else self.plus_api.send_trace_events(self.trace_batch_id, payload)
)
response = self.plus_api.send_trace_events(self.trace_batch_id, payload)
if response.status_code == 200 or response.status_code == 201:
self.event_buffer.clear()
@@ -170,15 +146,15 @@ class TraceBatchManager:
except Exception as e:
logger.error(f"❌ Error sending events to backend: {str(e)}")
def finalize_batch(self, ephemeral: bool = True) -> Optional[TraceBatch]:
def finalize_batch(self) -> Optional[TraceBatch]:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
if self.event_buffer:
self._send_events_to_backend(ephemeral)
self._send_events_to_backend()
self._finalize_backend_batch(ephemeral)
self._finalize_backend_batch()
self.current_batch.events = self.event_buffer.copy()
@@ -192,7 +168,7 @@ class TraceBatchManager:
return finalized_batch
def _finalize_backend_batch(self, ephemeral: bool = True):
def _finalize_backend_batch(self):
"""Send batch finalization to backend"""
if not self.plus_api or not self.trace_batch_id:
return
@@ -206,24 +182,12 @@ class TraceBatchManager:
"final_event_count": total_events,
}
response = (
self.plus_api.finalize_ephemeral_trace_batch(
self.trace_batch_id, payload
)
if ephemeral
else self.plus_api.finalize_trace_batch(self.trace_batch_id, payload)
)
response = self.plus_api.finalize_trace_batch(self.trace_batch_id, payload)
if response.status_code == 200:
access_code = response.json().get("access_code", None)
console = Console()
return_link = (
f"{CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}"
if not ephemeral and access_code
else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
)
panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}",
title="Trace Batch Finalization",
border_style="green",
)

View File

@@ -13,6 +13,7 @@ from crewai.utilities.events.agent_events import (
AgentExecutionErrorEvent,
)
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.events.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
@@ -66,7 +67,7 @@ from crewai.utilities.events.memory_events import (
MemorySaveFailedEvent,
)
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.version import get_crewai_version
@@ -75,12 +76,13 @@ class TraceCollectionListener(BaseEventListener):
Trace collection listener that orchestrates trace collection
"""
trace_enabled: Optional[bool] = False
complex_events = ["task_started", "llm_call_started", "llm_call_completed"]
_instance = None
_initialized = False
def __new__(cls, batch_manager=None):
def __new__(cls, batch_manager=None, tracing: Optional[bool] = False):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@@ -88,22 +90,25 @@ class TraceCollectionListener(BaseEventListener):
def __init__(
self,
batch_manager: Optional[TraceBatchManager] = None,
tracing: Optional[bool] = False,
):
if self._initialized:
return
super().__init__()
self.batch_manager = batch_manager or TraceBatchManager()
self.tracing = tracing or False
self.trace_enabled = self._check_trace_enabled()
self._initialized = True
def _check_authenticated(self) -> bool:
def _check_trace_enabled(self) -> bool:
"""Check if tracing should be enabled"""
try:
res = bool(get_auth_token())
return res
except AuthError:
auth_token = get_auth_token()
if not auth_token:
return False
return is_tracing_enabled() or self.tracing
def _get_user_context(self) -> Dict[str, str]:
"""Extract user context for tracing"""
return {
@@ -115,6 +120,8 @@ class TraceCollectionListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
"""Setup event listeners - delegates to specific handlers"""
if not self.trace_enabled:
return
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
@@ -160,13 +167,13 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
if not self.batch_manager.is_batch_initialized():
self._initialize_crew_batch(source, event)
self._initialize_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
self._handle_trace_event("crew_kickoff_completed", source, event)
self.batch_manager.finalize_batch(ephemeral=True)
self.batch_manager.finalize_batch()
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):
@@ -280,7 +287,7 @@ class TraceCollectionListener(BaseEventListener):
def on_agent_reasoning_failed(source, event):
self._handle_action_event("agent_reasoning_failed", source, event)
def _initialize_crew_batch(self, source: Any, event: Any):
def _initialize_batch(self, source: Any, event: Any):
"""Initialize trace batch"""
user_context = self._get_user_context()
execution_metadata = {
@@ -289,7 +296,7 @@ class TraceCollectionListener(BaseEventListener):
"crewai_version": get_crewai_version(),
}
self._initialize_batch(user_context, execution_metadata)
self.batch_manager.initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any):
"""Initialize trace batch for Flow execution"""
@@ -301,20 +308,7 @@ class TraceCollectionListener(BaseEventListener):
"execution_type": "flow",
}
self._initialize_batch(user_context, execution_metadata)
def _initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
):
"""Initialize trace batch if ephemeral"""
if not self._check_authenticated():
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=True
)
else:
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=False
)
self.batch_manager.initialize_batch(user_context, execution_metadata)
def _handle_trace_event(self, event_type: str, source: Any, event: Any):
"""Generic handler for context end events"""

View File

@@ -1,153 +1,5 @@
import os
import platform
import uuid
import hashlib
import subprocess
import getpass
from pathlib import Path
from datetime import datetime
import re
import json
import click
from crewai.utilities.paths import db_storage_path
def is_tracing_enabled() -> bool:
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true"
def on_first_execution_tracing_confirmation() -> bool:
if _is_test_environment():
return False
if is_first_execution():
mark_first_execution_done()
return click.confirm(
"This is the first execution of CrewAI. Do you want to enable tracing?",
default=True,
show_default=True,
)
return False
def _is_test_environment() -> bool:
"""Detect if we're running in a test environment."""
return os.environ.get("CREWAI_TESTING", "").lower() == "true"
def _get_machine_id() -> str:
"""Stable, privacy-preserving machine fingerprint (cross-platform)."""
parts = []
try:
mac = ":".join(
["{:02x}".format((uuid.getnode() >> b) & 0xFF) for b in range(0, 12, 2)][
::-1
]
)
parts.append(mac)
except Exception:
pass
sysname = platform.system()
parts.append(sysname)
try:
if sysname == "Darwin":
res = subprocess.run(
["system_profiler", "SPHardwareDataType"],
capture_output=True,
text=True,
timeout=2,
)
m = re.search(r"Hardware UUID:\s*([A-Fa-f0-9\-]+)", res.stdout)
if m:
parts.append(m.group(1))
elif sysname == "Linux":
try:
parts.append(Path("/etc/machine-id").read_text().strip())
except Exception:
parts.append(Path("/sys/class/dmi/id/product_uuid").read_text().strip())
elif sysname == "Windows":
res = subprocess.run(
["wmic", "csproduct", "get", "UUID"],
capture_output=True,
text=True,
timeout=2,
)
lines = [line.strip() for line in res.stdout.splitlines() if line.strip()]
if len(lines) >= 2:
parts.append(lines[1])
except Exception:
pass
return hashlib.sha256("".join(parts).encode()).hexdigest()
def _user_data_file() -> Path:
base = Path(db_storage_path())
base.mkdir(parents=True, exist_ok=True)
return base / ".crewai_user.json"
def _load_user_data() -> dict:
p = _user_data_file()
if p.exists():
try:
return json.loads(p.read_text())
except Exception:
pass
return {}
def _save_user_data(data: dict) -> None:
try:
p = _user_data_file()
p.write_text(json.dumps(data, indent=2))
except Exception:
pass
def get_user_id() -> str:
"""Stable, anonymized user identifier with caching."""
data = _load_user_data()
if "user_id" in data:
return data["user_id"]
try:
username = getpass.getuser()
except Exception:
username = "unknown"
seed = f"{username}|{_get_machine_id()}"
uid = hashlib.sha256(seed.encode()).hexdigest()
data["user_id"] = uid
_save_user_data(data)
return uid
def is_first_execution() -> bool:
"""True if this is the first execution for this user."""
data = _load_user_data()
return not data.get("first_execution_done", False)
def mark_first_execution_done() -> None:
"""Mark that the first execution has been completed."""
data = _load_user_data()
if data.get("first_execution_done", False):
return
data.update(
{
"first_execution_done": True,
"first_execution_at": datetime.now().timestamp(),
"user_id": get_user_id(),
"machine_id": _get_machine_id(),
}
)
_save_user_data(data)

View File

@@ -0,0 +1 @@
from .agentops_listener import agentops_listener

View File

@@ -0,0 +1,67 @@
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
@crewai_event_bus.on(TaskEvaluationEvent)
def on_task_evaluation(source, event: TaskEvaluationEvent):
if self.session:
self.session.create_agent(
name="Task Evaluator", agent_id=str(source.original_agent.id)
)
agentops_listener = AgentOpsListener()

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional
from crewai.agents.parser import AgentAction
from crewai.security import Fingerprint
@@ -10,7 +10,7 @@ from crewai.utilities.i18n import I18N
def execute_tool_and_check_finality(
agent_action: AgentAction,
tools: List[Union[CrewStructuredTool, dict]],
tools: List[CrewStructuredTool],
i18n: I18N,
agent_key: Optional[str] = None,
agent_role: Optional[str] = None,
@@ -37,8 +37,7 @@ def execute_tool_and_check_finality(
ToolResult containing the execution result and whether it should be treated as a final answer
"""
try:
executable_tools = [tool for tool in tools if hasattr(tool, 'name') and hasattr(tool, 'result_as_answer')]
tool_name_to_tool_map = {tool.name: tool for tool in executable_tools}
tool_name_to_tool_map = {tool.name: tool for tool in tools}
if agent_key and agent_role and agent:
fingerprint_context = fingerprint_context or {}
@@ -53,12 +52,10 @@ def execute_tool_and_check_finality(
except Exception as e:
raise ValueError(f"Failed to set fingerprint: {e}")
# Create tool usage instance - filter to only CrewStructuredTool instances for ToolUsage
from typing import cast
crew_structured_tools = [tool for tool in tools if hasattr(tool, 'name') and hasattr(tool, 'result_as_answer') and not isinstance(tool, dict)]
# Create tool usage instance
tool_usage = ToolUsage(
tools_handler=tools_handler,
tools=cast(List[CrewStructuredTool], crew_structured_tools),
tools=tools,
function_calling_llm=function_calling_llm,
task=task,
agent=agent,
@@ -85,7 +82,7 @@ def execute_tool_and_check_finality(
# Handle invalid tool name
tool_result = i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in executable_tools]),
tools=", ".join([tool.name.casefold() for tool in tools]),
)
return ToolResult(tool_result, False)

View File

@@ -34,11 +34,11 @@ def setup_test_environment():
f"Test storage directory {storage_dir} is not writable: {e}"
)
# Set environment variable to point to the test storage directory
os.environ["CREWAI_STORAGE_DIR"] = str(storage_dir)
os.environ["CREWAI_TESTING"] = "true"
yield
os.environ.pop("CREWAI_TESTING", None)
# Cleanup is handled automatically when tempfile context exits

View File

@@ -270,4 +270,34 @@ def test_internal_crew_with_mcp():
assert crew.reporting_analyst().tools == [simple_tool, another_simple_tool]
assert crew.researcher().tools == [simple_tool]
adapter_mock.assert_called_once_with({"host": "localhost", "port": 8000})
adapter_mock.assert_called_once_with({"host": "localhost", "port": 8000}, connect_timeout=30)
def test_internal_crew_with_mcp_connect_timeout():
from crewai_tools import MCPServerAdapter
from crewai_tools.adapters.mcp_adapter import ToolCollection
@CrewBase
class TestCrewWithMCP:
mcp_server_params = {"host": "localhost", "port": 8000}
agents_config = None
tasks_config = None
mock = Mock(spec=MCPServerAdapter)
mock.tools = ToolCollection([simple_tool, another_simple_tool])
with patch("crewai_tools.MCPServerAdapter", return_value=mock) as adapter_mock:
crew1 = TestCrewWithMCP()
crew1.get_mcp_tools()
adapter_mock.assert_called_with({"host": "localhost", "port": 8000}, connect_timeout=30)
adapter_mock.reset_mock()
crew2 = TestCrewWithMCP()
crew2.get_mcp_tools(connect_timeout=60)
adapter_mock.assert_called_with({"host": "localhost", "port": 8000}, connect_timeout=60)
adapter_mock.reset_mock()
crew3 = TestCrewWithMCP()
crew3.get_mcp_tools("simple_tool", connect_timeout=45)
adapter_mock.assert_called_with({"host": "localhost", "port": 8000}, connect_timeout=45)

View File

@@ -1,237 +0,0 @@
import pytest
from crewai import Agent
from crewai.tools import BaseTool
class MockTool(BaseTool):
name: str = "mock_tool"
description: str = "A mock tool for testing"
def _run(self, query: str) -> str:
return f"Mock result for: {query}"
def test_agent_with_crewai_tools_only():
"""Test backward compatibility with CrewAI tools only."""
mock_tool = MockTool()
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[mock_tool]
)
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], BaseTool)
def test_agent_with_raw_tools_only():
"""Test agent with raw tool definitions only."""
raw_tool = {
"name": "hosted_search",
"description": "Search the web using hosted search",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[raw_tool]
)
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], dict)
assert agent.tools[0]["name"] == "hosted_search"
def test_agent_with_mixed_tools():
"""Test agent with both CrewAI tools and raw tool definitions."""
mock_tool = MockTool()
raw_tool = {
"name": "hosted_calculator",
"description": "Hosted calculator tool",
"parameters": {
"type": "object",
"properties": {
"expression": {"type": "string"}
}
}
}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[mock_tool, raw_tool]
)
assert len(agent.tools) == 2
assert isinstance(agent.tools[0], BaseTool)
assert isinstance(agent.tools[1], dict)
def test_invalid_raw_tool_definition():
"""Test error handling for invalid raw tool definitions."""
invalid_tool = {"description": "Missing name field"}
with pytest.raises(ValueError, match="Raw tool definition must have a 'name' field"):
Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[invalid_tool]
)
def test_parse_tools_with_mixed_types():
"""Test parse_tools function with mixed tool types."""
from crewai.utilities.agent_utils import parse_tools
mock_tool = MockTool()
raw_tool = {
"name": "hosted_tool",
"description": "A hosted tool",
"parameters": {"type": "object"}
}
parsed = parse_tools([mock_tool, raw_tool])
assert len(parsed) == 2
assert hasattr(parsed[0], 'name')
assert parsed[0].name == "mock_tool"
assert isinstance(parsed[1], dict)
assert parsed[1]["name"] == "hosted_tool"
def test_get_tool_names_with_mixed_types():
"""Test get_tool_names function with mixed tool types."""
from crewai.utilities.agent_utils import get_tool_names
mock_tool = MockTool()
raw_tool = {"name": "hosted_tool", "description": "A hosted tool"}
names = get_tool_names([mock_tool, raw_tool])
assert "mock_tool" in names
assert "hosted_tool" in names
def test_render_text_description_with_mixed_types():
"""Test render_text_description_and_args function with mixed tool types."""
from crewai.utilities.agent_utils import render_text_description_and_args
mock_tool = MockTool()
raw_tool = {"name": "hosted_tool", "description": "A hosted tool"}
description = render_text_description_and_args([mock_tool, raw_tool])
assert "A mock tool for testing" in description
assert "A hosted tool" in description
def test_agent_executor_with_mixed_tools():
"""Test CrewAgentExecutor initialization with mixed tool types."""
mock_tool = MockTool()
raw_tool = {"name": "hosted_tool", "description": "A hosted tool"}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[mock_tool, raw_tool]
)
agent.create_agent_executor()
assert len(agent.agent_executor.tool_name_to_tool_map) == 2
assert "mock_tool" in agent.agent_executor.tool_name_to_tool_map
assert "hosted_tool" in agent.agent_executor.tool_name_to_tool_map
def test_empty_tools_list():
"""Test agent with empty tools list."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[]
)
assert len(agent.tools) == 0
def test_none_tools():
"""Test agent with None tools."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=None
)
assert agent.tools == []
def test_raw_tool_without_description():
"""Test raw tool definition without description field."""
raw_tool = {"name": "minimal_tool"}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[raw_tool]
)
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], dict)
assert agent.tools[0]["name"] == "minimal_tool"
def test_complex_raw_tool_definition():
"""Test complex raw tool definition with nested parameters."""
raw_tool = {
"name": "complex_search",
"description": "Advanced search with multiple parameters",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"filters": {
"type": "object",
"properties": {
"date_range": {"type": "string"},
"category": {"type": "string"}
}
},
"limit": {
"type": "integer",
"minimum": 1,
"maximum": 100,
"default": 10
}
},
"required": ["query"]
}
}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[raw_tool]
)
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], dict)
assert agent.tools[0]["name"] == "complex_search"
assert "parameters" in agent.tools[0]
assert agent.tools[0]["parameters"]["type"] == "object"

View File

@@ -2,8 +2,8 @@ import os
import pytest
from unittest.mock import patch, MagicMock
# Remove the module-level patch
from crewai import Agent, Task, Crew
from crewai.flow.flow import Flow, start
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
@@ -284,42 +284,29 @@ class TestTraceListenerSetup:
f"Found {len(trace_handlers)} trace handlers when tracing should be disabled"
)
def test_trace_listener_setup_correctly_for_crew(self):
def test_trace_listener_setup_correctly(self):
"""Test that trace listener is set up correctly when enabled"""
with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}):
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm="gpt-4o-mini",
)
task = Task(
description="Say hello to the world",
expected_output="hello world",
agent=agent,
)
with patch.object(
TraceCollectionListener, "setup_listeners"
) as mock_listener_setup:
Crew(agents=[agent], tasks=[task], verbose=True)
assert mock_listener_setup.call_count >= 1
trace_listener = TraceCollectionListener()
def test_trace_listener_setup_correctly_for_flow(self):
assert trace_listener.trace_enabled is True
assert trace_listener.batch_manager is not None
@pytest.mark.vcr(filter_headers=["authorization"])
def test_trace_listener_setup_correctly_with_tracing_flag(self):
"""Test that trace listener is set up correctly when enabled"""
with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}):
class FlowExample(Flow):
@start()
def start(self):
pass
with patch.object(
TraceCollectionListener, "setup_listeners"
) as mock_listener_setup:
FlowExample()
assert mock_listener_setup.call_count >= 1
agent = Agent(role="Test Agent", goal="Test goal", backstory="Test backstory")
task = Task(
description="Say hello to the world",
expected_output="hello world",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=True, tracing=True)
crew.kickoff()
trace_listener = TraceCollectionListener(tracing=True)
assert trace_listener.trace_enabled is True
assert trace_listener.batch_manager is not None
# Helper method to ensure cleanup
def teardown_method(self):

6656
uv.lock generated

File diff suppressed because it is too large Load Diff