Compare commits

...

7 Commits

Author SHA1 Message Date
Devin AI
95d91d2561 fix: resolve lint and type-checker issues in AgentOps integration
- Fix event handler signatures to match expected (source, event) pattern
- Change 'arguments' to 'tool_args' for tool usage events
- Update TaskEvaluationEvent to use correct attributes (evaluation_type, task)
- Fix corrupted tests/__init__.py file
- Add explicit re-export in src third_party __init__.py

Co-Authored-By: João <joao@crewai.com>
2025-08-18 17:58:59 +00:00
Devin AI
5afe3921d2 feat: restore AgentOps documentation and integration
- Add AgentOps optional dependency to pyproject.toml
- Implement AgentOpsListener with event handling for crew kickoff, tool usage, and task evaluation
- Add comprehensive AgentOps documentation in English, Portuguese, and Korean
- Update docs.json navigation to include AgentOps in observability sections
- Add agentops.log to .gitignore
- Include comprehensive tests for AgentOps integration with mocking
- Ensure graceful handling when AgentOps package is not installed

Addresses issue #3348 by restoring AgentOps integration removed in PR #3334

Co-Authored-By: João <joao@crewai.com>
2025-08-18 17:47:12 +00:00
Greyson LaLonde
1d3d7ebf5e fix: convert XMLSearchTool config values to strings for configparser compatibility (#3344) 2025-08-18 13:23:58 -04:00
Gabe Milani
2c2196f415 fix: flaky test with PytestUnraisableExceptionWarning (#3346) 2025-08-18 14:07:51 -03:00
Gabe Milani
c9f30b175c chore: ignore deprecation warning from chromadb (#3328)
* chore: ignore deprecation warning from chromadb

* adding TODO: in the comment
2025-08-18 13:24:11 -03:00
Greyson LaLonde
a17b93a7f8 Mock telemetry in pytest tests (#3340)
* Add telemetry mocking for pytest tests

- Mock telemetry by default for all tests except telemetry-specific tests
- Add @pytest.mark.telemetry marker for real telemetry tests
- Reduce test overhead and improve isolation

* Fix telemetry test isolation

- Properly isolate telemetry tests from mocking environment
- Preserve API keys and other necessary environment variables
- Ensure telemetry tests can run with real telemetry instances
2025-08-18 11:55:30 -04:00
namho kim
0d3e462791 fix: Revised Korean translation and sentence structure improvement (#3337)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
2025-08-18 10:46:13 -04:00
19 changed files with 1129 additions and 150 deletions

3
.gitignore vendored
View File

@@ -27,3 +27,6 @@ plan.md
conceptual_plan.md
build_image
chromadb-*.lock
# AgentOps
agentops.log

View File

@@ -226,6 +226,7 @@
"group": "Observability",
"pages": [
"en/observability/overview",
"en/observability/agentops",
"en/observability/arize-phoenix",
"en/observability/langdb",
"en/observability/langfuse",
@@ -565,6 +566,7 @@
"group": "Observabilidade",
"pages": [
"pt-BR/observability/overview",
"pt-BR/observability/agentops",
"pt-BR/observability/arize-phoenix",
"pt-BR/observability/langdb",
"pt-BR/observability/langfuse",
@@ -709,7 +711,7 @@
"icon": "globe"
},
{
"anchor": "법정",
"anchor": "포럼",
"href": "https://community.crewai.com",
"icon": "discourse"
},
@@ -719,7 +721,7 @@
"icon": "robot"
},
{
"anchor": "출시",
"anchor": "릴리스",
"href": "https://github.com/crewAIInc/crewAI/releases",
"icon": "tag"
}
@@ -734,22 +736,22 @@
"pages": ["ko/introduction", "ko/installation", "ko/quickstart"]
},
{
"group": "안내서",
"group": "가이드",
"pages": [
{
"group": "전략",
"pages": ["ko/guides/concepts/evaluating-use-cases"]
},
{
"group": "Agents",
"group": "에이전트 (Agents)",
"pages": ["ko/guides/agents/crafting-effective-agents"]
},
{
"group": "Crews",
"group": "크루 (Crews)",
"pages": ["ko/guides/crews/first-crew"]
},
{
"group": "Flows",
"group": "플로우 (Flows)",
"pages": [
"ko/guides/flows/first-flow",
"ko/guides/flows/mastering-flow-state"
@@ -797,7 +799,7 @@
]
},
{
"group": "도구",
"group": "도구 (Tools)",
"pages": [
"ko/tools/overview",
{
@@ -887,7 +889,7 @@
]
},
{
"group": "클라우드 & 저장",
"group": "클라우드 & 스토리지",
"pages": [
"ko/tools/cloud-storage/overview",
"ko/tools/cloud-storage/s3readertool",
@@ -909,9 +911,10 @@
]
},
{
"group": "오브저버빌리티",
"group": "Observability",
"pages": [
"ko/observability/overview",
"ko/observability/agentops",
"ko/observability/arize-phoenix",
"ko/observability/langdb",
"ko/observability/langfuse",
@@ -927,7 +930,7 @@
]
},
{
"group": "익히다",
"group": "학습",
"pages": [
"ko/learn/overview",
"ko/learn/llm-selection-guide",
@@ -951,13 +954,13 @@
]
},
{
"group": "원격측정",
"group": "Telemetry",
"pages": ["ko/telemetry"]
}
]
},
{
"tab": "기업",
"tab": "엔터프라이즈",
"groups": [
{
"group": "시작 안내",
@@ -997,7 +1000,7 @@
]
},
{
"group": "사용 안내서",
"group": "How-To Guides",
"pages": [
"ko/enterprise/guides/build-crew",
"ko/enterprise/guides/deploy-crew",

View File

@@ -0,0 +1,184 @@
---
title: "AgentOps Integration"
description: "Monitor and analyze your CrewAI agents with AgentOps observability platform"
---
# AgentOps Integration
AgentOps is a powerful observability platform designed specifically for AI agents. It provides comprehensive monitoring, analytics, and debugging capabilities for your CrewAI crews.
## Features
- **Real-time Monitoring**: Track agent performance and behavior in real-time
- **Session Replay**: Review complete agent sessions with detailed execution traces
- **Performance Analytics**: Analyze crew efficiency, tool usage, and task completion rates
- **Error Tracking**: Identify and debug issues in agent workflows
- **Cost Tracking**: Monitor LLM usage and associated costs
- **Team Collaboration**: Share insights and collaborate on agent optimization
## Installation
Install AgentOps alongside CrewAI:
```bash
pip install crewai[agentops]
```
Or install AgentOps separately:
```bash
pip install agentops
```
## Setup
1. **Get your API Key**: Sign up at [AgentOps](https://agentops.ai) and get your API key
2. **Configure your environment**: Set your AgentOps API key as an environment variable:
```bash
export AGENTOPS_API_KEY="your-api-key-here"
```
3. **Initialize AgentOps**: Add this to your CrewAI script:
```python
import agentops
from crewai import Agent, Task, Crew
# Initialize AgentOps
agentops.init()
# Your CrewAI code here
agent = Agent(
role="Data Analyst",
goal="Analyze data and provide insights",
backstory="You are an expert data analyst...",
)
task = Task(
description="Analyze the sales data and provide insights",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
)
# Run your crew
result = crew.kickoff()
# End the AgentOps session
agentops.end_session("Success")
```
## Automatic Integration
CrewAI automatically integrates with AgentOps when the library is installed. The integration captures:
- **Crew Kickoff Events**: Start and completion of crew executions
- **Tool Usage**: All tool calls and their results
- **Task Evaluations**: Task performance metrics and feedback
- **Error Events**: Any errors that occur during execution
## Configuration Options
You can customize the AgentOps integration:
```python
import agentops
# Configure AgentOps with custom settings
agentops.init(
api_key="your-api-key",
tags=["production", "data-analysis"],
auto_start_session=True,
instrument_llm_calls=True,
)
```
## Viewing Your Data
1. **Dashboard**: Visit the AgentOps dashboard to view your agent sessions
2. **Session Details**: Click on any session to see detailed execution traces
3. **Analytics**: Use the analytics tab to identify performance trends
4. **Errors**: Monitor the errors tab for debugging information
## Best Practices
- **Tag Your Sessions**: Use meaningful tags to organize your agent runs
- **Monitor Costs**: Keep track of LLM usage and associated costs
- **Review Errors**: Regularly check for and address any errors
- **Optimize Performance**: Use analytics to identify bottlenecks and optimization opportunities
## Troubleshooting
### AgentOps Not Recording Data
1. Verify your API key is set correctly
2. Check that AgentOps is properly initialized
3. Ensure you're calling `agentops.end_session()` at the end of your script
### Missing Events
If some events aren't being captured:
1. Make sure you have the latest version of both CrewAI and AgentOps
2. Check that the AgentOps listener is properly registered
3. Review the logs for any error messages
## Example: Complete Integration
```python
import os
import agentops
from crewai import Agent, Task, Crew, Process
# Initialize AgentOps
agentops.init(
api_key=os.getenv("AGENTOPS_API_KEY"),
tags=["example", "tutorial"],
)
# Define your agents
researcher = Agent(
role="Research Specialist",
goal="Conduct thorough research on given topics",
backstory="You are an expert researcher with access to various tools...",
)
writer = Agent(
role="Content Writer",
goal="Create engaging content based on research",
backstory="You are a skilled writer who can transform research into compelling content...",
)
# Define your tasks
research_task = Task(
description="Research the latest trends in AI and machine learning",
agent=researcher,
)
writing_task = Task(
description="Write a blog post about AI trends based on the research",
agent=writer,
)
# Create and run your crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
)
try:
result = crew.kickoff()
print(result)
agentops.end_session("Success")
except Exception as e:
print(f"Error: {e}")
agentops.end_session("Fail")
```
This integration provides comprehensive observability for your CrewAI agents, helping you monitor, debug, and optimize your AI workflows.

View File

@@ -1,65 +1,65 @@
---
title: 소개
description: 함께 협력하여 복잡한 작업을 해결하는 AI 에이전트 팀 구축
description: 함께 협력하여 복잡한 작업을 해결하는 AI agent 팀 구축
icon: handshake
---
# CrewAI란 무엇인가?
**CrewAI는 완전히 독립적으로, LangChain이나 기타 agent 프레임워크에 의존하지 않고 처음부터 스크래치로 개발된 가볍고 매우 빠른 Python 프레임워크입니다.**
**CrewAI는 LangChain이나 기타 agent 프레임워크에 의존하지 않고, 완전히 독립적으로 처음부터 스크래치로 개발된 가볍고 매우 빠른 Python 프레임워크입니다.**
CrewAI는 고수준의 간편함과 정밀한 저수준 제어를 모두 제공하여, 어떤 시나리오에도 맞춤화된 자율 AI agent를 만드는 데 이상적입니다:
- **[CrewAI Crews](/ko/guides/crews/first-crew)**: 자율성과 협업 지능을 극대화하여, 각 agent가 특정 역할, 도구, 목표를 가진 AI 팀을 만들 수 있습니다.
- **[CrewAI Flows](/ko/guides/flows/first-flow)**: 세밀한 이벤트 기반 제어와 단일 LLM 호출을 통한 정확한 작업 오케스트레이션을 가능하게 하며 Crews 네이티브로 지원합니다.
- **[CrewAI Flows](/ko/guides/flows/first-flow)**: 이벤트 기반의 세밀한 제어와 단일 LLM 호출을 통한 정확한 작업 orchestration을 지원하며, Crews 네이티브로 통합됩니다.
10만 명이 넘는 개발자가 커뮤니티 과정을 통해 인증을 받았으며, CrewAI는 기업용 AI 자동화의 표준으로 빠르게 자리잡고 있습니다.
## 크루 작동 방식
## Crew의 작동 방식
<Note>
회사가 비즈니스 목표를 달성하기 위해 여러 부서(영업, 엔지니어링, 마케팅 등)가 리더십 아래에서 함께 일하는 것처럼, CrewAI는 복잡한 작업을 달성하기 위해 전문화된 역할의 AI 에이전트들이 협력하는 조직을 만들 수 있도록 도와줍니다.
회사가 비즈니스 목표를 달성하기 위해 여러 부서(영업, 엔지니어링, 마케팅 등)가 리더십 아래에서 함께 일하는 것처럼, CrewAI는 복잡한 작업을 달성하기 위해 전문화된 역할의 AI agent들이 협력하는 조직을 만들 수 있도록 도와줍니다.
</Note>
<Frame caption="CrewAI 프레임워크 개요">
<Frame caption="CrewAI Framework Overview">
<img src="/images/crews.png" alt="CrewAI Framework Overview" />
</Frame>
| 구성 요소 | 설명 | 주요 특징 |
|:--------------|:---------------------:|:----------|
| **크루** | 최상위 조직 | • AI 에이전트 팀 관리<br/>• 워크플로우 감독<br/>• 협업 보장<br/>• 결과 전달 |
| **AI 에이전트** | 전문 팀원 | • 특정 역할 보유(연구원, 작가 등)<br/>• 지정된 도구 사용<br/>• 작업 위임 가능<br/>• 자율적 의사결정 가능 |
| **프로세스** | 워크플로우 관리 시스템 | • 협업 패턴 정의<br/>• 작업 할당 제어<br/>• 상호작용 관리<br/>• 효율적 실행 보장 |
| **작업** | 개별 할당 | • 명확한 목표 보유<br/>• 특정 도구 사용<br/>• 더 큰 프로세스에 기여<br/>• 실행 가능한 결과 도출 |
| 구성 요소 | 설명 | 주요 특징 |
|:----------|:----:|:----------|
| **Crew** | 최상위 조직 | • AI agent 팀 관리<br/>• workflow 감독<br/>• 협업 보장<br/>• 결과 전달 |
| **AI agents** | 전문 팀원 | • 특정 역할 보유(Researcher, Writer 등)<br/>• 지정된 도구 사용<br/>• 작업 위임 가능<br/>• 자율적 의사결정 가능 |
| **Process** | workflow 관리 시스템 | • 협업 패턴 정의<br/>• 작업 할당 제어<br/>• 상호작용 관리<br/>• 효율적 실행 보장 |
| **Task** | 개별 할당 | • 명확한 목표 보유<br/>• 특정 도구 사용<br/>• 더 큰 프로세스에 기여<br/>• 실행 가능한 결과 도출 |
### 어떻게 모두 함께 작동하는가
### 전체 구조의 동작 방식
1. **Crew**가 전체 운영을 조직합니다
2. **AI Agents**가 자신들의 전문 작업을 수행합니다
2. **AI agents**가 자신들의 전문 작업을 수행합니다
3. **Process**가 원활한 협업을 보장합니다
4. **Tasks**가 완료되어 목표를 달성합니다
## 주요 기능
<CardGroup cols={2}>
<Card title="역할 기반 에이전트" icon="users">
연구원, 분석가, 작가 등 다양한 역할, 전문성, 목표를 가진 전문 에이전트를 생성할 수 있습니다
<Card title="역할 기반 agent" icon="users">
Researcher, Analyst, Writer 등 다양한 역할 전문성, 목표를 가진 agent를 생성할 수 있습니다
</Card>
<Card title="유연한 도구" icon="screwdriver-wrench">
에이전트에게 외부 서비스 및 데이터 소스와 상호작용할 수 있는 맞춤형 도구와 API를 제공합니다
agent에게 외부 서비스 및 데이터 소스와 상호작용할 수 있는 맞춤형 도구와 API를 제공합니다
</Card>
<Card title="지능형 협업" icon="people-arrows">
에이전트가 함께 작업하며, 인사이트를 공유하고 작업을 조율하여 복잡한 목표를 달성합니다
agent들이 함께 작업하며, 인사이트를 공유하고 작업을 조율하여 복잡한 목표를 달성합니다
</Card>
<Card title="작업 관리" icon="list-check">
순차적 또는 병렬 워크플로우를 정의할 수 있으며, 에이전트가 작업 의존성을 자동으로 처리합니다
순차적 또는 병렬 workflow를 정의할 수 있으며, agent가 작업 의존성을 자동으로 처리합니다
</Card>
</CardGroup>
## 플로우의 작동 원리
## Flow의 작동 원리
<Note>
crew 자율 협업에 탁월한 반면, 플로우는 구조화된 자동화를 제공하여 워크플로우 실행에 대한 세밀한 제어를 제공합니다. 플로우는 조건부 로직, 반복문, 동적 상태 관리를 정확하게 처리하면서 작업이 신뢰성 있게, 안전하게, 효율적으로 실행되도록 보장합니다. 플로우crew와 원활하게 통합되어 높은 자율성과 엄격한 제어의 균형을 이룰 수 있게 해줍니다.
Crew 자율 협업에 탁월하다면, Flow는 구조화된 자동화를 제공하여 workflow 실행에 대한 세밀한 제어를 제공합니다. Flow는 조건부 로직, 반복문, 동적 상태 관리를 정확하게 처리하면서 작업이 신뢰성 있게, 안전하게, 효율적으로 실행되도록 보장합니다. FlowCrew와 원활하게 통합되어 높은 자율성과 엄격한 제어의 균형을 이룰 수 있게 해줍니다.
</Note>
<Frame caption="CrewAI Framework Overview">
@@ -68,41 +68,41 @@ CrewAI는 고수준의 간편함과 정밀한 저수준 제어를 모두 제공
| 구성 요소 | 설명 | 주요 기능 |
|:----------|:-----------:|:------------|
| **Flow** | 구조화된 워크플로우 오케스트레이션 | • 실행 경로 관리<br/>• 상태 전환 처리<br/>• 작업 순서 제어<br/>• 신뢰성 있는 실행 보장 |
| **Events** | 워크플로우 액션 트리거 | • 특정 프로세스 시작<br/>• 동적 응답 가능<br/>• 조건부 분기 지원<br/>• 실시간 적응 허용 |
| **States** | 워크플로우 실행 컨텍스트 | • 실행 데이터 유지<br/>• 데이터 영속성 지원<br/>• 재개 가능성 보장<br/>• 실행 무결성 확보 |
| **Crew Support** | 워크플로우 자동화 강화 | • 필요할 때 agency 삽입<br/>• 구조화된 워크플로우 보완<br/>• 자동화와 인텔리전스의 균형<br/>• 적응적 의사결정 지원 |
| **Flow** | 구조화된 workflow orchestration | • 실행 경로 관리<br/>• 상태 전환 처리<br/>• 작업 순서 제어<br/>• 신뢰성 있는 실행 보장 |
| **Events** | workflow 액션 트리거 | • 특정 프로세스 시작<br/>• 동적 응답 가능<br/>• 조건부 분기 지원<br/>• 실시간 적응 허용 |
| **States** | workflow 실행 컨텍스트 | • 실행 데이터 유지<br/>• 데이터 영속성 지원<br/>• 재개 가능성 보장<br/>• 실행 무결성 확보 |
| **Crew Support** | workflow 자동화 강화 | • 필요할 때 agency 삽입<br/>• 구조화된 workflow 보완<br/>• 자동화와 인텔리전스의 균형<br/>• 적응적 의사결정 지원 |
### 주요 기능
<CardGroup cols={2}>
<Card title="이벤트 기반 오케스트레이션" icon="bolt">
이벤트에 동적으로 반응하여 정밀한 실행 경로 정의
<Card title="이벤트 기반 orchestration" icon="bolt">
이벤트에 동적으로 반응하여 정밀한 실행 경로 정의합니다
</Card>
<Card title="세밀한 제어" icon="sliders">
워크플로우 상태와 조건부 실행을 안전하고 효율적으로 관리
workflow 상태와 조건부 실행을 안전하고 효율적으로 관리합니다
</Card>
<Card title="네이티브 Crew 통합" icon="puzzle-piece">
Crews와 손쉽게 결합하여 자율성과 지능 강화
Crews와 손쉽게 결합하여 자율성과 지능 강화합니다
</Card>
<Card title="결정론적 실행" icon="route">
명시적 제어 흐름과 오류 처리로 예측 가능한 결과 보장
명시적 제어 흐름과 오류 처리로 예측 가능한 결과 보장합니다
</Card>
</CardGroup>
## 크루(Crews)와 플로우(Flows)를 언제 사용할까
## CrewFlow를 언제 사용할까
<Note>
[크루](/ko/guides/crews/first-crew)와 [플로우](/ko/guides/flows/first-flow)를 언제 사용할지 이해하는 것은 CrewAI의 잠재력을 애플리케이션에서 극대화하는 데 핵심적입니다.
[Crew](/ko/guides/crews/first-crew)와 [Flow](/ko/guides/flows/first-flow)를 언제 사용할지 이해하는 것은 CrewAI의 잠재력을 애플리케이션에서 극대화하는 데 핵심적입니다.
</Note>
| 사용 사례 | 권장 접근 방식 | 이유 |
|:---------|:---------------------|:-----|
| **개방형 연구** | [크루](/ko/guides/crews/first-crew) | 과제가 창의적 사고, 탐색, 적응이 필요할 때 |
| **콘텐츠 생성** | [크루](/ko/guides/crews/first-crew) | 기사, 보고서, 마케팅 자료 등 협업형 생성 |
| **의사결정 워크플로우** | [플로우](/ko/guides/flows/first-flow) | 예측 가능하고 감사 가능한 의사결정 경로 및 정밀 제어가 필요할 때 |
| **API 오케스트레이션** | [플로우](/ko/guides/flows/first-flow) | 특정 순서로 여러 외부 서비스에 신뢰성 있게 통합할 때 |
| **하이브리드 애플리케이션** | 혼합 접근 방식 | [플로우](/ko/guides/flows/first-flow)로 전체 프로세스를 오케스트레이션하고, [크루](/ko/guides/crews/first-crew)로 복잡한 하위 작업을 처리 |
| **개방형 연구** | [Crew](/ko/guides/crews/first-crew) | 창의적 사고, 탐색, 적응이 필요한 작업에 적합 |
| **콘텐츠 생성** | [Crew](/ko/guides/crews/first-crew) | 기사, 보고서, 마케팅 자료 등 협업형 생성에 적합 |
| **의사결정 workflow** | [Flow](/ko/guides/flows/first-flow) | 예측 가능하고 감사 가능한 의사결정 경로 및 정밀 제어가 필요할 때 |
| **API orchestration** | [Flow](/ko/guides/flows/first-flow) | 특정 순서로 여러 외부 서비스에 신뢰성 있게 통합할 때 |
| **하이브리드 애플리케이션** | 혼합 접근 방식 | [Flow](/ko/guides/flows/first-flow)로 전체 프로세스를 orchestration하고, [Crew](/ko/guides/crews/first-crew)로 복잡한 하위 작업을 처리 |
### 의사결정 프레임워크
@@ -112,8 +112,8 @@ CrewAI는 고수준의 간편함과 정밀한 저수준 제어를 모두 제공
## CrewAI를 선택해야 하는 이유?
- 🧠 **자율적 운영**: 에이전트가 자신의 역할과 사용 가능한 도구를 바탕으로 지능적인 결정을 내립니다
- 📝 **자연스러운 상호작용**: 에이전트가 인간 팀원처럼 소통하고 협업합니다
- 🧠 **자율적 운영**: agent가 자신의 역할과 사용 가능한 도구를 바탕으로 지능적인 결정을 내립니다
- 📝 **자연스러운 상호작용**: agent가 인간 팀원처럼 소통하고 협업합니다
- 🛠️ **확장 가능한 설계**: 새로운 도구, 역할, 기능을 쉽게 추가할 수 있습니다
- 🚀 **프로덕션 준비 완료**: 실제 환경에서의 신뢰성과 확장성을 고려하여 구축되었습니다
- 🔒 **보안 중심**: 엔터프라이즈 보안 요구 사항을 고려하여 설계되었습니다
@@ -134,7 +134,7 @@ CrewAI는 고수준의 간편함과 정밀한 저수준 제어를 모두 제공
icon="diagram-project"
href="/ko/guides/flows/first-flow"
>
실행을 정밀하게 제어할 수 있는 구조화된, 이벤트 기반 워크플로우를 만드는 방법을 배워보세요.
실행을 정밀하게 제어할 수 있는 구조화된, 이벤트 기반 workflow를 만드는 방법을 배워보세요.
</Card>
</CardGroup>
@@ -151,7 +151,7 @@ CrewAI는 고수준의 간편함과 정밀한 저수준 제어를 모두 제공
icon="bolt"
href="ko/quickstart"
>
빠른 시작 가이드를 따라 첫 번째 CrewAI 에이전트를 만들고 직접 경험해 보세요.
빠른 시작 가이드를 따라 첫 번째 CrewAI agent를 만들고 직접 경험해 보세요.
</Card>
<Card
title="커뮤니티 가입하기"

View File

@@ -0,0 +1,131 @@
---
title: "AgentOps 통합"
description: "AgentOps 관찰 가능성 플랫폼으로 CrewAI 에이전트를 모니터링하고 분석하세요"
---
# AgentOps 통합
AgentOps는 AI 에이전트를 위해 특별히 설계된 강력한 관찰 가능성 플랫폼입니다. CrewAI 크루를 위한 포괄적인 모니터링, 분석 및 디버깅 기능을 제공합니다.
## 기능
- **실시간 모니터링**: 에이전트 성능과 동작을 실시간으로 추적
- **세션 재생**: 상세한 실행 추적과 함께 완전한 에이전트 세션 검토
- **성능 분석**: 크루 효율성, 도구 사용량 및 작업 완료율 분석
- **오류 추적**: 에이전트 워크플로우의 문제 식별 및 디버그
- **비용 추적**: LLM 사용량 및 관련 비용 모니터링
- **팀 협업**: 인사이트 공유 및 에이전트 최적화 협업
## 설치
CrewAI와 함께 AgentOps 설치:
```bash
pip install crewai[agentops]
```
또는 AgentOps를 별도로 설치:
```bash
pip install agentops
```
## 설정
1. **API 키 받기**: [AgentOps](https://agentops.ai)에 가입하고 API 키를 받으세요
2. **환경 구성**: AgentOps API 키를 환경 변수로 설정:
```bash
export AGENTOPS_API_KEY="여기에-api-키-입력"
```
3. **AgentOps 초기화**: CrewAI 스크립트에 다음을 추가:
```python
import agentops
from crewai import Agent, Task, Crew
# AgentOps 초기화
agentops.init()
# 여기에 CrewAI 코드
agent = Agent(
role="데이터 분석가",
goal="데이터를 분석하고 인사이트 제공",
backstory="당신은 전문 데이터 분석가입니다...",
)
task = Task(
description="판매 데이터를 분석하고 인사이트를 제공하세요",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
)
# 크루 실행
result = crew.kickoff()
# AgentOps 세션 종료
agentops.end_session("Success")
```
## 자동 통합
CrewAI는 라이브러리가 설치되면 AgentOps와 자동으로 통합됩니다. 통합은 다음을 캡처합니다:
- **크루 킥오프 이벤트**: 크루 실행의 시작과 완료
- **도구 사용**: 모든 도구 호출과 결과
- **작업 평가**: 작업 성능 메트릭과 피드백
- **오류 이벤트**: 실행 중 발생하는 모든 오류
## 구성 옵션
AgentOps 통합을 사용자 정의할 수 있습니다:
```python
import agentops
# 사용자 정의 설정으로 AgentOps 구성
agentops.init(
api_key="당신의-api-키",
tags=["프로덕션", "데이터-분석"],
auto_start_session=True,
instrument_llm_calls=True,
)
```
## 데이터 보기
1. **대시보드**: AgentOps 대시보드를 방문하여 에이전트 세션 보기
2. **세션 세부사항**: 세션을 클릭하여 상세한 실행 추적 보기
3. **분석**: 분석 탭을 사용하여 성능 트렌드 식별
4. **오류**: 디버깅 정보를 위해 오류 탭 모니터링
## 모범 사례
- **세션 태그 지정**: 의미 있는 태그를 사용하여 에이전트 실행 정리
- **비용 모니터링**: LLM 사용량과 관련 비용 추적
- **오류 검토**: 정기적으로 오류 확인 및 해결
- **성능 최적화**: 분석을 사용하여 병목 현상과 최적화 기회 식별
## 문제 해결
### AgentOps가 데이터를 기록하지 않음
1. API 키가 올바르게 설정되었는지 확인
2. AgentOps가 제대로 초기화되었는지 확인
3. 스크립트 끝에서 `agentops.end_session()`을 호출하는지 확인
### 누락된 이벤트
일부 이벤트가 캡처되지 않는 경우:
1. CrewAI와 AgentOps의 최신 버전이 있는지 확인
2. AgentOps 리스너가 제대로 등록되었는지 확인
3. 오류 메시지에 대한 로그 검토
이 통합은 CrewAI 에이전트에 대한 포괄적인 관찰 가능성을 제공하여 AI 워크플로우를 모니터링, 디버그 및 최적화하는 데 도움이 됩니다.

View File

@@ -0,0 +1,131 @@
---
title: "Integração AgentOps"
description: "Monitore e analise seus agentes CrewAI com a plataforma de observabilidade AgentOps"
---
# Integração AgentOps
AgentOps é uma poderosa plataforma de observabilidade projetada especificamente para agentes de IA. Ela fornece capacidades abrangentes de monitoramento, análise e depuração para suas crews CrewAI.
## Recursos
- **Monitoramento em Tempo Real**: Acompanhe o desempenho e comportamento dos agentes em tempo real
- **Replay de Sessão**: Revise sessões completas de agentes com rastreamentos detalhados de execução
- **Análise de Desempenho**: Analise eficiência da crew, uso de ferramentas e taxas de conclusão de tarefas
- **Rastreamento de Erros**: Identifique e depure problemas em fluxos de trabalho de agentes
- **Rastreamento de Custos**: Monitore o uso de LLM e custos associados
- **Colaboração em Equipe**: Compartilhe insights e colabore na otimização de agentes
## Instalação
Instale o AgentOps junto com o CrewAI:
```bash
pip install crewai[agentops]
```
Ou instale o AgentOps separadamente:
```bash
pip install agentops
```
## Configuração
1. **Obtenha sua Chave API**: Cadastre-se no [AgentOps](https://agentops.ai) e obtenha sua chave API
2. **Configure seu ambiente**: Defina sua chave API do AgentOps como uma variável de ambiente:
```bash
export AGENTOPS_API_KEY="sua-chave-api-aqui"
```
3. **Inicialize o AgentOps**: Adicione isso ao seu script CrewAI:
```python
import agentops
from crewai import Agent, Task, Crew
# Inicializar AgentOps
agentops.init()
# Seu código CrewAI aqui
agent = Agent(
role="Analista de Dados",
goal="Analisar dados e fornecer insights",
backstory="Você é um analista de dados especialista...",
)
task = Task(
description="Analise os dados de vendas e forneça insights",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
)
# Execute sua crew
result = crew.kickoff()
# Finalize a sessão AgentOps
agentops.end_session("Success")
```
## Integração Automática
O CrewAI se integra automaticamente com o AgentOps quando a biblioteca está instalada. A integração captura:
- **Eventos de Kickoff da Crew**: Início e conclusão de execuções da crew
- **Uso de Ferramentas**: Todas as chamadas de ferramentas e seus resultados
- **Avaliações de Tarefas**: Métricas de desempenho de tarefas e feedback
- **Eventos de Erro**: Quaisquer erros que ocorram durante a execução
## Opções de Configuração
Você pode personalizar a integração do AgentOps:
```python
import agentops
# Configure AgentOps com configurações personalizadas
agentops.init(
api_key="sua-chave-api",
tags=["producao", "analise-dados"],
auto_start_session=True,
instrument_llm_calls=True,
)
```
## Visualizando Seus Dados
1. **Dashboard**: Visite o dashboard do AgentOps para ver suas sessões de agentes
2. **Detalhes da Sessão**: Clique em qualquer sessão para ver rastreamentos detalhados de execução
3. **Análises**: Use a aba de análises para identificar tendências de desempenho
4. **Erros**: Monitore a aba de erros para informações de depuração
## Melhores Práticas
- **Marque Suas Sessões**: Use tags significativas para organizar suas execuções de agentes
- **Monitore Custos**: Acompanhe o uso de LLM e custos associados
- **Revise Erros**: Verifique e resolva regularmente quaisquer erros
- **Otimize Desempenho**: Use análises para identificar gargalos e oportunidades de otimização
## Solução de Problemas
### AgentOps Não Está Gravando Dados
1. Verifique se sua chave API está definida corretamente
2. Verifique se o AgentOps está inicializado adequadamente
3. Certifique-se de estar chamando `agentops.end_session()` no final do seu script
### Eventos Ausentes
Se alguns eventos não estão sendo capturados:
1. Certifique-se de ter a versão mais recente do CrewAI e AgentOps
2. Verifique se o listener do AgentOps está registrado adequadamente
3. Revise os logs para quaisquer mensagens de erro
Esta integração fornece observabilidade abrangente para seus agentes CrewAI, ajudando você a monitorar, depurar e otimizar seus fluxos de trabalho de IA.

View File

@@ -68,6 +68,7 @@ docling = [
aisuite = [
"aisuite>=0.1.10",
]
agentops = ["agentops==0.3.18"]
[tool.uv]
dev-dependencies = [
@@ -98,6 +99,11 @@ exclude = ["cli/templates"]
[tool.bandit]
exclude_dirs = ["src/crewai/cli/templates"]
[tool.pytest.ini_options]
markers = [
"telemetry: mark test as a telemetry test (don't mock telemetry)",
]
# PyTorch index configuration, since torch 2.5.0 is not compatible with python 3.13
[[tool.uv.index]]
name = "pytorch-nightly"

View File

@@ -11,6 +11,8 @@ import chromadb.errors
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
from pydantic.warnings import PydanticDeprecatedSince211
import warnings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
@@ -85,6 +87,15 @@ class KnowledgeStorage(BaseKnowledgeStorage):
raise Exception("Collection not initialized")
def initialize_knowledge_storage(self):
# Suppress deprecation warnings from chromadb, which are not relevant to us
# TODO: Remove this once we upgrade chromadb to at least 1.0.8.
warnings.filterwarnings(
"ignore",
category=PydanticDeprecatedSince211,
message=r".*'model_fields'.*is deprecated.*",
module=r"^chromadb(\.|$)",
)
self.app = create_persistent_client(
path=os.path.join(db_storage_path(), "knowledge"),
settings=Settings(allow_reset=True),

View File

@@ -12,6 +12,8 @@ from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.chromadb import create_persistent_client
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
import warnings
from pydantic.warnings import PydanticDeprecatedSince211
@contextlib.contextmanager
@@ -62,6 +64,15 @@ class RAGStorage(BaseRAGStorage):
def _initialize_app(self):
from chromadb.config import Settings
# Suppress deprecation warnings from chromadb, which are not relevant to us
# TODO: Remove this once we upgrade chromadb to at least 1.0.8.
warnings.filterwarnings(
"ignore",
category=PydanticDeprecatedSince211,
message=r".*'model_fields'.*is deprecated.*",
module=r"^chromadb(\.|$)",
)
self._set_embedder_config()
self.app = create_persistent_client(

View File

@@ -67,6 +67,7 @@ from .memory_events import (
# events
from .event_listener import EventListener
from .third_party.agentops_listener import agentops_listener
__all__ = [
"EventListener",
@@ -121,4 +122,5 @@ __all__ = [
"ToolSelectionErrorEvent",
"ToolUsageEvent",
"ToolValidateInputErrorEvent",
"agentops_listener",
]

View File

@@ -0,0 +1 @@
from .agentops_listener import agentops_listener as agentops_listener

View File

@@ -0,0 +1,137 @@
import logging
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffStartedEvent,
)
from crewai.utilities.events.task_events import TaskEvaluationEvent
from crewai.utilities.events.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
logger = logging.getLogger(__name__)
class AgentOpsListener(BaseEventListener):
def __init__(self):
self.agentops = None
try:
import agentops
self.agentops = agentops
logger.info("AgentOps integration enabled")
except ImportError:
logger.debug("AgentOps not installed, skipping AgentOps integration")
super().__init__()
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus):
if self.agentops is None:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event):
self._handle_crew_kickoff_started(source, event)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event):
self._handle_crew_kickoff_completed(source, event)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event):
self._handle_tool_usage_started(source, event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event):
self._handle_tool_usage_error(source, event)
@crewai_event_bus.on(TaskEvaluationEvent)
def on_task_evaluation(source, event):
self._handle_task_evaluation(source, event)
def _handle_crew_kickoff_started(self, source, event: CrewKickoffStartedEvent):
if self.agentops is None:
return
try:
self.agentops.start_session(
tags=["crewai", "crew_kickoff"],
config=self.agentops.Configuration(
auto_start_session=False,
instrument_llm_calls=True,
),
)
logger.debug("AgentOps session started for crew kickoff")
except Exception as e:
logger.warning(f"Failed to start AgentOps session: {e}")
def _handle_crew_kickoff_completed(self, source, event: CrewKickoffCompletedEvent):
if self.agentops is None:
return
try:
self.agentops.end_session("Success")
logger.debug("AgentOps session ended for crew kickoff completion")
except Exception as e:
logger.warning(f"Failed to end AgentOps session: {e}")
def _handle_tool_usage_started(self, source, event: ToolUsageStartedEvent):
if self.agentops is None:
return
try:
self.agentops.record(
self.agentops.ActionEvent(
action_type="tool_usage",
params={
"tool_name": event.tool_name,
"tool_args": event.tool_args,
},
)
)
logger.debug(f"AgentOps recorded tool usage: {event.tool_name}")
except Exception as e:
logger.warning(f"Failed to record tool usage in AgentOps: {e}")
def _handle_tool_usage_error(self, source, event: ToolUsageErrorEvent):
if self.agentops is None:
return
try:
self.agentops.record(
self.agentops.ErrorEvent(
message=f"Tool usage error: {event.error}",
error_type="ToolUsageError",
details={
"tool_name": event.tool_name,
"tool_args": event.tool_args,
},
)
)
logger.debug(f"AgentOps recorded tool usage error: {event.tool_name}")
except Exception as e:
logger.warning(f"Failed to record tool usage error in AgentOps: {e}")
def _handle_task_evaluation(self, source, event: TaskEvaluationEvent):
if self.agentops is None:
return
try:
self.agentops.record(
self.agentops.ActionEvent(
action_type="task_evaluation",
params={
"evaluation_type": event.evaluation_type,
"task": str(event.task) if event.task else None,
},
)
)
logger.debug(f"AgentOps recorded task evaluation: {event.evaluation_type}")
except Exception as e:
logger.warning(f"Failed to record task evaluation in AgentOps: {e}")
agentops_listener = AgentOpsListener()

View File

@@ -1896,7 +1896,7 @@ def test_agent_with_knowledge_sources_generate_search_query():
assert "red" in result.raw.lower()
@pytest.mark.vcr(record_mode='none', filter_headers=["authorization"])
@pytest.mark.vcr(record_mode="none", filter_headers=["authorization"])
def test_agent_with_knowledge_with_no_crewai_knowledge():
mock_knowledge = MagicMock(spec=Knowledge)
@@ -1904,8 +1904,11 @@ def test_agent_with_knowledge_with_no_crewai_knowledge():
role="Information Agent",
goal="Provide information based on knowledge sources",
backstory="You have access to specific knowledge sources.",
llm=LLM(model="openrouter/openai/gpt-4o-mini",api_key=os.getenv('OPENROUTER_API_KEY')),
knowledge=mock_knowledge
llm=LLM(
model="openrouter/openai/gpt-4o-mini",
api_key=os.getenv("OPENROUTER_API_KEY"),
),
knowledge=mock_knowledge,
)
# Create a task that requires the agent to use the knowledge
@@ -1920,7 +1923,7 @@ def test_agent_with_knowledge_with_no_crewai_knowledge():
mock_knowledge.query.assert_called_once()
@pytest.mark.vcr(record_mode='none', filter_headers=["authorization"])
@pytest.mark.vcr(record_mode="none", filter_headers=["authorization"])
def test_agent_with_only_crewai_knowledge():
mock_knowledge = MagicMock(spec=Knowledge)
@@ -1928,33 +1931,10 @@ def test_agent_with_only_crewai_knowledge():
role="Information Agent",
goal="Provide information based on knowledge sources",
backstory="You have access to specific knowledge sources.",
llm=LLM(model="openrouter/openai/gpt-4o-mini",api_key=os.getenv('OPENROUTER_API_KEY'))
)
# Create a task that requires the agent to use the knowledge
task = Task(
description="What is Vidit's favorite color?",
expected_output="Vidit's favorclearite color.",
agent=agent
)
crew = Crew(agents=[agent], tasks=[task],knowledge=mock_knowledge)
crew.kickoff()
mock_knowledge.query.assert_called_once()
@pytest.mark.vcr(record_mode='none', filter_headers=["authorization"])
def test_agent_knowledege_with_crewai_knowledge():
crew_knowledge = MagicMock(spec=Knowledge)
agent_knowledge = MagicMock(spec=Knowledge)
agent = Agent(
role="Information Agent",
goal="Provide information based on knowledge sources",
backstory="You have access to specific knowledge sources.",
llm=LLM(model="openrouter/openai/gpt-4o-mini",api_key=os.getenv('OPENROUTER_API_KEY')),
knowledge=agent_knowledge
llm=LLM(
model="openrouter/openai/gpt-4o-mini",
api_key=os.getenv("OPENROUTER_API_KEY"),
),
)
# Create a task that requires the agent to use the knowledge
@@ -1964,7 +1944,35 @@ def test_agent_knowledege_with_crewai_knowledge():
agent=agent,
)
crew = Crew(agents=[agent],tasks=[task],knowledge=crew_knowledge)
crew = Crew(agents=[agent], tasks=[task], knowledge=mock_knowledge)
crew.kickoff()
mock_knowledge.query.assert_called_once()
@pytest.mark.vcr(record_mode="none", filter_headers=["authorization"])
def test_agent_knowledege_with_crewai_knowledge():
crew_knowledge = MagicMock(spec=Knowledge)
agent_knowledge = MagicMock(spec=Knowledge)
agent = Agent(
role="Information Agent",
goal="Provide information based on knowledge sources",
backstory="You have access to specific knowledge sources.",
llm=LLM(
model="openrouter/openai/gpt-4o-mini",
api_key=os.getenv("OPENROUTER_API_KEY"),
),
knowledge=agent_knowledge,
)
# Create a task that requires the agent to use the knowledge
task = Task(
description="What is Vidit's favorite color?",
expected_output="Vidit's favorclearite color.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], knowledge=crew_knowledge)
crew.kickoff()
agent_knowledge.query.assert_called_once()
crew_knowledge.query.assert_called_once()
@@ -2164,7 +2172,12 @@ def mock_get_auth_token():
@patch("crewai.cli.plus_api.PlusAPI.get_agent")
def test_agent_from_repository(mock_get_agent, mock_get_auth_token):
from crewai_tools import SerperDevTool, XMLSearchTool, CSVSearchTool, EnterpriseActionTool
from crewai_tools import (
SerperDevTool,
XMLSearchTool,
CSVSearchTool,
EnterpriseActionTool,
)
mock_get_response = MagicMock()
mock_get_response.status_code = 200
@@ -2173,12 +2186,23 @@ def test_agent_from_repository(mock_get_agent, mock_get_auth_token):
"goal": "test goal",
"backstory": "test backstory",
"tools": [
{"module": "crewai_tools", "name": "SerperDevTool", "init_params": {"n_results": 30}},
{"module": "crewai_tools", "name": "XMLSearchTool", "init_params": {"summarize": True}},
{
"module": "crewai_tools",
"name": "SerperDevTool",
"init_params": {"n_results": "30"},
},
{
"module": "crewai_tools",
"name": "XMLSearchTool",
"init_params": {"summarize": "true"},
},
{"module": "crewai_tools", "name": "CSVSearchTool", "init_params": {}},
# using a tools that returns a list of BaseTools
{"module": "crewai_tools", "name": "CrewaiEnterpriseTools", "init_params": {"actions_list": [], "enterprise_token": "test_key"}},
{
"module": "crewai_tools",
"name": "CrewaiEnterpriseTools",
"init_params": {"actions_list": [], "enterprise_token": "test_key"},
},
],
}
mock_get_agent.return_value = mock_get_response
@@ -2221,7 +2245,9 @@ def test_agent_from_repository_override_attributes(mock_get_agent, mock_get_auth
"role": "test role",
"goal": "test goal",
"backstory": "test backstory",
"tools": [{"name": "SerperDevTool", "module": "crewai_tools", "init_params": {}}],
"tools": [
{"name": "SerperDevTool", "module": "crewai_tools", "init_params": {}}
],
}
mock_get_agent.return_value = mock_get_response
agent = Agent(from_repository="test_agent", role="Custom Role")

View File

@@ -2,6 +2,7 @@
import os
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch
import pytest
from dotenv import load_dotenv
@@ -42,6 +43,122 @@ def setup_test_environment():
# Cleanup is handled automatically when tempfile context exits
def pytest_configure(config):
config.addinivalue_line(
"markers", "telemetry: mark test as a telemetry test (don't mock telemetry)"
)
@pytest.fixture(autouse=True)
def auto_mock_telemetry(request):
if request.node.get_closest_marker("telemetry"):
telemetry_env = {
key: value
for key, value in os.environ.items()
if key not in ["CREWAI_DISABLE_TELEMETRY", "OTEL_SDK_DISABLED"]
}
with patch.dict(os.environ, telemetry_env, clear=True):
yield
return
if "telemetry" in str(request.fspath):
telemetry_env = {
key: value
for key, value in os.environ.items()
if key not in ["CREWAI_DISABLE_TELEMETRY", "OTEL_SDK_DISABLED"]
}
with patch.dict(os.environ, telemetry_env, clear=True):
yield
return
with patch.dict(
os.environ, {"CREWAI_DISABLE_TELEMETRY": "true", "OTEL_SDK_DISABLED": "true"}
):
with patch("crewai.telemetry.Telemetry") as mock_telemetry_class:
mock_instance = create_mock_telemetry_instance()
mock_telemetry_class.return_value = mock_instance
with (
patch(
"crewai.utilities.events.event_listener.Telemetry",
mock_telemetry_class,
),
patch("crewai.tools.tool_usage.Telemetry", mock_telemetry_class),
patch("crewai.cli.command.Telemetry", mock_telemetry_class),
patch("crewai.cli.create_flow.Telemetry", mock_telemetry_class),
):
yield mock_instance
def create_mock_telemetry_instance():
mock_instance = Mock()
mock_instance.ready = False
mock_instance.trace_set = False
mock_instance._initialized = True
mock_instance._is_telemetry_disabled.return_value = True
mock_instance._should_execute_telemetry.return_value = False
telemetry_methods = [
"set_tracer",
"crew_creation",
"task_started",
"task_ended",
"tool_usage",
"tool_repeated_usage",
"tool_usage_error",
"crew_execution_span",
"end_crew",
"flow_creation_span",
"flow_execution_span",
"individual_test_result_span",
"test_execution_span",
"deploy_signup_error_span",
"start_deployment_span",
"create_crew_deployment_span",
"get_crew_logs_span",
"remove_crew_span",
"flow_plotting_span",
"_add_attribute",
"_safe_telemetry_operation",
]
for method in telemetry_methods:
setattr(mock_instance, method, Mock(return_value=None))
mock_instance.task_started.return_value = None
return mock_instance
@pytest.fixture
def mock_opentelemetry_components():
with (
patch("opentelemetry.trace.get_tracer") as mock_get_tracer,
patch("opentelemetry.trace.set_tracer_provider") as mock_set_provider,
patch("opentelemetry.baggage.set_baggage") as mock_set_baggage,
patch("opentelemetry.baggage.get_baggage") as mock_get_baggage,
patch("opentelemetry.context.attach") as mock_attach,
patch("opentelemetry.context.detach") as mock_detach,
):
mock_tracer = Mock()
mock_span = Mock()
mock_tracer.start_span.return_value = mock_span
mock_get_tracer.return_value = mock_tracer
yield {
"get_tracer": mock_get_tracer,
"set_tracer_provider": mock_set_provider,
"tracer": mock_tracer,
"span": mock_span,
"set_baggage": mock_set_baggage,
"get_baggage": mock_get_baggage,
"attach": mock_attach,
"detach": mock_detach,
}
@pytest.fixture(scope="module")
def vcr_config(request) -> dict:
return {

View File

@@ -39,6 +39,7 @@ def test_short_term_memory_search_events(short_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemoryQueryStartedEvent)
def on_search_started(source, event):
events["MemoryQueryStartedEvent"].append(event)
@@ -59,33 +60,34 @@ def test_short_term_memory_search_events(short_term_memory):
assert len(events["MemoryQueryFailedEvent"]) == 0
assert dict(events["MemoryQueryStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_started',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'query': 'test value',
'limit': 3,
'score_threshold': 0.35
"timestamp": ANY,
"type": "memory_query_started",
"source_fingerprint": None,
"source_type": "short_term_memory",
"fingerprint_metadata": None,
"query": "test value",
"limit": 3,
"score_threshold": 0.35,
}
assert dict(events["MemoryQueryCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_query_completed',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'query': 'test value',
'results': [],
'limit': 3,
'score_threshold': 0.35,
'query_time_ms': ANY
"timestamp": ANY,
"type": "memory_query_completed",
"source_fingerprint": None,
"source_type": "short_term_memory",
"fingerprint_metadata": None,
"query": "test value",
"results": [],
"limit": 3,
"score_threshold": 0.35,
"query_time_ms": ANY,
}
def test_short_term_memory_save_events(short_term_memory):
events = defaultdict(list)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_save_started(source, event):
events["MemorySaveStartedEvent"].append(event)
@@ -105,28 +107,29 @@ def test_short_term_memory_save_events(short_term_memory):
assert len(events["MemorySaveFailedEvent"]) == 0
assert dict(events["MemorySaveStartedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_started',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'value': 'test value',
'metadata': {'task': 'test_task'},
'agent_role': "test_agent"
"timestamp": ANY,
"type": "memory_save_started",
"source_fingerprint": None,
"source_type": "short_term_memory",
"fingerprint_metadata": None,
"value": "test value",
"metadata": {"task": "test_task"},
"agent_role": "test_agent",
}
assert dict(events["MemorySaveCompletedEvent"][0]) == {
'timestamp': ANY,
'type': 'memory_save_completed',
'source_fingerprint': None,
'source_type': 'short_term_memory',
'fingerprint_metadata': None,
'value': 'test value',
'metadata': {'task': 'test_task', 'agent': 'test_agent'},
'agent_role': "test_agent",
'save_time_ms': ANY
"timestamp": ANY,
"type": "memory_save_completed",
"source_fingerprint": None,
"source_type": "short_term_memory",
"fingerprint_metadata": None,
"value": "test value",
"metadata": {"task": "test_task", "agent": "test_agent"},
"agent_role": "test_agent",
"save_time_ms": ANY,
}
def test_save_and_search(short_term_memory):
memory = ShortTermMemoryItem(
data="""test value test value test value test value test value test value

View File

@@ -1,4 +1,5 @@
import os
import threading
from unittest.mock import patch
import pytest
@@ -11,12 +12,16 @@ from opentelemetry import trace
@pytest.fixture(autouse=True)
def cleanup_telemetry():
"""Automatically clean up Telemetry singleton between tests."""
Telemetry._instance = None
if hasattr(Telemetry, "_lock"):
Telemetry._lock = threading.Lock()
yield
Telemetry._instance = None
if hasattr(Telemetry, "_lock"):
Telemetry._lock = threading.Lock()
@pytest.mark.telemetry
@pytest.mark.parametrize(
"env_var,value,expected_ready",
[
@@ -36,6 +41,7 @@ def test_telemetry_environment_variables(env_var, value, expected_ready):
assert telemetry.ready is expected_ready
@pytest.mark.telemetry
def test_telemetry_enabled_by_default():
"""Test that telemetry is enabled by default."""
with patch.dict(os.environ, {}, clear=True):
@@ -44,6 +50,7 @@ def test_telemetry_enabled_by_default():
assert telemetry.ready is True
@pytest.mark.telemetry
@patch("crewai.telemetry.telemetry.logger.error")
@patch(
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export",
@@ -76,6 +83,7 @@ def test_telemetry_fails_due_connect_timeout(export_mock, logger_mock):
logger_mock.assert_called_once_with(error)
@pytest.mark.telemetry
def test_telemetry_singleton_pattern():
"""Test that Telemetry uses the singleton pattern correctly."""
Telemetry._instance = None

View File

@@ -14,14 +14,18 @@ def cleanup_telemetry():
Telemetry._instance = None
@pytest.mark.parametrize("env_var,value,expected_ready", [
("OTEL_SDK_DISABLED", "true", False),
("OTEL_SDK_DISABLED", "TRUE", False),
("CREWAI_DISABLE_TELEMETRY", "true", False),
("CREWAI_DISABLE_TELEMETRY", "TRUE", False),
("OTEL_SDK_DISABLED", "false", True),
("CREWAI_DISABLE_TELEMETRY", "false", True),
])
@pytest.mark.telemetry
@pytest.mark.parametrize(
"env_var,value,expected_ready",
[
("OTEL_SDK_DISABLED", "true", False),
("OTEL_SDK_DISABLED", "TRUE", False),
("CREWAI_DISABLE_TELEMETRY", "true", False),
("CREWAI_DISABLE_TELEMETRY", "TRUE", False),
("OTEL_SDK_DISABLED", "false", True),
("CREWAI_DISABLE_TELEMETRY", "false", True),
],
)
def test_telemetry_environment_variables(env_var, value, expected_ready):
"""Test telemetry state with different environment variable configurations."""
with patch.dict(os.environ, {env_var: value}):
@@ -30,6 +34,7 @@ def test_telemetry_environment_variables(env_var, value, expected_ready):
assert telemetry.ready is expected_ready
@pytest.mark.telemetry
def test_telemetry_enabled_by_default():
"""Test that telemetry is enabled by default."""
with patch.dict(os.environ, {}, clear=True):
@@ -38,57 +43,60 @@ def test_telemetry_enabled_by_default():
assert telemetry.ready is True
@pytest.mark.telemetry
def test_telemetry_disable_after_singleton_creation():
"""Test that telemetry operations are disabled when env var is set after singleton creation."""
with patch.dict(os.environ, {}, clear=True):
with patch("crewai.telemetry.telemetry.TracerProvider"):
telemetry = Telemetry()
assert telemetry.ready is True
mock_operation = MagicMock()
telemetry._safe_telemetry_operation(mock_operation)
mock_operation.assert_called_once()
mock_operation.reset_mock()
os.environ['CREWAI_DISABLE_TELEMETRY'] = 'true'
os.environ["CREWAI_DISABLE_TELEMETRY"] = "true"
telemetry._safe_telemetry_operation(mock_operation)
mock_operation.assert_not_called()
@pytest.mark.telemetry
def test_telemetry_disable_with_multiple_instances():
"""Test that multiple telemetry instances respect dynamically changed env vars."""
with patch.dict(os.environ, {}, clear=True):
with patch("crewai.telemetry.telemetry.TracerProvider"):
telemetry1 = Telemetry()
assert telemetry1.ready is True
os.environ['CREWAI_DISABLE_TELEMETRY'] = 'true'
os.environ["CREWAI_DISABLE_TELEMETRY"] = "true"
telemetry2 = Telemetry()
assert telemetry2 is telemetry1
assert telemetry2.ready is True
mock_operation = MagicMock()
telemetry2._safe_telemetry_operation(mock_operation)
mock_operation.assert_not_called()
@pytest.mark.telemetry
def test_telemetry_otel_sdk_disabled_after_creation():
"""Test that OTEL_SDK_DISABLED also works when set after singleton creation."""
with patch.dict(os.environ, {}, clear=True):
with patch("crewai.telemetry.telemetry.TracerProvider"):
telemetry = Telemetry()
assert telemetry.ready is True
mock_operation = MagicMock()
telemetry._safe_telemetry_operation(mock_operation)
mock_operation.assert_called_once()
mock_operation.reset_mock()
os.environ['OTEL_SDK_DISABLED'] = 'true'
os.environ["OTEL_SDK_DISABLED"] = "true"
telemetry._safe_telemetry_operation(mock_operation)
mock_operation.assert_not_called()

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,196 @@
from unittest.mock import Mock, patch
from crewai.utilities.events.third_party.agentops_listener import AgentOpsListener
from crewai.utilities.events.crew_events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
)
from crewai.utilities.events.task_events import TaskEvaluationEvent
from crewai.utilities.events.tool_usage_events import (
ToolUsageStartedEvent,
ToolUsageErrorEvent,
)
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
class TestAgentOpsListener:
def test_agentops_listener_initialization_with_agentops_installed(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops"):
listener = AgentOpsListener()
assert listener.agentops is not None
def test_agentops_listener_initialization_without_agentops_installed(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops", side_effect=ImportError):
listener = AgentOpsListener()
assert listener.agentops is None
def test_setup_listeners_with_agentops_installed(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops"):
listener = AgentOpsListener()
mock_event_bus = Mock(spec=CrewAIEventsBus)
listener.setup_listeners(mock_event_bus)
assert mock_event_bus.register_handler.call_count == 5
mock_event_bus.register_handler.assert_any_call(
CrewKickoffStartedEvent, listener._handle_crew_kickoff_started
)
mock_event_bus.register_handler.assert_any_call(
CrewKickoffCompletedEvent, listener._handle_crew_kickoff_completed
)
mock_event_bus.register_handler.assert_any_call(
ToolUsageStartedEvent, listener._handle_tool_usage_started
)
mock_event_bus.register_handler.assert_any_call(
ToolUsageErrorEvent, listener._handle_tool_usage_error
)
mock_event_bus.register_handler.assert_any_call(
TaskEvaluationEvent, listener._handle_task_evaluation
)
def test_setup_listeners_without_agentops_installed(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops", side_effect=ImportError):
listener = AgentOpsListener()
mock_event_bus = Mock(spec=CrewAIEventsBus)
listener.setup_listeners(mock_event_bus)
mock_event_bus.register_handler.assert_not_called()
def test_handle_crew_kickoff_started_with_agentops(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
listener = AgentOpsListener()
event = CrewKickoffStartedEvent(crew_id="test-crew")
listener._handle_crew_kickoff_started(event)
mock_agentops.start_session.assert_called_once()
call_args = mock_agentops.start_session.call_args
assert call_args[1]["tags"] == ["crewai", "crew_kickoff"]
def test_handle_crew_kickoff_started_without_agentops(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops", side_effect=ImportError):
listener = AgentOpsListener()
event = CrewKickoffStartedEvent(crew_id="test-crew")
listener._handle_crew_kickoff_started(event)
def test_handle_crew_kickoff_completed_with_agentops(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
listener = AgentOpsListener()
event = CrewKickoffCompletedEvent(crew_id="test-crew", crew_output=Mock())
listener._handle_crew_kickoff_completed(event)
mock_agentops.end_session.assert_called_once_with("Success")
def test_handle_crew_kickoff_completed_without_agentops(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops", side_effect=ImportError):
listener = AgentOpsListener()
event = CrewKickoffCompletedEvent(crew_id="test-crew", crew_output=Mock())
listener._handle_crew_kickoff_completed(event)
def test_handle_tool_usage_started_with_agentops(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
listener = AgentOpsListener()
event = ToolUsageStartedEvent(
tool_name="test_tool",
arguments={"arg1": "value1"},
agent_id="test-agent",
task_id="test-task"
)
listener._handle_tool_usage_started(event)
mock_agentops.record.assert_called_once()
call_args = mock_agentops.record.call_args[0][0]
assert hasattr(call_args, "action_type")
def test_handle_tool_usage_error_with_agentops(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
listener = AgentOpsListener()
event = ToolUsageErrorEvent(
tool_name="test_tool",
arguments={"arg1": "value1"},
error="Test error",
agent_id="test-agent",
task_id="test-task"
)
listener._handle_tool_usage_error(event)
mock_agentops.record.assert_called_once()
def test_handle_task_evaluation_with_agentops(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
listener = AgentOpsListener()
event = TaskEvaluationEvent(
task_id="test-task",
score=0.85,
feedback="Good performance"
)
listener._handle_task_evaluation(event)
mock_agentops.record.assert_called_once()
def test_handle_crew_kickoff_started_with_exception(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
mock_agentops.start_session.side_effect = Exception("Test exception")
listener = AgentOpsListener()
event = CrewKickoffStartedEvent(crew_id="test-crew")
listener._handle_crew_kickoff_started(event)
def test_handle_crew_kickoff_completed_with_exception(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
mock_agentops.end_session.side_effect = Exception("Test exception")
listener = AgentOpsListener()
event = CrewKickoffCompletedEvent(crew_id="test-crew", crew_output=Mock())
listener._handle_crew_kickoff_completed(event)
def test_handle_tool_usage_started_with_exception(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
mock_agentops.record.side_effect = Exception("Test exception")
listener = AgentOpsListener()
event = ToolUsageStartedEvent(
tool_name="test_tool",
arguments={"arg1": "value1"},
agent_id="test-agent",
task_id="test-task"
)
listener._handle_tool_usage_started(event)
def test_handle_tool_usage_error_with_exception(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
mock_agentops.record.side_effect = Exception("Test exception")
listener = AgentOpsListener()
event = ToolUsageErrorEvent(
tool_name="test_tool",
arguments={"arg1": "value1"},
error="Test error",
agent_id="test-agent",
task_id="test-task"
)
listener._handle_tool_usage_error(event)
def test_handle_task_evaluation_with_exception(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops") as mock_agentops:
mock_agentops.record.side_effect = Exception("Test exception")
listener = AgentOpsListener()
event = TaskEvaluationEvent(
task_id="test-task",
score=0.85,
feedback="Good performance"
)
listener._handle_task_evaluation(event)
def test_agentops_listener_instance_creation(self):
with patch("crewai.utilities.events.third_party.agentops_listener.agentops"):
from crewai.utilities.events.third_party.agentops_listener import agentops_listener
assert agentops_listener is not None
assert isinstance(agentops_listener, AgentOpsListener)