mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-25 21:28:14 +00:00
Compare commits
1 Commits
1.12.0a3
...
docs/amp-t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0cb97e2bc2 |
@@ -4,26 +4,6 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="26 مارس 2026">
|
||||
## v1.12.0a3
|
||||
|
||||
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
|
||||
|
||||
## ما الذي تغير
|
||||
|
||||
### إصلاحات الأخطاء
|
||||
- إصلاح بيانات الاعتماد الخاطئة لدفع دفعات التتبع (404)
|
||||
- حل العديد من الأخطاء في نظام تدفق HITL
|
||||
|
||||
### الوثائق
|
||||
- تحديث سجل التغييرات والإصدار لـ v1.12.0a2
|
||||
|
||||
## المساهمون
|
||||
|
||||
@akaKuruma, @greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="25 مارس 2026">
|
||||
## v1.12.0a2
|
||||
|
||||
|
||||
@@ -462,6 +462,7 @@
|
||||
"en/enterprise/guides/deploy-to-amp",
|
||||
"en/enterprise/guides/private-package-registry",
|
||||
"en/enterprise/guides/kickoff-crew",
|
||||
"en/enterprise/guides/training-crews",
|
||||
"en/enterprise/guides/update-crew",
|
||||
"en/enterprise/guides/enable-crew-studio",
|
||||
"en/enterprise/guides/capture_telemetry_logs",
|
||||
@@ -930,6 +931,7 @@
|
||||
"en/enterprise/guides/deploy-to-amp",
|
||||
"en/enterprise/guides/private-package-registry",
|
||||
"en/enterprise/guides/kickoff-crew",
|
||||
"en/enterprise/guides/training-crews",
|
||||
"en/enterprise/guides/update-crew",
|
||||
"en/enterprise/guides/enable-crew-studio",
|
||||
"en/enterprise/guides/capture_telemetry_logs",
|
||||
@@ -1398,6 +1400,7 @@
|
||||
"en/enterprise/guides/deploy-to-amp",
|
||||
"en/enterprise/guides/private-package-registry",
|
||||
"en/enterprise/guides/kickoff-crew",
|
||||
"en/enterprise/guides/training-crews",
|
||||
"en/enterprise/guides/update-crew",
|
||||
"en/enterprise/guides/enable-crew-studio",
|
||||
"en/enterprise/guides/capture_telemetry_logs",
|
||||
@@ -1867,6 +1870,7 @@
|
||||
"en/enterprise/guides/deploy-to-amp",
|
||||
"en/enterprise/guides/private-package-registry",
|
||||
"en/enterprise/guides/kickoff-crew",
|
||||
"en/enterprise/guides/training-crews",
|
||||
"en/enterprise/guides/update-crew",
|
||||
"en/enterprise/guides/enable-crew-studio",
|
||||
"en/enterprise/guides/capture_telemetry_logs",
|
||||
|
||||
@@ -4,26 +4,6 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="Mar 26, 2026">
|
||||
## v1.12.0a3
|
||||
|
||||
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
|
||||
|
||||
## What's Changed
|
||||
|
||||
### Bug Fixes
|
||||
- Fix bad credentials for traces batch push (404)
|
||||
- Resolve multiple bugs in HITL flow system
|
||||
|
||||
### Documentation
|
||||
- Update changelog and version for v1.12.0a2
|
||||
|
||||
## Contributors
|
||||
|
||||
@akaKuruma, @greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="Mar 25, 2026">
|
||||
## v1.12.0a2
|
||||
|
||||
|
||||
132
docs/en/enterprise/guides/training-crews.mdx
Normal file
132
docs/en/enterprise/guides/training-crews.mdx
Normal file
@@ -0,0 +1,132 @@
|
||||
---
|
||||
title: "Training Crews"
|
||||
description: "Train your deployed crews directly from the CrewAI AMP platform to improve agent performance over time"
|
||||
icon: "dumbbell"
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
Training lets you improve crew performance by running iterative training sessions directly from the **Training** tab in CrewAI AMP. The platform uses **auto-train mode** — it handles the iterative process automatically, unlike CLI training which requires interactive human feedback per iteration.
|
||||
|
||||
After training completes, CrewAI evaluates agent outputs and consolidates feedback into actionable suggestions for each agent. These suggestions are then applied to future crew runs to improve output quality.
|
||||
|
||||
<Tip>
|
||||
For details on how CrewAI training works under the hood, see the [Training Concepts](/en/concepts/training) page.
|
||||
</Tip>
|
||||
|
||||
## Prerequisites
|
||||
|
||||
<CardGroup cols={2}>
|
||||
<Card title="Active deployment" icon="rocket">
|
||||
You need a CrewAI AMP account with an active deployment in **Ready** status (Crew type).
|
||||
</Card>
|
||||
<Card title="Run permission" icon="key">
|
||||
Your account must have run permission for the deployment you want to train.
|
||||
</Card>
|
||||
</CardGroup>
|
||||
|
||||
## How to train a crew
|
||||
|
||||
<Steps>
|
||||
<Step title="Open the Training tab">
|
||||
Navigate to **Deployments**, click your deployment, then select the **Training** tab.
|
||||
</Step>
|
||||
|
||||
<Step title="Enter a training name">
|
||||
Provide a **Training Name** — this becomes the `.pkl` filename used to store training results. For example, "Expert Mode Training" produces `expert_mode_training.pkl`.
|
||||
</Step>
|
||||
|
||||
<Step title="Fill in the crew inputs">
|
||||
Enter the crew's input fields. These are the same inputs you'd provide for a normal kickoff — they're dynamically loaded based on your crew's configuration.
|
||||
</Step>
|
||||
|
||||
<Step title="Start training">
|
||||
Click **Train Crew**. The button changes to "Training..." with a spinner while the process runs.
|
||||
|
||||
Behind the scenes:
|
||||
- A training record is created for your deployment
|
||||
- The platform calls the deployment's auto-train endpoint
|
||||
- The crew runs its iterations automatically — no manual feedback required
|
||||
</Step>
|
||||
|
||||
<Step title="Monitor progress">
|
||||
The **Current Training Status** panel displays:
|
||||
- **Status** — Current state of the training run
|
||||
- **Nº Iterations** — Number of training iterations configured
|
||||
- **Filename** — The `.pkl` file being generated
|
||||
- **Started At** — When training began
|
||||
- **Training Inputs** — The inputs you provided
|
||||
</Step>
|
||||
</Steps>
|
||||
|
||||
## Understanding training results
|
||||
|
||||
Once training completes, you'll see per-agent result cards with the following information:
|
||||
|
||||
- **Agent Role** — The name/role of the agent in your crew
|
||||
- **Final Quality** — A score from 0 to 10 evaluating the agent's output quality
|
||||
- **Final Summary** — A summary of the agent's performance during training
|
||||
- **Suggestions** — Actionable recommendations for improving the agent's behavior
|
||||
|
||||
### Editing suggestions
|
||||
|
||||
You can refine the suggestions for any agent:
|
||||
|
||||
<Steps>
|
||||
<Step title="Click Edit">
|
||||
On any agent's result card, click the **Edit** button next to the suggestions.
|
||||
</Step>
|
||||
|
||||
<Step title="Modify suggestions">
|
||||
Update the suggestions text to better reflect the improvements you want.
|
||||
</Step>
|
||||
|
||||
<Step title="Save changes">
|
||||
Click **Save**. The edited suggestions sync back to the deployment and are used in all future runs.
|
||||
</Step>
|
||||
</Steps>
|
||||
|
||||
## Using trained data
|
||||
|
||||
To apply training results to your crew:
|
||||
|
||||
1. Note the **Training Filename** (the `.pkl` file) from your completed training session.
|
||||
2. Specify this filename in your deployment's kickoff or run configuration.
|
||||
3. The crew automatically loads the training file and applies the stored suggestions to each agent.
|
||||
|
||||
This means agents benefit from the feedback generated during training on every subsequent run.
|
||||
|
||||
## Previous trainings
|
||||
|
||||
The bottom of the Training tab displays a **history of all past training sessions** for the deployment. Use this to review previous training runs, compare results, or select a different training file to use.
|
||||
|
||||
## Error handling
|
||||
|
||||
If a training run fails, the status panel shows an error state along with a message describing what went wrong.
|
||||
|
||||
Common causes of training failures:
|
||||
- **Deployment runtime not updated** — Ensure your deployment is running the latest version
|
||||
- **Crew execution errors** — Issues within the crew's task logic or agent configuration
|
||||
- **Network issues** — Connectivity problems between the platform and the deployment
|
||||
|
||||
## Limitations
|
||||
|
||||
<Info>
|
||||
Keep these constraints in mind when planning your training workflow:
|
||||
- **One active training at a time** per deployment — wait for the current run to finish before starting another
|
||||
- **Auto-train mode only** — the platform does not support interactive per-iteration feedback like the CLI does
|
||||
- **Training data is deployment-specific** — training results are tied to the specific deployment instance and version
|
||||
</Info>
|
||||
|
||||
## Related resources
|
||||
|
||||
<CardGroup cols={3}>
|
||||
<Card title="Training Concepts" icon="book" href="/en/concepts/training">
|
||||
Learn how CrewAI training works under the hood.
|
||||
</Card>
|
||||
<Card title="Kickoff Crew" icon="play" href="/en/enterprise/guides/kickoff-crew">
|
||||
Run your deployed crew from the AMP platform.
|
||||
</Card>
|
||||
<Card title="Deploy to AMP" icon="cloud-arrow-up" href="/en/enterprise/guides/deploy-to-amp">
|
||||
Get your crew deployed and ready for training.
|
||||
</Card>
|
||||
</CardGroup>
|
||||
@@ -4,26 +4,6 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="2026년 3월 26일">
|
||||
## v1.12.0a3
|
||||
|
||||
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
|
||||
|
||||
## 변경 사항
|
||||
|
||||
### 버그 수정
|
||||
- 트레이스 배치 푸시에 대한 잘못된 자격 증명 수정 (404)
|
||||
- HITL 흐름 시스템의 여러 버그 해결
|
||||
|
||||
### 문서
|
||||
- v1.12.0a2에 대한 변경 로그 및 버전 업데이트
|
||||
|
||||
## 기여자
|
||||
|
||||
@akaKuruma, @greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="2026년 3월 25일">
|
||||
## v1.12.0a2
|
||||
|
||||
|
||||
@@ -4,26 +4,6 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="26 mar 2026">
|
||||
## v1.12.0a3
|
||||
|
||||
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.12.0a3)
|
||||
|
||||
## O que Mudou
|
||||
|
||||
### Correções de Bugs
|
||||
- Corrigir credenciais inválidas para envio em lote de rastros (404)
|
||||
- Resolver múltiplos bugs no sistema de fluxo HITL
|
||||
|
||||
### Documentação
|
||||
- Atualizar changelog e versão para v1.12.0a2
|
||||
|
||||
## Contributors
|
||||
|
||||
@akaKuruma, @greysonlalonde
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="25 mar 2026">
|
||||
## v1.12.0a2
|
||||
|
||||
|
||||
@@ -152,4 +152,4 @@ __all__ = [
|
||||
"wrap_file_source",
|
||||
]
|
||||
|
||||
__version__ = "1.12.0a3"
|
||||
__version__ = "1.12.0a2"
|
||||
|
||||
@@ -11,7 +11,7 @@ dependencies = [
|
||||
"pytube~=15.0.0",
|
||||
"requests~=2.32.5",
|
||||
"docker~=7.1.0",
|
||||
"crewai==1.12.0a3",
|
||||
"crewai==1.12.0a2",
|
||||
"tiktoken~=0.8.0",
|
||||
"beautifulsoup4~=4.13.4",
|
||||
"python-docx~=1.2.0",
|
||||
|
||||
@@ -309,4 +309,4 @@ __all__ = [
|
||||
"ZapierActionTools",
|
||||
]
|
||||
|
||||
__version__ = "1.12.0a3"
|
||||
__version__ = "1.12.0a2"
|
||||
|
||||
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
|
||||
|
||||
[project.optional-dependencies]
|
||||
tools = [
|
||||
"crewai-tools==1.12.0a3",
|
||||
"crewai-tools==1.12.0a2",
|
||||
]
|
||||
embeddings = [
|
||||
"tiktoken~=0.8.0"
|
||||
|
||||
@@ -42,7 +42,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
|
||||
|
||||
_suppress_pydantic_deprecation_warnings()
|
||||
|
||||
__version__ = "1.12.0a3"
|
||||
__version__ = "1.12.0a2"
|
||||
_telemetry_submitted = False
|
||||
|
||||
|
||||
|
||||
@@ -196,16 +196,6 @@ class PlusAPI:
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
def mark_ephemeral_trace_batch_as_failed(
|
||||
self, trace_batch_id: str, error_message: str
|
||||
) -> httpx.Response:
|
||||
return self._make_request(
|
||||
"PATCH",
|
||||
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}",
|
||||
json={"status": "failed", "failure_reason": error_message},
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
def get_mcp_configs(self, slugs: list[str]) -> httpx.Response:
|
||||
"""Get MCP server configurations for the given slugs."""
|
||||
return self._make_request(
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.12.0a3"
|
||||
"crewai[tools]==1.12.0a2"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.12.0a3"
|
||||
"crewai[tools]==1.12.0a2"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.12.0a3"
|
||||
"crewai[tools]==1.12.0a2"
|
||||
]
|
||||
|
||||
[tool.crewai]
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from datetime import datetime, timezone
|
||||
import logging
|
||||
import uuid
|
||||
import webbrowser
|
||||
@@ -101,50 +100,20 @@ class FirstTimeTraceHandler:
|
||||
user_context=user_context,
|
||||
execution_metadata=execution_metadata,
|
||||
use_ephemeral=True,
|
||||
skip_context_check=True,
|
||||
)
|
||||
|
||||
if not self.batch_manager.trace_batch_id:
|
||||
self._gracefully_fail(
|
||||
"Backend batch creation failed, cannot send events."
|
||||
)
|
||||
self._reset_batch_state()
|
||||
return
|
||||
|
||||
self.batch_manager.backend_initialized = True
|
||||
|
||||
# Capture values before send/finalize consume them
|
||||
events_count = len(self.batch_manager.event_buffer)
|
||||
batch_id = self.batch_manager.trace_batch_id
|
||||
# Read duration non-destructively — _finalize_backend_batch will consume it
|
||||
start_time = self.batch_manager.execution_start_times.get("execution")
|
||||
duration_ms = (
|
||||
int((datetime.now(timezone.utc) - start_time).total_seconds() * 1000)
|
||||
if start_time
|
||||
else 0
|
||||
)
|
||||
|
||||
if self.batch_manager.event_buffer:
|
||||
send_status = self.batch_manager._send_events_to_backend()
|
||||
if send_status == 500 and self.batch_manager.trace_batch_id:
|
||||
self.batch_manager._mark_batch_as_failed(
|
||||
self.batch_manager.trace_batch_id,
|
||||
"Error sending events to backend",
|
||||
)
|
||||
self._reset_batch_state()
|
||||
return
|
||||
self.batch_manager._send_events_to_backend()
|
||||
|
||||
self.batch_manager._finalize_backend_batch(events_count)
|
||||
self.batch_manager.finalize_batch()
|
||||
self.ephemeral_url = self.batch_manager.ephemeral_trace_url
|
||||
|
||||
if not self.ephemeral_url:
|
||||
self._show_local_trace_message(events_count, duration_ms, batch_id)
|
||||
|
||||
self._reset_batch_state()
|
||||
self._show_local_trace_message()
|
||||
|
||||
except Exception as e:
|
||||
self._gracefully_fail(f"Backend initialization failed: {e}")
|
||||
self._reset_batch_state()
|
||||
|
||||
def _display_ephemeral_trace_link(self) -> None:
|
||||
"""Display the ephemeral trace link to the user and automatically open browser."""
|
||||
@@ -216,19 +185,6 @@ To enable tracing later, do any one of these:
|
||||
console.print(panel)
|
||||
console.print()
|
||||
|
||||
def _reset_batch_state(self) -> None:
|
||||
"""Reset batch manager state to allow future executions to re-initialize."""
|
||||
if not self.batch_manager:
|
||||
return
|
||||
self.batch_manager.batch_owner_type = None
|
||||
self.batch_manager.batch_owner_id = None
|
||||
self.batch_manager.current_batch = None
|
||||
self.batch_manager.event_buffer.clear()
|
||||
self.batch_manager.trace_batch_id = None
|
||||
self.batch_manager.is_current_batch_ephemeral = False
|
||||
self.batch_manager.backend_initialized = False
|
||||
self.batch_manager._cleanup_batch_data()
|
||||
|
||||
def _gracefully_fail(self, error_message: str) -> None:
|
||||
"""Handle errors gracefully without disrupting user experience."""
|
||||
console = Console()
|
||||
@@ -236,9 +192,7 @@ To enable tracing later, do any one of these:
|
||||
|
||||
logger.debug(f"First-time trace error: {error_message}")
|
||||
|
||||
def _show_local_trace_message(
|
||||
self, events_count: int = 0, duration_ms: int = 0, batch_id: str | None = None
|
||||
) -> None:
|
||||
def _show_local_trace_message(self) -> None:
|
||||
"""Show message when traces were collected locally but couldn't be uploaded."""
|
||||
if self.batch_manager is None:
|
||||
return
|
||||
@@ -249,9 +203,9 @@ To enable tracing later, do any one of these:
|
||||
📊 Your execution traces were collected locally!
|
||||
|
||||
Unfortunately, we couldn't upload them to the server right now, but here's what we captured:
|
||||
• {events_count} trace events
|
||||
• Execution duration: {duration_ms}ms
|
||||
• Batch ID: {batch_id}
|
||||
• {len(self.batch_manager.event_buffer)} trace events
|
||||
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
|
||||
• Batch ID: {self.batch_manager.trace_batch_id}
|
||||
|
||||
✅ Tracing has been enabled for future runs!
|
||||
Your preference has been saved. Future Crew/Flow executions will automatically collect traces.
|
||||
|
||||
@@ -2,7 +2,6 @@ from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from logging import getLogger
|
||||
from threading import Condition, Lock
|
||||
import time
|
||||
from typing import Any
|
||||
import uuid
|
||||
|
||||
@@ -99,7 +98,7 @@ class TraceBatchManager:
|
||||
self._initialize_backend_batch(
|
||||
user_context, execution_metadata, use_ephemeral
|
||||
)
|
||||
self.backend_initialized = self.trace_batch_id is not None
|
||||
self.backend_initialized = True
|
||||
|
||||
self._batch_ready_cv.notify_all()
|
||||
return self.current_batch
|
||||
@@ -109,15 +108,14 @@ class TraceBatchManager:
|
||||
user_context: dict[str, str],
|
||||
execution_metadata: dict[str, Any],
|
||||
use_ephemeral: bool = False,
|
||||
skip_context_check: bool = False,
|
||||
) -> None:
|
||||
"""Send batch initialization to backend"""
|
||||
|
||||
if not skip_context_check and not is_tracing_enabled_in_context():
|
||||
return None
|
||||
if not is_tracing_enabled_in_context():
|
||||
return
|
||||
|
||||
if not self.plus_api or not self.current_batch:
|
||||
return None
|
||||
return
|
||||
|
||||
try:
|
||||
payload = {
|
||||
@@ -144,53 +142,19 @@ class TraceBatchManager:
|
||||
payload["ephemeral_trace_id"] = self.current_batch.batch_id
|
||||
payload["user_identifier"] = get_user_id()
|
||||
|
||||
max_retries = 1
|
||||
response = None
|
||||
|
||||
try:
|
||||
for attempt in range(max_retries + 1):
|
||||
response = (
|
||||
self.plus_api.initialize_ephemeral_trace_batch(payload)
|
||||
if use_ephemeral
|
||||
else self.plus_api.initialize_trace_batch(payload)
|
||||
)
|
||||
if response is not None and response.status_code < 500:
|
||||
break
|
||||
if attempt < max_retries:
|
||||
logger.debug(
|
||||
f"Trace batch init attempt {attempt + 1} failed "
|
||||
f"(status={response.status_code if response else 'None'}), retrying..."
|
||||
)
|
||||
time.sleep(0.2)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Error initializing trace batch: {e}. Continuing without tracing."
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
return None
|
||||
response = (
|
||||
self.plus_api.initialize_ephemeral_trace_batch(payload)
|
||||
if use_ephemeral
|
||||
else self.plus_api.initialize_trace_batch(payload)
|
||||
)
|
||||
|
||||
if response is None:
|
||||
logger.warning(
|
||||
"Trace batch initialization failed gracefully. Continuing without tracing."
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
return None
|
||||
|
||||
# Fall back to ephemeral on auth failure (expired/revoked token)
|
||||
if response.status_code in [401, 403] and not use_ephemeral:
|
||||
logger.warning(
|
||||
"Auth rejected by server, falling back to ephemeral tracing."
|
||||
)
|
||||
self.is_current_batch_ephemeral = True
|
||||
return self._initialize_backend_batch(
|
||||
user_context,
|
||||
execution_metadata,
|
||||
use_ephemeral=True,
|
||||
skip_context_check=skip_context_check,
|
||||
)
|
||||
return
|
||||
|
||||
if response.status_code in [201, 200]:
|
||||
self.is_current_batch_ephemeral = use_ephemeral
|
||||
response_data = response.json()
|
||||
self.trace_batch_id = (
|
||||
response_data["trace_id"]
|
||||
@@ -201,22 +165,11 @@ class TraceBatchManager:
|
||||
logger.warning(
|
||||
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Error initializing trace batch: {e}. Continuing without tracing."
|
||||
)
|
||||
self.trace_batch_id = None
|
||||
|
||||
def _mark_batch_as_failed(self, trace_batch_id: str, error_message: str) -> None:
|
||||
"""Mark a trace batch as failed, routing to the correct endpoint."""
|
||||
if self.is_current_batch_ephemeral:
|
||||
self.plus_api.mark_ephemeral_trace_batch_as_failed(
|
||||
trace_batch_id, error_message
|
||||
)
|
||||
else:
|
||||
self.plus_api.mark_trace_batch_as_failed(trace_batch_id, error_message)
|
||||
|
||||
def begin_event_processing(self) -> None:
|
||||
"""Mark that an event handler started processing (for synchronization)."""
|
||||
@@ -307,7 +260,7 @@ class TraceBatchManager:
|
||||
logger.error(
|
||||
"Event handler timeout - marking batch as failed due to incomplete events"
|
||||
)
|
||||
self._mark_batch_as_failed(
|
||||
self.plus_api.mark_trace_batch_as_failed(
|
||||
self.trace_batch_id,
|
||||
"Timeout waiting for event handlers - events incomplete",
|
||||
)
|
||||
@@ -331,7 +284,7 @@ class TraceBatchManager:
|
||||
events_sent_to_backend_status = self._send_events_to_backend()
|
||||
self.event_buffer = original_buffer
|
||||
if events_sent_to_backend_status == 500 and self.trace_batch_id:
|
||||
self._mark_batch_as_failed(
|
||||
self.plus_api.mark_trace_batch_as_failed(
|
||||
self.trace_batch_id, "Error sending events to backend"
|
||||
)
|
||||
return None
|
||||
@@ -411,16 +364,13 @@ class TraceBatchManager:
|
||||
logger.error(
|
||||
f"❌ Failed to finalize trace batch: {response.status_code} - {response.text}"
|
||||
)
|
||||
self._mark_batch_as_failed(self.trace_batch_id, response.text)
|
||||
self.plus_api.mark_trace_batch_as_failed(
|
||||
self.trace_batch_id, response.text
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Error finalizing trace batch: {e}")
|
||||
try:
|
||||
self._mark_batch_as_failed(self.trace_batch_id, str(e))
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"Could not mark trace batch as failed (network unavailable)"
|
||||
)
|
||||
self.plus_api.mark_trace_batch_as_failed(self.trace_batch_id, str(e))
|
||||
|
||||
def _cleanup_batch_data(self) -> None:
|
||||
"""Clean up batch data after successful finalization to free memory"""
|
||||
|
||||
@@ -235,11 +235,8 @@ class TraceCollectionListener(BaseEventListener):
|
||||
|
||||
@event_bus.on(FlowStartedEvent)
|
||||
def on_flow_started(source: Any, event: FlowStartedEvent) -> None:
|
||||
# Always call _initialize_flow_batch to claim ownership.
|
||||
# If batch was already initialized by a concurrent action event
|
||||
# (race condition), initialize_batch() returns early but
|
||||
# batch_owner_type is still correctly set to "flow".
|
||||
self._initialize_flow_batch(source, event)
|
||||
if not self.batch_manager.is_batch_initialized():
|
||||
self._initialize_flow_batch(source, event)
|
||||
self._handle_trace_event("flow_started", source, event)
|
||||
|
||||
@event_bus.on(MethodExecutionStartedEvent)
|
||||
@@ -269,12 +266,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
|
||||
@event_bus.on(CrewKickoffStartedEvent)
|
||||
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
|
||||
if self.batch_manager.batch_owner_type != "flow":
|
||||
# Always call _initialize_crew_batch to claim ownership.
|
||||
# If batch was already initialized by a concurrent action event
|
||||
# (race condition with DefaultEnvEvent), initialize_batch() returns
|
||||
# early but batch_owner_type is still correctly set to "crew".
|
||||
# Skip only when a parent flow already owns the batch.
|
||||
if not self.batch_manager.is_batch_initialized():
|
||||
self._initialize_crew_batch(source, event)
|
||||
self._handle_trace_event("crew_kickoff_started", source, event)
|
||||
|
||||
@@ -780,7 +772,7 @@ class TraceCollectionListener(BaseEventListener):
|
||||
"crew_name": getattr(source, "name", "Unknown Crew"),
|
||||
"crewai_version": get_crewai_version(),
|
||||
}
|
||||
self._initialize_batch(user_context, execution_metadata)
|
||||
self.batch_manager.initialize_batch(user_context, execution_metadata)
|
||||
|
||||
self.batch_manager.begin_event_processing()
|
||||
try:
|
||||
|
||||
@@ -127,9 +127,6 @@ To update, run: uv sync --upgrade-package crewai"""
|
||||
|
||||
def _show_tracing_disabled_message_if_needed(self) -> None:
|
||||
"""Show tracing disabled message if tracing is not enabled."""
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
TraceCollectionListener,
|
||||
)
|
||||
from crewai.events.listeners.tracing.utils import (
|
||||
has_user_declined_tracing,
|
||||
is_tracing_enabled_in_context,
|
||||
@@ -139,12 +136,6 @@ To update, run: uv sync --upgrade-package crewai"""
|
||||
if should_suppress_tracing_messages():
|
||||
return
|
||||
|
||||
# Don't show "disabled" message when the first-time handler will show
|
||||
# the trace prompt after execution completes (avoids confusing mid-flow messages)
|
||||
listener = TraceCollectionListener._instance # type: ignore[misc]
|
||||
if listener and listener.first_time_handler.is_first_time:
|
||||
return
|
||||
|
||||
if not is_tracing_enabled_in_context():
|
||||
if has_user_declined_tracing():
|
||||
message = """Info: Tracing is disabled.
|
||||
|
||||
@@ -182,7 +182,7 @@ class ConsoleProvider:
|
||||
console.print(message, style="yellow")
|
||||
console.print()
|
||||
|
||||
response = input(">>> ").strip()
|
||||
response = input(">>> \n").strip()
|
||||
else:
|
||||
response = input(f"{message} ").strip()
|
||||
|
||||
|
||||
@@ -63,32 +63,6 @@ class PendingFeedbackContext:
|
||||
llm: dict[str, Any] | str | None = None
|
||||
requested_at: datetime = field(default_factory=datetime.now)
|
||||
|
||||
@staticmethod
|
||||
def _make_json_safe(value: Any) -> Any:
|
||||
"""Convert a value to a JSON-serializable form.
|
||||
|
||||
Handles Pydantic models, dataclasses, and arbitrary objects by
|
||||
progressively falling back to string representation.
|
||||
"""
|
||||
if value is None or isinstance(value, (str, int, float, bool)):
|
||||
return value
|
||||
if isinstance(value, (list, tuple)):
|
||||
return [PendingFeedbackContext._make_json_safe(v) for v in value]
|
||||
if isinstance(value, dict):
|
||||
return {
|
||||
k: PendingFeedbackContext._make_json_safe(v) for k, v in value.items()
|
||||
}
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
if isinstance(value, BaseModel):
|
||||
return value.model_dump(mode="json")
|
||||
import dataclasses
|
||||
|
||||
if dataclasses.is_dataclass(value) and not isinstance(value, type):
|
||||
return PendingFeedbackContext._make_json_safe(dataclasses.asdict(value))
|
||||
return str(value)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Serialize context to a dictionary for persistence.
|
||||
|
||||
@@ -99,11 +73,11 @@ class PendingFeedbackContext:
|
||||
"flow_id": self.flow_id,
|
||||
"flow_class": self.flow_class,
|
||||
"method_name": self.method_name,
|
||||
"method_output": self._make_json_safe(self.method_output),
|
||||
"method_output": self.method_output,
|
||||
"message": self.message,
|
||||
"emit": self.emit,
|
||||
"default_outcome": self.default_outcome,
|
||||
"metadata": self._make_json_safe(self.metadata),
|
||||
"metadata": self.metadata,
|
||||
"llm": self.llm,
|
||||
"requested_at": self.requested_at.isoformat(),
|
||||
}
|
||||
|
||||
@@ -1223,6 +1223,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
# Mark that we're resuming execution
|
||||
instance._is_execution_resuming = True
|
||||
|
||||
# Mark the method as completed (it ran before pausing)
|
||||
instance._completed_methods.add(FlowMethodName(pending_context.method_name))
|
||||
|
||||
return instance
|
||||
|
||||
@property
|
||||
@@ -1377,8 +1380,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self.human_feedback_history.append(result)
|
||||
self.last_human_feedback = result
|
||||
|
||||
self._completed_methods.add(FlowMethodName(context.method_name))
|
||||
|
||||
# Clear pending context after processing
|
||||
self._pending_feedback_context = None
|
||||
|
||||
# Clear pending feedback from persistence
|
||||
@@ -1401,10 +1403,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
|
||||
self._is_execution_resuming = False
|
||||
|
||||
if emit and collapsed_outcome is None:
|
||||
collapsed_outcome = default_outcome or emit[0]
|
||||
result.outcome = collapsed_outcome
|
||||
|
||||
final_result: Any = result
|
||||
try:
|
||||
if emit and collapsed_outcome:
|
||||
self._method_outputs.append(collapsed_outcome)
|
||||
@@ -1422,8 +1421,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
from crewai.flow.async_feedback.types import HumanFeedbackPending
|
||||
|
||||
if isinstance(e, HumanFeedbackPending):
|
||||
self._pending_feedback_context = e.context
|
||||
|
||||
# Auto-save pending feedback (create default persistence if needed)
|
||||
if self._persistence is None:
|
||||
from crewai.flow.persistence import SQLiteFlowPersistence
|
||||
|
||||
@@ -1457,8 +1455,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return e
|
||||
raise
|
||||
|
||||
final_result = self._method_outputs[-1] if self._method_outputs else result
|
||||
|
||||
# Emit flow finished
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
@@ -2318,6 +2314,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
if isinstance(e, HumanFeedbackPending):
|
||||
e.context.method_name = method_name
|
||||
|
||||
# Auto-save pending feedback (create default persistence if needed)
|
||||
if self._persistence is None:
|
||||
from crewai.flow.persistence import SQLiteFlowPersistence
|
||||
|
||||
@@ -3136,16 +3133,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
if outcome.lower() == response_clean.lower():
|
||||
return outcome
|
||||
|
||||
# Partial match (longest wins, first on length ties)
|
||||
response_lower = response_clean.lower()
|
||||
best_outcome: str | None = None
|
||||
best_len = -1
|
||||
# Partial match
|
||||
for outcome in outcomes:
|
||||
if outcome.lower() in response_lower and len(outcome) > best_len:
|
||||
best_outcome = outcome
|
||||
best_len = len(outcome)
|
||||
if best_outcome is not None:
|
||||
return best_outcome
|
||||
if outcome.lower() in response_clean.lower():
|
||||
return outcome
|
||||
|
||||
# Fallback to first outcome
|
||||
logger.warning(
|
||||
|
||||
@@ -116,11 +116,10 @@ def _deserialize_llm_from_context(
|
||||
return LLM(model=llm_data)
|
||||
|
||||
if isinstance(llm_data, dict):
|
||||
data = dict(llm_data)
|
||||
model = data.pop("model", None)
|
||||
model = llm_data.pop("model", None)
|
||||
if not model:
|
||||
return None
|
||||
return LLM(model=model, **data)
|
||||
return LLM(model=model, **llm_data)
|
||||
return None
|
||||
|
||||
|
||||
@@ -451,12 +450,12 @@ def human_feedback(
|
||||
|
||||
# -- Core feedback helpers ------------------------------------
|
||||
|
||||
def _build_feedback_context(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> tuple[Any, Any]:
|
||||
"""Build the PendingFeedbackContext and resolve the effective provider."""
|
||||
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
|
||||
"""Request feedback using provider or default console."""
|
||||
from crewai.flow.async_feedback.types import PendingFeedbackContext
|
||||
|
||||
# Build context for provider
|
||||
# Use flow_id property which handles both dict and BaseModel states
|
||||
context = PendingFeedbackContext(
|
||||
flow_id=flow_instance.flow_id or "unknown",
|
||||
flow_class=f"{flow_instance.__class__.__module__}.{flow_instance.__class__.__name__}",
|
||||
@@ -469,53 +468,15 @@ def human_feedback(
|
||||
llm=llm if isinstance(llm, str) else _serialize_llm_for_context(llm),
|
||||
)
|
||||
|
||||
# Determine effective provider:
|
||||
effective_provider = provider
|
||||
if effective_provider is None:
|
||||
from crewai.flow.flow_config import flow_config
|
||||
|
||||
effective_provider = flow_config.hitl_provider
|
||||
|
||||
return context, effective_provider
|
||||
|
||||
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
|
||||
"""Request feedback using provider or default console (sync)."""
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
|
||||
if effective_provider is not None:
|
||||
feedback_result = effective_provider.request_feedback(
|
||||
context, flow_instance
|
||||
)
|
||||
if asyncio.iscoroutine(feedback_result):
|
||||
raise TypeError(
|
||||
f"Provider {type(effective_provider).__name__}.request_feedback() "
|
||||
"returned a coroutine in a sync flow method. Use an async flow "
|
||||
"method or a synchronous provider."
|
||||
)
|
||||
return str(feedback_result)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
metadata=metadata,
|
||||
emit=emit,
|
||||
)
|
||||
|
||||
async def _request_feedback_async(
|
||||
flow_instance: Flow[Any], method_output: Any
|
||||
) -> str:
|
||||
"""Request feedback, awaiting the provider if it returns a coroutine."""
|
||||
context, effective_provider = _build_feedback_context(
|
||||
flow_instance, method_output
|
||||
)
|
||||
|
||||
if effective_provider is not None:
|
||||
feedback_result = effective_provider.request_feedback(
|
||||
context, flow_instance
|
||||
)
|
||||
if asyncio.iscoroutine(feedback_result):
|
||||
return str(await feedback_result)
|
||||
return str(feedback_result)
|
||||
return effective_provider.request_feedback(context, flow_instance)
|
||||
return flow_instance._request_human_feedback(
|
||||
message=message,
|
||||
output=method_output,
|
||||
@@ -563,11 +524,10 @@ def human_feedback(
|
||||
flow_instance.human_feedback_history.append(result)
|
||||
flow_instance.last_human_feedback = result
|
||||
|
||||
# Return based on mode
|
||||
if emit:
|
||||
if collapsed_outcome is None:
|
||||
collapsed_outcome = default_outcome or emit[0]
|
||||
result.outcome = collapsed_outcome
|
||||
return collapsed_outcome
|
||||
# Return outcome for routing
|
||||
return collapsed_outcome # type: ignore[return-value]
|
||||
return result
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
@@ -580,7 +540,7 @@ def human_feedback(
|
||||
if learn and getattr(self, "memory", None) is not None:
|
||||
method_output = _pre_review_with_lessons(self, method_output)
|
||||
|
||||
raw_feedback = await _request_feedback_async(self, method_output)
|
||||
raw_feedback = _request_feedback(self, method_output)
|
||||
result = _process_feedback(self, method_output, raw_feedback)
|
||||
|
||||
# Distill: extract lessons from output + feedback, store in memory
|
||||
|
||||
@@ -7,7 +7,6 @@ from crewai.events.listeners.tracing.first_time_trace_handler import (
|
||||
FirstTimeTraceHandler,
|
||||
)
|
||||
from crewai.events.listeners.tracing.trace_batch_manager import (
|
||||
TraceBatch,
|
||||
TraceBatchManager,
|
||||
)
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
@@ -658,16 +657,6 @@ class TestTraceListenerSetup:
|
||||
|
||||
trace_listener.first_time_handler.collected_events = True
|
||||
|
||||
mock_batch_response = MagicMock()
|
||||
mock_batch_response.status_code = 201
|
||||
mock_batch_response.json.return_value = {
|
||||
"trace_id": "mock-trace-id",
|
||||
"ephemeral_trace_id": "mock-ephemeral-trace-id",
|
||||
"access_code": "TRACE-mock",
|
||||
}
|
||||
mock_events_response = MagicMock()
|
||||
mock_events_response.status_code = 200
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
trace_listener.first_time_handler,
|
||||
@@ -677,40 +666,6 @@ class TestTraceListenerSetup:
|
||||
patch.object(
|
||||
trace_listener.first_time_handler, "_display_ephemeral_trace_link"
|
||||
) as mock_display_link,
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=mock_batch_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=mock_batch_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"send_trace_events",
|
||||
return_value=mock_events_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"send_ephemeral_trace_events",
|
||||
return_value=mock_events_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"finalize_trace_batch",
|
||||
return_value=mock_events_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager.plus_api,
|
||||
"finalize_ephemeral_trace_batch",
|
||||
return_value=mock_events_response,
|
||||
),
|
||||
patch.object(
|
||||
trace_listener.batch_manager,
|
||||
"_cleanup_batch_data",
|
||||
),
|
||||
):
|
||||
crew.kickoff()
|
||||
wait_for_event_handlers()
|
||||
@@ -963,676 +918,3 @@ class TestTraceListenerSetup:
|
||||
mock_init.assert_called_once()
|
||||
payload = mock_init.call_args[0][0]
|
||||
assert "user_identifier" not in payload
|
||||
|
||||
|
||||
class TestTraceBatchIdClearedOnFailure:
|
||||
"""Tests: trace_batch_id is cleared when _initialize_backend_batch fails."""
|
||||
|
||||
def _make_batch_manager(self):
|
||||
"""Create a TraceBatchManager with a pre-set trace_batch_id (simulating first-time user)."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id # simulate line 96
|
||||
bm.is_current_batch_ephemeral = True
|
||||
return bm
|
||||
|
||||
def test_trace_batch_id_cleared_on_exception(self):
|
||||
"""trace_batch_id must be None when the API call raises an exception."""
|
||||
bm = self._make_batch_manager()
|
||||
assert bm.trace_batch_id is not None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=ConnectionError("network down"),
|
||||
),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
|
||||
def test_trace_batch_id_set_on_success(self):
|
||||
"""trace_batch_id must be set from the server response on success."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-ephemeral-trace-id-999"
|
||||
|
||||
mock_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=mock_response,
|
||||
),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
|
||||
def test_send_events_skipped_when_trace_batch_id_none(self):
|
||||
"""_send_events_to_backend must return early when trace_batch_id is None."""
|
||||
bm = self._make_batch_manager()
|
||||
bm.trace_batch_id = None
|
||||
bm.event_buffer = [MagicMock()] # has events
|
||||
|
||||
with patch.object(
|
||||
bm.plus_api, "send_ephemeral_trace_events"
|
||||
) as mock_send:
|
||||
result = bm._send_events_to_backend()
|
||||
|
||||
assert result == 500
|
||||
mock_send.assert_not_called()
|
||||
|
||||
|
||||
class TestInitializeBackendBatchRetry:
|
||||
"""Tests for retry logic in _initialize_backend_batch."""
|
||||
|
||||
def _make_batch_manager(self):
|
||||
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = True
|
||||
return bm
|
||||
|
||||
def test_retries_on_none_response_then_succeeds(self):
|
||||
"""Retries when API returns None, succeeds on second attempt."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-id-after-retry"
|
||||
|
||||
success_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=[None, success_response],
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert mock_init.call_count == 2
|
||||
mock_sleep.assert_called_once_with(0.2)
|
||||
|
||||
def test_retries_on_5xx_then_succeeds(self):
|
||||
"""Retries on 500 server error, succeeds on second attempt."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "server-id-after-5xx"
|
||||
|
||||
error_response = MagicMock(status_code=500, text="Internal Server Error")
|
||||
success_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=[error_response, success_response],
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert mock_init.call_count == 2
|
||||
|
||||
def test_no_retry_on_exception(self):
|
||||
"""Exceptions (e.g. timeout, connection error) abort immediately without retry."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
side_effect=ConnectionError("network down"),
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
assert mock_init.call_count == 1
|
||||
mock_sleep.assert_not_called()
|
||||
|
||||
def test_no_retry_on_4xx(self):
|
||||
"""Does NOT retry on 422 — client error is not transient."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
error_response = MagicMock(status_code=422, text="Unprocessable Entity")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=error_response,
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep") as mock_sleep,
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
assert mock_init.call_count == 1
|
||||
mock_sleep.assert_not_called()
|
||||
|
||||
def test_exhausts_retries_then_clears_batch_id(self):
|
||||
"""After all retries fail, trace_batch_id is None."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=None,
|
||||
) as mock_init,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
assert mock_init.call_count == 2 # initial + 1 retry
|
||||
|
||||
|
||||
class TestFirstTimeHandlerBackendInitGuard:
|
||||
"""Tests: backend_initialized gated on actual batch creation success."""
|
||||
|
||||
def _make_handler_with_manager(self):
|
||||
"""Create a FirstTimeTraceHandler wired to a TraceBatchManager."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = True
|
||||
|
||||
handler = FirstTimeTraceHandler()
|
||||
handler.is_first_time = True
|
||||
handler.collected_events = True
|
||||
handler.batch_manager = bm
|
||||
return handler, bm
|
||||
|
||||
def test_backend_initialized_true_on_success(self):
|
||||
"""Events are sent when batch creation succeeds, then state is cleaned up."""
|
||||
handler, bm = self._make_handler_with_manager()
|
||||
server_id = "server-id-abc"
|
||||
|
||||
mock_init_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
mock_send_response = MagicMock(status_code=200)
|
||||
|
||||
trace_batch_id_during_send = None
|
||||
|
||||
def capture_send(*args, **kwargs):
|
||||
nonlocal trace_batch_id_during_send
|
||||
trace_batch_id_during_send = bm.trace_batch_id
|
||||
return mock_send_response
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=mock_init_response,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"send_ephemeral_trace_events",
|
||||
side_effect=capture_send,
|
||||
),
|
||||
patch.object(bm, "_finalize_backend_batch"),
|
||||
):
|
||||
bm.event_buffer = [MagicMock(to_dict=MagicMock(return_value={}))]
|
||||
handler._initialize_backend_and_send_events()
|
||||
|
||||
# trace_batch_id was set correctly during send
|
||||
assert trace_batch_id_during_send == server_id
|
||||
# State cleaned up after completion (singleton reuse)
|
||||
assert bm.backend_initialized is False
|
||||
assert bm.trace_batch_id is None
|
||||
assert bm.current_batch is None
|
||||
|
||||
def test_backend_initialized_false_on_failure(self):
|
||||
"""backend_initialized stays False and events are NOT sent when batch creation fails."""
|
||||
handler, bm = self._make_handler_with_manager()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=None, # server call fails
|
||||
),
|
||||
patch.object(bm, "_send_events_to_backend") as mock_send,
|
||||
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
|
||||
patch.object(handler, "_gracefully_fail") as mock_fail,
|
||||
):
|
||||
bm.event_buffer = [MagicMock()]
|
||||
handler._initialize_backend_and_send_events()
|
||||
|
||||
assert bm.backend_initialized is False
|
||||
assert bm.trace_batch_id is None
|
||||
mock_send.assert_not_called()
|
||||
mock_finalize.assert_not_called()
|
||||
mock_fail.assert_called_once()
|
||||
|
||||
def test_backend_initialized_false_on_non_2xx(self):
|
||||
"""backend_initialized stays False when server returns non-2xx."""
|
||||
handler, bm = self._make_handler_with_manager()
|
||||
|
||||
mock_response = MagicMock(status_code=500, text="Internal Server Error")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=mock_response,
|
||||
),
|
||||
patch.object(bm, "_send_events_to_backend") as mock_send,
|
||||
patch.object(bm, "_finalize_backend_batch") as mock_finalize,
|
||||
patch.object(handler, "_gracefully_fail") as mock_fail,
|
||||
):
|
||||
bm.event_buffer = [MagicMock()]
|
||||
handler._initialize_backend_and_send_events()
|
||||
|
||||
assert bm.backend_initialized is False
|
||||
assert bm.trace_batch_id is None
|
||||
mock_send.assert_not_called()
|
||||
mock_finalize.assert_not_called()
|
||||
mock_fail.assert_called_once()
|
||||
|
||||
|
||||
class TestFirstTimeHandlerAlwaysEphemeral:
|
||||
"""Tests that first-time handler always uses ephemeral with skip_context_check."""
|
||||
|
||||
def _make_handler_with_manager(self):
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = True
|
||||
|
||||
handler = FirstTimeTraceHandler()
|
||||
handler.is_first_time = True
|
||||
handler.collected_events = True
|
||||
handler.batch_manager = bm
|
||||
return handler, bm
|
||||
|
||||
def test_deferred_init_uses_ephemeral_and_skip_context_check(self):
|
||||
"""Deferred backend init always uses ephemeral=True and skip_context_check=True."""
|
||||
handler, bm = self._make_handler_with_manager()
|
||||
|
||||
with (
|
||||
patch.object(bm, "_initialize_backend_batch") as mock_init,
|
||||
patch.object(bm, "_send_events_to_backend"),
|
||||
patch.object(bm, "_finalize_backend_batch"),
|
||||
):
|
||||
mock_init.side_effect = lambda **kwargs: None
|
||||
bm.event_buffer = [MagicMock()]
|
||||
handler._initialize_backend_and_send_events()
|
||||
|
||||
mock_init.assert_called_once()
|
||||
assert mock_init.call_args.kwargs["use_ephemeral"] is True
|
||||
assert mock_init.call_args.kwargs["skip_context_check"] is True
|
||||
|
||||
|
||||
class TestAuthFailbackToEphemeral:
|
||||
"""Tests for ephemeral fallback when server rejects auth (401/403)."""
|
||||
|
||||
def _make_batch_manager(self):
|
||||
"""Create a TraceBatchManager with a pre-set trace_batch_id."""
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.current_batch = TraceBatch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew", "crew_name": "test"},
|
||||
)
|
||||
bm.trace_batch_id = bm.current_batch.batch_id
|
||||
bm.is_current_batch_ephemeral = False # authenticated path
|
||||
return bm
|
||||
|
||||
def test_401_non_ephemeral_falls_back_to_ephemeral(self):
|
||||
"""A 401 on the non-ephemeral endpoint should retry as ephemeral."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "ephemeral-fallback-id"
|
||||
|
||||
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
||||
ephemeral_success = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=auth_rejected,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=ephemeral_success,
|
||||
) as mock_ephemeral,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=False,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert bm.is_current_batch_ephemeral is True
|
||||
mock_ephemeral.assert_called_once()
|
||||
|
||||
def test_403_non_ephemeral_falls_back_to_ephemeral(self):
|
||||
"""A 403 on the non-ephemeral endpoint should also fall back."""
|
||||
bm = self._make_batch_manager()
|
||||
server_id = "ephemeral-fallback-403"
|
||||
|
||||
forbidden = MagicMock(status_code=403, text="Forbidden")
|
||||
ephemeral_success = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"ephemeral_trace_id": server_id}),
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=forbidden,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=ephemeral_success,
|
||||
),
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=False,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id == server_id
|
||||
assert bm.is_current_batch_ephemeral is True
|
||||
|
||||
def test_401_on_ephemeral_does_not_recurse(self):
|
||||
"""A 401 on the ephemeral endpoint should NOT try to fall back again."""
|
||||
bm = self._make_batch_manager()
|
||||
bm.is_current_batch_ephemeral = True
|
||||
|
||||
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=auth_rejected,
|
||||
) as mock_ephemeral,
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=True,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
# Called only once — no recursive fallback
|
||||
mock_ephemeral.assert_called()
|
||||
|
||||
def test_401_fallback_ephemeral_also_fails(self):
|
||||
"""If ephemeral fallback also fails, trace_batch_id is cleared."""
|
||||
bm = self._make_batch_manager()
|
||||
|
||||
auth_rejected = MagicMock(status_code=401, text="Bad credentials")
|
||||
ephemeral_fail = MagicMock(status_code=422, text="Validation failed")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_trace_batch",
|
||||
return_value=auth_rejected,
|
||||
),
|
||||
patch.object(
|
||||
bm.plus_api,
|
||||
"initialize_ephemeral_trace_batch",
|
||||
return_value=ephemeral_fail,
|
||||
),
|
||||
patch("crewai.events.listeners.tracing.trace_batch_manager.time.sleep"),
|
||||
):
|
||||
bm._initialize_backend_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
use_ephemeral=False,
|
||||
)
|
||||
|
||||
assert bm.trace_batch_id is None
|
||||
|
||||
|
||||
class TestMarkBatchAsFailedRouting:
|
||||
"""Tests: _mark_batch_as_failed routes to the correct endpoint."""
|
||||
|
||||
def _make_batch_manager(self, ephemeral: bool = False):
|
||||
with patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
bm.is_current_batch_ephemeral = ephemeral
|
||||
return bm
|
||||
|
||||
def test_routes_to_ephemeral_endpoint_when_ephemeral(self):
|
||||
"""Ephemeral batches must use mark_ephemeral_trace_batch_as_failed."""
|
||||
bm = self._make_batch_manager(ephemeral=True)
|
||||
|
||||
with patch.object(
|
||||
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
|
||||
) as mock_ephemeral, patch.object(
|
||||
bm.plus_api, "mark_trace_batch_as_failed"
|
||||
) as mock_non_ephemeral:
|
||||
bm._mark_batch_as_failed("batch-123", "some error")
|
||||
|
||||
mock_ephemeral.assert_called_once_with("batch-123", "some error")
|
||||
mock_non_ephemeral.assert_not_called()
|
||||
|
||||
def test_routes_to_non_ephemeral_endpoint_when_not_ephemeral(self):
|
||||
"""Non-ephemeral batches must use mark_trace_batch_as_failed."""
|
||||
bm = self._make_batch_manager(ephemeral=False)
|
||||
|
||||
with patch.object(
|
||||
bm.plus_api, "mark_ephemeral_trace_batch_as_failed"
|
||||
) as mock_ephemeral, patch.object(
|
||||
bm.plus_api, "mark_trace_batch_as_failed"
|
||||
) as mock_non_ephemeral:
|
||||
bm._mark_batch_as_failed("batch-456", "another error")
|
||||
|
||||
mock_non_ephemeral.assert_called_once_with("batch-456", "another error")
|
||||
mock_ephemeral.assert_not_called()
|
||||
|
||||
|
||||
class TestBackendInitializedGatedOnSuccess:
|
||||
"""Tests: backend_initialized reflects actual init success on non-first-time path."""
|
||||
|
||||
def test_backend_initialized_true_on_success(self):
|
||||
"""backend_initialized is True when _initialize_backend_batch succeeds."""
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
|
||||
return_value=False,
|
||||
),
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
),
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
mock_response = MagicMock(
|
||||
status_code=201,
|
||||
json=MagicMock(return_value={"trace_id": "server-id"}),
|
||||
)
|
||||
with patch.object(
|
||||
bm.plus_api, "initialize_trace_batch", return_value=mock_response
|
||||
):
|
||||
bm.initialize_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
)
|
||||
|
||||
assert bm.backend_initialized is True
|
||||
assert bm.trace_batch_id == "server-id"
|
||||
|
||||
def test_backend_initialized_false_on_failure(self):
|
||||
"""backend_initialized is False when _initialize_backend_batch fails."""
|
||||
with (
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.should_auto_collect_first_time_traces",
|
||||
return_value=False,
|
||||
),
|
||||
patch(
|
||||
"crewai.events.listeners.tracing.trace_batch_manager.get_auth_token",
|
||||
return_value="mock_token",
|
||||
),
|
||||
):
|
||||
bm = TraceBatchManager()
|
||||
with patch.object(
|
||||
bm.plus_api, "initialize_trace_batch", return_value=None
|
||||
):
|
||||
bm.initialize_batch(
|
||||
user_context={"privacy_level": "standard"},
|
||||
execution_metadata={"execution_type": "crew"},
|
||||
)
|
||||
|
||||
assert bm.backend_initialized is False
|
||||
assert bm.trace_batch_id is None
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
"""CrewAI development tools."""
|
||||
|
||||
__version__ = "1.12.0a3"
|
||||
__version__ = "1.12.0a2"
|
||||
|
||||
Reference in New Issue
Block a user