Compare commits

...

7 Commits

Author SHA1 Message Date
João Moura
371e6cfd11 docs: update changelog and version for v1.12.0 (#5091) 2026-03-25 22:07:28 -03:00
João Moura
6fd70ce6e5 chore: bump version to 1.14.0 across all modules (#5090)
* chore: bump version to 1.14.0 across all modules

* chore: downgrade version to 1.12.0 across all modules
2026-03-25 22:03:37 -03:00
alex-clawd
c183b77991 fix: address Copilot review on OpenAI-compatible providers (#5042) (#5089)
Some checks failed
Build uv cache / build-cache (3.10) (push) Waiting to run
Build uv cache / build-cache (3.11) (push) Waiting to run
Build uv cache / build-cache (3.12) (push) Waiting to run
Build uv cache / build-cache (3.13) (push) Waiting to run
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
- Delegate supports_function_calling() to parent (handles o1 models via OpenRouter)
- Guard empty env vars in base_url resolution
- Fix misleading comment about model validation rules
- Remove unused MagicMock import
- Use 'is not None' for env var restoration in tests

Co-authored-by: Joao Moura <joao@crewai.com>
2026-03-25 18:22:13 -03:00
Greyson LaLonde
b5a0d6e709 docs: update changelog and version for v1.12.0a3 2026-03-26 04:17:37 +08:00
Greyson LaLonde
454156cff9 feat: bump versions to 1.12.0a3 2026-03-26 04:12:49 +08:00
Tiago Freire
d86707da3d Fix: bad credentials for traces batch push (404) (#4947)
## Summary

### Core fixes

<details>
<summary><b>Fix silent 404 cascade on trace event send</b></summary>

When `_initialize_backend_batch` failed, `trace_batch_id` was left populated with a client-generated UUID never registered server-side. All subsequent event sends hit a non-existent batch endpoint and returned 404. Now all three failure paths (None response, non-2xx status, exception) clear `trace_batch_id`.
</details>

<details>
<summary><b>Fix first-time deferred batch init silently skipped</b></summary>

First-time users have `is_tracing_enabled_in_context() = False` by design. This caused `_initialize_backend_batch` to return early without creating the batch, and `finalize_batch` to skip finalization (same guard). The first-time handler now passes `skip_context_check=True` to bypass both guards, calls `_finalize_backend_batch` directly, gates `backend_initialized` on actual success, checks `_send_events_to_backend` return status (marking batch as failed on 500), captures event count/duration/batch ID before they're consumed by send/finalize, and cleans up all singleton state via `_reset_batch_state()` on every exit path.
</details>

<details>
<summary><b>Sync <code>is_current_batch_ephemeral</code> on batch creation success</b></summary>

When the batch is successfully created on the server, `is_current_batch_ephemeral` is now synced with the actual `use_ephemeral` value used. This prevents endpoint mismatches where the batch was created on one endpoint but events and finalization were sent to a different one, resulting in 404.
</details>

<details>
<summary><b>Route <code>mark_trace_batch_as_failed</code> to correct endpoint for ephemeral batches</b></summary>

`mark_trace_batch_as_failed` always routed to the non-ephemeral endpoint (`/tracing/batches/{id}`), causing 404s when called on ephemeral batches — the same class of endpoint mismatch this PR aims to fix. Added `mark_ephemeral_trace_batch_as_failed` to `PlusAPI` and a `_mark_batch_as_failed` helper on `TraceBatchManager` that routes based on `is_current_batch_ephemeral`.
</details>

<details>
<summary><b>Gate <code>backend_initialized</code> on actual init success (non-first-time path)</b></summary>

On the non-first-time path, `backend_initialized` was set to `True` unconditionally after `_initialize_backend_batch` returned. With the new failure-path cleanup that clears `trace_batch_id`, this created an inconsistent state: `backend_initialized=True` + `trace_batch_id=None`. Now set via `self.trace_batch_id is not None`.
</details>

### Resilience improvements

<details>
<summary><b>Retry transient failures on batch creation</b></summary>

`_initialize_backend_batch` now retries up to 2 times with 200ms backoff on transient failures (None response, 5xx, network errors). Non-transient 4xx errors are not retried. The short backoff minimizes lock hold time on the non-first-time path where `_batch_ready_cv` is held.
</details>

<details>
<summary><b>Fall back to ephemeral on server auth rejection</b></summary>

When the non-ephemeral endpoint returns 401/403 (expired token, revoked credentials, key rotation), the client automatically switches to ephemeral tracing instead of losing traces. The fallback forwards `skip_context_check` and is guarded against infinite recursion — if ephemeral also fails, `trace_batch_id` is cleared normally.
</details>

<details>
<summary><b>Fix action-event race initializing batch as non-ephemeral</b></summary>

`_handle_action_event` called `batch_manager.initialize_batch()` directly, defaulting `use_ephemeral=False`. When a `DefaultEnvEvent` or `LLMCallStartedEvent` fired before `CrewKickoffStartedEvent` in the thread pool, the batch was locked in as non-ephemeral. Now routes through `_initialize_batch()` which computes `use_ephemeral` from `_check_authenticated()`.
</details>

<details>
<summary><b>Guard <code>_mark_batch_as_failed</code> against cascading network errors</b></summary>

When `_finalize_backend_batch` failed with a network error (e.g. `[Errno 54] Connection reset by peer`), the exception handler called `_mark_batch_as_failed` — which also makes an HTTP request on the same dead connection. That second failure was unhandled. Now wrapped in a try/except so it logs at debug level instead of propagating.
</details>

<details>
<summary><b>Design decision: first-time users always use ephemeral</b></summary>

First-time trace collection **always creates ephemeral batches**, regardless of authentication status. This is intentional:

1. **The first-time handler UX is built around ephemeral traces** — it displays an access code, a 24-hour expiry link, and opens the browser to the ephemeral trace viewer. Non-ephemeral batches don't produce these artifacts, so the handler would fall through to the "Local Traces Collected" fallback even when traces were successfully sent.

2. **The server handles account linking automatically** — `LinkEphemeralTracesJob` runs on user signup and migrates ephemeral traces to permanent records. Logged-in users can access their traces via their dashboard regardless.

3. **Checking auth during batch setup broke event collection** — moving `_check_authenticated()` into `_initialize_batch` caused the batch initialization to fail silently during the flow/crew start event handler, preventing all event collection. Keeping the first-time path fast and side-effect-free preserves event collection.

The auth check is deferred to the non-first-time path (second run onwards), where `is_tracing_enabled_in_context()` is `True` and the normal tracing pipeline handles everything — including the 401/403 ephemeral fallback.
</details>


### Manual tests


<details>
<summary><b>Matrix</b></summary>

| Scenario | First run | Second run |
|----------|-----------|------------|
| Logged out, fresh `.crewai_user.json` | Ephemeral trace created, URL returned | Ephemeral trace created, URL returned |
| Logged in, fresh `.crewai_user.json` | Ephemeral trace created, URL returned | Trace batch finalized, URL returned |
| Flow execution | Tested with `poem_flow` | Tested with `poem_flow` |
| Crew execution | Tested with `hitl_crew` | Tested with `hitl_crew` |
</details>
2026-03-25 16:00:05 -04:00
Greyson LaLonde
1956471086 fix: resolve multiple bugs in HITL flow system 2026-03-26 03:33:03 +08:00
27 changed files with 3108 additions and 74 deletions

View File

@@ -4,6 +4,71 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="25 مارس 2026">
## v1.12.0
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0)
## ما الذي تغير
### الميزات
- إضافة واجهة تخزين Qdrant Edge لنظام الذاكرة
- إضافة أمر docs-check لتحليل التغييرات وتوليد الوثائق مع الترجمات
- إضافة دعم اللغة العربية لسجل التغييرات وأدوات الإصدار
- إضافة ترجمة اللغة العربية الفصحى لجميع الوثائق
- إضافة أمر تسجيل الخروج في واجهة سطر الأوامر
- تنفيذ مهارات الوكيل
- تنفيذ نطاق الجذر التلقائي لعزل الذاكرة الهرمية
- تنفيذ موفري خدمات متوافقين مع OpenAI (OpenRouter، DeepSeek، Ollama، vLLM، Cerebras، Dashscope)
### إصلاح الأخطاء
- إصلاح بيانات الاعتماد السيئة لدفع دفعات التتبع (404)
- حل العديد من الأخطاء في نظام تدفق HITL
- حل أخطاء mypy في crewai-files وإضافة جميع الحزم إلى فحوصات نوع CI
- حل جميع أخطاء mypy الصارمة عبر حزمة crewai-tools
- حل جميع أخطاء mypy عبر حزمة crewai
- إصلاح حفظ الذاكرة في الوكيل
- إصلاح استخدام __router_paths__ لطرق المستمع + الموجه في FlowMeta
- رفع خطأ القيمة عند عدم دعم الملفات
- تصحيح صياغة الحجر الصحي لـ litellm في الوثائق
- استخدام فحص None بدلاً من isinstance للذاكرة في تعلم التغذية الراجعة البشرية
- تثبيت الحد الأعلى لـ litellm على آخر إصدار تم اختباره (1.82.6)
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.12.0
- إضافة CONTRIBUTING.md
- إضافة دليل لاستخدام CrewAI بدون LiteLLM
### إعادة الهيكلة
- إعادة هيكلة لتجنب تكرار تنفيذ المهام المتزامنة / غير المتزامنة وبدء التشغيل في الوكيل
- تبسيط الأنابيب الداخلية من litellm (عد الرموز، ردود النداء، اكتشاف الميزات، الأخطاء)
## المساهمون
@akaKuruma، @alex-clawd، @greysonlalonde، @iris-clawd، @joaomdmoura، @lorenzejay، @nicoferdi96
</Update>
<Update label="26 مارس 2026">
## v1.12.0a3
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
## ما الذي تغير
### إصلاحات الأخطاء
- إصلاح بيانات الاعتماد الخاطئة لدفع دفعات التتبع (404)
- حل العديد من الأخطاء في نظام تدفق HITL
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.12.0a2
## المساهمون
@akaKuruma, @greysonlalonde
</Update>
<Update label="25 مارس 2026">
## v1.12.0a2

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,71 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Mar 25, 2026">
## v1.12.0
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0)
## What's Changed
### Features
- Add Qdrant Edge storage backend for memory system
- Add docs-check command to analyze changes and generate docs with translations
- Add Arabic language support to changelog and release tooling
- Add modern standard Arabic translation of all documentation
- Add logout command in CLI
- Implement agent skills
- Implement automatic root_scope for hierarchical memory isolation
- Implement native OpenAI-compatible providers (OpenRouter, DeepSeek, Ollama, vLLM, Cerebras, Dashscope)
### Bug Fixes
- Fix bad credentials for traces batch push (404)
- Resolve multiple bugs in HITL flow system
- Resolve mypy errors in crewai-files and add all packages to CI type checks
- Resolve all strict mypy errors across crewai-tools package
- Resolve all mypy errors across crewai package
- Fix memory saving in agent
- Fix usage of __router_paths__ for listener+router methods in FlowMeta
- Raise value error on no file support
- Correct litellm quarantine wording in docs
- Use None check instead of isinstance for memory in human feedback learn
- Pin litellm upper bound to last tested version (1.82.6)
### Documentation
- Update changelog and version for v1.12.0
- Add CONTRIBUTING.md
- Add guide for using CrewAI without LiteLLM
### Refactoring
- Refactor to deduplicate sync/async task execution and kickoff in agent
- Simplify internal plumbing from litellm (token counting, callbacks, feature detection, errors)
## Contributors
@akaKuruma, @alex-clawd, @greysonlalonde, @iris-clawd, @joaomdmoura, @lorenzejay, @nicoferdi96
</Update>
<Update label="Mar 26, 2026">
## v1.12.0a3
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
## What's Changed
### Bug Fixes
- Fix bad credentials for traces batch push (404)
- Resolve multiple bugs in HITL flow system
### Documentation
- Update changelog and version for v1.12.0a2
## Contributors
@akaKuruma, @greysonlalonde
</Update>
<Update label="Mar 25, 2026">
## v1.12.0a2

View File

@@ -4,6 +4,71 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 3월 25일">
## v1.12.0
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0)
## 변경 사항
### 기능
- 메모리 시스템을 위한 Qdrant Edge 스토리지 백엔드 추가
- 변경 사항을 분석하고 번역된 문서와 함께 문서를 생성하는 docs-check 명령어 추가
- 변경 로그 및 릴리스 도구에 아랍어 지원 추가
- 모든 문서의 현대 표준 아랍어 번역 추가
- CLI에 로그아웃 명령어 추가
- 에이전트 기술 구현
- 계층적 메모리 격리를 위한 자동 root_scope 구현
- OpenAI 호환 네이티브 제공자 구현 (OpenRouter, DeepSeek, Ollama, vLLM, Cerebras, Dashscope)
### 버그 수정
- 트레이스 배치 푸시에 대한 잘못된 자격 증명 수정 (404)
- HITL 흐름 시스템의 여러 버그 해결
- crewai-files의 mypy 오류 해결 및 모든 패키지를 CI 타입 검사에 추가
- crewai-tools 패키지 전반의 모든 엄격한 mypy 오류 해결
- crewai 패키지 전반의 모든 mypy 오류 해결
- 에이전트의 메모리 절약 수정
- FlowMeta에서 listener+router 메서드의 __router_paths__ 사용 수정
- 파일 지원이 없을 때 값 오류 발생
- 문서에서 litellm 격리 단어 수정
- 인간 피드백 학습에서 메모리에 대한 isinstance 대신 None 체크 사용
- litellm의 상한을 마지막 테스트된 버전(1.82.6)으로 고정
### 문서
- v1.12.0에 대한 변경 로그 및 버전 업데이트
- CONTRIBUTING.md 추가
- LiteLLM 없이 CrewAI를 사용하는 가이드 추가
### 리팩토링
- 에이전트에서 동기/비동기 작업 실행 및 시작을 중복 제거하도록 리팩토링
- litellm의 내부 플러밍 단순화 (토큰 카운팅, 콜백, 기능 감지, 오류)
## 기여자
@akaKuruma, @alex-clawd, @greysonlalonde, @iris-clawd, @joaomdmoura, @lorenzejay, @nicoferdi96
</Update>
<Update label="2026년 3월 26일">
## v1.12.0a3
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
## 변경 사항
### 버그 수정
- 트레이스 배치 푸시에 대한 잘못된 자격 증명 수정 (404)
- HITL 흐름 시스템의 여러 버그 해결
### 문서
- v1.12.0a2에 대한 변경 로그 및 버전 업데이트
## 기여자
@akaKuruma, @greysonlalonde
</Update>
<Update label="2026년 3월 25일">
## v1.12.0a2

View File

@@ -4,6 +4,71 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="25 mar 2026">
## v1.12.0
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0)
## O que Mudou
### Funcionalidades
- Adicionar backend de armazenamento Qdrant Edge para sistema de memória
- Adicionar comando docs-check para analisar mudanças e gerar documentos com traduções
- Adicionar suporte ao idioma árabe para changelog e ferramentas de lançamento
- Adicionar tradução em árabe padrão moderno de toda a documentação
- Adicionar comando de logout na CLI
- Implementar habilidades de agente
- Implementar root_scope automático para isolamento hierárquico de memória
- Implementar provedores nativos compatíveis com OpenAI (OpenRouter, DeepSeek, Ollama, vLLM, Cerebras, Dashscope)
### Correções de Bugs
- Corrigir credenciais inválidas para envio em lote de rastros (404)
- Resolver múltiplos bugs no sistema de fluxo HITL
- Resolver erros do mypy em crewai-files e adicionar todos os pacotes às verificações de tipo do CI
- Resolver todos os erros estritos do mypy no pacote crewai-tools
- Resolver todos os erros do mypy no pacote crewai
- Corrigir economia de memória no agente
- Corrigir uso de __router_paths__ para métodos listener+router em FlowMeta
- Levantar erro de valor em caso de suporte a arquivos inexistente
- Corrigir a redação da quarentena do litellm na documentação
- Usar verificação de None em vez de isinstance para memória no aprendizado de feedback humano
- Fixar limite superior do litellm na última versão testada (1.82.6)
### Documentação
- Atualizar changelog e versão para v1.12.0
- Adicionar CONTRIBUTING.md
- Adicionar guia para usar CrewAI sem LiteLLM
### Refatoração
- Refatorar para desduplicar execução de tarefas síncronas/assíncronas e início no agente
- Simplificar a infraestrutura interna do litellm (contagem de tokens, callbacks, detecção de recursos, erros)
## Contribuidores
@akaKuruma, @alex-clawd, @greysonlalonde, @iris-clawd, @joaomdmoura, @lorenzejay, @nicoferdi96
</Update>
<Update label="26 mar 2026">
## v1.12.0a3
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
## O que Mudou
### Correções de Bugs
- Corrigir credenciais inválidas para envio em lote de rastros (404)
- Resolver múltiplos bugs no sistema de fluxo HITL
### Documentação
- Atualizar changelog e versão para v1.12.0a2
## Contributors
@akaKuruma, @greysonlalonde
</Update>
<Update label="25 mar 2026">
## v1.12.0a2

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.12.0a2"
__version__ = "1.12.0"

