Compare commits

..

11 Commits

Author SHA1 Message Date
Tiago Freire
81a8c2efc1 fix: prevent zeroed data in local trace message and ensure cleanup on all paths
- Read execution start time non-destructively before _finalize_backend_batch
    consumes it, so the server receives the real duration and the local
    fallback message also shows the correct value
  - Pass pre-captured events_count, duration_ms, and batch_id to
    _show_local_trace_message instead of reading from batch_manager
    (buffer cleared by send, duration consumed by finalize)
  - Extract _reset_batch_state to reset all singleton state (current_batch,
    event_buffer, trace_batch_id, is_current_batch_ephemeral,
    backend_initialized, batch_owner_type/id) and call it in every exit
    path: success, init failure, send failure, and exception handler
2026-03-18 22:50:48 -03:00
Tiago Freire
4859dc66a5 fix: address PR review findings — forward skip_context_check in
recursive fallback, reduce retry backoff, clean up batch state

  - Forward skip_context_check parameter in the 401/403 ephemeral
    fallback recursive call to prevent silent early return when
    is_tracing_enabled_in_context() is False
  - Reduce retry backoff from 1s to 200ms to minimize lock hold time
    on the non-first-time path (worst case 400ms vs 2s)
  - Add batch state cleanup after _finalize_backend_batch in the
    first-time handler, mirroring finalize_batch: reset current_batch,
    event_buffer, trace_batch_id, is_current_batch_ephemeral,
    batch_owner_type, batch_owner_id, and call _cleanup_batch_data()
2026-03-18 21:29:35 -03:00
Tiago Freire
7f2eda2ac6 refactor: remove redundant tests in TestTraceBatchIdClearedOnFailure
Remove test_trace_batch_id_cleared_on_none_response (covered by
  TestInitializeBackendBatchRetry::test_exhausts_retries_then_clears_batch_id)
  and test_trace_batch_id_cleared_on_non_2xx_response (covered by
  TestInitializeBackendBatchRetry::test_no_retry_on_4xx).
2026-03-18 20:46:20 -03:00
Tiago Freire
6734db4f71 fix: mark batch as failed when event send fails in first-time handler
The return value of _send_events_to_backend() was discarded in
  _initialize_backend_and_send_events, so _finalize_backend_batch was
  called unconditionally with the full event count even when the send
  returned 500. This finalized the batch as "completed" on the server
  while it received 0 events, producing an empty trace URL.

  Now check the return status and call mark_trace_batch_as_failed on
  500, matching the behavior of the regular finalize_batch path.
2026-03-18 20:35:57 -03:00
Tiago Freire
c00544c97d fix: always use ephemeral tracing for first-time users
The first-time handler UX is built around ephemeral traces (access
  code, 24hr expiry link, browser open). Checking auth and creating
  non-ephemeral batches caused the handler to fall through to the
  local traces fallback since ephemeral_trace_url is only set for
  ephemeral batches. The server's LinkEphemeralTracesJob links
  ephemeral traces to user accounts on signup regardless.

  Remove auth check from first-time handler and always pass
  use_ephemeral=True to _initialize_backend_batch.
