Compare commits

...

8 Commits

Author SHA1 Message Date
Greyson LaLonde
243c9edc1c docs: update changelog and version for v1.14.7rc1
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Check Documentation Broken Links / Check broken links (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run
2026-06-10 18:56:52 -07:00
Greyson LaLonde
68910b70c0 feat: bump versions to 1.14.7rc1 2026-06-10 18:50:54 -07:00
Greyson LaLonde
299782765c ci: ignore GHSA-rrmf-rvhw-rf47 (torch alias of PYSEC-2025-194)
* ci: ignore GHSA-rrmf-rvhw-rf47 (torch alias of PYSEC-2025-194)

pip-audit reports CVE-2025-3000 under its GHSA id, which the existing
PYSEC-2025-194 ignore does not match. Same advisory: memory corruption
in torch.jit.script, CVSS 1.9, local-only, no fix for torch 2.11.0.

* ci: sync GHSA-rrmf-rvhw-rf47 ignore into pre-commit pip-audit
2026-06-10 18:45:42 -07:00
Greyson LaLonde
a1f44eb272 fix(events): scope runtime state per run to bound growth and isolate concurrent runs 2026-06-10 18:39:05 -07:00
Lorenze Jay
036b032ab6 handle supporting both custom prompts (#6108)
* handle supporting both custom prompts

* handle translations

* handle deprecation warnings better
2026-06-10 17:52:53 -07:00
Lorenze Jay
f88ae54f96 fix telemetry setup on crewai-login (#6106)
* fix telemetry setup on crewai-login

* type check fix
2026-06-10 17:03:25 -07:00
Lorenze Jay
b6e5d632c1 improve convo routing cycle with one less route (#6102)
* improve one less route

* flows in flows, new agent executor causing early trace batch finalization

* addressing comments

* addressing comments pt2

* lint and typecheck fix
2026-06-10 16:49:16 -07:00
Greyson LaLonde
0d971e5bc5 feat(events): add reset_runtime_state to release accumulated bus state 2026-06-10 16:12:28 -07:00
42 changed files with 884 additions and 110 deletions

View File

@@ -64,6 +64,7 @@ jobs:
--ignore-vuln PYSEC-2025-197 \
--ignore-vuln PYSEC-2025-210 \
--ignore-vuln PYSEC-2026-139 \
--ignore-vuln GHSA-rrmf-rvhw-rf47 \
--ignore-vuln PYSEC-2025-211 \
--ignore-vuln PYSEC-2025-212 \
--ignore-vuln PYSEC-2025-213 \
@@ -81,6 +82,7 @@ jobs:
# PYSEC-2025-183 - pyjwt 2.12.1: disputed weak-encryption claim; key length is application-chosen
# PYSEC-2025-189..197 - torch 2.11.0: memory-corruption/DoS in functions only reachable via untrusted models; no fix available
# PYSEC-2025-210, PYSEC-2026-139 - torch 2.11.0: profiler/deserialization issues; no fix available
# GHSA-rrmf-rvhw-rf47 - torch 2.11.0 (CVE-2025-3000, alias of PYSEC-2025-194): memory corruption in torch.jit.script, CVSS 1.9, local-only; affected <=2.12.0, no fix available. pip-audit reports it under the GHSA id so the PYSEC ignore above does not catch it.
# PYSEC-2025-211..218 - transformers 5.5.4: deserialization/code injection via malicious model checkpoints; no fix available
# GHSA-f4j7-r4q5-qw2c - chromadb 1.1.1 (CVE-2026-45829): pre-auth RCE via /api/v2/tenants/{tenant}/databases/{db}/collections when trust_remote_code=true.
# Advisory: vulnerable >=1.0.0,<=1.5.9, firstPatchedVersion=none. We only use chromadb.PersistentClient (lib/crewai/src/crewai/rag/chromadb/factory.py)

View File

@@ -47,6 +47,7 @@ repos:
--ignore-vuln PYSEC-2025-197
--ignore-vuln PYSEC-2025-210
--ignore-vuln PYSEC-2026-139
--ignore-vuln GHSA-rrmf-rvhw-rf47
--ignore-vuln PYSEC-2025-211
--ignore-vuln PYSEC-2025-212
--ignore-vuln PYSEC-2025-213

View File

@@ -4,6 +4,38 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="10 يونيو 2026">
## v1.14.7rc1
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## ما الذي تغير
### الميزات
- إضافة `reset_runtime_state` لإطلاق حالة الحافلة المتراكمة
- التعامل مع دعم كل من الموجهات المخصصة
- فصل منطق المحادثة عن وقت التشغيل وإضافة `conversational_definition`
### إصلاحات الأخطاء
- إصلاح نطاق حالة وقت التشغيل لكل تشغيل للحد من النمو وعزل التشغيلات المتزامنة
- إصلاح إعدادات القياس عن بُعد على `crewai-login`
- إصلاح احترام `suppress_flow_events` لفعاليات تنفيذ الأساليب
### الوثائق
- تحديث صور OpenTelemetry
- تحديث الوثائق لتعكس الحالة الجديدة لجمع بيانات OpenTelemetry
- تحديث سجل التغييرات والإصدار لـ v1.14.7a4
### إعادة الهيكلة
- تبسيط تقييم شرط التدفق ليكون بلا حالة لكل حدث
- تحسين دورة توجيه المحادثة مع تقليل مسار واحد
## المساهمون
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="9 يونيو 2026">
## v1.14.7a4

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
يُحتفظ بـ `agent.i18n` للتوافق مع الإصدارات السابقة فقط، وقد تم إهماله. لتخصيص المطالبات أثناء التشغيل، مرّر `prompt_file` إلى `Crew`. وللوصول البرمجي المباشر إلى شرائح المطالبات، استخدم أداة i18n مباشرة:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### الخيار 3: تعطيل مطالبات النظام لنماذج o1
```python
agent = Agent(
@@ -208,6 +220,8 @@ agent = Agent(
يدمج CrewAI بعد ذلك تخصيصاتك مع الإعدادات الافتراضية، فلا تحتاج لإعادة تعريف كل مطالبة. إليك الطريقة:
بالنسبة للكود الذي يحتاج إلى قراءة شرائح المطالبات مباشرة، استخدم `crewai.utilities.i18n.get_i18n()` مع ملف المطالبات نفسه بدلًا من قراءة `agent.i18n`.
### مثال: تخصيص أساسي للمطالبات
أنشئ ملف `custom_prompts.json` بالمطالبات التي تريد تعديلها. تأكد من إدراج جميع المطالبات عالية المستوى التي يجب أن يحتويها، وليس فقط تغييراتك:

View File

@@ -4,6 +4,38 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Jun 10, 2026">
## v1.14.7rc1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## What's Changed
### Features
- Add `reset_runtime_state` to release accumulated bus state
- Handle supporting both custom prompts
- Decouple conversation logic from runtime and add a `conversational_definition`
### Bug Fixes
- Fix scope of runtime state per run to bound growth and isolate concurrent runs
- Fix telemetry setup on `crewai-login`
- Fix respect for `suppress_flow_events` for method-execution events
### Documentation
- Update OpenTelemetry images
- Update documentation to reflect new state of OpenTelemetry collector
- Update changelog and version for v1.14.7a4
### Refactoring
- Simplify flow condition evaluation to be stateless per event
- Improve conversation routing cycle with one less route
## Contributors
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="Jun 09, 2026">
## v1.14.7a4

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
`agent.i18n` is maintained only for backward compatibility and is deprecated. For runtime prompt customization, pass `prompt_file` to `Crew`. For programmatic access to prompt slices, use the i18n utility directly:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### Option 3: Disable System Prompts for o1 Models
```python
agent = Agent(
@@ -208,6 +220,8 @@ One straightforward approach is to create a JSON file for the prompts you want t
CrewAI then merges your customizations with the defaults, so you don't have to redefine every prompt. Here's how:
For code that needs to read prompt slices directly, use `crewai.utilities.i18n.get_i18n()` with the same prompt file instead of reading `agent.i18n`.
### Example: Basic Prompt Customization
Create a `custom_prompts.json` file with the prompts you want to modify. Ensure you list all top-level prompts it should contain, not just your changes:

View File

@@ -4,6 +4,38 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 6월 10일">
## v1.14.7rc1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## 변경 사항
### 기능
- 누적된 버스 상태를 해제하기 위해 `reset_runtime_state` 추가
- 사용자 정의 프롬프트를 모두 지원하도록 처리
- 대화 논리를 런타임과 분리하고 `conversational_definition` 추가
### 버그 수정
- 실행당 런타임 상태의 범위를 수정하여 성장 제한 및 동시 실행 격리
- `crewai-login`에서 원격 측정 설정 수정
- 메서드 실행 이벤트에 대한 `suppress_flow_events` 존중 수정
### 문서
- OpenTelemetry 이미지 업데이트
- OpenTelemetry 수집기의 새로운 상태를 반영하도록 문서 업데이트
- v1.14.7a4에 대한 변경 로그 및 버전 업데이트
### 리팩토링
- 이벤트당 상태 비저장 방식으로 흐름 조건 평가 단순화
- 경로를 하나 줄여 대화 라우팅 사이클 개선
## 기여자
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="2026년 6월 9일">
## v1.14.7a4

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
`agent.i18n`은 이전 버전과의 호환성을 위해서만 유지되며 사용이 중단될 예정입니다. 런타임 프롬프트 커스터마이징에는 `Crew`에 `prompt_file`을 전달하세요. 프롬프트 슬라이스를 코드에서 직접 읽어야 한다면 i18n 유틸리티를 직접 사용하세요:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### 옵션 3: o1 모델에 대한 시스템 프롬프트 비활성화
```python
agent = Agent(
@@ -208,6 +220,8 @@ agent = Agent(
그러면 CrewAI가 기본값과 사용자가 지정한 내용을 병합하므로, 모든 프롬프트를 다시 정의할 필요가 없습니다. 방법은 다음과 같습니다:
프롬프트 슬라이스를 코드에서 직접 읽어야 하는 경우에는 `agent.i18n`을 읽는 대신 동일한 프롬프트 파일로 `crewai.utilities.i18n.get_i18n()`을 사용하세요.
### 예시: 기본 프롬프트 커스터마이징
수정하고 싶은 프롬프트를 포함하는 `custom_prompts.json` 파일을 생성하세요. 변경 사항만이 아니라 포함해야 하는 모든 최상위 프롬프트를 반드시 나열해야 합니다:
@@ -314,4 +328,4 @@ CrewAI에서의 저수준 prompt 커스터마이제이션은 매우 맞춤화되
<Check>
이제 CrewAI에서 고급 prompt 커스터마이징을 위한 기초를 갖추었습니다. 모델별 구조나 도메인별 제약에 맞춰 적용하든, 이러한 저수준 접근 방식은 agent 상호작용을 매우 전문적으로 조정할 수 있게 해줍니다.
</Check>
</Check>

View File

@@ -4,6 +4,38 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="10 jun 2026">
## v1.14.7rc1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.7rc1)
## O que Mudou
### Recursos
- Adicionar `reset_runtime_state` para liberar o estado acumulado do barramento
- Lidar com suporte a ambos os prompts personalizados
- Desacoplar a lógica de conversa do tempo de execução e adicionar uma `conversational_definition`
### Correções de Bugs
- Corrigir o escopo do estado de tempo de execução por execução para limitar o crescimento e isolar execuções concorrentes
- Corrigir a configuração de telemetria em `crewai-login`
- Corrigir o respeito a `suppress_flow_events` para eventos de execução de método
### Documentação
- Atualizar imagens do OpenTelemetry
- Atualizar a documentação para refletir o novo estado do coletor OpenTelemetry
- Atualizar o changelog e a versão para v1.14.7a4
### Refatoração
- Simplificar a avaliação da condição de fluxo para ser sem estado por evento
- Melhorar o ciclo de roteamento de conversas com uma rota a menos
## Contribuidores
@greysonlalonde, @lorenzejay, @lucasgomide, @vinibrsl
</Update>
<Update label="09 jun 2026">
## v1.14.7a4

View File

@@ -161,6 +161,18 @@ crew = Crew(
)
```
<Note>
`agent.i18n` é mantido apenas para compatibilidade retroativa e está obsoleto. Para customização de prompts em tempo de execução, passe `prompt_file` para `Crew`. Para acesso programático aos slices de prompt, use diretamente o utilitário de i18n:
</Note>
```python
from crewai.utilities.i18n import get_i18n
i18n = get_i18n("custom_prompts.json")
format_slice = i18n.slice("format")
tool_prompt = i18n.tools("ask_question")
```
#### Opção 3: Desativar Prompts de Sistema para Modelos o1
```python
agent = Agent(
@@ -208,6 +220,8 @@ Uma abordagem direta é criar um arquivo JSON para os prompts que deseja sobresc
O CrewAI então mescla suas customizações com os padrões, assim você não precisa redefinir todos os prompts. Veja como:
Para código que precisa ler slices de prompt diretamente, use `crewai.utilities.i18n.get_i18n()` com o mesmo arquivo de prompts em vez de ler `agent.i18n`.
### Exemplo: Customização Básica de Prompt
Crie um arquivo `custom_prompts.json` com os prompts que deseja modificar. Certifique-se de listar todos os prompts de nível superior que ele deve conter, não apenas suas alterações:

View File

@@ -8,7 +8,7 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7a4",
"crewai-core==1.14.7rc1",
"click>=8.1.7,<9",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",

View File

@@ -1 +1 @@
__version__ = "1.14.7a4"
__version__ = "1.14.7rc1"

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a4"
"crewai[tools]==1.14.7rc1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a4"
"crewai[tools]==1.14.7rc1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.7a4"
"crewai[tools]==1.14.7rc1"
]
[tool.crewai]

View File

@@ -1 +1 @@
__version__ = "1.14.7a4"
__version__ = "1.14.7rc1"

View File

@@ -17,7 +17,7 @@ import contextlib
import logging
import os
import threading
from typing import Any, Final
from typing import Any, ClassVar, Final
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
@@ -27,7 +27,7 @@ from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span, Status, StatusCode
from opentelemetry.trace import ProxyTracerProvider, Span, Status, StatusCode
from typing_extensions import Self
@@ -72,8 +72,8 @@ class Telemetry:
and event-bus signal handlers (see ``crewai.telemetry.telemetry``).
"""
_instance = None
_lock = threading.Lock()
_instance: ClassVar[Self | None] = None
_lock: ClassVar[threading.Lock] = threading.Lock()
def __new__(cls) -> Self:
if cls._instance is None:
@@ -149,6 +149,10 @@ class Telemetry:
if self.ready and not self.trace_set:
try:
with suppress_warnings():
existing_provider = trace.get_tracer_provider()
if not isinstance(existing_provider, ProxyTracerProvider):
self.trace_set = True
return
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:

View File

@@ -14,6 +14,7 @@ from crewai_core import (
version,
)
import pytest
from opentelemetry.sdk.trace import TracerProvider
def test_version_returns_string() -> None:
@@ -94,3 +95,36 @@ def test_user_data_decline_blocks(
def test_unused_var_warning_silenced() -> None:
# Touch os to keep the import (used by env-var fixtures above)
assert os.environ is not None
def test_core_telemetry_skips_duplicate_tracer_provider(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from crewai_core.telemetry import Telemetry
Telemetry._instance = None
monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False)
monkeypatch.delenv("CREWAI_DISABLE_TELEMETRY", raising=False)
monkeypatch.delenv("CREWAI_DISABLE_TRACKING", raising=False)
monkeypatch.setattr(
"crewai_core.telemetry.trace.get_tracer_provider",
lambda: TracerProvider(),
)
called = False
def fail_if_called(provider: object) -> None:
nonlocal called
called = True
monkeypatch.setattr(
"crewai_core.telemetry.trace.set_tracer_provider",
fail_if_called,
)
telemetry = Telemetry()
telemetry.set_tracer()
assert called is False
assert telemetry.trace_set is True

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.7a4"
__version__ = "1.14.7rc1"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.7a4",
"crewai==1.14.7rc1",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.7a4"
__version__ = "1.14.7rc1"

View File

@@ -8,8 +8,8 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.7a4",
"crewai-cli==1.14.7a4",
"crewai-core==1.14.7rc1",
"crewai-cli==1.14.7rc1",
# Core Dependencies
"pydantic>=2.11.9,<2.13",
"openai>=2.30.0,<3",
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.7a4",
"crewai-tools==1.14.7rc1",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.7a4"
__version__ = "1.14.7rc1"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -46,6 +46,7 @@ from crewai.state.checkpoint_config import CheckpointConfig, _coerce_checkpoint
from crewai.tools.base_tool import BaseTool, Tool
from crewai.types.callback import SerializableCallable
from crewai.utilities.config import process_config
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.logger import Logger
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.string_utils import interpolate_only
@@ -186,6 +187,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
tools (list[Any] | None): Tools at the agent's disposal.
max_iter (int): Maximum iterations for an agent to execute a task.
agent_executor: An instance of the CrewAgentExecutor class.
i18n (I18N): Internationalization settings.
llm (Any): Language model that will run the agent.
crew (Any): Crew to which the agent belongs.
@@ -265,6 +267,14 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
_serialize_executor_ref, return_type=dict | None, when_used="json"
),
] = Field(default=None, description="An instance of the CrewAgentExecutor class.")
i18n: I18N = Field(
default_factory=get_i18n,
description="Internationalization settings.",
deprecated=(
"Agent.i18n is deprecated and will be removed in a future release. "
"Use crewai.utilities.i18n.get_i18n() or Crew(prompt_file=...) instead."
),
)
llm: Annotated[
str | BaseLLM | None,

View File

@@ -117,8 +117,10 @@ def capture_execution_context(
)
def apply_execution_context(ctx: ExecutionContext) -> None:
def apply_execution_context(ctx: ExecutionContext | dict[str, Any]) -> None:
"""Write an ExecutionContext back into the ContextVars."""
if isinstance(ctx, dict):
ctx = ExecutionContext.model_validate(ctx)
_current_task_id.set(ctx.current_task_id)
current_flow_request_id.set(ctx.flow_request_id)
current_flow_id.set(ctx.flow_id)

View File

@@ -1013,6 +1013,7 @@ class Crew(FlowTrackable, BaseModel):
)
token = attach(baggage_ctx)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
@@ -1048,6 +1049,7 @@ class Crew(FlowTrackable, BaseModel):
self._memory.drain_writes()
clear_files(self.id)
detach(token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
return result
@@ -1223,6 +1225,7 @@ class Crew(FlowTrackable, BaseModel):
)
token = attach(baggage_ctx)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
@@ -1256,6 +1259,7 @@ class Crew(FlowTrackable, BaseModel):
finally:
clear_files(self.id)
detach(token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
async def akickoff_for_each(
self,

View File

@@ -80,6 +80,17 @@ def is_replaying() -> bool:
return _replaying.get()
_runtime_state_var: contextvars.ContextVar[RuntimeState | None] = (
contextvars.ContextVar("crewai_runtime_state", default=None)
)
_registered_entity_ids_var: contextvars.ContextVar[set[int] | None] = (
contextvars.ContextVar("crewai_registered_entity_ids", default=None)
)
_runtime_scope_depth: contextvars.ContextVar[int] = contextvars.ContextVar(
"crewai_runtime_scope_depth", default=0
)
class CrewAIEventsBus:
"""Singleton event bus for handling events in CrewAI.
@@ -116,7 +127,6 @@ class CrewAIEventsBus:
_futures_lock: threading.Lock
_executor_initialized: bool
_has_pending_events: bool
_runtime_state: RuntimeState | None
def __new__(cls) -> Self:
"""Create or return the singleton instance.
@@ -151,8 +161,6 @@ class CrewAIEventsBus:
self._console = ConsoleFormatter()
self._executor_initialized = False
self._has_pending_events = False
self._runtime_state: RuntimeState | None = None
self._registered_entity_ids: set[int] = set()
def _ensure_executor_initialized(self) -> None:
"""Lazily initialize the thread pool executor and event loop.
@@ -281,6 +289,51 @@ class CrewAIEventsBus:
"""The RuntimeState currently attached to the bus, if any."""
return self._runtime_state
@property
def _runtime_state(self) -> RuntimeState | None:
return _runtime_state_var.get()
@_runtime_state.setter
def _runtime_state(self, value: RuntimeState | None) -> None:
_runtime_state_var.set(value)
@property
def _registered_entity_ids(self) -> set[int]:
ids = _registered_entity_ids_var.get()
if ids is None:
ids = set()
_registered_entity_ids_var.set(ids)
return ids
@_registered_entity_ids.setter
def _registered_entity_ids(self, value: set[int]) -> None:
_registered_entity_ids_var.set(value)
def reset_runtime_state(self) -> None:
"""Detach the RuntimeState and clear the entity registry."""
self._runtime_state = None
self._registered_entity_ids = set()
def _enter_runtime_scope(self) -> bool:
depth = _runtime_scope_depth.get()
_runtime_scope_depth.set(depth + 1)
if depth != 0:
return False
if _runtime_state_var.get() is None:
from crewai import RuntimeState
if RuntimeState is not None:
_runtime_state_var.set(RuntimeState(root=[]))
_registered_entity_ids_var.set(set())
return True
def _exit_runtime_scope(self, outermost: bool) -> None:
depth = _runtime_scope_depth.get()
_runtime_scope_depth.set(depth - 1 if depth > 0 else 0)
if outermost:
_runtime_state_var.set(None)
_registered_entity_ids_var.set(None)
def register_entity(self, entity: Any) -> None:
"""Add an entity to the RuntimeState, creating it if needed.
@@ -349,6 +402,7 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: SyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Call provided synchronous handlers.
@@ -356,8 +410,8 @@ class CrewAIEventsBus:
source: The emitting object
event: The event instance
handlers: Frozenset of sync handlers to call
state: The RuntimeState captured on the emitting context
"""
state = self._runtime_state
errors: list[tuple[SyncHandler, Exception]] = [
(handler, error)
for handler in handlers
@@ -376,6 +430,7 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: AsyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Asynchronously call provided async handlers.
@@ -383,8 +438,8 @@ class CrewAIEventsBus:
source: The object that emitted the event
event: The event instance
handlers: Frozenset of async handlers to call
state: The RuntimeState captured on the emitting context
"""
state = self._runtime_state
async def _call(handler: AsyncHandler) -> Any:
if _get_param_count(handler) >= 3:
@@ -399,7 +454,9 @@ class CrewAIEventsBus:
f"[CrewAIEventsBus] Async handler error in {getattr(handler, '__name__', handler)}: {result}"
)
async def _emit_with_dependencies(self, source: Any, event: BaseEvent) -> None:
async def _emit_with_dependencies(
self, source: Any, event: BaseEvent, state: RuntimeState | None
) -> None:
"""Emit an event with dependency-aware handler execution.
Handlers are grouped into execution levels based on their dependencies.
@@ -450,18 +507,18 @@ class CrewAIEventsBus:
if level_sync:
if event_type is LLMStreamChunkEvent:
self._call_handlers(source, event, level_sync)
self._call_handlers(source, event, level_sync, state)
else:
ctx = contextvars.copy_context()
future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, level_sync
ctx.run, self._call_handlers, source, event, level_sync, state
)
await asyncio.get_running_loop().run_in_executor(
None, future.result
)
if level_async:
await self._acall_handlers(source, event, level_async)
await self._acall_handlers(source, event, level_async, state)
def _register_source(self, source: Any) -> None:
"""Register the source entity in RuntimeState if applicable."""
@@ -556,21 +613,23 @@ class CrewAIEventsBus:
self._ensure_executor_initialized()
self._has_pending_events = True
state = self._runtime_state
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies(source, event),
self._emit_with_dependencies(source, event, state),
self._loop,
)
)
if sync_handlers:
if event_type is LLMStreamChunkEvent:
self._call_handlers(source, event, sync_handlers)
self._call_handlers(source, event, sync_handlers, state)
else:
ctx = contextvars.copy_context()
sync_future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, sync_handlers
ctx.run, self._call_handlers, source, event, sync_handlers, state
)
if not async_handlers:
return self._track_future(sync_future)
@@ -578,7 +637,7 @@ class CrewAIEventsBus:
if async_handlers:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers(source, event, async_handlers),
self._acall_handlers(source, event, async_handlers, state),
self._loop,
)
)
@@ -590,21 +649,22 @@ class CrewAIEventsBus:
source: Any,
event: BaseEvent,
handlers: AsyncHandlerSet,
state: RuntimeState | None,
) -> None:
"""Call async handlers with the replaying flag set on the loop thread."""
token = _replaying.set(True)
try:
await self._acall_handlers(source, event, handlers)
await self._acall_handlers(source, event, handlers, state)
finally:
_replaying.reset(token)
async def _emit_with_dependencies_replaying(
self, source: Any, event: BaseEvent
self, source: Any, event: BaseEvent, state: RuntimeState | None
) -> None:
"""Dependency-aware dispatch with the replaying flag set."""
token = _replaying.set(True)
try:
await self._emit_with_dependencies(source, event)
await self._emit_with_dependencies(source, event, state)
finally:
_replaying.reset(token)
@@ -638,12 +698,13 @@ class CrewAIEventsBus:
self._ensure_executor_initialized()
self._has_pending_events = True
state = self._runtime_state
token = _replaying.set(True)
try:
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies_replaying(source, event),
self._emit_with_dependencies_replaying(source, event, state),
self._loop,
)
)
@@ -651,7 +712,7 @@ class CrewAIEventsBus:
if sync_handlers:
ctx = contextvars.copy_context()
sync_future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, sync_handlers
ctx.run, self._call_handlers, source, event, sync_handlers, state
)
self._track_future(sync_future)
if not async_handlers:
@@ -659,7 +720,9 @@ class CrewAIEventsBus:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers_replaying(source, event, async_handlers),
self._acall_handlers_replaying(
source, event, async_handlers, state
),
self._loop,
)
)
@@ -727,7 +790,9 @@ class CrewAIEventsBus:
async_handlers = self._async_handlers.get(event_type, frozenset())
if async_handlers:
await self._acall_handlers(source, event, async_handlers)
await self._acall_handlers(
source, event, async_handlers, self._runtime_state
)
def register_handler(
self,

View File

@@ -292,7 +292,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
self._handle_trace_event("crew_kickoff_completed", source, event)
if self.batch_manager.defer_session_finalization:
if self._should_defer_session_finalization():
return
if self._nested_in_flow_execution():
return
@@ -306,7 +306,7 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self._handle_trace_event("crew_kickoff_failed", source, event)
if self.batch_manager.defer_session_finalization:
if self._should_defer_session_finalization():
return
if self._nested_in_flow_execution():
return
@@ -734,7 +734,7 @@ class TraceCollectionListener(BaseEventListener):
if not self.batch_manager.is_batch_initialized():
return
# Multi-turn flows defer batch finalization to finalize_session_traces().
if self.batch_manager.defer_session_finalization:
if self._should_defer_session_finalization():
return
self.batch_manager.finalize_batch()
@@ -745,6 +745,15 @@ class TraceCollectionListener(BaseEventListener):
return current_flow_id.get() is not None
def _should_defer_session_finalization(self) -> bool:
"""True when the active trace belongs to a deferred flow session."""
from crewai.flow.flow_context import current_flow_defer_trace_finalization
return (
self.batch_manager.defer_session_finalization
or current_flow_defer_trace_finalization.get()
)
def _flow_owns_trace_batch(self) -> bool:
"""True when an in-flight conversational flow already owns the trace batch."""
if self.batch_manager.batch_owner_type == "flow":
@@ -786,7 +795,11 @@ class TraceCollectionListener(BaseEventListener):
(``current_flow_id``) to keep LLM/tool events from falling back to an
implicit crew batch.
"""
from crewai.flow.flow_context import current_flow_id, current_flow_name
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
)
flow_id = current_flow_id.get()
if flow_id is None:
@@ -802,6 +815,8 @@ class TraceCollectionListener(BaseEventListener):
}
self.batch_manager.batch_owner_type = "flow"
self.batch_manager.batch_owner_id = flow_id
if current_flow_defer_trace_finalization.get():
self.batch_manager.defer_session_finalization = True
self._initialize_batch(user_context, execution_metadata)
return True

View File

@@ -1,6 +1,6 @@
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, field_serializer
from crewai.events.base_events import BaseEvent
@@ -57,6 +57,10 @@ class MethodExecutionFailedEvent(FlowEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_serializer("error")
def _serialize_error(self, error: Exception) -> str:
return str(error)
class MethodExecutionPausedEvent(FlowEvent):
"""Event emitted when a flow method is paused waiting for human feedback.

View File

@@ -46,7 +46,9 @@ from crewai.flow.conversation import (
get_conversation_messages,
receive_user_message as _receive_user_message,
)
from crewai.flow.dsl import listen, router, start
from crewai.flow.dsl import listen, start
from crewai.flow.dsl._utils import _set_flow_method_definition
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.utilities.types import LLMMessage
@@ -72,6 +74,15 @@ def _iter_condition_labels(condition: Any) -> set[str]:
return set()
def _conversation_start_router(func: Callable[..., Any]) -> Any:
wrapper = start()(func)
_set_flow_method_definition(
cast(Any, wrapper),
FlowMethodDefinition(start=True, router=True),
)
return wrapper
class _ConversationalMixin:
"""Experimental conversational graph for ``Flow``.
@@ -85,10 +96,7 @@ class _ConversationalMixin:
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
internal_routes: ClassVar[tuple[str, ...]] = (
"answer_from_history",
"conversation_start",
)
internal_routes: ClassVar[tuple[str, ...]] = ("answer_from_history",)
builtin_route_descriptions: ClassVar[dict[str, str]] = {
"converse": (
"Ordinary chat, follow-ups, summaries, clarifications, and "
@@ -138,23 +146,24 @@ class _ConversationalMixin:
def kickoff(self, *args: Any, **kwargs: Any) -> Any:
pass
@start()
@_conversational_only
def conversation_start(self) -> str | None:
"""Internal Flow entrypoint that hands the user message to the router.
"""Return the current user message for conversational route selection.
In conversational mode, ``Flow.kickoff_async`` runs all ``@start``
methods sequentially and this one is registered last, so any user
``@start`` methods (e.g. permission loading) have already finished
before the returned value triggers ``route_conversation``.
This remains as a plain overridable helper for compatibility. It is not
registered as a Flow method; ``route_conversation`` is the synthetic
built-in start/router that begins a conversational turn.
"""
state = cast(ConversationState, self.state)
return state.current_user_message
@router(conversation_start)
@_conversation_start_router
@_conversational_only
def route_conversation(self) -> str:
"""Route the current turn to a listener label."""
if "conversation_start" not in {
str(method_name) for method_name in self._completed_methods
}:
self.conversation_start()
state = cast(ConversationState, self.state)
context = self.build_router_context()
previous_intent = state.last_intent
@@ -651,16 +660,16 @@ class _ConversationalMixin:
if not type(self)._is_conversational():
return start_methods, False
conversation_start = "conversation_start"
if conversation_start not in {str(method) for method in start_methods}:
route_conversation = "route_conversation"
if route_conversation not in {str(method) for method in start_methods}:
return start_methods, False
ordered_starts = [
method for method in start_methods if str(method) != conversation_start
method for method in start_methods if str(method) != route_conversation
]
ordered_starts.append(
next(
method for method in start_methods if str(method) == conversation_start
method for method in start_methods if str(method) == route_conversation
)
)
return ordered_starts, True
@@ -1047,12 +1056,15 @@ class _ConversationalMixin:
trace_listener = TraceCollectionListener()
batch_manager = trace_listener.batch_manager
if batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
batch_manager.finalize_batch()
try:
if batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
batch_manager.finalize_batch()
finally:
batch_manager.defer_session_finalization = False
__all__ = ["_ConversationalMixin"]

View File

@@ -39,9 +39,7 @@ class FlowConversationalDefinition(BaseModel):
visible_agent_outputs: list[str] | Literal["all"] | None = None
defer_trace_finalization: bool = True
builtin_routes: list[str] = Field(default_factory=lambda: ["converse", "end"])
internal_routes: list[str] = Field(
default_factory=lambda: ["answer_from_history", "conversation_start"]
)
internal_routes: list[str] = Field(default_factory=lambda: ["answer_from_history"])
__all__ = [

View File

@@ -313,7 +313,7 @@ def _build_conversational_definition(
internal_routes = getattr(
flow_class,
"internal_routes",
("answer_from_history", "conversation_start"),
("answer_from_history",),
)
if config is None:
return FlowConversationalDefinition(

View File

@@ -15,6 +15,10 @@ current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_id", default=None
)
current_flow_defer_trace_finalization: contextvars.ContextVar[bool] = (
contextvars.ContextVar("flow_defer_trace_finalization", default=False)
)
current_flow_method_name: contextvars.ContextVar[str] = contextvars.ContextVar(
"flow_method_name", default="unknown"
)

View File

@@ -85,7 +85,12 @@ from crewai.events.types.flow_events import (
MethodExecutionStartedEvent,
)
from crewai.flow.dsl._utils import build_flow_definition
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
current_flow_request_id,
)
from crewai.flow.flow_definition import (
FlowDefinition,
FlowDefinitionCondition,
@@ -1514,7 +1519,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
if not self.suppress_flow_events:
if (
not self.suppress_flow_events
and not self._should_defer_trace_finalization()
):
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
@@ -1531,7 +1539,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
logger.warning("FlowFinishedEvent handler failed", exc_info=True)
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if (
trace_listener.batch_manager.batch_owner_type == "flow"
and current_flow_id.get() == self.flow_id
and not trace_listener.batch_manager.defer_session_finalization
and not current_flow_defer_trace_finalization.get()
):
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
@@ -1922,13 +1935,17 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
restore_from_state_id=restore_from_state_id,
)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
asyncio.get_running_loop()
ctx = contextvars.copy_context()
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(ctx.run, asyncio.run, _run_flow()).result()
except RuntimeError:
return asyncio.run(_run_flow())
try:
asyncio.get_running_loop()
ctx = contextvars.copy_context()
with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(ctx.run, asyncio.run, _run_flow()).result()
except RuntimeError:
return asyncio.run(_run_flow())
finally:
crewai_event_bus._exit_runtime_scope(runtime_scope)
async def kickoff_async(
self,
@@ -2020,12 +2037,23 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
flow_token = attach(ctx)
flow_id_token = None
flow_name_token = None
flow_defer_trace_finalization_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
flow_name_token = current_flow_name.set(
self.name or self.__class__.__name__
)
flow_defer_trace_finalization_token = (
current_flow_defer_trace_finalization.set(
self._should_defer_trace_finalization()
)
)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
# Reset flow state for fresh execution unless restoring from persistence
is_restoring = (
@@ -2117,6 +2145,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
should_emit_flow_started = not (
defer_trace_finalization and deferred_started_event_id
)
if current_flow_id.get() == self.flow_id:
TraceCollectionListener().batch_manager.defer_session_finalization = (
defer_trace_finalization
)
if (
defer_trace_finalization
@@ -2290,7 +2322,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if (
trace_listener.batch_manager.batch_owner_type == "flow"
and current_flow_id.get() == self.flow_id
and not trace_listener.batch_manager.defer_session_finalization
and not current_flow_defer_trace_finalization.get()
):
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
@@ -2304,9 +2341,16 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
self.memory.drain_writes()
if request_id_token is not None:
current_flow_request_id.reset(request_id_token)
if flow_defer_trace_finalization_token is not None:
current_flow_defer_trace_finalization.reset(
flow_defer_trace_finalization_token
)
if flow_name_token is not None:
current_flow_name.reset(flow_name_token)
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
detach(flow_token)
crewai_event_bus._exit_runtime_scope(runtime_scope)
async def akickoff(
self,

View File

@@ -30,7 +30,7 @@ from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import Span
from opentelemetry.trace import ProxyTracerProvider, Span
from typing_extensions import Self
from crewai.events.event_bus import crewai_event_bus
@@ -162,6 +162,10 @@ class Telemetry:
if self.ready and not self.trace_set:
try:
with suppress_warnings():
existing_provider = trace.get_tracer_provider()
if not isinstance(existing_provider, ProxyTracerProvider):
self.trace_set = True
return
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:

View File

@@ -4,6 +4,7 @@ import os
import threading
from unittest import mock
from unittest.mock import MagicMock, patch
import warnings
from crewai.agents.crew_agent_executor import AgentFinish, CrewAgentExecutor
from crewai.constants import DEFAULT_LLM_MODEL
@@ -77,6 +78,51 @@ def test_agent_creation():
assert agent.backstory == "test backstory"
def test_agent_exposes_i18n_for_backward_compatibility():
from crewai.utilities.i18n import I18N_DEFAULT
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
i18n = agent.i18n
assert i18n is I18N_DEFAULT
assert isinstance(i18n.slice("role_playing"), str)
def test_agent_accepts_custom_i18n():
from crewai.utilities.i18n import I18N
prompt_file = os.path.join(
os.path.dirname(__file__), "..", "utilities", "prompts.json"
)
i18n = I18N(prompt_file=prompt_file)
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
i18n=i18n,
)
with pytest.warns(DeprecationWarning, match="Agent.i18n is deprecated"):
agent_i18n = agent.i18n
assert agent_i18n is i18n
assert agent_i18n.slice("role_playing") == "Lorem ipsum dolor sit amet"
def test_agent_copy_does_not_emit_i18n_deprecation_warning():
agent = Agent(role="test role", goal="test goal", backstory="test backstory")
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter("always", DeprecationWarning)
agent.copy()
assert not any(
"Agent.i18n is deprecated" in str(w.message) for w in caught_warnings
)
def test_agent_with_only_system_template():
"""Test that an agent with only system_template works without errors."""
agent = Agent(

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import threading
from typing import Any
from unittest.mock import patch
@@ -109,10 +110,79 @@ class TestCheckpointListenerOptsOut:
assert do_cp.call_count == 0
class TestFlowResumeReplaysEvents:
"""End-to-end: a resumed flow emits MethodExecution* events for completed methods."""
class TestCheckpointResumeReplaysEvents:
"""A flow resumed from a checkpoint replays MethodExecution* events for
completed methods and executes the pending ones. The checkpoint persists
the event record, which is reloaded into the per-run runtime state.
def test_resume_dispatches_completed_method_events(self, tmp_path) -> None:
``step_c`` is gated on a threading.Event so the flow is frozen with exactly
``step_a`` and ``step_b`` completed when the checkpoint is written — the
mid-run snapshot is deterministic rather than dependent on write timing.
"""
def test_resume_replays_completed_and_executes_pending(self, tmp_path) -> None:
from crewai.flow.flow import Flow, listen, start
from crewai.state.checkpoint_config import CheckpointConfig
at_step_c = threading.Event()
release = threading.Event()
captured: list[Any] = []
class ThreeStepFlow(Flow[dict]):
@start()
def step_a(self) -> str:
return "a"
@listen(step_a)
def step_b(self) -> str:
return "b"
@listen(step_b)
def step_c(self) -> str:
captured.append(crewai_event_bus.runtime_state)
at_step_c.set()
release.wait(timeout=10)
return "c"
runner = threading.Thread(target=ThreeStepFlow().kickoff)
runner.start()
try:
assert at_step_c.wait(timeout=10)
location = captured[0].checkpoint(str(tmp_path / "cp"))
finally:
release.set()
runner.join(timeout=10)
captured_started: list[str] = []
captured_finished: list[str] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _cs(_: Any, event: MethodExecutionStartedEvent) -> None:
captured_started.append(event.method_name)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def _cf(_: Any, event: MethodExecutionFinishedEvent) -> None:
captured_finished.append(event.method_name)
ThreeStepFlow().kickoff(
from_checkpoint=CheckpointConfig(restore_from=location)
)
assert captured_started == ["step_a", "step_b", "step_c"]
assert captured_finished == ["step_a", "step_b", "step_c"]
class TestPersistResumeDoesNotReplayCompletedEvents:
"""A @persist resume continues from pending methods only.
@persist stores flow state, not the event record, so completed-method
events have no persisted source to replay from. Runtime state is scoped
per run, so flow1's events are not visible to flow2.
"""
def test_persist_resume_executes_only_pending_methods(self, tmp_path) -> None:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
@@ -132,9 +202,6 @@ class TestFlowResumeReplaysEvents:
def step_c(self) -> str:
return "c"
if crewai_event_bus.runtime_state is not None:
crewai_event_bus.runtime_state.event_record.clear()
flow1 = ThreeStepFlow(persistence=persistence)
flow1.kickoff()
flow_id = flow1.state["id"]
@@ -157,9 +224,5 @@ class TestFlowResumeReplaysEvents:
flow2.kickoff(inputs={"id": flow_id})
assert captured_started.count("step_a") == 1
assert captured_started.count("step_b") == 1
assert captured_started.count("step_c") == 1
assert captured_finished.count("step_a") == 1
assert captured_finished.count("step_b") == 1
assert captured_finished.count("step_c") == 1
assert captured_started == ["step_c"]
assert captured_finished == ["step_c"]

View File

@@ -6,6 +6,7 @@ import pytest
from crewai import Agent, Crew, Task
from crewai.telemetry import Telemetry
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
@pytest.fixture(autouse=True)
@@ -53,6 +54,23 @@ def test_telemetry_enabled_by_default():
assert telemetry.ready is True
def test_set_tracer_skips_when_provider_already_configured():
"""A second telemetry instance must not re-install the global provider."""
with (
patch.dict(os.environ, {}, clear=True),
patch(
"crewai.telemetry.telemetry.trace.get_tracer_provider",
return_value=TracerProvider(),
),
patch("crewai.telemetry.telemetry.trace.set_tracer_provider") as mock_set,
):
telemetry = Telemetry()
telemetry.set_tracer()
mock_set.assert_not_called()
assert telemetry.trace_set is True
@patch("crewai.telemetry.telemetry.logger.error")
@patch(
"opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export",

View File

@@ -409,4 +409,31 @@ class TestRuntimeStateIntegration:
old_json, context={"from_checkpoint": True}
)
assert len(restored.root) == 1
assert len(restored.event_record) == 0
assert len(restored.event_record) == 0
def test_reset_runtime_state_clears_state_and_registry(self):
from crewai import Agent, Crew, RuntimeState
from crewai.events.event_bus import crewai_event_bus
if RuntimeState is None:
pytest.skip("RuntimeState unavailable (model_rebuild failed)")
agent = Agent(role="test", goal="test", backstory="test", llm="gpt-4o-mini")
crew = Crew(agents=[agent], tasks=[], verbose=False)
previous_state = crewai_event_bus._runtime_state
previous_ids = crewai_event_bus._registered_entity_ids
crewai_event_bus._runtime_state = None
crewai_event_bus._registered_entity_ids = set()
try:
crewai_event_bus.register_entity(crew)
assert crewai_event_bus.runtime_state is not None
assert crewai_event_bus._registered_entity_ids
crewai_event_bus.reset_runtime_state()
assert crewai_event_bus.runtime_state is None
assert crewai_event_bus._registered_entity_ids == set()
finally:
crewai_event_bus._runtime_state = previous_state
crewai_event_bus._registered_entity_ids = previous_ids

View File

@@ -26,7 +26,11 @@ from crewai.experimental import (
RouterConfig,
)
from crewai.flow import Flow, ChatState, listen, start
from crewai.flow.flow_context import current_flow_id, current_flow_name
from crewai.flow.flow_context import (
current_flow_defer_trace_finalization,
current_flow_id,
current_flow_name,
)
from crewai.flow.conversation import (
append_message,
get_conversation_messages,
@@ -598,9 +602,9 @@ class TestConversationalFlow:
"""Conversational flows: user ``@start`` methods finish before router fires.
Non-chat flows run ``@start`` methods in parallel via ``asyncio.gather``,
which would race with ``conversation_start`` and let the router fire
which would race with ``route_conversation`` and let the router fire
before user setup finished. In conversational mode the framework runs
them sequentially, with ``conversation_start`` last.
them sequentially, with ``route_conversation`` last.
"""
order: list[str] = []
@@ -643,15 +647,10 @@ class TestConversationalFlow:
assert "attach_bus" in order # still fires every turn
assert "route_turn" in order
def test_subclass_can_override_conversation_start_without_redecorating(
def test_subclass_can_override_conversation_start_helper(
self,
) -> None:
"""Overriding an inherited ``@start`` method must not unregister it.
Before the metaclass fix, subclasses had to re-apply ``@start()`` on
every override or the parent's ``conversation_start`` would silently
drop out of the start registry — leaving the flow with nothing to fire.
"""
"""The compatibility helper remains overridable without adding a Flow node."""
bootstrap_calls: list[str] = []
@@ -672,6 +671,38 @@ class TestConversationalFlow:
flow = BootstrapFlow()
flow.handle_turn("hi")
assert bootstrap_calls == ["ran"]
assert "conversation_start" not in BootstrapFlow.flow_definition().methods
route_definition = BootstrapFlow.flow_definition().methods["route_conversation"]
assert route_definition.start is True
assert route_definition.router is True
assert flow.state.messages[-1].content == "worked"
def test_legacy_decorated_conversation_start_runs_once_per_turn(
self,
) -> None:
"""Legacy ``@start`` overrides are not invoked again by the router."""
bootstrap_calls: list[str] = []
@ConversationConfig()
class BootstrapFlow(ConversationalFlow):
@start()
def conversation_start(self) -> str | None:
bootstrap_calls.append("ran")
return super().conversation_start()
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.append_assistant_message("worked")
return "worked"
flow = BootstrapFlow()
flow.handle_turn("hi")
assert bootstrap_calls == ["ran"]
assert flow.state.messages[-1].content == "worked"
@@ -1170,6 +1201,40 @@ class TestConversationalFlow:
"finalize_session_traces must finalize the trace batch once"
)
def test_deferred_resume_skips_per_resume_flow_finished_event(self) -> None:
"""Deferred sessions do not emit terminal events while resuming."""
from crewai.events.types.flow_events import FlowFinishedEvent
from crewai.flow.async_feedback.types import PendingFeedbackContext
class DeferredResumeFlow(Flow[ChatState]):
defer_trace_finalization = True
@start()
def begin(self) -> str:
return "started"
flow = DeferredResumeFlow()
flow._pending_feedback_context = PendingFeedbackContext(
flow_id=flow.flow_id,
flow_class="DeferredResumeFlow",
method_name="begin",
method_output="started",
message="Review",
)
finished_events: list[FlowFinishedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowFinishedEvent)
def capture(_: Any, event: FlowFinishedEvent) -> None:
finished_events.append(event)
flow.resume("approved")
crewai_event_bus.flush()
assert finished_events == []
def test_finalize_session_traces_restores_event_scope(self, capsys) -> None:
"""No ``empty scope stack`` warning when deferred ``flow_finished`` fires.
@@ -1471,6 +1536,44 @@ class TestDeferredFlowLifecycleEvents:
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
def test_deferred_flow_kickoff_marks_trace_manager_session_deferred(
self,
) -> None:
class DeferredTraceFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "done"
listener = TraceCollectionListener()
listener.batch_manager.defer_session_finalization = False
flow = DeferredTraceFlow()
flow.defer_trace_finalization = True
with patch.object(listener.batch_manager, "finalize_batch"):
flow.kickoff()
assert listener.batch_manager.defer_session_finalization is True
flow.finalize_session_traces()
assert listener.batch_manager.defer_session_finalization is False
def test_non_deferred_flow_kickoff_clears_stale_trace_manager_flag(
self,
) -> None:
class PlainTraceFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "done"
listener = TraceCollectionListener()
listener.batch_manager.defer_session_finalization = True
PlainTraceFlow().kickoff()
assert listener.batch_manager.defer_session_finalization is False
class TestNestedCrewTracing:
def test_is_inside_active_flow_context_when_kickoff_running(self) -> None:
@@ -1524,3 +1627,130 @@ class TestNestedCrewTracing:
elif listener.batch_manager.batch_owner_type == "crew":
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
def test_lazy_flow_batch_from_context_preserves_deferred_parent(self) -> None:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
flow_id_token = current_flow_id.set("parent-flow-id")
flow_name_token = current_flow_name.set("ParentChatFlow")
defer_token = current_flow_defer_trace_finalization.set(True)
try:
initialized = listener._try_initialize_flow_batch_from_context(
type("Event", (), {"timestamp": None})()
)
assert initialized is True
assert listener.batch_manager.batch_owner_type == "flow"
assert listener.batch_manager.batch_owner_id == "parent-flow-id"
assert listener.batch_manager.defer_session_finalization is True
assert listener.batch_manager.current_batch is not None
assert (
listener.batch_manager.current_batch.execution_metadata[
"execution_type"
]
== "flow"
)
assert (
listener.batch_manager.current_batch.execution_metadata["flow_name"]
== "ParentChatFlow"
)
finally:
current_flow_defer_trace_finalization.reset(defer_token)
current_flow_name.reset(flow_name_token)
current_flow_id.reset(flow_id_token)
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.trace_batch_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
def test_nested_agent_executor_flow_does_not_finalize_parent_batch(
self,
) -> None:
from crewai import Agent, Crew, Task
from crewai.llms.base_llm import BaseLLM
class StaticLLM(BaseLLM):
def __init__(self) -> None:
super().__init__(model="debug-static-llm", provider="debug")
def call(
self,
messages: Any,
tools: Any = None,
callbacks: Any = None,
available_functions: Any = None,
from_task: Any = None,
from_agent: Any = None,
response_model: Any = None,
) -> str:
return (
"Thought: I can answer directly.\n"
"Final Answer: nested crew result"
)
class NestedCrewFlow(Flow[ChatState]):
defer_trace_finalization = True
tracing = True
@start()
def begin(self) -> str:
return "run_nested_crew"
@listen(begin)
def run_nested_crew(self, _: str) -> str:
agent = Agent(
role="Debug Agent",
goal="Return a short deterministic result",
backstory="Used only for trace finalization debugging.",
llm=StaticLLM(),
verbose=False,
)
task = Task(
description="Return the deterministic nested crew result.",
expected_output="nested crew result",
agent=agent,
)
return Crew(agents=[agent], tasks=[task], verbose=False).kickoff().raw
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
listener.batch_manager.trace_batch_id = None
listener.batch_manager.defer_session_finalization = False
listener.batch_manager.event_buffer.clear()
listener.first_time_handler.is_first_time = False
def initialize_backend_batch(*_: Any, **__: Any) -> None:
listener.batch_manager.trace_batch_id = "debug-trace-batch"
flow = NestedCrewFlow()
with (
patch.object(
listener.batch_manager,
"_initialize_backend_batch",
side_effect=initialize_backend_batch,
),
patch.object(listener.batch_manager, "finalize_batch") as mock_finalize,
):
flow.kickoff()
crewai_event_bus.flush()
flow.kickoff()
crewai_event_bus.flush()
assert mock_finalize.call_count == 0, (
"nested AgentExecutor flows inside a deferred parent Flow must "
"not finalize the parent trace batch"
)

View File

@@ -223,10 +223,11 @@ def test_flow_definition_includes_conversational_builtins_when_enabled():
assert definition.conversational.enabled is True
assert definition.conversational.defer_trace_finalization is True
assert definition.conversational.builtin_routes == ["converse", "end"]
assert "conversation_start" in methods
assert "conversation_start" not in methods
assert "route_conversation" in methods
assert "converse_turn" in methods
assert methods["conversation_start"].start is True
assert methods["route_conversation"].start is True
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_conversational_config():
@@ -260,7 +261,7 @@ def test_flow_definition_serializes_conversational_config():
assert conversational.router.fallback_intent == "end"
def test_flow_definition_preserves_undecorated_conversational_override():
def test_flow_definition_uses_collapsed_conversational_router_start():
class ChatFlow(Flow):
conversational = True
@@ -269,8 +270,10 @@ def test_flow_definition_preserves_undecorated_conversational_override():
methods = ChatFlow.flow_definition().methods
assert methods["conversation_start"].start is True
assert "conversation_start" not in methods
assert "route_conversation" in methods
assert methods["route_conversation"].start is True
assert methods["route_conversation"].router is True
def test_flow_definition_serializes_human_feedback_metadata():

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.7a4"
__version__ = "1.14.7rc1"