View File

@@ -11,7 +11,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.12.0a2",
"crewai==1.12.0a3",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -309,4 +309,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.12.0a2"
__version__ = "1.12.0"

View File

@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.12.0a2",
"crewai-tools==1.12.0a3",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -42,7 +42,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.12.0a2"
__version__ = "1.12.0"
_telemetry_submitted = False

View File

@@ -196,6 +196,16 @@ class PlusAPI:
timeout=30,
)
def mark_ephemeral_trace_batch_as_failed(
self, trace_batch_id: str, error_message: str
) -> httpx.Response:
return self._make_request(
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}",
json={"status": "failed", "failure_reason": error_message},
timeout=30,
)
def get_mcp_configs(self, slugs: list[str]) -> httpx.Response:
"""Get MCP server configurations for the given slugs."""
return self._make_request(

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.12.0a2"
"crewai[tools]==1.12.0a3"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.12.0a2"
"crewai[tools]==1.12.0a3"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.12.0a2"
"crewai[tools]==1.12.0a3"
]
[tool.crewai]

View File

@@ -1,3 +1,4 @@
from datetime import datetime, timezone
import logging
import uuid
import webbrowser
@@ -100,20 +101,50 @@ class FirstTimeTraceHandler:
user_context=user_context,
execution_metadata=execution_metadata,
use_ephemeral=True,
skip_context_check=True,
)
if not self.batch_manager.trace_batch_id:
self._gracefully_fail(
"Backend batch creation failed, cannot send events."
)
self._reset_batch_state()
return
self.batch_manager.backend_initialized = True
if self.batch_manager.event_buffer:
self.batch_manager._send_events_to_backend()
# Capture values before send/finalize consume them
events_count = len(self.batch_manager.event_buffer)
batch_id = self.batch_manager.trace_batch_id
# Read duration non-destructively — _finalize_backend_batch will consume it
start_time = self.batch_manager.execution_start_times.get("execution")
duration_ms = (
int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
if start_time
else 0
)
self.batch_manager.finalize_batch()
if self.batch_manager.event_buffer:
send_status = self.batch_manager._send_events_to_backend()
if send_status == 500 and self.batch_manager.trace_batch_id:
self.batch_manager._mark_batch_as_failed(
self.batch_manager.trace_batch_id,
"Error sending events to backend",
)
self._reset_batch_state()
return
self.batch_manager._finalize_backend_batch(events_count)
self.ephemeral_url = self.batch_manager.ephemeral_trace_url
if not self.ephemeral_url:
self._show_local_trace_message()
self._show_local_trace_message(events_count, duration_ms, batch_id)
self._reset_batch_state()
except Exception as e:
self._gracefully_fail(f"Backend initialization failed: {e}")
self._reset_batch_state()
def _display_ephemeral_trace_link(self) -> None:
"""Display the ephemeral trace link to the user and automatically open browser."""
@@ -185,6 +216,19 @@ To enable tracing later, do any one of these:
console.print(panel)
console.print()
def _reset_batch_state(self) -> None:
"""Reset batch manager state to allow future executions to re-initialize."""
if not self.batch_manager:
return
self.batch_manager.batch_owner_type = None
self.batch_manager.batch_owner_id = None
self.batch_manager.current_batch = None
self.batch_manager.event_buffer.clear()
self.batch_manager.trace_batch_id = None
self.batch_manager.is_current_batch_ephemeral = False
self.batch_manager.backend_initialized = False
self.batch_manager._cleanup_batch_data()
def _gracefully_fail(self, error_message: str) -> None:
"""Handle errors gracefully without disrupting user experience."""
console = Console()
@@ -192,7 +236,9 @@ To enable tracing later, do any one of these:
logger.debug(f"First-time trace error: {error_message}")
def _show_local_trace_message(self) -> None:
def _show_local_trace_message(
self, events_count: int = 0, duration_ms: int = 0, batch_id: str | None = None
) -> None:
"""Show message when traces were collected locally but couldn't be uploaded."""
if self.batch_manager is None:
return
@@ -203,9 +249,9 @@ To enable tracing later, do any one of these:
📊 Your execution traces were collected locally!
Unfortunately, we couldn't upload them to the server right now, but here's what we captured:
{len(self.batch_manager.event_buffer)} trace events
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
• Batch ID: {self.batch_manager.trace_batch_id}
{events_count} trace events
• Execution duration: {duration_ms}ms
• Batch ID: {batch_id}
✅ Tracing has been enabled for future runs!
Your preference has been saved. Future Crew/Flow executions will automatically collect traces.