2026-03-18 19:54:10 -03:00
Tiago Freire
08c533905e fix: bypass is_tracing_enabled_in_context for first-time deferred batch init
First-time users have is_tracing_enabled_in_context() = False by design
  (it's a prerequisite for should_auto_collect_first_time_traces). This
  caused _initialize_backend_batch to return early without creating the
  batch, and _send_events_to_backend to send to a non-existent batch.

  Add skip_context_check parameter to _initialize_backend_batch so the
  first-time handler can bypass the guard during deferred init. Gate
  backend_initialized on trace_batch_id being set. Call
  _finalize_backend_batch directly instead of finalize_batch (which has
  the same context guard). Sync is_current_batch_ephemeral on success
  to prevent endpoint mismatch between batch creation and event send.
2026-03-18 19:54:10 -03:00
Tiago Freire
6d1546c381 feat: fall back to ephemeral tracing on server auth rejection
When the non-ephemeral batch endpoint returns 401 or 403 (expired
  token, revoked credentials, JWKS rotation), _initialize_backend_batch
  now switches is_current_batch_ephemeral to True and retries via the
  ephemeral endpoint. This preserves traces that would otherwise be
  lost due to the timing gap between client-side token validation and
  server-side JWT decode.

  The fallback only triggers on the non-ephemeral path to prevent
  infinite recursion. If the ephemeral attempt also fails, trace_batch_id
  is cleared normally.

  Addresses the 2M+ failed push attempts in which valid client-side
  tokens were rejected on the server.
2026-03-18 19:54:10 -03:00
Tiago Freire
bcba620a41 fix: respect authentication status for first-time users
Previously, _initialize_batch forced use_ephemeral=True for all
  first-time users, bypassing _check_authenticated() entirely. This
  meant logged-in users in a new project directory were routed to the
  ephemeral endpoint instead of their account's tracing endpoint.

  Now _check_authenticated() runs for all users including first-time.
  Authenticated first-time users get non-ephemeral tracing (traces
  linked to their account); only unauthenticated first-time users
  fall back to ephemeral. The deferred backend init in
  FirstTimeTraceHandler also reads is_current_batch_ephemeral instead
  of hardcoding use_ephemeral=True.
2026-03-18 19:54:10 -03:00
Tiago Freire
4141233a78 feat: add retry logic for ephemeral trace batch creation
Transient failures (None response, 5xx, network errors) during
  _initialize_backend_batch now retry up to 2 times with a 1s backoff.
  Non-transient 4xx errors (422 validation, 401 auth) are not retried
  since the same payload would fail again. If all retries are exhausted,
  trace_batch_id is cleared per the existing safety net.

  This runs post-execution when the user has already answered "y" to
  view traces, so the ~2s worst-case delay is acceptable.
2026-03-18 19:54:10 -03:00
Tiago Freire
c67425d323 fix: gate backend_initialized on actual batch creation success
In first_time_trace_handler._initialize_backend_and_send_events,
  backend_initialized was set to True unconditionally after calling
  _initialize_backend_batch, regardless of whether the server-side
  batch was actually created. This caused _send_events_to_backend and
  finalize_batch to run against a non-existent batch.

  Now check trace_batch_id after _initialize_backend_batch returns;
  if None (batch creation failed), call _gracefully_fail and return
  early, skipping event send and finalization.
2026-03-18 19:54:10 -03:00
Tiago Freire
7f090c664e fix: clear trace_batch_id on backend batch initialization failure
When _initialize_backend_batch fails (None response, non-2xx status,
  or exception), trace_batch_id was left populated with a client-generated
  UUID that was never registered server-side. Subsequent calls to
  _send_events_to_backend would see the stale ID and POST events to
  /ephemeral/batches/{id}/events, resulting in a 404 from the server.

  Nullify trace_batch_id on all three failure paths so downstream methods
  skip event sending instead of hitting a non-existent batch.
2026-03-18 19:54:10 -03:00
27 changed files with 760 additions and 1330 deletions

View File

@@ -115,13 +115,6 @@
"en/guides/flows/mastering-flow-state"
]
},
{
"group": "Tools",
"icon": "wrench",
"pages": [
"en/guides/tools/publish-custom-tools"
]
},
{
"group": "Coding Tools",
"icon": "terminal",
@@ -582,13 +575,6 @@
"en/guides/flows/mastering-flow-state"
]
},
{
"group": "Tools",
"icon": "wrench",
"pages": [
"en/guides/tools/publish-custom-tools"
]
},
{
"group": "Coding Tools",
"icon": "terminal",
@@ -1049,13 +1035,6 @@
"en/guides/flows/mastering-flow-state"
]
},
{
"group": "Tools",
"icon": "wrench",
"pages": [
"en/guides/tools/publish-custom-tools"
]
},
{
"group": "Coding Tools",
"icon": "terminal",
@@ -1546,20 +1525,6 @@
"pt-BR/guides/flows/mastering-flow-state"
]
},
{
"group": "Ferramentas",
"icon": "wrench",
"pages": [
"pt-BR/guides/tools/publish-custom-tools"
]
},
{
"group": "Ferramentas de Codificação",
"icon": "terminal",
"pages": [
"pt-BR/guides/coding-tools/agents-md"
]
},
{
"group": "Avançado",
"icon": "gear",
@@ -1999,20 +1964,6 @@
"pt-BR/guides/flows/mastering-flow-state"
]
},
{
"group": "Ferramentas",
"icon": "wrench",
"pages": [
"pt-BR/guides/tools/publish-custom-tools"
]
},
{
"group": "Ferramentas de Codificação",
"icon": "terminal",
"pages": [
"pt-BR/guides/coding-tools/agents-md"
]
},
{
"group": "Avançado",
"icon": "gear",
@@ -2452,20 +2403,6 @@
"pt-BR/guides/flows/mastering-flow-state"
]
},
{
"group": "Ferramentas",
"icon": "wrench",
"pages": [
"pt-BR/guides/tools/publish-custom-tools"
]
},
{
"group": "Ferramentas de Codificação",
"icon": "terminal",
"pages": [
"pt-BR/guides/coding-tools/agents-md"
]
},
{
"group": "Avançado",
"icon": "gear",
@@ -2935,20 +2872,6 @@
"ko/guides/flows/mastering-flow-state"
]
},
{
"group": "도구",
"icon": "wrench",
"pages": [
"ko/guides/tools/publish-custom-tools"
]
},
{
"group": "코딩 도구",
"icon": "terminal",
"pages": [
"ko/guides/coding-tools/agents-md"
]
},
{
"group": "고급",
"icon": "gear",
@@ -3400,20 +3323,6 @@
"ko/guides/flows/mastering-flow-state"
]
},
{
"group": "도구",
"icon": "wrench",
"pages": [
"ko/guides/tools/publish-custom-tools"
]
},
{
"group": "코딩 도구",
"icon": "terminal",
"pages": [
"ko/guides/coding-tools/agents-md"
]
},
{
"group": "고급",
"icon": "gear",
@@ -3865,20 +3774,6 @@
"ko/guides/flows/mastering-flow-state"
]
},
{
"group": "도구",
"icon": "wrench",
"pages": [
"ko/guides/tools/publish-custom-tools"
]
},
{
"group": "코딩 도구",
"icon": "terminal",
"pages": [
"ko/guides/coding-tools/agents-md"
]
},
{
"group": "고급",
"icon": "gear",

View File

@@ -1,244 +0,0 @@
---
title: Publish Custom Tools
description: How to build, package, and publish your own CrewAI-compatible tools to PyPI so any CrewAI user can install and use them.
icon: box-open
mode: "wide"
---
## Overview
CrewAI's tool system is designed to be extended. If you've built a tool that could benefit others, you can package it as a standalone Python library, publish it to PyPI, and make it available to any CrewAI user — no PR to the CrewAI repo required.
This guide walks through the full process: implementing the tools contract, structuring your package, and publishing to PyPI.
<Note type="info" title="Not looking to publish?">
If you just need a custom tool for your own project, see the [Create Custom Tools](/en/learn/create-custom-tools) guide instead.
</Note>
## The Tools Contract
Every CrewAI tool must satisfy one of two interfaces:
### Option 1: Subclass `BaseTool`
Subclass `crewai.tools.BaseTool` and implement the `_run` method. Define `name`, `description`, and optionally an `args_schema` for input validation.
```python
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class GeolocateInput(BaseModel):
"""Input schema for GeolocateTool."""
address: str = Field(..., description="The street address to geolocate.")
class GeolocateTool(BaseTool):
name: str = "Geolocate"
description: str = "Converts a street address into latitude/longitude coordinates."
args_schema: type[BaseModel] = GeolocateInput
def _run(self, address: str) -> str:
# Your implementation here
return f"40.7128, -74.0060"
```
### Option 2: Use the `@tool` Decorator
For simpler tools, the `@tool` decorator turns a function into a CrewAI tool. The function **must** have a docstring (used as the tool description) and type annotations.
```python
from crewai.tools import tool
@tool("Geolocate")
def geolocate(address: str) -> str:
"""Converts a street address into latitude/longitude coordinates."""
return "40.7128, -74.0060"
```
### Key Requirements
Regardless of which approach you use, your tool must:
- Have a **`name`** — a short, descriptive identifier.
- Have a **`description`** — tells the agent when and how to use the tool. This directly affects how well agents use your tool, so be clear and specific.
- Implement **`_run`** (BaseTool) or provide a **function body** (@tool) — the synchronous execution logic.
- Use **type annotations** on all parameters and return values.
- Return a **string** result (or something that can be meaningfully converted to one).
### Optional: Async Support
If your tool performs I/O-bound work, implement `_arun` for async execution:
```python
class GeolocateTool(BaseTool):
name: str = "Geolocate"
description: str = "Converts a street address into latitude/longitude coordinates."
def _run(self, address: str) -> str:
# Sync implementation
...
async def _arun(self, address: str) -> str:
# Async implementation
...
```
### Optional: Input Validation with `args_schema`
Define a Pydantic model as your `args_schema` to get automatic input validation and clear error messages. If you don't provide one, CrewAI will infer it from your `_run` method's signature.
```python
from pydantic import BaseModel, Field
class TranslateInput(BaseModel):
"""Input schema for TranslateTool."""
text: str = Field(..., description="The text to translate.")
target_language: str = Field(
default="en",
description="ISO 639-1 language code for the target language.",
)
```
Explicit schemas are recommended for published tools — they produce better agent behavior and clearer documentation for your users.
### Optional: Environment Variables
If your tool requires API keys or other configuration, declare them with `env_vars` so users know what to set:
```python
from crewai.tools import BaseTool, EnvVar
class GeolocateTool(BaseTool):
name: str = "Geolocate"
description: str = "Converts a street address into latitude/longitude coordinates."
env_vars: list[EnvVar] = [
EnvVar(
name="GEOCODING_API_KEY",
description="API key for the geocoding service.",
required=True,
),
]
def _run(self, address: str) -> str:
...
```
## Package Structure
Structure your project as a standard Python package. Here's a recommended layout:
```
crewai-geolocate/
├── pyproject.toml
├── LICENSE
├── README.md
└── src/
└── crewai_geolocate/
├── __init__.py
└── tools.py
```
### `pyproject.toml`
```toml
[project]
name = "crewai-geolocate"
version = "0.1.0"
description = "A CrewAI tool for geolocating street addresses."
requires-python = ">=3.10"
dependencies = [
"crewai",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
```
Declare `crewai` as a dependency so users get a compatible version automatically.
### `__init__.py`
Re-export your tool classes so users can import them directly:
```python
from crewai_geolocate.tools import GeolocateTool
__all__ = ["GeolocateTool"]
```
### Naming Conventions
- **Package name**: Use the prefix `crewai-` (e.g., `crewai-geolocate`). This makes your tool discoverable when users search PyPI.
- **Module name**: Use underscores (e.g., `crewai_geolocate`).
- **Tool class name**: Use PascalCase ending in `Tool` (e.g., `GeolocateTool`).
## Testing Your Tool
Before publishing, verify your tool works within a crew:
```python
from crewai import Agent, Crew, Task
from crewai_geolocate import GeolocateTool
agent = Agent(
role="Location Analyst",
goal="Find coordinates for given addresses.",
backstory="An expert in geospatial data.",
tools=[GeolocateTool()],
)
task = Task(
description="Find the coordinates of 1600 Pennsylvania Avenue, Washington, DC.",
expected_output="The latitude and longitude of the address.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
print(result)
```
## Publishing to PyPI
Once your tool is tested and ready:
```bash
# Build the package
uv build
# Publish to PyPI
uv publish
```
If this is your first time publishing, you'll need a [PyPI account](https://pypi.org/account/register/) and an [API token](https://pypi.org/help/#apitoken).
### After Publishing
Users can install your tool with:
```bash
pip install crewai-geolocate
```
Or with uv:
```bash
uv add crewai-geolocate
```
Then use it in their crews:
```python
from crewai_geolocate import GeolocateTool
agent = Agent(
role="Location Analyst",
tools=[GeolocateTool()],
# ...
)
```

View File

@@ -11,10 +11,6 @@ This guide provides detailed instructions on creating custom tools for the CrewA
incorporating the latest functionalities such as tool delegation, error handling, and dynamic tool calling. It also highlights the importance of collaboration tools,
enabling agents to perform a wide range of actions.
<Tip>
**Want to publish your tool for the community?** If you're building a tool that others could benefit from, check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to learn how to package and distribute your tool on PyPI.
</Tip>
### Subclassing `BaseTool`
To create a personalized tool, inherit from `BaseTool` and define the necessary attributes, including the `args_schema` for input validation, and the `_run` method.

View File

@@ -1,61 +0,0 @@
---
title: 코딩 도구
description: AGENTS.md를 사용하여 CrewAI 프로젝트 전반에서 코딩 에이전트와 IDE를 안내합니다.
icon: terminal
mode: "wide"
---
## AGENTS.md를 사용하는 이유
`AGENTS.md`는 가벼운 저장소 로컬 지침 파일로, 코딩 에이전트에게 일관되고 프로젝트별 안내를 제공합니다. 프로젝트 루트에 배치하고 어시스턴트가 작업하는 방식(컨벤션, 명령어, 아키텍처 노트, 가드레일)에 대한 신뢰할 수 있는 소스로 활용하세요.
## CLI로 프로젝트 생성
CrewAI CLI를 사용하여 프로젝트를 스캐폴딩하면, `AGENTS.md`가 루트에 자동으로 추가됩니다.
```bash
# Crew
crewai create crew my_crew
# Flow
crewai create flow my_flow
# Tool repository
crewai tool create my_tool
```
## 도구 설정: 어시스턴트에 AGENTS.md 연결
### Codex
Codex는 저장소에 배치된 `AGENTS.md` 파일로 안내할 수 있습니다. 컨벤션, 명령어, 워크플로우 기대치 등 지속적인 프로젝트 컨텍스트를 제공하는 데 사용하세요.
### Claude Code
Claude Code는 프로젝트 메모리를 `CLAUDE.md`에 저장합니다. `/init`으로 부트스트랩하고 `/memory`로 편집할 수 있습니다. Claude Code는 `CLAUDE.md` 내에서 임포트도 지원하므로, `@AGENTS.md`와 같은 한 줄을 추가하여 공유 지침을 중복 없이 가져올 수 있습니다.
간단하게 다음과 같이 사용할 수 있습니다:
```bash
mv AGENTS.md CLAUDE.md
```
### Gemini CLI와 Google Antigravity
Gemini CLI와 Antigravity는 저장소 루트 및 상위 디렉토리에서 프로젝트 컨텍스트 파일(기본값: `GEMINI.md`)을 로드합니다. Gemini CLI 설정에서 `context.fileName`을 설정하여 `AGENTS.md`를 대신(또는 추가로) 읽도록 구성할 수 있습니다. 예를 들어, `AGENTS.md`만 설정하거나 각 도구의 형식을 유지하고 싶다면 `AGENTS.md`와 `GEMINI.md`를 모두 포함할 수 있습니다.
간단하게 다음과 같이 사용할 수 있습니다:
```bash
mv AGENTS.md GEMINI.md
```
### Cursor
Cursor는 `AGENTS.md`를 프로젝트 지침 파일로 지원합니다. 프로젝트 루트에 배치하여 Cursor의 코딩 어시스턴트에 안내를 제공하세요.
### Windsurf
Claude Code는 Windsurf와의 공식 통합을 제공합니다. Windsurf 내에서 Claude Code를 사용하는 경우, 위의 Claude Code 안내를 따르고 `CLAUDE.md`에서 `AGENTS.md`를 임포트하세요.
Windsurf의 네이티브 어시스턴트를 사용하는 경우, 프로젝트 규칙 또는 지침 기능(사용 가능한 경우)을 구성하여 `AGENTS.md`에서 읽거나 내용을 직접 붙여넣으세요.

View File

@@ -1,244 +0,0 @@
---
title: 커스텀 도구 배포하기
description: PyPI에 게시할 수 있는 CrewAI 호환 도구를 빌드, 패키징, 배포하는 방법을 안내합니다.
icon: box-open
mode: "wide"
---
## 개요
CrewAI의 도구 시스템은 확장 가능하도록 설계되었습니다. 다른 사용자에게도 유용한 도구를 만들었다면, 독립적인 Python 라이브러리로 패키징하여 PyPI에 게시하고 모든 CrewAI 사용자가 사용할 수 있도록 할 수 있습니다. CrewAI 저장소에 PR을 보낼 필요가 없습니다.
이 가이드에서는 도구 계약 구현, 패키지 구조화, PyPI 게시까지의 전체 과정을 안내합니다.
<Note type="info" title="배포할 계획이 없으신가요?">
프로젝트 내에서만 사용할 커스텀 도구가 필요하다면 [커스텀 도구 생성](/ko/learn/create-custom-tools) 가이드를 참고하세요.
</Note>
## 도구 계약
모든 CrewAI 도구는 다음 두 가지 인터페이스 중 하나를 충족해야 합니다:
### 옵션 1: `BaseTool` 서브클래싱
`crewai.tools.BaseTool`을 서브클래싱하고 `_run` 메서드를 구현합니다. `name`, `description`, 그리고 선택적으로 입력 검증을 위한 `args_schema`를 정의합니다.
```python
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class GeolocateInput(BaseModel):
"""GeolocateTool의 입력 스키마."""
address: str = Field(..., description="지오코딩할 도로명 주소.")
class GeolocateTool(BaseTool):
name: str = "Geolocate"
description: str = "도로명 주소를 위도/경도 좌표로 변환합니다."
args_schema: type[BaseModel] = GeolocateInput
def _run(self, address: str) -> str:
# 구현 로직
return f"40.7128, -74.0060"
```
### 옵션 2: `@tool` 데코레이터 사용
간단한 도구의 경우, `@tool` 데코레이터로 함수를 CrewAI 도구로 변환할 수 있습니다. 함수에는 반드시 독스트링(도구 설명으로 사용됨)과 타입 어노테이션이 있어야 합니다.
```python
from crewai.tools import tool
@tool("Geolocate")
def geolocate(address: str) -> str:
"""도로명 주소를 위도/경도 좌표로 변환합니다."""
return "40.7128, -74.0060"
```
### 핵심 요구사항
어떤 방식을 사용하든, 도구는 다음을 충족해야 합니다:
- **`name`** — 짧고 설명적인 식별자.
- **`description`** — 에이전트에게 도구를 언제, 어떻게 사용할지 알려줍니다. 에이전트가 도구를 얼마나 잘 활용하는지에 직접적으로 영향을 미치므로 명확하고 구체적으로 작성하세요.
- **`_run`** (BaseTool) 또는 **함수 본문** (@tool) 구현 — 동기 실행 로직.
- 모든 매개변수와 반환 값에 **타입 어노테이션** 사용.
- **문자열** 결과를 반환 (또는 의미 있게 문자열로 변환 가능한 값).
### 선택사항: 비동기 지원
I/O 바운드 작업을 수행하는 도구의 경우 비동기 실행을 위해 `_arun`을 구현합니다:
```python
class GeolocateTool(BaseTool):
name: str = "Geolocate"
description: str = "도로명 주소를 위도/경도 좌표로 변환합니다."
def _run(self, address: str) -> str:
# 동기 구현
...
async def _arun(self, address: str) -> str:
# 비동기 구현
...
```
### 선택사항: `args_schema`를 통한 입력 검증
Pydantic 모델을 `args_schema`로 정의하면 자동 입력 검증과 명확한 에러 메시지를 받을 수 있습니다. 제공하지 않으면 CrewAI가 `_run` 메서드의 시그니처에서 추론합니다.
```python
from pydantic import BaseModel, Field
class TranslateInput(BaseModel):
"""TranslateTool의 입력 스키마."""
text: str = Field(..., description="번역할 텍스트.")
target_language: str = Field(
default="en",
description="대상 언어의 ISO 639-1 언어 코드.",
)
```
배포용 도구에는 명시적 스키마를 권장합니다 — 에이전트 동작이 개선되고 사용자에게 더 명확한 문서를 제공합니다.
### 선택사항: 환경 변수
도구에 API 키나 기타 설정이 필요한 경우, `env_vars`로 선언하여 사용자가 무엇을 설정해야 하는지 알 수 있도록 합니다:
```python
from crewai.tools import BaseTool, EnvVar
class GeolocateTool(BaseTool):
name: str = "Geolocate"
description: str = "도로명 주소를 위도/경도 좌표로 변환합니다."
env_vars: list[EnvVar] = [
EnvVar(
name="GEOCODING_API_KEY",
description="지오코딩 서비스 API 키.",
required=True,
),
]
def _run(self, address: str) -> str:
...
```
## 패키지 구조
프로젝트를 표준 Python 패키지로 구성합니다. 권장 레이아웃:
```
crewai-geolocate/
├── pyproject.toml
├── LICENSE
├── README.md
└── src/
└── crewai_geolocate/
├── __init__.py
└── tools.py
```
### `pyproject.toml`
```toml
[project]
name = "crewai-geolocate"
version = "0.1.0"
description = "도로명 주소를 지오코딩하는 CrewAI 도구."
requires-python = ">=3.10"
dependencies = [
"crewai",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
```
사용자가 자동으로 호환 버전을 받을 수 있도록 `crewai`를 의존성으로 선언합니다.
### `__init__.py`
사용자가 직접 import할 수 있도록 도구 클래스를 re-export합니다:
```python
from crewai_geolocate.tools import GeolocateTool
__all__ = ["GeolocateTool"]
```
### 명명 규칙
- **패키지 이름**: `crewai-` 접두사를 사용합니다 (예: `crewai-geolocate`). PyPI에서 검색할 때 도구를 쉽게 찾을 수 있습니다.
- **모듈 이름**: 밑줄을 사용합니다 (예: `crewai_geolocate`).
- **도구 클래스 이름**: `Tool`로 끝나는 PascalCase를 사용합니다 (예: `GeolocateTool`).
## 도구 테스트
게시 전에 도구가 크루 내에서 작동하는지 확인합니다:
```python
from crewai import Agent, Crew, Task
from crewai_geolocate import GeolocateTool
agent = Agent(
role="Location Analyst",
goal="주어진 주소의 좌표를 찾습니다.",
backstory="지리공간 데이터 전문가.",
tools=[GeolocateTool()],
)
task = Task(
description="1600 Pennsylvania Avenue, Washington, DC의 좌표를 찾으세요.",
expected_output="해당 주소의 위도와 경도.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
print(result)
```
## PyPI에 게시하기
도구 테스트를 완료하고 준비가 되면:
```bash
# 패키지 빌드
uv build
# PyPI에 게시
uv publish
```
처음 게시하는 경우 [PyPI 계정](https://pypi.org/account/register/)과 [API 토큰](https://pypi.org/help/#apitoken)이 필요합니다.
### 게시 후
사용자는 다음과 같이 도구를 설치할 수 있습니다:
```bash
pip install crewai-geolocate
```
또는 uv를 사용하여:
```bash
uv add crewai-geolocate
```
그런 다음 크루에서 사용합니다:
```python
from crewai_geolocate import GeolocateTool
agent = Agent(
role="Location Analyst",
tools=[GeolocateTool()],
# ...
)
```

View File

@@ -9,10 +9,6 @@ mode: "wide"
이 가이드는 CrewAI 프레임워크를 위한 커스텀 툴을 생성하는 방법과 최신 기능(툴 위임, 오류 처리, 동적 툴 호출 등)을 통합하여 이러한 툴을 효율적으로 관리하고 활용하는 방법에 대해 자세히 안내합니다. 또한 협업 툴의 중요성을 강조하며, 에이전트가 다양한 작업을 수행할 수 있도록 지원합니다.
<Tip>
**커뮤니티에 도구를 배포하고 싶으신가요?** 다른 사용자에게도 유용한 도구를 만들고 있다면, [커스텀 도구 배포하기](/ko/guides/tools/publish-custom-tools) 가이드에서 도구를 패키징하고 PyPI에 배포하는 방법을 알아보세요.
</Tip>
### `BaseTool` 서브클래싱
개인화된 툴을 생성하려면 `BaseTool`을 상속받고, 입력 검증을 위한 `args_schema`와 `_run` 메서드를 포함한 필요한 속성들을 정의해야 합니다.

View File

@@ -1,61 +0,0 @@
---
title: Ferramentas de Codificação
description: Use o AGENTS.md para guiar agentes de codificação e IDEs em seus projetos CrewAI.
icon: terminal
mode: "wide"
---
## Por que AGENTS.md
`AGENTS.md` é um arquivo de instruções leve e local do repositório que fornece aos agentes de codificação orientações consistentes e específicas do projeto. Mantenha-o na raiz do projeto e trate-o como a fonte da verdade para como você deseja que os assistentes trabalhem: convenções, comandos, notas de arquitetura e proteções.
## Criar um Projeto com o CLI
Use o CLI do CrewAI para criar a estrutura de um projeto, e o `AGENTS.md` será automaticamente adicionado na raiz.
```bash
# Crew
crewai create crew my_crew
# Flow
crewai create flow my_flow
# Tool repository
crewai tool create my_tool
```
## Configuração de Ferramentas: Direcione Assistentes para o AGENTS.md
### Codex
O Codex pode ser guiado por arquivos `AGENTS.md` colocados no seu repositório. Use-os para fornecer contexto persistente do projeto, como convenções, comandos e expectativas de fluxo de trabalho.
### Claude Code
O Claude Code armazena a memória do projeto em `CLAUDE.md`. Você pode inicializá-lo com `/init` e editá-lo usando `/memory`. O Claude Code também suporta importações dentro do `CLAUDE.md`, então você pode adicionar uma única linha como `@AGENTS.md` para incluir as instruções compartilhadas sem duplicá-las.
Você pode simplesmente usar:
```bash
mv AGENTS.md CLAUDE.md
```
### Gemini CLI e Google Antigravity
O Gemini CLI e o Antigravity carregam um arquivo de contexto do projeto (padrão: `GEMINI.md`) da raiz do repositório e diretórios pais. Você pode configurá-lo para ler o `AGENTS.md` em vez disso (ou além) definindo `context.fileName` nas configurações do Gemini CLI. Por exemplo, defina apenas para `AGENTS.md`, ou inclua tanto `AGENTS.md` quanto `GEMINI.md` se quiser manter o formato de cada ferramenta.
Você pode simplesmente usar:
```bash
mv AGENTS.md GEMINI.md
```
### Cursor
O Cursor suporta `AGENTS.md` como arquivo de instruções do projeto. Coloque-o na raiz do projeto para fornecer orientação ao assistente de codificação do Cursor.
### Windsurf
O Claude Code fornece uma integração oficial com o Windsurf. Se você usa o Claude Code dentro do Windsurf, siga a orientação do Claude Code acima e importe o `AGENTS.md` a partir do `CLAUDE.md`.
Se você está usando o assistente nativo do Windsurf, configure o recurso de regras ou instruções do projeto (se disponível) para ler o `AGENTS.md` ou cole o conteúdo diretamente.

View File

@@ -1,244 +0,0 @@
---
title: Publicar Ferramentas Personalizadas
description: Como construir, empacotar e publicar suas próprias ferramentas compatíveis com CrewAI no PyPI para que qualquer usuário do CrewAI possa instalá-las e usá-las.
icon: box-open
mode: "wide"
---
## Visão Geral
O sistema de ferramentas do CrewAI foi projetado para ser extensível. Se você construiu uma ferramenta que pode beneficiar outros, pode empacotá-la como uma biblioteca Python independente, publicá-la no PyPI e disponibilizá-la para qualquer usuário do CrewAI — sem necessidade de PR para o repositório do CrewAI.
Este guia percorre todo o processo: implementação do contrato de ferramentas, estruturação do pacote e publicação no PyPI.
<Note type="info" title="Não pretende publicar?">
Se você precisa apenas de uma ferramenta personalizada para seu próprio projeto, consulte o guia [Criar Ferramentas Personalizadas](/pt-BR/learn/create-custom-tools).
</Note>
## O Contrato de Ferramentas
Toda ferramenta CrewAI deve satisfazer uma das duas interfaces:
### Opção 1: Subclassificar `BaseTool`
Subclassifique `crewai.tools.BaseTool` e implemente o método `_run`. Defina `name`, `description` e, opcionalmente, um `args_schema` para validação de entrada.
```python
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class GeolocateInput(BaseModel):
"""Esquema de entrada para GeolocateTool."""
address: str = Field(..., description="O endereço para geolocalizar.")
class GeolocateTool(BaseTool):
name: str = "Geolocate"
description: str = "Converte um endereço em coordenadas de latitude/longitude."
args_schema: type[BaseModel] = GeolocateInput
def _run(self, address: str) -> str:
# Sua implementação aqui
return f"40.7128, -74.0060"
```
### Opção 2: Usar o Decorador `@tool`
Para ferramentas mais simples, o decorador `@tool` transforma uma função em uma ferramenta CrewAI. A função **deve** ter uma docstring (usada como descrição da ferramenta) e anotações de tipo.
```python
from crewai.tools import tool
@tool("Geolocate")
def geolocate(address: str) -> str:
"""Converte um endereço em coordenadas de latitude/longitude."""
return "40.7128, -74.0060"
```
### Requisitos Essenciais
Independentemente da abordagem escolhida, sua ferramenta deve:
- Ter um **`name`** — um identificador curto e descritivo.
- Ter uma **`description`** — informa ao agente quando e como usar a ferramenta. Isso afeta diretamente a qualidade do uso da ferramenta pelo agente, então seja claro e específico.
- Implementar **`_run`** (BaseTool) ou fornecer um **corpo de função** (@tool) — a lógica de execução síncrona.
- Usar **anotações de tipo** em todos os parâmetros e valores de retorno.
- Retornar um resultado em **string** (ou algo que possa ser convertido de forma significativa).
### Opcional: Suporte Assíncrono
Se sua ferramenta realiza operações de I/O, implemente `_arun` para execução assíncrona:
```python
class GeolocateTool(BaseTool):
name: str = "Geolocate"
description: str = "Converte um endereço em coordenadas de latitude/longitude."
def _run(self, address: str) -> str:
# Implementação síncrona
...
async def _arun(self, address: str) -> str:
# Implementação assíncrona
...
```
### Opcional: Validação de Entrada com `args_schema`
Defina um modelo Pydantic como seu `args_schema` para obter validação automática de entrada e mensagens de erro claras. Se não fornecer um, o CrewAI irá inferi-lo da assinatura do seu método `_run`.
```python
from pydantic import BaseModel, Field
class TranslateInput(BaseModel):
"""Esquema de entrada para TranslateTool."""
text: str = Field(..., description="O texto a ser traduzido.")
target_language: str = Field(
default="en",
description="Código de idioma ISO 639-1 para o idioma de destino.",
)
```
Esquemas explícitos são recomendados para ferramentas publicadas — produzem melhor comportamento do agente e documentação mais clara para seus usuários.
### Opcional: Variáveis de Ambiente
Se sua ferramenta requer chaves de API ou outra configuração, declare-as com `env_vars` para que os usuários saibam o que configurar:
```python
from crewai.tools import BaseTool, EnvVar
class GeolocateTool(BaseTool):
name: str = "Geolocate"
description: str = "Converte um endereço em coordenadas de latitude/longitude."
env_vars: list[EnvVar] = [
EnvVar(
name="GEOCODING_API_KEY",
description="Chave de API para o serviço de geocodificação.",
required=True,
),
]
def _run(self, address: str) -> str:
...
```
## Estrutura do Pacote
Estruture seu projeto como um pacote Python padrão. Layout recomendado:
```
crewai-geolocate/
├── pyproject.toml
├── LICENSE
├── README.md
└── src/
└── crewai_geolocate/
├── __init__.py
└── tools.py
```
### `pyproject.toml`
```toml
[project]
name = "crewai-geolocate"
version = "0.1.0"
description = "Uma ferramenta CrewAI para geolocalizar endereços."
requires-python = ">=3.10"
dependencies = [
"crewai",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
```
Declare `crewai` como dependência para que os usuários obtenham automaticamente uma versão compatível.
### `__init__.py`
Re-exporte suas classes de ferramenta para que os usuários possam importá-las diretamente:
```python
from crewai_geolocate.tools import GeolocateTool
__all__ = ["GeolocateTool"]
```
### Convenções de Nomenclatura
- **Nome do pacote**: Use o prefixo `crewai-` (ex.: `crewai-geolocate`). Isso torna sua ferramenta fácil de encontrar no PyPI.
- **Nome do módulo**: Use underscores (ex.: `crewai_geolocate`).
- **Nome da classe da ferramenta**: Use PascalCase terminando em `Tool` (ex.: `GeolocateTool`).
## Testando sua Ferramenta
Antes de publicar, verifique se sua ferramenta funciona dentro de uma crew:
```python
from crewai import Agent, Crew, Task
from crewai_geolocate import GeolocateTool
agent = Agent(
role="Analista de Localização",
goal="Encontrar coordenadas para os endereços fornecidos.",
backstory="Um especialista em dados geoespaciais.",
tools=[GeolocateTool()],
)
task = Task(
description="Encontre as coordenadas de 1600 Pennsylvania Avenue, Washington, DC.",
expected_output="A latitude e longitude do endereço.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
print(result)
```
## Publicando no PyPI
Quando sua ferramenta estiver testada e pronta:
```bash
# Construir o pacote
uv build
# Publicar no PyPI
uv publish
```
Se é sua primeira vez publicando, você precisará de uma [conta no PyPI](https://pypi.org/account/register/) e um [token de API](https://pypi.org/help/#apitoken).
### Após a Publicação
Os usuários podem instalar sua ferramenta com:
```bash
pip install crewai-geolocate
```
Ou com uv:
```bash
uv add crewai-geolocate
```
E então usá-la em suas crews:
```python
from crewai_geolocate import GeolocateTool
agent = Agent(
role="Analista de Localização",
tools=[GeolocateTool()],
# ...
)
```

View File

@@ -11,10 +11,6 @@ Este guia traz instruções detalhadas sobre como criar ferramentas personalizad
incorporando funcionalidades recentes, como delegação de ferramentas, tratamento de erros e chamada dinâmica de ferramentas. Destaca também a importância de ferramentas de colaboração,
permitindo que agentes executem uma ampla gama de ações.
<Tip>
**Quer publicar sua ferramenta para a comunidade?** Se você está construindo uma ferramenta que pode beneficiar outros, confira o guia [Publicar Ferramentas Personalizadas](/pt-BR/guides/tools/publish-custom-tools) para aprender como empacotar e distribuir sua ferramenta no PyPI.
</Tip>
### Subclassificando `BaseTool`
Para criar uma ferramenta personalizada, herde de `BaseTool` e defina os atributos necessários, incluindo o `args_schema` para validação de entrada e o método `_run`.

View File

@@ -1,5 +1,4 @@
import os
from pathlib import Path
from typing import Any
from crewai.tools import BaseTool
@@ -31,39 +30,27 @@ class FileWriterTool(BaseTool):
def _run(self, **kwargs: Any) -> str:
try:
directory = kwargs.get("directory") or "./"
filename = kwargs["filename"]
filepath = os.path.join(directory, filename)
# Prevent path traversal: the resolved path must be strictly inside
# the resolved directory. This blocks ../sequences, absolute paths in
# filename, and symlink escapes regardless of how directory is set.
# is_relative_to() does a proper path-component comparison that is
# safe on case-insensitive filesystems and avoids the "// " edge case
# that plagues startswith(real_directory + os.sep).
# We also reject the case where filepath resolves to the directory
# itself, since that is not a valid file target.
real_directory = Path(directory).resolve()
real_filepath = Path(filepath).resolve()
if not real_filepath.is_relative_to(real_directory) or real_filepath == real_directory:
return "Error: Invalid file path — the filename must not escape the target directory."
if kwargs.get("directory"):
os.makedirs(real_directory, exist_ok=True)
os.makedirs(kwargs["directory"], exist_ok=True)
# Construct the full path
filepath = os.path.join(kwargs.get("directory") or "", kwargs["filename"])
# Convert overwrite to boolean
kwargs["overwrite"] = strtobool(kwargs["overwrite"])
if os.path.exists(real_filepath) and not kwargs["overwrite"]:
return f"File {real_filepath} already exists and overwrite option was not passed."
# Check if file exists and overwrite is not allowed
if os.path.exists(filepath) and not kwargs["overwrite"]:
return f"File {filepath} already exists and overwrite option was not passed."
# Write content to the file
mode = "w" if kwargs["overwrite"] else "x"
with open(real_filepath, mode) as file:
with open(filepath, mode) as file:
file.write(kwargs["content"])
return f"Content successfully written to {real_filepath}"
return f"Content successfully written to {filepath}"
except FileExistsError:
return (
f"File {real_filepath} already exists and overwrite option was not passed."
f"File {filepath} already exists and overwrite option was not passed."
)
except KeyError as e:
return f"An error occurred while accessing key: {e!s}"

View File

@@ -135,59 +135,3 @@ def test_file_exists_error_handling(tool, temp_env, overwrite):
assert "already exists and overwrite option was not passed" in result
assert read_file(path) == "Pre-existing content"
# --- Path traversal prevention ---
def test_blocks_traversal_in_filename(tool, temp_env):
# Create a sibling "outside" directory so we can assert nothing was written there.
outside_dir = tempfile.mkdtemp()
outside_file = os.path.join(outside_dir, "outside.txt")
try:
result = tool._run(
filename=f"../{os.path.basename(outside_dir)}/outside.txt",
directory=temp_env["temp_dir"],
content="should not be written",
overwrite=True,
)
assert "Error" in result
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)
def test_blocks_absolute_path_in_filename(tool, temp_env):
# Use a temp file outside temp_dir as the absolute target so we don't
# depend on /etc/passwd existing or being writable on the host.
outside_dir = tempfile.mkdtemp()
outside_file = os.path.join(outside_dir, "target.txt")
try:
result = tool._run(
filename=outside_file,
directory=temp_env["temp_dir"],
content="should not be written",
overwrite=True,
)
assert "Error" in result
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)
def test_blocks_symlink_escape(tool, temp_env):
# Symlink inside temp_dir pointing to a separate temp "outside" directory.
outside_dir = tempfile.mkdtemp()
outside_file = os.path.join(outside_dir, "target.txt")
link = os.path.join(temp_env["temp_dir"], "escape")
os.symlink(outside_dir, link)
try:
result = tool._run(
filename="escape/target.txt",
directory=temp_env["temp_dir"],
content="should not be written",
overwrite=True,
)
assert "Error" in result
assert not os.path.exists(outside_file)
finally:
shutil.rmtree(outside_dir, ignore_errors=True)

View File

@@ -75,7 +75,6 @@ from crewai.utilities.agent_utils import (
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.env import get_env_context
from crewai.utilities.guardrail import process_guardrail
from crewai.utilities.guardrail_types import GuardrailType
from crewai.utilities.llm_utils import create_llm
@@ -365,7 +364,6 @@ class Agent(BaseAgent):
ValueError: If the max execution time is not a positive integer.
RuntimeError: If the agent execution fails for other reasons.
"""
get_env_context()
# Only call handle_reasoning for legacy CrewAgentExecutor
# For AgentExecutor, planning is handled in AgentExecutor.generate_plan()
if self.executor_class is not AgentExecutor:

View File

@@ -98,7 +98,6 @@ from crewai.types.streaming import CrewStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
from crewai.utilities.crew.models import CrewContext
from crewai.utilities.env import get_env_context
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.file_handler import FileHandler
@@ -680,7 +679,6 @@ class Crew(FlowTrackable, BaseModel):
Returns:
CrewOutput or CrewStreamingOutput if streaming is enabled.
"""
get_env_context()
if self.stream:
enable_agent_streaming(self.agents)
ctx = StreamingContext()

View File

@@ -34,12 +34,6 @@ from crewai.events.types.crew_events import (
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from crewai.events.types.env_events import (
CCEnvEvent,
CodexEnvEvent,
CursorEnvEvent,
DefaultEnvEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
@@ -149,23 +143,6 @@ class EventListener(BaseEventListener):
# ----------- CREW EVENTS -----------
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
@crewai_event_bus.on(CCEnvEvent)
def on_cc_env(_: Any, event: CCEnvEvent) -> None:
self._telemetry.env_context_span(event.type)
@crewai_event_bus.on(CodexEnvEvent)
def on_codex_env(_: Any, event: CodexEnvEvent) -> None:
self._telemetry.env_context_span(event.type)
@crewai_event_bus.on(CursorEnvEvent)
def on_cursor_env(_: Any, event: CursorEnvEvent) -> None:
self._telemetry.env_context_span(event.type)
@crewai_event_bus.on(DefaultEnvEvent)
def on_default_env(_: Any, event: DefaultEnvEvent) -> None:
self._telemetry.env_context_span(event.type)
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
self.formatter.handle_crew_started(event.crew_name or "Crew", source.id)

View File

@@ -1,6 +1,7 @@
import logging
import uuid
import webbrowser
from datetime import datetime, timezone
from rich.console import Console
from rich.panel import Panel
@@ -100,20 +101,48 @@ class FirstTimeTraceHandler:
user_context=user_context,
execution_metadata=execution_metadata,
use_ephemeral=True,
skip_context_check=True,
)
if not self.batch_manager.trace_batch_id:
self._gracefully_fail("Backend batch creation failed, cannot send events.")
self._reset_batch_state()
return
self.batch_manager.backend_initialized = True
if self.batch_manager.event_buffer:
self.batch_manager._send_events_to_backend()
# Capture values before send/finalize consume them
events_count = len(self.batch_manager.event_buffer)
batch_id = self.batch_manager.trace_batch_id
# Read duration non-destructively — _finalize_backend_batch will consume it
start_time = self.batch_manager.execution_start_times.get("execution")
duration_ms = (
int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
if start_time
else 0
)
self.batch_manager.finalize_batch()
if self.batch_manager.event_buffer:
send_status = self.batch_manager._send_events_to_backend()
if send_status == 500 and self.batch_manager.trace_batch_id:
self.batch_manager.plus_api.mark_trace_batch_as_failed(
self.batch_manager.trace_batch_id,
"Error sending events to backend",
)
self._reset_batch_state()
return
self.batch_manager._finalize_backend_batch(events_count)
self.ephemeral_url = self.batch_manager.ephemeral_trace_url
if not self.ephemeral_url:
self._show_local_trace_message()
self._show_local_trace_message(events_count, duration_ms, batch_id)
self._reset_batch_state()
except Exception as e:
self._gracefully_fail(f"Backend initialization failed: {e}")
self._reset_batch_state()
def _display_ephemeral_trace_link(self):
"""Display the ephemeral trace link to the user and automatically open browser."""
@@ -184,6 +213,19 @@ To enable tracing later, do any one of these:
console.print(panel)
console.print()
def _reset_batch_state(self):
"""Reset batch manager state to allow future executions to re-initialize."""
if not self.batch_manager:
return
self.batch_manager.batch_owner_type = None
self.batch_manager.batch_owner_id = None
self.batch_manager.current_batch = None
self.batch_manager.event_buffer.clear()
self.batch_manager.trace_batch_id = None
self.batch_manager.is_current_batch_ephemeral = False
self.batch_manager.backend_initialized = False
self.batch_manager._cleanup_batch_data()
def _gracefully_fail(self, error_message: str):
"""Handle errors gracefully without disrupting user experience."""
console = Console()
@@ -191,7 +233,7 @@ To enable tracing later, do any one of these:
logger.debug(f"First-time trace error: {error_message}")
def _show_local_trace_message(self):
def _show_local_trace_message(self, events_count: int = 0, duration_ms: int = 0, batch_id: str | None = None):
"""Show message when traces were collected locally but couldn't be uploaded."""
console = Console()
@@ -199,9 +241,9 @@ To enable tracing later, do any one of these:
📊 Your execution traces were collected locally!
Unfortunately, we couldn't upload them to the server right now, but here's what we captured:
{len(self.batch_manager.event_buffer)} trace events
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
• Batch ID: {self.batch_manager.trace_batch_id}
{events_count} trace events
• Execution duration: {duration_ms}ms
• Batch ID: {batch_id}
✅ Tracing has been enabled for future runs!
Your preference has been saved. Future Crew/Flow executions will automatically collect traces.

View File

@@ -1,3 +1,4 @@
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from logging import getLogger
@@ -108,10 +109,11 @@ class TraceBatchManager:
user_context: dict[str, str],
execution_metadata: dict[str, Any],
use_ephemeral: bool = False,
skip_context_check: bool = False,
) -> None:
"""Send batch initialization to backend"""
if not is_tracing_enabled_in_context():
if not skip_context_check and not is_tracing_enabled_in_context():
return
if not self.plus_api or not self.current_batch:
@@ -142,19 +144,62 @@ class TraceBatchManager:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
payload["user_identifier"] = get_user_id()
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
max_retries = 2
response = None
last_exception = None
for attempt in range(max_retries + 1):
try:
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
if response is not None and response.status_code < 500:
break
if attempt < max_retries:
logger.debug(
f"Trace batch init attempt {attempt + 1} failed "
f"(status={response.status_code if response else 'None'}), retrying..."
)
time.sleep(0.2)
except Exception as e:
last_exception = e
if attempt < max_retries:
logger.debug(
f"Trace batch init attempt {attempt + 1} raised {type(e).__name__}, retrying..."
)
time.sleep(0.2)
if last_exception and response is None:
logger.warning(
f"Error initializing trace batch: {last_exception}. Continuing without tracing."
)
self.trace_batch_id = None
return
if response is None:
logger.warning(
"Trace batch initialization failed gracefully. Continuing without tracing."
)
self.trace_batch_id = None
return
# Fall back to ephemeral on auth failure (expired/revoked token)
if response.status_code in [401, 403] and not use_ephemeral:
logger.warning(
"Auth rejected by server, falling back to ephemeral tracing."
)
self.is_current_batch_ephemeral = True
return self._initialize_backend_batch(
user_context,
execution_metadata,
use_ephemeral=True,
skip_context_check=skip_context_check,
)
if response.status_code in [201, 200]:
self.is_current_batch_ephemeral = use_ephemeral
response_data = response.json()
self.trace_batch_id = (
response_data["trace_id"]
@@ -165,11 +210,13 @@ class TraceBatchManager:
logger.warning(
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
)
self.trace_batch_id = None
except Exception as e:
logger.warning(
f"Error initializing trace batch: {e}. Continuing without tracing."
)
self.trace_batch_id = None
def begin_event_processing(self) -> None:
"""Mark that an event handler started processing (for synchronization)."""

View File

@@ -58,12 +58,6 @@ from crewai.events.types.crew_events import (
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
)
from crewai.events.types.env_events import (
CCEnvEvent,
CodexEnvEvent,
CursorEnvEvent,
DefaultEnvEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
@@ -198,7 +192,6 @@ class TraceCollectionListener(BaseEventListener):
if self._listeners_setup:
return
self._register_env_event_handlers(crewai_event_bus)
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
self._register_action_event_handlers(crewai_event_bus)
@@ -207,25 +200,6 @@ class TraceCollectionListener(BaseEventListener):
self._listeners_setup = True
def _register_env_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for environment context events."""
@event_bus.on(CCEnvEvent)
def on_cc_env(source: Any, event: CCEnvEvent) -> None:
self._handle_action_event("cc_env", source, event)
@event_bus.on(CodexEnvEvent)
def on_codex_env(source: Any, event: CodexEnvEvent) -> None:
self._handle_action_event("codex_env", source, event)
@event_bus.on(CursorEnvEvent)
def on_cursor_env(source: Any, event: CursorEnvEvent) -> None:
self._handle_action_event("cursor_env", source, event)
@event_bus.on(DefaultEnvEvent)
def on_default_env(source: Any, event: DefaultEnvEvent) -> None:
self._handle_action_event("default_env", source, event)
def _register_flow_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for flow events."""

View File

@@ -1,36 +0,0 @@
from typing import Annotated, Literal
from pydantic import Field, TypeAdapter
from crewai.events.base_events import BaseEvent
class CCEnvEvent(BaseEvent):
type: Literal["cc_env"] = "cc_env"
class CodexEnvEvent(BaseEvent):
type: Literal["codex_env"] = "codex_env"
class CursorEnvEvent(BaseEvent):
type: Literal["cursor_env"] = "cursor_env"
class DefaultEnvEvent(BaseEvent):
type: Literal["default_env"] = "default_env"
EnvContextEvent = Annotated[
CCEnvEvent | CodexEnvEvent | CursorEnvEvent | DefaultEnvEvent,
Field(discriminator="type"),
]
env_context_event_adapter: TypeAdapter[EnvContextEvent] = TypeAdapter(EnvContextEvent)
ENV_CONTEXT_EVENT_TYPES: tuple[type[BaseEvent], ...] = (
CCEnvEvent,
CodexEnvEvent,
CursorEnvEvent,
DefaultEnvEvent,
)

View File

@@ -110,7 +110,6 @@ if TYPE_CHECKING:
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
from crewai.utilities.env import get_env_context
from crewai.utilities.streaming import (
TaskInfo,
create_async_chunk_generator,
@@ -1771,7 +1770,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
The final output from the flow or FlowStreamingOutput if streaming.
"""
get_env_context()
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {

View File

@@ -986,22 +986,6 @@ class Telemetry:
self._safe_telemetry_operation(_operation)
def env_context_span(self, tool: str) -> None:
"""Records the coding tool environment context."""
def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Environment Context")
self._add_attribute(
span,
"crewai_version",
version("crewai"),
)
self._add_attribute(span, "tool", tool)
close_span(span)
self._safe_telemetry_operation(_operation)
def human_feedback_span(
self,
event_type: str,

View File

@@ -281,7 +281,6 @@ class BaseTool(BaseModel, ABC):
result_as_answer=self.result_as_answer,
max_usage_count=self.max_usage_count,
current_usage_count=self.current_usage_count,
cache_function=self.cache_function,
)
structured_tool._original_tool = self
return structured_tool

View File

@@ -58,7 +58,6 @@ class CrewStructuredTool:
result_as_answer: bool = False,
max_usage_count: int | None = None,
current_usage_count: int = 0,
cache_function: Callable[..., bool] | None = None,
) -> None:
"""Initialize the structured tool.
@@ -70,7 +69,6 @@ class CrewStructuredTool:
result_as_answer: Whether to return the output directly
max_usage_count: Maximum number of times this tool can be used. None means unlimited usage.
current_usage_count: Current number of times this tool has been used.
cache_function: Function to determine if the tool result should be cached.
"""
self.name = name
self.description = description
@@ -80,7 +78,6 @@ class CrewStructuredTool:
self.result_as_answer = result_as_answer
self.max_usage_count = max_usage_count
self.current_usage_count = current_usage_count
self.cache_function = cache_function
self._original_tool: BaseTool | None = None
# Validate the function signature matches the schema
@@ -89,7 +86,7 @@ class CrewStructuredTool:
@classmethod
def from_function(
cls,
func: Callable[..., Any],
func: Callable,
name: str | None = None,
description: str | None = None,
return_direct: bool = False,
@@ -150,7 +147,7 @@ class CrewStructuredTool:
@staticmethod
def _create_schema_from_function(
name: str,
func: Callable[..., Any],
func: Callable,
) -> type[BaseModel]:
"""Create a Pydantic schema from a function's signature.
@@ -185,7 +182,7 @@ class CrewStructuredTool:
# Create model
schema_name = f"{name.title()}Schema"
return create_model(schema_name, **fields) # type: ignore[call-overload, no-any-return]
return create_model(schema_name, **fields) # type: ignore[call-overload]
def _validate_function_signature(self) -> None:
"""Validate that the function signature matches the args schema."""
@@ -213,7 +210,7 @@ class CrewStructuredTool:
f"not found in args_schema"
)
def _parse_args(self, raw_args: str | dict[str, Any]) -> dict[str, Any]:
def _parse_args(self, raw_args: str | dict) -> dict:
"""Parse and validate the input arguments against the schema.
Args:
@@ -237,8 +234,8 @@ class CrewStructuredTool:
async def ainvoke(
self,
input: str | dict[str, Any],
config: dict[str, Any] | None = None,
input: str | dict,
config: dict | None = None,
**kwargs: Any,
) -> Any:
"""Asynchronously invoke the tool.
@@ -272,7 +269,7 @@ class CrewStructuredTool:
except Exception:
raise
def _run(self, *args: Any, **kwargs: Any) -> Any:
def _run(self, *args, **kwargs) -> Any:
"""Legacy method for compatibility."""
# Convert args/kwargs to our expected format
input_dict = dict(zip(self.args_schema.model_fields.keys(), args, strict=False))
@@ -280,10 +277,7 @@ class CrewStructuredTool:
return self.invoke(input_dict)
def invoke(
self,
input: str | dict[str, Any],
config: dict[str, Any] | None = None,
**kwargs: Any,
self, input: str | dict, config: dict | None = None, **kwargs: Any
) -> Any:
"""Main method for tool execution."""
parsed_args = self._parse_args(input)
@@ -319,10 +313,9 @@ class CrewStructuredTool:
self._original_tool.current_usage_count = self.current_usage_count
@property
def args(self) -> dict[str, Any]:
def args(self) -> dict:
"""Get the tool's input arguments schema."""
schema: dict[str, Any] = self.args_schema.model_json_schema()["properties"]
return schema
return self.args_schema.model_json_schema()["properties"]
def __repr__(self) -> str:
return f"CrewStructuredTool(name='{sanitize_tool_name(self.name)}', description='{self.description}')"

View File

@@ -8,21 +8,6 @@ TRAINED_AGENTS_DATA_FILE: Final[str] = "trained_agents_data.pkl"
KNOWLEDGE_DIRECTORY: Final[str] = "knowledge"
MAX_FILE_NAME_LENGTH: Final[int] = 255
EMITTER_COLOR: Final[PrinterColor] = "bold_blue"
CC_ENV_VAR: Final[str] = "CLAUDECODE"
CODEX_ENV_VARS: Final[tuple[str, ...]] = (
"CODEX_CI",
"CODEX_MANAGED_BY_NPM",
"CODEX_SANDBOX",
"CODEX_SANDBOX_NETWORK_DISABLED",
"CODEX_THREAD_ID",
)
CURSOR_ENV_VARS: Final[tuple[str, ...]] = (
"CURSOR_AGENT",
"CURSOR_EXTENSION_HOST_ROLE",
"CURSOR_SANDBOX",
"CURSOR_TRACE_ID",
"CURSOR_WORKSPACE_LABEL",
)
class _NotSpecified:

View File

@@ -1,39 +0,0 @@
import contextvars
import os
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.env_events import (
CCEnvEvent,
CodexEnvEvent,
CursorEnvEvent,
DefaultEnvEvent,
)
from crewai.utilities.constants import CC_ENV_VAR, CODEX_ENV_VARS, CURSOR_ENV_VARS
_env_context_emitted: contextvars.ContextVar[bool] = contextvars.ContextVar(
"_env_context_emitted", default=False
)
def _is_codex_env() -> bool:
return any(os.environ.get(var) for var in CODEX_ENV_VARS)
def _is_cursor_env() -> bool:
return any(os.environ.get(var) for var in CURSOR_ENV_VARS)
def get_env_context() -> None:
if _env_context_emitted.get():
return
_env_context_emitted.set(True)
if os.environ.get(CC_ENV_VAR):
crewai_event_bus.emit(None, CCEnvEvent())
elif _is_codex_env():
crewai_event_bus.emit(None, CodexEnvEvent())
elif _is_cursor_env():
crewai_event_bus.emit(None, CursorEnvEvent())
else:
crewai_event_bus.emit(None, DefaultEnvEvent())

View File

@@ -38,44 +38,6 @@ def test_initialization(basic_function, schema_class):
assert tool.args_schema == schema_class
def test_cache_function_passed_through(basic_function, schema_class):
"""Test that cache_function is stored on CrewStructuredTool."""
def no_cache(_args: dict, _result: str) -> bool:
return False
tool = CrewStructuredTool(
name="test_tool",
description="Test tool description",
func=basic_function,
args_schema=schema_class,
cache_function=no_cache,
)
assert tool.cache_function is no_cache
def test_base_tool_passes_cache_function_to_structured_tool():
"""Test that BaseTool.to_structured_tool propagates cache_function."""
from crewai.tools import BaseTool
def no_cache(_args: dict, _result: str) -> bool:
return False
class MyCacheTool(BaseTool):
name: str = "cache_test"
description: str = "tool for testing cache passthrough"
def _run(self, query: str = "") -> str:
return "result"
my_tool = MyCacheTool()
my_tool.cache_function = no_cache # type: ignore[assignment]
structured = my_tool.to_structured_tool()
assert structured.cache_function is no_cache
def test_from_function(basic_function):
"""Test creating tool from function"""
tool = CrewStructuredTool.from_function(

View File

@@ -7,6 +7,7 @@ from crewai.events.listeners.tracing.first_time_trace_handler import (
FirstTimeTraceHandler,
)
from crewai.events.listeners.tracing.trace_batch_manager import (
TraceBatch,
TraceBatchManager,
)
from crewai.events.listeners.tracing.trace_listener import (
@@ -657,6 +658,16 @@ class TestTraceListenerSetup:
trace_listener.first_time_handler.collected_events = True
mock_batch_response = MagicMock()
mock_batch_response.status_code = 201
mock_batch_response.json.return_value = {
"trace_id": "mock-trace-id",
"ephemeral_trace_id": "mock-ephemeral-trace-id",
"access_code": "TRACE-mock",
}
mock_events_response = MagicMock()
mock_events_response.status_code = 200
with (
patch.object(
trace_listener.first_time_handler,
@@ -666,6 +677,40 @@ class TestTraceListenerSetup:
patch.object(
trace_listener.first_time_handler, "_display_ephemeral_trace_link"
) as mock_display_link,
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_ephemeral_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_ephemeral_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager,
"_cleanup_batch_data",
),
):
crew.kickoff()
wait_for_event_handlers()
@@ -918,3 +963,576 @@ class TestTraceListenerSetup:
mock_init.assert_called_once()
payload = mock_init.call_args[0][0]
assert "user_identifier" not in payload
class TestTraceBatchIdClearedOnFailure:
"""Tests: trace_batch_id is cleared when _initialize_backend_batch fails."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user)."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96
bm.is_current_batch_ephemeral = True
return bm
def test_trace_batch_id_cleared_on_exception(self):
"""trace_batch_id must be None when the API call raises an exception."""
bm = self._make_batch_manager()
assert bm.trace_batch_id is not None
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=ConnectionError("network down"),
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
def test_trace_batch_id_set_on_success(self):
"""trace_batch_id must be set from the server response on success."""
bm = self._make_batch_manager()
server_id = "server-ephemeral-trace-id-999"
mock_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
def test_send_events_skipped_when_trace_batch_id_none(self):
"""_send_events_to_backend must return early when trace_batch_id is None."""
bm = self._make_batch_manager()
bm.trace_batch_id = None
bm.event_buffer = [MagicMock()] # has events
with patch.object(
bm.plus_api, "send_ephemeral_trace_events"
) as mock_send:
result = bm._send_events_to_backend()
assert result == 500
mock_send.assert_not_called()
class TestInitializeBackendBatchRetry:
"""Tests for retry logic in _initialize_backend_batch."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
return bm
def test_retries_on_none_response_then_succeeds(self):
"""Retries when API returns None, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-retry"
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[None, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
mock_sleep.assert_called_once_with(0.2)
def test_retries_on_5xx_then_succeeds(self):
"""Retries on 500 server error, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-5xx"
error_response = MagicMock(status_code=500, text="Internal Server Error")
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[error_response, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
def test_retries_on_exception_then_succeeds(self):
"""Retries on ConnectionError, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-exception"
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[ConnectionError("network down"), success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
def test_no_retry_on_4xx(self):
"""Does NOT retry on 422 — client error is not transient."""
bm = self._make_batch_manager()
error_response = MagicMock(status_code=422, text="Unprocessable Entity")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=error_response,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 1
mock_sleep.assert_not_called()
def test_exhausts_retries_then_clears_batch_id(self):
"""After all retries fail, trace_batch_id is None."""
bm = self._make_batch_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 3 # initial + 2 retries
class TestFirstTimeHandlerBackendInitGuard:
"""Tests: backend_initialized gated on actual batch creation success."""
def _make_handler_with_manager(self):
"""Create a FirstTimeTraceHandler wired to a TraceBatchManager."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_backend_initialized_true_on_success(self):
"""Events are sent when batch creation succeeds, then state is cleaned up."""
handler, bm = self._make_handler_with_manager()
server_id = "server-id-abc"
mock_init_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
mock_send_response = MagicMock(status_code=200)
trace_batch_id_during_send = None
def capture_send(*args, **kwargs):
nonlocal trace_batch_id_during_send
trace_batch_id_during_send = bm.trace_batch_id
return mock_send_response
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_init_response,
),
patch.object(
bm.plus_api,
"send_ephemeral_trace_events",
side_effect=capture_send,
),
patch.object(bm, "_finalize_backend_batch"),
):
bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))]
handler._initialize_backend_and_send_events()
# trace_batch_id was set correctly during send
assert trace_batch_id_during_send == server_id
# State cleaned up after completion (singleton reuse)
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
assert bm.current_batch is None
def test_backend_initialized_false_on_failure(self):
"""backend_initialized stays False and events are NOT sent when batch creation fails."""
handler, bm = self._make_handler_with_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None, # server call fails
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
def test_backend_initialized_false_on_non_2xx(self):
"""backend_initialized stays False when server returns non-2xx."""
handler, bm = self._make_handler_with_manager()
mock_response = MagicMock(status_code=500, text="Internal Server Error")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
class TestFirstTimeHandlerAlwaysEphemeral:
"""Tests that first-time handler always uses ephemeral with skip_context_check."""
def _make_handler_with_manager(self):
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_deferred_init_uses_ephemeral_and_skip_context_check(self):
"""Deferred backend init always uses ephemeral=True and skip_context_check=True."""
handler, bm = self._make_handler_with_manager()
with (
patch.object(bm, "_initialize_backend_batch") as mock_init,
patch.object(bm, "_send_events_to_backend"),
patch.object(bm, "_finalize_backend_batch"),
):
mock_init.side_effect = lambda **kwargs: None
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
mock_init.assert_called_once()
assert mock_init.call_args.kwargs["use_ephemeral"] is True
assert mock_init.call_args.kwargs["skip_context_check"] is True
class TestAuthFailbackToEphemeral:
"""Tests for ephemeral fallback when server rejects auth (401/403)."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = False # authenticated path
return bm
def test_401_non_ephemeral_falls_back_to_ephemeral(self):
"""A 401 on the non-ephemeral endpoint should retry as ephemeral."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-id"
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
mock_ephemeral.assert_called_once()
def test_403_non_ephemeral_falls_back_to_ephemeral(self):
"""A 403 on the non-ephemeral endpoint should also fall back."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-403"
forbidden = MagicMock(status_code=403, text="Forbidden")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=forbidden,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
def test_401_on_ephemeral_does_not_recurse(self):
"""A 401 on the ephemeral endpoint should NOT try to fall back again."""
bm = self._make_batch_manager()
bm.is_current_batch_ephemeral = True
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=auth_rejected,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
# Called only once — no recursive fallback
mock_ephemeral.assert_called()
def test_401_fallback_ephemeral_also_fails(self):
"""If ephemeral fallback also fails, trace_batch_id is cleared."""
bm = self._make_batch_manager()
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_fail = MagicMock(status_code=422, text="Validation failed")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_fail,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id is None

View File

@@ -5,7 +5,6 @@ from pathlib import Path
import subprocess
import sys
import time
from typing import Final, Literal
import click
from dotenv import load_dotenv
@@ -251,9 +250,7 @@ def add_docs_version(docs_json_path: Path, version: str) -> bool:
return True
ChangelogLang = Literal["en", "pt-BR", "ko"]
_PT_BR_MONTHS: Final[dict[int, str]] = {
_PT_BR_MONTHS = {
1: "jan",
2: "fev",
3: "mar",
@@ -268,9 +265,7 @@ _PT_BR_MONTHS: Final[dict[int, str]] = {
12: "dez",
}
_CHANGELOG_LOCALES: Final[
dict[ChangelogLang, dict[Literal["link_text", "language_name"], str]]
] = {
_CHANGELOG_LOCALES: dict[str, dict[str, str]] = {
"en": {
"link_text": "View release on GitHub",
"language_name": "English",
@@ -288,7 +283,7 @@ _CHANGELOG_LOCALES: Final[
def translate_release_notes(
release_notes: str,
lang: ChangelogLang,
lang: str,
client: OpenAI,
) -> str:
"""Translate release notes into the target language using OpenAI.
@@ -331,7 +326,7 @@ def translate_release_notes(
return release_notes
def _format_changelog_date(lang: ChangelogLang) -> str:
def _format_changelog_date(lang: str) -> str:
"""Format today's date for a changelog entry in the given language."""
from datetime import datetime
@@ -347,7 +342,7 @@ def update_changelog(
changelog_path: Path,
version: str,
release_notes: str,
lang: ChangelogLang = "en",
lang: str = "en",
) -> bool:
"""Prepend a new release entry to a docs changelog file.
@@ -480,23 +475,6 @@ def get_packages(lib_dir: Path) -> list[Path]:
return packages
PrereleaseIndicator = Literal["a", "b", "rc", "alpha", "beta", "dev"]
_PRERELEASE_INDICATORS: Final[tuple[PrereleaseIndicator, ...]] = (
"a",
"b",
"rc",
"alpha",
"beta",
"dev",
)
def _is_prerelease(version: str) -> bool:
"""Check if a version string represents a pre-release."""
v = version.lower().lstrip("v")
return any(indicator in v for indicator in _PRERELEASE_INDICATORS)
def get_commits_from_last_tag(tag_name: str, version: str) -> tuple[str, str]:
"""Get commits from the last tag, excluding current version.
@@ -511,9 +489,6 @@ def get_commits_from_last_tag(tag_name: str, version: str) -> tuple[str, str]:
all_tags = run_command(["git", "tag", "--sort=-version:refname"]).split("\n")
prev_tags = [t for t in all_tags if t and t != tag_name and t != f"v{version}"]
if not _is_prerelease(version):
prev_tags = [t for t in prev_tags if not _is_prerelease(t)]
if prev_tags:
last_tag = prev_tags[0]
commit_range = f"{last_tag}..HEAD"
@@ -703,28 +678,20 @@ def _generate_release_notes(
with console.status("[cyan]Generating release notes..."):
try:
prev_bump_output = run_command(
prev_bump_commit = run_command(
[
"git",
"log",
"--grep=^feat: bump versions to",
"--format=%H %s",
"--format=%H",
"-n",
"2",
]
)
bump_entries = [
line for line in prev_bump_output.strip().split("\n") if line.strip()
]
commits_list = prev_bump_commit.strip().split("\n")
is_stable = not _is_prerelease(version)
prev_commit = None
for entry in bump_entries[1:]:
bump_ver = entry.split("feat: bump versions to", 1)[-1].strip()
if is_stable and _is_prerelease(bump_ver):
continue
prev_commit = entry.split()[0]
break
if prev_commit:
if len(commits_list) > 1:
prev_commit = commits_list[1]
commit_range = f"{prev_commit}..HEAD"
commits = run_command(
["git", "log", commit_range, "--pretty=format:%s"]
@@ -810,7 +777,10 @@ def _generate_release_notes(
"\n[green]✓[/green] Using generated release notes without editing"
)
is_prerelease = _is_prerelease(version)
is_prerelease = any(
indicator in version.lower()
for indicator in ["a", "b", "rc", "alpha", "beta", "dev"]
)
return release_notes, openai_client, is_prerelease
@@ -829,7 +799,7 @@ def _update_docs_and_create_pr(
The docs branch name if a PR was created, None otherwise.
"""
docs_json_path = cwd / "docs" / "docs.json"
changelog_langs: list[ChangelogLang] = ["en", "pt-BR", "ko"]
changelog_langs = ["en", "pt-BR", "ko"]
if not dry_run:
docs_files_staged: list[str] = []