View File

@@ -2,6 +2,7 @@ from dataclasses import dataclass, field
from datetime import datetime, timezone
from logging import getLogger
from threading import Condition, Lock
import time
from typing import Any
import uuid
@@ -98,7 +99,7 @@ class TraceBatchManager:
self._initialize_backend_batch(
user_context, execution_metadata, use_ephemeral
)
self.backend_initialized = True
self.backend_initialized = self.trace_batch_id is not None
self._batch_ready_cv.notify_all()
return self.current_batch
@@ -108,14 +109,15 @@ class TraceBatchManager:
user_context: dict[str, str],
execution_metadata: dict[str, Any],
use_ephemeral: bool = False,
skip_context_check: bool = False,
) -> None:
"""Send batch initialization to backend"""
if not is_tracing_enabled_in_context():
return
if not skip_context_check and not is_tracing_enabled_in_context():
return None
if not self.plus_api or not self.current_batch:
return
return None
try:
payload = {
@@ -142,19 +144,53 @@ class TraceBatchManager:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
payload["user_identifier"] = get_user_id()
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
max_retries = 1
response = None
try:
for attempt in range(max_retries + 1):
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
if response is not None and response.status_code < 500:
break
if attempt < max_retries:
logger.debug(
f"Trace batch init attempt {attempt + 1} failed "
f"(status={response.status_code if response else 'None'}), retrying..."
)
time.sleep(0.2)
except Exception as e:
logger.warning(
f"Error initializing trace batch: {e}. Continuing without tracing."
)
self.trace_batch_id = None
return None
if response is None:
logger.warning(
"Trace batch initialization failed gracefully. Continuing without tracing."
)
return
self.trace_batch_id = None
return None
# Fall back to ephemeral on auth failure (expired/revoked token)
if response.status_code in [401, 403] and not use_ephemeral:
logger.warning(
"Auth rejected by server, falling back to ephemeral tracing."
)
self.is_current_batch_ephemeral = True
return self._initialize_backend_batch(
user_context,
execution_metadata,
use_ephemeral=True,
skip_context_check=skip_context_check,
)
if response.status_code in [201, 200]:
self.is_current_batch_ephemeral = use_ephemeral
response_data = response.json()
self.trace_batch_id = (
response_data["trace_id"]
@@ -165,11 +201,22 @@ class TraceBatchManager:
logger.warning(
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
)
self.trace_batch_id = None
except Exception as e:
logger.warning(
f"Error initializing trace batch: {e}. Continuing without tracing."
)
self.trace_batch_id = None
def _mark_batch_as_failed(self, trace_batch_id: str, error_message: str) -> None:
"""Mark a trace batch as failed, routing to the correct endpoint."""
if self.is_current_batch_ephemeral:
self.plus_api.mark_ephemeral_trace_batch_as_failed(
trace_batch_id, error_message
)
else:
self.plus_api.mark_trace_batch_as_failed(trace_batch_id, error_message)
def begin_event_processing(self) -> None:
"""Mark that an event handler started processing (for synchronization)."""
@@ -260,7 +307,7 @@ class TraceBatchManager:
logger.error(
"Event handler timeout - marking batch as failed due to incomplete events"
)
self.plus_api.mark_trace_batch_as_failed(
self._mark_batch_as_failed(
self.trace_batch_id,
"Timeout waiting for event handlers - events incomplete",
)
@@ -284,7 +331,7 @@ class TraceBatchManager:
events_sent_to_backend_status = self._send_events_to_backend()
self.event_buffer = original_buffer
if events_sent_to_backend_status == 500 and self.trace_batch_id:
self.plus_api.mark_trace_batch_as_failed(
self._mark_batch_as_failed(
self.trace_batch_id, "Error sending events to backend"
)
return None
@@ -364,13 +411,16 @@ class TraceBatchManager:
logger.error(
f"❌ Failed to finalize trace batch: {response.status_code} - {response.text}"
)
self.plus_api.mark_trace_batch_as_failed(
self.trace_batch_id, response.text
)
self._mark_batch_as_failed(self.trace_batch_id, response.text)
except Exception as e:
logger.error(f"❌ Error finalizing trace batch: {e}")
self.plus_api.mark_trace_batch_as_failed(self.trace_batch_id, str(e))
try:
self._mark_batch_as_failed(self.trace_batch_id, str(e))
except Exception:
logger.debug(
"Could not mark trace batch as failed (network unavailable)"
)
def _cleanup_batch_data(self) -> None:
"""Clean up batch data after successful finalization to free memory"""

View File

@@ -235,8 +235,11 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowStartedEvent)
def on_flow_started(source: Any, event: FlowStartedEvent) -> None:
if not self.batch_manager.is_batch_initialized():
self._initialize_flow_batch(source, event)
# Always call _initialize_flow_batch to claim ownership.
# If batch was already initialized by a concurrent action event
# (race condition), initialize_batch() returns early but
# batch_owner_type is still correctly set to "flow".
self._initialize_flow_batch(source, event)
self._handle_trace_event("flow_started", source, event)
@event_bus.on(MethodExecutionStartedEvent)
@@ -266,7 +269,12 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
if not self.batch_manager.is_batch_initialized():
if self.batch_manager.batch_owner_type != "flow":
# Always call _initialize_crew_batch to claim ownership.
# If batch was already initialized by a concurrent action event
# (race condition with DefaultEnvEvent), initialize_batch() returns
# early but batch_owner_type is still correctly set to "crew".
# Skip only when a parent flow already owns the batch.
self._initialize_crew_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@@ -772,7 +780,7 @@ class TraceCollectionListener(BaseEventListener):
"crew_name": getattr(source, "name", "Unknown Crew"),
"crewai_version": get_crewai_version(),
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
self._initialize_batch(user_context, execution_metadata)
self.batch_manager.begin_event_processing()
try:

View File

@@ -127,6 +127,9 @@ To update, run: uv sync --upgrade-package crewai"""
def _show_tracing_disabled_message_if_needed(self) -> None:
"""Show tracing disabled message if tracing is not enabled."""
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.events.listeners.tracing.utils import (
has_user_declined_tracing,
is_tracing_enabled_in_context,
@@ -136,6 +139,12 @@ To update, run: uv sync --upgrade-package crewai"""
if should_suppress_tracing_messages():
return
# Don't show "disabled" message when the first-time handler will show
# the trace prompt after execution completes (avoids confusing mid-flow messages)
listener = TraceCollectionListener._instance # type: ignore[misc]
if listener and listener.first_time_handler.is_first_time:
return
if not is_tracing_enabled_in_context():
if has_user_declined_tracing():
message = """Info: Tracing is disabled.

View File

@@ -182,7 +182,7 @@ class ConsoleProvider:
console.print(message, style="yellow")
console.print()
response = input(">>> \n").strip()
response = input(">>> ").strip()
else:
response = input(f"{message} ").strip()

View File

@@ -63,6 +63,32 @@ class PendingFeedbackContext:
llm: dict[str, Any] | str | None = None
requested_at: datetime = field(default_factory=datetime.now)
@staticmethod
def _make_json_safe(value: Any) -> Any:
"""Convert a value to a JSON-serializable form.
Handles Pydantic models, dataclasses, and arbitrary objects by
progressively falling back to string representation.
"""
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, (list, tuple)):
return [PendingFeedbackContext._make_json_safe(v) for v in value]
if isinstance(value, dict):
return {
k: PendingFeedbackContext._make_json_safe(v) for k, v in value.items()
}
from pydantic import BaseModel
if isinstance(value, BaseModel):
return value.model_dump(mode="json")
import dataclasses
if dataclasses.is_dataclass(value) and not isinstance(value, type):
return PendingFeedbackContext._make_json_safe(dataclasses.asdict(value))
return str(value)
def to_dict(self) -> dict[str, Any]:
"""Serialize context to a dictionary for persistence.
@@ -73,11 +99,11 @@ class PendingFeedbackContext:
"flow_id": self.flow_id,
"flow_class": self.flow_class,
"method_name": self.method_name,
"method_output": self.method_output,
"method_output": self._make_json_safe(self.method_output),
"message": self.message,
"emit": self.emit,
"default_outcome": self.default_outcome,
"metadata": self.metadata,
"metadata": self._make_json_safe(self.metadata),
"llm": self.llm,
"requested_at": self.requested_at.isoformat(),
}

View File

@@ -1223,9 +1223,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Mark that we're resuming execution
instance._is_execution_resuming = True
# Mark the method as completed (it ran before pausing)
instance._completed_methods.add(FlowMethodName(pending_context.method_name))
return instance
@property
@@ -1380,7 +1377,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
self.human_feedback_history.append(result)
self.last_human_feedback = result
# Clear pending context after processing
self._completed_methods.add(FlowMethodName(context.method_name))
self._pending_feedback_context = None
# Clear pending feedback from persistence
@@ -1403,7 +1401,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
self._is_execution_resuming = False
final_result: Any = result
if emit and collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result.outcome = collapsed_outcome
try:
if emit and collapsed_outcome:
self._method_outputs.append(collapsed_outcome)
@@ -1421,7 +1422,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
self._pending_feedback_context = e.context
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
@@ -1455,6 +1457,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
return e
raise
final_result = self._method_outputs[-1] if self._method_outputs else result
# Emit flow finished
crewai_event_bus.emit(
self,
@@ -2314,7 +2318,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
if isinstance(e, HumanFeedbackPending):
e.context.method_name = method_name
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
@@ -3133,10 +3136,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
if outcome.lower() == response_clean.lower():
return outcome
# Partial match
# Partial match (longest wins, first on length ties)
response_lower = response_clean.lower()
best_outcome: str | None = None
best_len = -1
for outcome in outcomes:
if outcome.lower() in response_clean.lower():
return outcome
if outcome.lower() in response_lower and len(outcome) > best_len:
best_outcome = outcome
best_len = len(outcome)
if best_outcome is not None:
return best_outcome
# Fallback to first outcome
logger.warning(

View File

@@ -116,10 +116,11 @@ def _deserialize_llm_from_context(
return LLM(model=llm_data)
if isinstance(llm_data, dict):
model = llm_data.pop("model", None)
data = dict(llm_data)
model = data.pop("model", None)
if not model:
return None
return LLM(model=model, **llm_data)
return LLM(model=model, **data)
return None
@@ -450,12 +451,12 @@ def human_feedback(
# -- Core feedback helpers ------------------------------------
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
"""Request feedback using provider or default console."""
def _build_feedback_context(
flow_instance: Flow[Any], method_output: Any
) -> tuple[Any, Any]:
"""Build the PendingFeedbackContext and resolve the effective provider."""
from crewai.flow.async_feedback.types import PendingFeedbackContext
# Build context for provider
# Use flow_id property which handles both dict and BaseModel states
context = PendingFeedbackContext(
flow_id=flow_instance.flow_id or "unknown",
flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}",
@@ -468,15 +469,53 @@ def human_feedback(
llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm),
)
# Determine effective provider:
effective_provider = provider
if effective_provider is None:
from crewai.flow.flow_config import flow_config
effective_provider = flow_config.hitl_provider
return context, effective_provider
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
"""Request feedback using provider or default console (sync)."""
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
if effective_provider is not None:
return effective_provider.request_feedback(context, flow_instance)
feedback_result = effective_provider.request_feedback(
context, flow_instance
)
if asyncio.iscoroutine(feedback_result):
raise TypeError(
f"Provider {type(effective_provider).__name__}.request_feedback() "
"returned a coroutine in a sync flow method. Use an async flow "
"method or a synchronous provider."
)
return str(feedback_result)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
async def _request_feedback_async(
flow_instance: Flow[Any], method_output: Any
) -> str:
"""Request feedback, awaiting the provider if it returns a coroutine."""
context, effective_provider = _build_feedback_context(
flow_instance, method_output
)
if effective_provider is not None:
feedback_result = effective_provider.request_feedback(
context, flow_instance
)
if asyncio.iscoroutine(feedback_result):
return str(await feedback_result)
return str(feedback_result)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
@@ -524,10 +563,11 @@ def human_feedback(
flow_instance.human_feedback_history.append(result)
flow_instance.last_human_feedback = result
# Return based on mode
if emit:
# Return outcome for routing
return collapsed_outcome # type: ignore[return-value]
if collapsed_outcome is None:
collapsed_outcome = default_outcome or emit[0]
result.outcome = collapsed_outcome
return collapsed_outcome
return result
if asyncio.iscoroutinefunction(func):
@@ -540,7 +580,7 @@ def human_feedback(
if learn and getattr(self, "memory", None) is not None:
method_output = _pre_review_with_lessons(self, method_output)
raw_feedback = _request_feedback(self, method_output)
raw_feedback = await _request_feedback_async(self, method_output)
result = _process_feedback(self, method_output, raw_feedback)
# Distill: extract lessons from output + feedback, store in memory

View File

@@ -483,8 +483,8 @@ class LLM(BaseLLM):
for prefix in ["gpt-", "gpt-35-", "o1", "o3", "o4", "azure-"]
)
# OpenAI-compatible providers - accept any model name since these
# providers host many different models with varied naming conventions
# OpenAI-compatible providers - most accept any model name, but some
# (DeepSeek, Dashscope) restrict to their own model prefixes
if provider == "deepseek":
return model_lower.startswith("deepseek")

View File

@@ -239,7 +239,8 @@ class OpenAICompatibleCompletion(OpenAICompletion):
if base_url:
resolved = base_url
elif config.base_url_env:
resolved = os.getenv(config.base_url_env, config.base_url)
env_value = os.getenv(config.base_url_env)
resolved = env_value if env_value else config.base_url
else:
resolved = config.base_url
@@ -274,9 +275,11 @@ class OpenAICompatibleCompletion(OpenAICompletion):
def supports_function_calling(self) -> bool:
"""Check if the provider supports function calling.
All modern OpenAI-compatible providers support function calling.
Delegates to the parent OpenAI implementation which handles
edge cases like o1 models (which may be routed through
OpenRouter or other compatible providers).
Returns:
True, as all supported providers have function calling support.
Whether the model supports function calling.
"""
return True
return super().supports_function_calling()

View File

@@ -1,7 +1,7 @@
"""Tests for OpenAI-compatible providers."""
import os
from unittest.mock import MagicMock, patch
from unittest.mock import patch
import pytest
@@ -133,7 +133,7 @@ class TestOpenAICompatibleCompletion:
with pytest.raises(ValueError, match="API key required"):
OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek")
finally:
if original:
if original is not None:
os.environ[env_key] = original
def test_api_key_from_env(self):

View File

@@ -7,6 +7,7 @@ from crewai.events.listeners.tracing.first_time_trace_handler import (
FirstTimeTraceHandler,
)
from crewai.events.listeners.tracing.trace_batch_manager import (
TraceBatch,
TraceBatchManager,
)
from crewai.events.listeners.tracing.trace_listener import (
@@ -657,6 +658,16 @@ class TestTraceListenerSetup:
trace_listener.first_time_handler.collected_events = True
mock_batch_response = MagicMock()
mock_batch_response.status_code = 201
mock_batch_response.json.return_value = {
"trace_id": "mock-trace-id",
"ephemeral_trace_id": "mock-ephemeral-trace-id",
"access_code": "TRACE-mock",
}
mock_events_response = MagicMock()
mock_events_response.status_code = 200
with (
patch.object(
trace_listener.first_time_handler,
@@ -666,6 +677,40 @@ class TestTraceListenerSetup:
patch.object(
trace_listener.first_time_handler, "_display_ephemeral_trace_link"
) as mock_display_link,
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_batch_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"send_ephemeral_trace_events",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager.plus_api,
"finalize_ephemeral_trace_batch",
return_value=mock_events_response,
),
patch.object(
trace_listener.batch_manager,
"_cleanup_batch_data",
),
):
crew.kickoff()
wait_for_event_handlers()
@@ -918,3 +963,676 @@ class TestTraceListenerSetup:
mock_init.assert_called_once()
payload = mock_init.call_args[0][0]
assert "user_identifier" not in payload
class TestTraceBatchIdClearedOnFailure:
"""Tests: trace_batch_id is cleared when _initialize_backend_batch fails."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user)."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96
bm.is_current_batch_ephemeral = True
return bm
def test_trace_batch_id_cleared_on_exception(self):
"""trace_batch_id must be None when the API call raises an exception."""
bm = self._make_batch_manager()
assert bm.trace_batch_id is not None
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=ConnectionError("network down"),
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
def test_trace_batch_id_set_on_success(self):
"""trace_batch_id must be set from the server response on success."""
bm = self._make_batch_manager()
server_id = "server-ephemeral-trace-id-999"
mock_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
def test_send_events_skipped_when_trace_batch_id_none(self):
"""_send_events_to_backend must return early when trace_batch_id is None."""
bm = self._make_batch_manager()
bm.trace_batch_id = None
bm.event_buffer = [MagicMock()] # has events
with patch.object(
bm.plus_api, "send_ephemeral_trace_events"
) as mock_send:
result = bm._send_events_to_backend()
assert result == 500
mock_send.assert_not_called()
class TestInitializeBackendBatchRetry:
"""Tests for retry logic in _initialize_backend_batch."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
return bm
def test_retries_on_none_response_then_succeeds(self):
"""Retries when API returns None, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-retry"
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[None, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
mock_sleep.assert_called_once_with(0.2)
def test_retries_on_5xx_then_succeeds(self):
"""Retries on 500 server error, succeeds on second attempt."""
bm = self._make_batch_manager()
server_id = "server-id-after-5xx"
error_response = MagicMock(status_code=500, text="Internal Server Error")
success_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=[error_response, success_response],
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id == server_id
assert mock_init.call_count == 2
def test_no_retry_on_exception(self):
"""Exceptions (e.g. timeout, connection error) abort immediately without retry."""
bm = self._make_batch_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
side_effect=ConnectionError("network down"),
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 1
mock_sleep.assert_not_called()
def test_no_retry_on_4xx(self):
"""Does NOT retry on 422 — client error is not transient."""
bm = self._make_batch_manager()
error_response = MagicMock(status_code=422, text="Unprocessable Entity")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=error_response,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 1
mock_sleep.assert_not_called()
def test_exhausts_retries_then_clears_batch_id(self):
"""After all retries fail, trace_batch_id is None."""
bm = self._make_batch_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None,
) as mock_init,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
assert mock_init.call_count == 2 # initial + 1 retry
class TestFirstTimeHandlerBackendInitGuard:
"""Tests: backend_initialized gated on actual batch creation success."""
def _make_handler_with_manager(self):
"""Create a FirstTimeTraceHandler wired to a TraceBatchManager."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_backend_initialized_true_on_success(self):
"""Events are sent when batch creation succeeds, then state is cleaned up."""
handler, bm = self._make_handler_with_manager()
server_id = "server-id-abc"
mock_init_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
mock_send_response = MagicMock(status_code=200)
trace_batch_id_during_send = None
def capture_send(*args, **kwargs):
nonlocal trace_batch_id_during_send
trace_batch_id_during_send = bm.trace_batch_id
return mock_send_response
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_init_response,
),
patch.object(
bm.plus_api,
"send_ephemeral_trace_events",
side_effect=capture_send,
),
patch.object(bm, "_finalize_backend_batch"),
):
bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))]
handler._initialize_backend_and_send_events()
# trace_batch_id was set correctly during send
assert trace_batch_id_during_send == server_id
# State cleaned up after completion (singleton reuse)
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
assert bm.current_batch is None
def test_backend_initialized_false_on_failure(self):
"""backend_initialized stays False and events are NOT sent when batch creation fails."""
handler, bm = self._make_handler_with_manager()
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=None, # server call fails
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
def test_backend_initialized_false_on_non_2xx(self):
"""backend_initialized stays False when server returns non-2xx."""
handler, bm = self._make_handler_with_manager()
mock_response = MagicMock(status_code=500, text="Internal Server Error")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=mock_response,
),
patch.object(bm, "_send_events_to_backend") as mock_send,
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
patch.object(handler, "_gracefully_fail") as mock_fail,
):
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
assert bm.backend_initialized is False
assert bm.trace_batch_id is None
mock_send.assert_not_called()
mock_finalize.assert_not_called()
mock_fail.assert_called_once()
class TestFirstTimeHandlerAlwaysEphemeral:
"""Tests that first-time handler always uses ephemeral with skip_context_check."""
def _make_handler_with_manager(self):
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = True
handler = FirstTimeTraceHandler()
handler.is_first_time = True
handler.collected_events = True
handler.batch_manager = bm
return handler, bm
def test_deferred_init_uses_ephemeral_and_skip_context_check(self):
"""Deferred backend init always uses ephemeral=True and skip_context_check=True."""
handler, bm = self._make_handler_with_manager()
with (
patch.object(bm, "_initialize_backend_batch") as mock_init,
patch.object(bm, "_send_events_to_backend"),
patch.object(bm, "_finalize_backend_batch"),
):
mock_init.side_effect = lambda **kwargs: None
bm.event_buffer = [MagicMock()]
handler._initialize_backend_and_send_events()
mock_init.assert_called_once()
assert mock_init.call_args.kwargs["use_ephemeral"] is True
assert mock_init.call_args.kwargs["skip_context_check"] is True
class TestAuthFailbackToEphemeral:
"""Tests for ephemeral fallback when server rejects auth (401/403)."""
def _make_batch_manager(self):
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.current_batch = TraceBatch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew", "crew_name": "test"},
)
bm.trace_batch_id = bm.current_batch.batch_id
bm.is_current_batch_ephemeral = False # authenticated path
return bm
def test_401_non_ephemeral_falls_back_to_ephemeral(self):
"""A 401 on the non-ephemeral endpoint should retry as ephemeral."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-id"
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
mock_ephemeral.assert_called_once()
def test_403_non_ephemeral_falls_back_to_ephemeral(self):
"""A 403 on the non-ephemeral endpoint should also fall back."""
bm = self._make_batch_manager()
server_id = "ephemeral-fallback-403"
forbidden = MagicMock(status_code=403, text="Forbidden")
ephemeral_success = MagicMock(
status_code=201,
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
)
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=forbidden,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_success,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id == server_id
assert bm.is_current_batch_ephemeral is True
def test_401_on_ephemeral_does_not_recurse(self):
"""A 401 on the ephemeral endpoint should NOT try to fall back again."""
bm = self._make_batch_manager()
bm.is_current_batch_ephemeral = True
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=auth_rejected,
) as mock_ephemeral,
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=True,
)
assert bm.trace_batch_id is None
# Called only once — no recursive fallback
mock_ephemeral.assert_called()
def test_401_fallback_ephemeral_also_fails(self):
"""If ephemeral fallback also fails, trace_batch_id is cleared."""
bm = self._make_batch_manager()
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
ephemeral_fail = MagicMock(status_code=422, text="Validation failed")
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch.object(
bm.plus_api,
"initialize_trace_batch",
return_value=auth_rejected,
),
patch.object(
bm.plus_api,
"initialize_ephemeral_trace_batch",
return_value=ephemeral_fail,
),
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
):
bm._initialize_backend_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
use_ephemeral=False,
)
assert bm.trace_batch_id is None
class TestMarkBatchAsFailedRouting:
"""Tests: _mark_batch_as_failed routes to the correct endpoint."""
def _make_batch_manager(self, ephemeral: bool = False):
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
):
bm = TraceBatchManager()
bm.is_current_batch_ephemeral = ephemeral
return bm
def test_routes_to_ephemeral_endpoint_when_ephemeral(self):
"""Ephemeral batches must use mark_ephemeral_trace_batch_as_failed."""
bm = self._make_batch_manager(ephemeral=True)
with patch.object(
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
) as mock_ephemeral, patch.object(
bm.plus_api, "mark_trace_batch_as_failed"
) as mock_non_ephemeral:
bm._mark_batch_as_failed("batch-123", "some error")
mock_ephemeral.assert_called_once_with("batch-123", "some error")
mock_non_ephemeral.assert_not_called()
def test_routes_to_non_ephemeral_endpoint_when_not_ephemeral(self):
"""Non-ephemeral batches must use mark_trace_batch_as_failed."""
bm = self._make_batch_manager(ephemeral=False)
with patch.object(
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
) as mock_ephemeral, patch.object(
bm.plus_api, "mark_trace_batch_as_failed"
) as mock_non_ephemeral:
bm._mark_batch_as_failed("batch-456", "another error")
mock_non_ephemeral.assert_called_once_with("batch-456", "another error")
mock_ephemeral.assert_not_called()
class TestBackendInitializedGatedOnSuccess:
"""Tests: backend_initialized reflects actual init success on non-first-time path."""
def test_backend_initialized_true_on_success(self):
"""backend_initialized is True when _initialize_backend_batch succeeds."""
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
return_value=False,
),
patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
),
):
bm = TraceBatchManager()
mock_response = MagicMock(
status_code=201,
json=MagicMock(return_value={"trace_id": "server-id"}),
)
with patch.object(
bm.plus_api, "initialize_trace_batch", return_value=mock_response
):
bm.initialize_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
)
assert bm.backend_initialized is True
assert bm.trace_batch_id == "server-id"
def test_backend_initialized_false_on_failure(self):
"""backend_initialized is False when _initialize_backend_batch fails."""
with (
patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
),
patch(
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
return_value=False,
),
patch(
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
return_value="mock_token",
),
):
bm = TraceBatchManager()
with patch.object(
bm.plus_api, "initialize_trace_batch", return_value=None
):
bm.initialize_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "crew"},
)
assert bm.backend_initialized is False
assert bm.trace_batch_id is None

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.12.0a2"
__version__ = "1.12.0"