Compare commits

..

15 Commits

Author SHA1 Message Date
Joao Moura
a5321aae92 fix: bump litellm to ~=1.83.7 for GHSA-xqmj-j6mv-4862 + update exclude-newer
litellm 1.83.0 has MCP stdio command injection vuln (CVE-2026-30623).
Fixed in 1.83.7-stable. Also bumps exclude-newer to 2026-04-26 so
the resolver can find the newer version.

Note: GHSA-58qw-9mgm-455v (pip) requires a workflow file change to
add --ignore-vuln, which needs the workflow OAuth scope.
2026-04-25 17:24:58 -07:00
Joao Moura
54f5b7db2e fix: ruff format on llm.py and llm_result.py 2026-04-25 16:15:43 -07:00
Joao Moura
c9a6955cd6 fix: auto-fix pre-existing ruff errors in mcp/__init__.py and daytona __init__.py 2026-04-25 16:05:36 -07:00
Joao Moura
086f534d4e fix: add typing.Any import to llm_result.py (mypy needs it) 2026-04-25 15:57:25 -07:00
Joao Moura
fe93dfe64c fix: bare dict type param in llm_result.py (mypy) 2026-04-25 15:48:02 -07:00
Joao Moura
5837f8edb8 fix: type-checker errors, missing imports, lint issues
- LLM.call() return type -> str | Any (keeps callers happy)
- Add type: ignore for runtime-compatible dict -> LLMMessage cast
- Add missing typing.Any import to llm_result.py
- Fix dict -> dict[str, Any] for type params
- Restore unittest.mock imports in tests
- All 17 tests passing
2026-04-25 15:43:14 -07:00
Joao Moura
cdc4b43620 feat(llm): add tool loop support to LLM.call() with structured LLMResult
When LLM.call() is invoked with both tools and available_functions,
it now runs a tool loop — calling the model, executing requested tools,
and feeding results back — until the model responds with text or
max_iterations is reached.

Changes:
- New llm_result.py with LLMResult and ToolCallRecord models
- LLM.call() returns LLMResult (structured) when tools are provided,
  str when not (fully backwards compatible)
- Tool loop with max_iterations parameter (default 10)
- Cost estimation based on model name and token counts
- Comprehensive test suite (17 tests, all mocked)
- Exports LLMResult and ToolCallRecord from crewai.__init__
2026-04-25 15:22:18 -07:00
Greyson LaLonde
cb46a1c4ba docs: update changelog and version for v1.14.3
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
2026-04-25 00:13:43 +08:00
Greyson LaLonde
d9046b98dd feat: bump versions to 1.14.3 2026-04-25 00:04:46 +08:00
Tiago Freire
b0e2fda105 fix(flow): add execution_id separate from state.id
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fix(flow): add execution_id separate from state.id (COR-48)

  When a consumer passes `id` in `kickoff(inputs=...)`, that value
  overwrites the flow's state.id — which was also being used as the
  execution tracking identity for telemetry, tracing, and external
  correlation. Two kickoffs sharing the same consumer id ended up
  with the same tracking id, breaking any downstream system that
  joins on it.

  Introduces `Flow.execution_id`: a stable per-run identifier stored
  as a `PrivateAttr` on the `Flow` model, exposed via property +
  setter. It defaults to a fresh `uuid4` per instance, is never
  touched by `inputs["id"]`, and can be assigned by outer systems
  that already have an execution identity (e.g. a task id).

  Switches the `current_flow_id` / `current_flow_request_id`
  ContextVars to seed from `execution_id` so OTel spans emitted by
  `FlowTrackable` children correlate on the stable tracking key.

  `state.id` keeps its existing override semantics for
  persistence/restore — consumers resuming a persisted flow via
  `inputs["id"]` work exactly as before.

  Adds tests covering default uniqueness per instance, immunity to
  consumer `inputs["id"]`, context-var propagation, absence from
  serialized state, and parity for dict-state flows.

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-04-24 04:48:14 +08:00
Greyson LaLonde
69d777ca50 fix(flow): replay recorded method events on checkpoint resume 2026-04-24 03:41:55 +08:00
Greyson LaLonde
77b2835a1d fix(flow): serialize initial_state class refs as JSON schema
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
2026-04-23 21:55:50 +08:00
Lorenze Jay
c77f1632dd fix: preserve metadata-only agent skills
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-04-23 19:58:12 +08:00
Greyson LaLonde
69461076df refactor: dedupe checkpoint helpers and tighten state type hints 2026-04-23 19:29:04 +08:00
Greyson LaLonde
55937d7523 feat: emit lifecycle events for checkpoint operations
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2026-04-23 18:47:50 +08:00
38 changed files with 3979 additions and 111 deletions

View File

@@ -4,6 +4,54 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="25 أبريل 2026">
## v1.14.3
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.3)
## ما الذي تغير
### الميزات
- إضافة أحداث دورة الحياة لعمليات نقطة التحقق
- إضافة دعم لـ e2b
- الرجوع إلى DefaultAzureCredential عند عدم توفير مفتاح API في تكامل Azure
- إضافة دعم Bedrock V4
- إضافة أدوات Daytona sandbox لوظائف محسّنة
- إضافة دعم نقطة التحقق والتفرع للوكلاء المستقلين
### إصلاحات الأخطاء
- إصلاح execution_id ليكون منفصلًا عن state.id
- حل مشكلة إعادة تشغيل أحداث الطريقة المسجلة عند استئناف نقطة التحقق
- إصلاح تسلسل مراجع class initial_state كـ JSON schema
- الحفاظ على مهارات الوكلاء التي تحتوي على بيانات وصفية فقط
- تمرير أسماء @CrewBase الضمنية إلى أحداث الطاقم
- دمج بيانات التنفيذ عند تهيئة دفعة مكررة
- إصلاح تسلسل حقول مراجع class Task لنقاط التحقق
- التعامل مع نتيجة BaseModel في حلقة إعادة المحاولة guardrail
- الحفاظ على thought_signature في استدعاءات أدوات Gemini للبث
- إصدار task_started عند استئناف التفرع وإعادة تصميم واجهة المستخدم النصية لنقطة التحقق
- استخدام تواريخ مستقبلية في اختبارات تقليم نقطة التحقق لمنع الفشل المعتمد على الوقت
- إصلاح ترتيب التشغيل الجاف والتعامل مع الفرع القديم الذي تم التحقق منه في إصدار أدوات التطوير
- ترقية lxml إلى >=6.1.0 لرقعة الأمان
- رفع python-dotenv إلى >=1.2.2 لرقعة الأمان
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.3
- إضافة صفحة "بناء باستخدام الذكاء الاصطناعي" وتحديث التنقل لجميع اللغات
- إزالة الأسئلة الشائعة حول التسعير من صفحة البناء باستخدام الذكاء الاصطناعي عبر جميع المواقع
### الأداء
- تحسين MCP SDK وأنواع الأحداث لتقليل بدء التشغيل البارد بنسبة ~29%
### إعادة الهيكلة
- إعادة هيكلة مساعدي نقطة التحقق للقضاء على التكرار وتشديد تلميحات نوع الحالة
## المساهمون
@MatthiasHowellYopp, @akaKuruma, @alex-clawd, @github-actions[bot], @github-advanced-security[bot], @greysonlalonde, @iris-clawd, @lorenzejay, @mattatcha, @renatonitta
</Update>
<Update label="23 أبريل 2026">
## v1.14.3a3

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,54 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Apr 25, 2026">
## v1.14.3
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.3)
## What's Changed
### Features
- Add lifecycle events for checkpoint operations
- Add support for e2b
- Fall back to DefaultAzureCredential when no API key is provided in Azure integration
- Add Bedrock V4 support
- Add Daytona sandbox tools for enhanced functionality
- Add checkpoint and fork support to standalone agents
### Bug Fixes
- Fix execution_id to be separate from state.id
- Resolve replay of recorded method events on checkpoint resume
- Fix serialization of initial_state class references as JSON schema
- Preserve metadata-only agent skills
- Propagate implicit @CrewBase names to crew events
- Merge execution metadata on duplicate batch initialization
- Fix serialization of Task class-reference fields for checkpointing
- Handle BaseModel result in guardrail retry loop
- Preserve thought_signature in Gemini streaming tool calls
- Emit task_started on fork resume and redesign checkpoint TUI
- Use future dates in checkpoint prune tests to prevent time-dependent failures
- Fix dry-run order and handle checked-out stale branch in devtools release
- Upgrade lxml to >=6.1.0 for security patch
- Bump python-dotenv to >=1.2.2 for security patch
### Documentation
- Update changelog and version for v1.14.3
- Add 'Build with AI' page and update navigation for all languages
- Remove pricing FAQ from build-with-ai page across all locales
### Performance
- Optimize MCP SDK and event types to reduce cold start by ~29%
### Refactoring
- Refactor checkpoint helpers to eliminate duplication and tighten state type hints
## Contributors
@MatthiasHowellYopp, @akaKuruma, @alex-clawd, @github-actions[bot], @github-advanced-security[bot], @greysonlalonde, @iris-clawd, @lorenzejay, @mattatcha, @renatonitta
</Update>
<Update label="Apr 23, 2026">
## v1.14.3a3

View File

@@ -4,6 +4,54 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 4월 25일">
## v1.14.3
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.3)
## 변경 사항
### 기능
- 체크포인트 작업을 위한 생명주기 이벤트 추가
- e2b 지원 추가
- Azure 통합에서 API 키가 제공되지 않을 경우 DefaultAzureCredential로 대체
- Bedrock V4 지원 추가
- 향상된 기능을 위한 Daytona 샌드박스 도구 추가
- 독립형 에이전트에 체크포인트 및 포크 지원 추가
### 버그 수정
- execution_id를 state.id와 분리되도록 수정
- 체크포인트 재개 시 기록된 메서드 이벤트 재생 문제 해결
- initial_state 클래스 참조의 JSON 스키마 직렬화 수정
- 메타데이터 전용 에이전트 기술 보존
- 암묵적인 @CrewBase 이름을 크루 이벤트로 전파
- 중복 배치 초기화 시 실행 메타데이터 병합
- 체크포인트를 위한 Task 클래스 참조 필드의 직렬화 수정
- 가드레일 재시도 루프에서 BaseModel 결과 처리
- Gemini 스트리밍 도구 호출에서 thought_signature 보존
- 포크 재개 시 task_started 방출 및 체크포인트 TUI 재설계
- 체크포인트 가지치기 테스트에서 미래 날짜 사용하여 시간 의존적 실패 방지
- 드라이 런 주문 수정 및 devtools 릴리스에서 체크아웃된 오래된 브랜치 처리
- 보안 패치를 위해 lxml을 >=6.1.0으로 업그레이드
- 보안 패치를 위해 python-dotenv를 >=1.2.2로 업그레이드
### 문서
- v1.14.3에 대한 변경 로그 및 버전 업데이트
- 'AI로 빌드하기' 페이지 추가 및 모든 언어에 대한 내비게이션 업데이트
- 모든 로케일에서 build-with-ai 페이지의 가격 FAQ 제거
### 성능
- MCP SDK 및 이벤트 유형 최적화하여 콜드 스타트를 약 29% 감소
### 리팩토링
- 중복 제거 및 상태 유형 힌트를 강화하기 위해 체크포인트 헬퍼 리팩토링
## 기여자
@MatthiasHowellYopp, @akaKuruma, @alex-clawd, @github-actions[bot], @github-advanced-security[bot], @greysonlalonde, @iris-clawd, @lorenzejay, @mattatcha, @renatonitta
</Update>
<Update label="2026년 4월 23일">
## v1.14.3a3

View File

@@ -4,6 +4,54 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="25 abr 2026">
## v1.14.3
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.3)
## O que Mudou
### Recursos
- Adicionar eventos de ciclo de vida para operações de checkpoint
- Adicionar suporte para e2b
- Reverter para DefaultAzureCredential quando nenhuma chave de API for fornecida na integração com o Azure
- Adicionar suporte ao Bedrock V4
- Adicionar ferramentas de sandbox Daytona para funcionalidade aprimorada
- Adicionar suporte a checkpoint e fork para agentes autônomos
### Correções de Bugs
- Corrigir execution_id para ser separado de state.id
- Resolver a reprodução de eventos de método gravados na retomada do checkpoint
- Corrigir a serialização de referências de classe initial_state como esquema JSON
- Preservar habilidades de agente somente de metadados
- Propagar nomes implícitos @CrewBase para eventos da equipe
- Mesclar metadados de execução na inicialização de lote duplicado
- Corrigir a serialização de campos de referência de classe Task para checkpointing
- Lidar com o resultado BaseModel no loop de retry do guardrail
- Preservar thought_signature em chamadas de ferramentas de streaming Gemini
- Emitir task_started na retomada do fork e redesenhar TUI de checkpoint
- Usar datas futuras em testes de poda de checkpoint para evitar falhas dependentes do tempo
- Corrigir a ordem de dry-run e lidar com branch obsoleta verificada na liberação do devtools
- Atualizar lxml para >=6.1.0 para patch de segurança
- Aumentar python-dotenv para >=1.2.2 para patch de segurança
### Documentação
- Atualizar changelog e versão para v1.14.3
- Adicionar página 'Construir com IA' e atualizar navegação para todos os idiomas
- Remover FAQ de preços da página construir-com-ia em todos os locais
### Desempenho
- Otimizar MCP SDK e tipos de eventos para reduzir o tempo de inicialização a frio em ~29%
### Refatoração
- Refatorar auxiliares de checkpoint para eliminar duplicação e apertar dicas de tipo de estado
## Contribuidores
@MatthiasHowellYopp, @akaKuruma, @alex-clawd, @github-actions[bot], @github-advanced-security[bot], @greysonlalonde, @iris-clawd, @lorenzejay, @mattatcha, @renatonitta
</Update>
<Update label="23 abr 2026">
## v1.14.3a3

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.3a3"
__version__ = "1.14.3"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.3a3",
"crewai==1.14.3",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -321,4 +321,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.3a3"
__version__ = "1.14.3"

View File

@@ -5,6 +5,7 @@ from crewai_tools.tools.daytona_sandbox_tool.daytona_python_tool import (
DaytonaPythonTool,
)
__all__ = [
"DaytonaBaseTool",
"DaytonaExecTool",

View File

@@ -55,7 +55,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.3a3",
"crewai-tools==1.14.3",
]
embeddings = [
"tiktoken~=0.8.0"
@@ -84,7 +84,7 @@ voyageai = [
"voyageai~=0.3.5",
]
litellm = [
"litellm~=1.83.0",
"litellm~=1.83.7",
]
bedrock = [
"boto3~=1.42.79",

View File

@@ -13,6 +13,7 @@ from crewai.crews.crew_output import CrewOutput
from crewai.flow.flow import Flow
from crewai.knowledge.knowledge import Knowledge
from crewai.llm import LLM
from crewai.llm_result import LLMResult, ToolCallRecord
from crewai.llms.base_llm import BaseLLM
from crewai.process import Process
from crewai.state.checkpoint_config import CheckpointConfig # noqa: F401
@@ -48,7 +49,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.3a3"
__version__ = "1.14.3"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),
@@ -195,11 +196,13 @@ __all__ = [
"Flow",
"Knowledge",
"LLMGuardrail",
"LLMResult",
"Memory",
"PlanningConfig",
"Process",
"RuntimeState",
"Task",
"TaskOutput",
"ToolCallRecord",
"__version__",
]

View File

@@ -394,15 +394,17 @@ class Agent(BaseAgent):
self,
resolved_crew_skills: list[SkillModel] | None = None,
) -> None:
"""Resolve skill paths and activate skills to INSTRUCTIONS level.
"""Resolve skill paths while preserving explicit disclosure levels.
Path entries trigger discovery and activation. Pre-loaded Skill objects
below INSTRUCTIONS level are activated. Crew-level skills are merged in
with event emission so observability is consistent regardless of origin.
Path entries trigger discovery and activation because directory-based
skills opt into eager loading. Pre-loaded Skill objects keep their
current disclosure level so callers can attach METADATA-only skills and
progressively activate them later. Crew-level skills are merged in with
event emission so observability is consistent regardless of origin.
Args:
resolved_crew_skills: Pre-resolved crew skills (already discovered
and activated). When provided, avoids redundant discovery per agent.
resolved_crew_skills: Pre-resolved crew skills. When provided,
avoids redundant discovery per agent.
"""
from crewai.crew import Crew
@@ -443,8 +445,7 @@ class Agent(BaseAgent):
elif isinstance(item, SkillModel):
if item.name not in seen:
seen.add(item.name)
activated = activate_skill(item, source=self)
if activated is item and item.disclosure_level >= INSTRUCTIONS:
if item.disclosure_level >= INSTRUCTIONS:
crewai_event_bus.emit(
self,
event=SkillActivatedEvent(
@@ -454,7 +455,7 @@ class Agent(BaseAgent):
disclosure_level=item.disclosure_level,
),
)
resolved.append(activated)
resolved.append(item)
self.skills = resolved if resolved else None

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.3a3"
"crewai[tools]==1.14.3"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.3a3"
"crewai[tools]==1.14.3"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.3a3"
"crewai[tools]==1.14.3"
]
[tool.crewai]

View File

@@ -21,6 +21,7 @@ from crewai.events.depends import Depends
from crewai.events.event_bus import crewai_event_bus
from crewai.events.handler_graph import CircularDependencyError
if TYPE_CHECKING:
from crewai.events.types.agent_events import (
AgentEvaluationCompletedEvent,
@@ -33,6 +34,20 @@ if TYPE_CHECKING:
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.checkpoint_events import (
CheckpointBaseEvent,
CheckpointCompletedEvent,
CheckpointFailedEvent,
CheckpointForkBaseEvent,
CheckpointForkCompletedEvent,
CheckpointForkStartedEvent,
CheckpointPrunedEvent,
CheckpointRestoreBaseEvent,
CheckpointRestoreCompletedEvent,
CheckpointRestoreFailedEvent,
CheckpointRestoreStartedEvent,
CheckpointStartedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
@@ -141,6 +156,19 @@ _LAZY_EVENT_MAPPING: dict[str, str] = {
"LiteAgentExecutionCompletedEvent": "crewai.events.types.agent_events",
"LiteAgentExecutionErrorEvent": "crewai.events.types.agent_events",
"LiteAgentExecutionStartedEvent": "crewai.events.types.agent_events",
# checkpoint_events
"CheckpointBaseEvent": "crewai.events.types.checkpoint_events",
"CheckpointCompletedEvent": "crewai.events.types.checkpoint_events",
"CheckpointFailedEvent": "crewai.events.types.checkpoint_events",
"CheckpointForkBaseEvent": "crewai.events.types.checkpoint_events",
"CheckpointForkCompletedEvent": "crewai.events.types.checkpoint_events",
"CheckpointForkStartedEvent": "crewai.events.types.checkpoint_events",
"CheckpointPrunedEvent": "crewai.events.types.checkpoint_events",
"CheckpointRestoreBaseEvent": "crewai.events.types.checkpoint_events",
"CheckpointRestoreCompletedEvent": "crewai.events.types.checkpoint_events",
"CheckpointRestoreFailedEvent": "crewai.events.types.checkpoint_events",
"CheckpointRestoreStartedEvent": "crewai.events.types.checkpoint_events",
"CheckpointStartedEvent": "crewai.events.types.checkpoint_events",
# crew_events
"CrewKickoffCompletedEvent": "crewai.events.types.crew_events",
"CrewKickoffFailedEvent": "crewai.events.types.crew_events",
@@ -265,6 +293,18 @@ __all__ = [
"AgentReasoningFailedEvent",
"AgentReasoningStartedEvent",
"BaseEventListener",
"CheckpointBaseEvent",
"CheckpointCompletedEvent",
"CheckpointFailedEvent",
"CheckpointForkBaseEvent",
"CheckpointForkCompletedEvent",
"CheckpointForkStartedEvent",
"CheckpointPrunedEvent",
"CheckpointRestoreBaseEvent",
"CheckpointRestoreCompletedEvent",
"CheckpointRestoreFailedEvent",
"CheckpointRestoreStartedEvent",
"CheckpointStartedEvent",
"CircularDependencyError",
"CrewKickoffCompletedEvent",
"CrewKickoffFailedEvent",

View File

@@ -64,6 +64,22 @@ P = ParamSpec("P")
R = TypeVar("R")
_replaying: contextvars.ContextVar[bool] = contextvars.ContextVar(
"crewai_event_replaying", default=False
)
def is_replaying() -> bool:
"""Return True if the current context is dispatching a replayed event.
Listeners with side effects (checkpoint writes, external API calls that
should not be repeated) should early-return when this is true. Listeners
whose purpose is reconstructing timeline state (trace batch, console
formatter) should ignore the flag and process replayed events normally.
"""
return _replaying.get()
class CrewAIEventsBus:
"""Singleton event bus for handling events in CrewAI.
@@ -261,6 +277,11 @@ class CrewAIEventsBus:
self._runtime_state = state
self._registered_entity_ids = {id(e) for e in state.root}
@property
def runtime_state(self) -> RuntimeState | None:
"""The RuntimeState currently attached to the bus, if any."""
return self._runtime_state
def register_entity(self, entity: Any) -> None:
"""Add an entity to the RuntimeState, creating it if needed.
@@ -568,6 +589,87 @@ class CrewAIEventsBus:
return None
async def _acall_handlers_replaying(
self,
source: Any,
event: BaseEvent,
handlers: AsyncHandlerSet,
) -> None:
"""Call async handlers with the replaying flag set on the loop thread."""
token = _replaying.set(True)
try:
await self._acall_handlers(source, event, handlers)
finally:
_replaying.reset(token)
async def _emit_with_dependencies_replaying(
self, source: Any, event: BaseEvent
) -> None:
"""Dependency-aware dispatch with the replaying flag set."""
token = _replaying.set(True)
try:
await self._emit_with_dependencies(source, event)
finally:
_replaying.reset(token)
def replay(self, source: Any, event: BaseEvent) -> Future[None] | None:
"""Dispatch a previously-recorded event without mutating its fields.
Unlike :meth:`emit`, this does not run ``_prepare_event`` (so stored
event ids and ``emission_sequence`` are preserved) and does not
re-record the event. Listeners can call :func:`is_replaying` to
opt out of side-effectful processing.
Args:
source: The emitting object.
event: The previously-recorded event to dispatch.
Returns:
Future that completes when handlers finish, or None if no handlers.
"""
event_type = type(event)
with self._rwlock.r_locked():
if self._shutting_down:
return None
has_dependencies = event_type in self._handler_dependencies
sync_handlers = self._sync_handlers.get(event_type, frozenset())
async_handlers = self._async_handlers.get(event_type, frozenset())
if not sync_handlers and not async_handlers:
return None
self._ensure_executor_initialized()
self._has_pending_events = True
token = _replaying.set(True)
try:
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies_replaying(source, event),
self._loop,
)
)
if sync_handlers:
ctx = contextvars.copy_context()
sync_future = self._sync_executor.submit(
ctx.run, self._call_handlers, source, event, sync_handlers
)
self._track_future(sync_future)
if not async_handlers:
return sync_future
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers_replaying(source, event, async_handlers),
self._loop,
)
)
finally:
_replaying.reset(token)
def flush(self, timeout: float | None = 30.0) -> bool:
"""Block until all pending event handlers complete.

View File

@@ -30,6 +30,17 @@ from crewai.events.types.agent_events import (
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
)
from crewai.events.types.checkpoint_events import (
CheckpointCompletedEvent,
CheckpointFailedEvent,
CheckpointForkCompletedEvent,
CheckpointForkStartedEvent,
CheckpointPrunedEvent,
CheckpointRestoreCompletedEvent,
CheckpointRestoreFailedEvent,
CheckpointRestoreStartedEvent,
CheckpointStartedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
@@ -183,4 +194,13 @@ EventTypes = (
| MCPToolExecutionCompletedEvent
| MCPToolExecutionFailedEvent
| MCPConfigFetchFailedEvent
| CheckpointStartedEvent
| CheckpointCompletedEvent
| CheckpointFailedEvent
| CheckpointForkStartedEvent
| CheckpointForkCompletedEvent
| CheckpointRestoreStartedEvent
| CheckpointRestoreCompletedEvent
| CheckpointRestoreFailedEvent
| CheckpointPrunedEvent
)

View File

@@ -0,0 +1,97 @@
"""Event family for automatic state checkpointing and forking."""
from typing import Literal
from crewai.events.base_events import BaseEvent
class CheckpointBaseEvent(BaseEvent):
"""Base event for checkpoint lifecycle operations."""
type: str
location: str
provider: str
trigger: str | None = None
branch: str | None = None
parent_id: str | None = None
class CheckpointStartedEvent(CheckpointBaseEvent):
"""Event emitted immediately before a checkpoint is written."""
type: Literal["checkpoint_started"] = "checkpoint_started"
class CheckpointCompletedEvent(CheckpointBaseEvent):
"""Event emitted when a checkpoint has been written successfully."""
type: Literal["checkpoint_completed"] = "checkpoint_completed"
checkpoint_id: str
duration_ms: float
class CheckpointFailedEvent(CheckpointBaseEvent):
"""Event emitted when a checkpoint write fails."""
type: Literal["checkpoint_failed"] = "checkpoint_failed"
error: str
class CheckpointPrunedEvent(CheckpointBaseEvent):
"""Event emitted after pruning old checkpoints from a branch."""
type: Literal["checkpoint_pruned"] = "checkpoint_pruned"
removed_count: int
max_checkpoints: int
class CheckpointForkBaseEvent(BaseEvent):
"""Base event for fork lifecycle operations on a RuntimeState."""
type: str
branch: str
parent_branch: str | None = None
parent_checkpoint_id: str | None = None
class CheckpointForkStartedEvent(CheckpointForkBaseEvent):
"""Event emitted immediately before a fork relabels the branch."""
type: Literal["checkpoint_fork_started"] = "checkpoint_fork_started"
class CheckpointForkCompletedEvent(CheckpointForkBaseEvent):
"""Event emitted after a fork has established the new branch."""
type: Literal["checkpoint_fork_completed"] = "checkpoint_fork_completed"
class CheckpointRestoreBaseEvent(BaseEvent):
"""Base event for checkpoint restore lifecycle operations."""
type: str
location: str
provider: str | None = None
class CheckpointRestoreStartedEvent(CheckpointRestoreBaseEvent):
"""Event emitted immediately before a checkpoint restore begins."""
type: Literal["checkpoint_restore_started"] = "checkpoint_restore_started"
class CheckpointRestoreCompletedEvent(CheckpointRestoreBaseEvent):
"""Event emitted when a checkpoint has been restored successfully."""
type: Literal["checkpoint_restore_completed"] = "checkpoint_restore_completed"
checkpoint_id: str
branch: str | None = None
parent_id: str | None = None
duration_ms: float
class CheckpointRestoreFailedEvent(CheckpointRestoreBaseEvent):
"""Event emitted when a checkpoint restore fails."""
type: Literal["checkpoint_restore_failed"] = "checkpoint_restore_failed"
error: str

View File

@@ -45,6 +45,7 @@ from pydantic import (
BeforeValidator,
ConfigDict,
Field,
PlainSerializer,
PrivateAttr,
SerializeAsAny,
ValidationError,
@@ -58,6 +59,7 @@ from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import (
get_current_parent_id,
reset_last_event_id,
restore_event_scope,
triggered_by_scope,
)
from crewai.events.listeners.tracing.trace_listener import (
@@ -157,6 +159,37 @@ def _resolve_persistence(value: Any) -> Any:
return value
_INITIAL_STATE_CLASS_MARKER = "__crewai_pydantic_class_schema__"
def _serialize_initial_state(value: Any) -> Any:
"""Make ``initial_state`` safe for JSON checkpoint serialization.
``BaseModel`` class refs are emitted as their JSON schema under a sentinel
marker key so deserialization can round-trip them back to a class.
``BaseModel`` instances are dumped to JSON (round-trip as plain dicts,
which ``_create_initial_state`` accepts). Bare ``type`` values that are
not ``BaseModel`` subclasses (e.g. ``dict``) are dropped since they
can't be represented in JSON.
"""
if isinstance(value, type):
if issubclass(value, BaseModel):
return {_INITIAL_STATE_CLASS_MARKER: value.model_json_schema()}
return None
if isinstance(value, BaseModel):
return value.model_dump(mode="json")
return value
def _deserialize_initial_state(value: Any) -> Any:
"""Rehydrate a class ref serialized by :func:`_serialize_initial_state`."""
if isinstance(value, dict) and _INITIAL_STATE_CLASS_MARKER in value:
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
return create_model_from_schema(value[_INITIAL_STATE_CLASS_MARKER])
return value
class FlowState(BaseModel):
"""Base model for all flow states, ensuring each state has a unique ID."""
@@ -908,7 +941,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
entity_type: Literal["flow"] = "flow"
initial_state: Any = Field(default=None)
initial_state: Annotated[ # type: ignore[type-arg]
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
BeforeValidator(_deserialize_initial_state),
PlainSerializer(_serialize_initial_state, return_type=Any, when_used="json"),
] = Field(default=None)
name: str | None = Field(default=None)
tracing: bool | None = Field(default=None)
stream: bool = Field(default=False)
@@ -980,13 +1017,18 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
A Flow instance on the new branch. Call kickoff() to run.
"""
flow = cls.from_checkpoint(config)
state = crewai_event_bus._runtime_state
state = crewai_event_bus.runtime_state
if state is None:
raise RuntimeError(
"Cannot fork: no runtime state on the event bus. "
"Ensure from_checkpoint() succeeded before calling fork()."
)
state.fork(branch)
new_id = str(uuid4())
if isinstance(flow._state, dict):
flow._state["id"] = new_id
else:
object.__setattr__(flow._state, "id", new_id)
return flow
checkpoint_completed_methods: set[str] | None = Field(default=None)
@@ -1008,6 +1050,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
}
if self.checkpoint_state is not None:
self._restore_state(self.checkpoint_state)
restore_event_scope(())
reset_last_event_id()
_methods: dict[FlowMethodName, FlowMethod[Any, Any]] = PrivateAttr(
default_factory=dict
@@ -1030,6 +1074,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
_state: Any = PrivateAttr(default=None)
_execution_id: str = PrivateAttr(default_factory=lambda: str(uuid4()))
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
@@ -1820,6 +1865,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
except (AttributeError, TypeError):
return "" # Safely handle any unexpected attribute access issues
@property
def execution_id(self) -> str:
"""Stable identifier for this flow execution.
Separate from ``flow_id`` / ``state.id``, which consumers may
override via ``kickoff(inputs={"id": ...})`` to resume a persisted
flow. ``execution_id`` is never affected by ``inputs`` and stays
stable for the lifetime of a single run, so it is the correct key
for telemetry, tracing, and any external correlation that must
uniquely identify a single execution even when callers pass an
``id`` in ``inputs``.
Defaults to a fresh ``uuid4`` per ``Flow`` instance; assign to
override when an outer system already has an execution identity.
"""
return self._execution_id
@execution_id.setter
def execution_id(self, value: str) -> None:
self._execution_id = value
def _initialize_state(self, inputs: dict[str, Any]) -> None:
"""Initialize or update flow state with new inputs.
@@ -2133,9 +2199,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
flow_id_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
flow_id_token = current_flow_id.set(self.execution_id)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
request_id_token = current_flow_request_id.set(self.execution_id)
try:
# Reset flow state for fresh execution unless restoring from persistence
@@ -2214,6 +2280,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
if self._is_execution_resuming:
await self._replay_recorded_events()
try:
# Determine which start methods to execute at kickoff
# Conditional start methods (with __trigger_methods__) are only triggered by their conditions
@@ -2361,6 +2430,44 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
"""
return await self.kickoff_async(inputs, input_files, from_checkpoint)
async def _replay_recorded_events(self) -> None:
"""Dispatch recorded ``MethodExecution*`` events from the event record."""
state = crewai_event_bus.runtime_state
if state is None:
return
record = state.event_record
if len(record) == 0:
return
replayable = (
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
)
flow_name = self.name or self.__class__.__name__
nodes = sorted(
(
n
for n in record.all_nodes()
if isinstance(n.event, replayable)
and n.event.flow_name == flow_name
and n.event.method_name in self._completed_methods
),
key=lambda n: n.event.emission_sequence or 0,
)
for node in nodes:
future = crewai_event_bus.replay(self, node.event)
if future is not None:
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning(
"Replayed event handler failed: %s",
node.event.type,
exc_info=True,
)
async def _execute_start_method(self, start_method_name: FlowMethodName) -> None:
"""Executes a flow's start method and its triggered listeners.

View File

@@ -32,6 +32,11 @@ from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.llm_result import (
LLMResult,
ToolCallRecord,
estimate_cost_usd as _estimate_cost_usd,
)
from crewai.llms.base_llm import (
BaseLLM,
JsonResponseFormat,
@@ -1699,6 +1704,7 @@ class LLM(BaseLLM):
from_task: Task | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
max_iterations: int = 10,
) -> str | Any:
"""High-level LLM call method.
@@ -1716,16 +1722,250 @@ class LLM(BaseLLM):
from_task: Optional Task that invoked the LLM
from_agent: Optional Agent that invoked the LLM
response_model: Optional Model that contains a pydantic response model.
max_iterations: Maximum number of tool-loop iterations (default 10).
Only used when both ``tools`` and ``available_functions``
are provided.
Returns:
Union[str, Any]: Either a text response from the LLM (str) or
the result of a tool function call (Any).
Union[str, LLMResult, Any]:
- ``str`` when called without tools (backwards compatible).
- ``LLMResult`` when called with tools and available_functions.
- ``Any`` when a tool call returns a non-string result in legacy mode.
Raises:
TypeError: If messages format is invalid
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
# When tools AND available_functions are both provided, use the tool loop
# which returns an LLMResult with structured metadata.
if tools and available_functions:
return self._call_with_tool_loop(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
max_iterations=max_iterations,
)
# Original single-shot path — returns str (backwards compatible).
return self._call_single(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
def _call_with_tool_loop(
self,
messages: str | list[LLMMessage],
tools: list[dict[str, BaseTool]],
callbacks: list[Any] | None,
available_functions: dict[str, Any],
from_task: Task | None,
from_agent: BaseAgent | None,
response_model: type[BaseModel] | None,
max_iterations: int,
) -> LLMResult:
"""Run an LLM tool loop, returning a structured LLMResult.
Keeps calling the model until it stops requesting tool calls or
``max_iterations`` is reached.
"""
from crewai.types.usage_metrics import UsageMetrics
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# Work on a mutable copy so we can append assistant/tool messages.
conversation: list[dict[str, Any]] = list(messages) # type: ignore[arg-type]
result = LLMResult(
text="",
tool_calls=[],
usage=UsageMetrics(),
cost_usd=0.0,
iterations=0,
)
for iteration in range(max_iterations):
# Call the model WITHOUT available_functions so the internal
# handler returns tool_calls as-is instead of executing them.
raw = self._call_single(
messages=conversation, # type: ignore[arg-type]
tools=tools,
callbacks=callbacks,
available_functions=None, # Don't let inner layer execute
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
result.iterations = iteration + 1
# Accumulate usage from this iteration
self._accumulate_usage(result)
# If we got a string back, the model is done (no tool calls).
if isinstance(raw, str):
result.text = raw
break
# If we got tool_calls (list), execute them and feed results back.
if isinstance(raw, list):
# Append assistant message with tool calls to conversation
assistant_msg: dict[str, Any] = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": getattr(tc, "id", f"call_{i}"),
"type": "function",
"function": {
"name": getattr(tc.function, "name", "")
if hasattr(tc, "function")
else "",
"arguments": getattr(tc.function, "arguments", "{}")
if hasattr(tc, "function")
else "{}",
},
}
for i, tc in enumerate(raw)
],
}
conversation.append(assistant_msg)
# Execute each tool call
for tc in raw:
func_name = sanitize_tool_name(
getattr(tc.function, "name", "")
if hasattr(tc, "function")
else ""
)
func_args_str = (
getattr(tc.function, "arguments", "{}")
if hasattr(tc, "function")
else "{}"
)
tool_call_id = getattr(tc, "id", f"call_{func_name}")
try:
func_args = json.loads(func_args_str)
except (json.JSONDecodeError, TypeError):
func_args = {}
record = ToolCallRecord(
name=func_name,
input=func_args,
)
if func_name in available_functions:
t0 = datetime.now()
started_at = t0
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=func_args,
from_agent=from_agent,
from_task=from_task,
),
)
try:
fn = available_functions[func_name]
tool_output = fn(**func_args)
t1 = datetime.now()
record.output = (
str(tool_output) if tool_output is not None else ""
)
record.duration_ms = (t1 - t0).total_seconds() * 1000
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=tool_output,
tool_name=func_name,
tool_args=func_args,
started_at=started_at,
finished_at=t1,
from_task=from_task,
from_agent=from_agent,
),
)
except Exception as e:
t1 = datetime.now()
record.output = f"Error: {e}"
record.duration_ms = (t1 - t0).total_seconds() * 1000
record.is_error = True
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=func_args,
error=str(e),
from_task=from_task,
from_agent=from_agent,
),
)
else:
record.output = f"Error: unknown function '{func_name}'"
record.is_error = True
result.tool_calls.append(record)
# Append tool result message for the model
conversation.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": record.output,
}
)
else:
# Unexpected return type — treat as final text
result.text = str(raw)
break
else:
# max_iterations exhausted — use last text or empty
if not result.text and result.tool_calls:
result.text = (
f"Max iterations ({max_iterations}) reached. "
f"Last tool: {result.tool_calls[-1].name}"
)
# Estimate cost
result.cost_usd = _estimate_cost_usd(
self.model,
result.usage.prompt_tokens,
result.usage.completion_tokens,
)
return result
def _accumulate_usage(self, result: LLMResult) -> None:
"""Pull token counts from the internal tracker into the LLMResult."""
tracker = getattr(self, "_token_usage", None)
if tracker and isinstance(tracker, dict):
result.usage.prompt_tokens = tracker.get("prompt_tokens", 0)
result.usage.completion_tokens = tracker.get("completion_tokens", 0)
result.usage.total_tokens = tracker.get("total_tokens", 0)
result.usage.successful_requests += 1
def _call_single(
self,
messages: str | list[LLMMessage],
tools: list[dict[str, BaseTool]] | None = None,
callbacks: list[Any] | None = None,
available_functions: dict[str, Any] | None = None,
from_task: Task | None = None,
from_agent: BaseAgent | None = None,
response_model: type[BaseModel] | None = None,
) -> str | Any:
"""Single-shot LLM call (original call() logic)."""
with llm_call_context() as call_id:
crewai_event_bus.emit(
self,
@@ -1819,7 +2059,7 @@ class LLM(BaseLLM):
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
return self._call_single(
messages,
tools=tools,
callbacks=callbacks,

View File

@@ -0,0 +1,112 @@
"""Structured result types for LLM.call() with tool loop support.
When LLM.call() is invoked with tools and available_functions, it returns
an LLMResult instead of a plain string. This preserves backwards compatibility:
calls without tools still return str.
"""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, Field
from crewai.types.usage_metrics import UsageMetrics
class ToolCallRecord(BaseModel):
"""Record of a single tool call executed during an LLM tool loop.
Attributes:
name: The tool function name.
input: The arguments passed to the tool.
output: The string result returned by the tool.
duration_ms: Wall-clock time for the tool execution in milliseconds.
is_error: Whether the tool call raised an exception.
"""
name: str
input: dict[str, Any] = Field(default_factory=dict)
output: str = ""
duration_ms: float = 0.0
is_error: bool = False
class LLMResult(BaseModel):
"""Structured result from LLM.call() when tools are used.
Attributes:
text: The final text response from the model.
tool_calls: Ordered list of every tool call made during the loop.
usage: Aggregated token usage across all iterations.
cost_usd: Estimated cost in USD based on model pricing.
iterations: Number of LLM round-trips in the tool loop.
"""
text: str = ""
tool_calls: list[ToolCallRecord] = Field(default_factory=list)
usage: UsageMetrics = Field(default_factory=UsageMetrics)
cost_usd: float = 0.0
iterations: int = 0
# ---------------------------------------------------------------------------
# Simple cost estimation
# ---------------------------------------------------------------------------
# USD per 1M tokens. Covers major models. Inspired by Iris's pricing table.
PRICING: dict[str, dict[str, float]] = {
# Anthropic
"claude-opus-4-7": {"in": 5.00, "out": 25.00},
"claude-sonnet-4-6": {"in": 3.00, "out": 15.00},
"claude-sonnet-4-5": {"in": 3.00, "out": 15.00},
"claude-haiku-4-5": {"in": 1.00, "out": 5.00},
# OpenAI
"gpt-4o": {"in": 2.50, "out": 10.00},
"gpt-4o-mini": {"in": 0.15, "out": 0.60},
"gpt-4.1": {"in": 2.00, "out": 8.00},
"gpt-4.1-mini": {"in": 0.40, "out": 1.60},
"gpt-4.1-nano": {"in": 0.10, "out": 0.40},
"o1": {"in": 15.00, "out": 60.00},
"o1-mini": {"in": 3.00, "out": 12.00},
"o3": {"in": 2.00, "out": 8.00},
"o3-mini": {"in": 1.10, "out": 4.40},
"gpt-5": {"in": 1.25, "out": 10.00},
# Google Gemini
"gemini-2.5-pro": {"in": 1.25, "out": 10.00},
"gemini-2.5-flash": {"in": 0.30, "out": 2.50},
"gemini-2.0-flash": {"in": 0.10, "out": 0.40},
}
def _lookup_pricing(model: str) -> dict[str, float] | None:
"""Resolve a model name to its pricing row.
Handles provider prefixes (``anthropic/claude-sonnet-4-6``) and partial
matches (``claude-sonnet-4-6-20250514`` → ``claude-sonnet-4-6``).
"""
if not model:
return None
# Exact match
if model in PRICING:
return PRICING[model]
# Strip provider prefix
if "/" in model:
suffix = model.rsplit("/", 1)[1]
if suffix in PRICING:
return PRICING[suffix]
model = suffix
# Prefix / partial match
for key in PRICING:
if model.startswith(key) or key.startswith(model):
return PRICING[key]
return None
def estimate_cost_usd(model: str, prompt_tokens: int, completion_tokens: int) -> float:
"""Estimate the cost in USD for a given model and token counts."""
pricing = _lookup_pricing(model)
if not pricing:
return 0.0
return (
prompt_tokens * pricing["in"] + completion_tokens * pricing["out"]
) / 1_000_000

View File

@@ -27,6 +27,7 @@ from crewai.mcp.filters import (
create_static_tool_filter,
)
if TYPE_CHECKING:
from crewai.mcp.client import MCPClient
from crewai.mcp.tool_resolver import MCPToolResolver

View File

@@ -10,12 +10,22 @@ from __future__ import annotations
import json
import logging
import threading
import time
from typing import Any
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crew import Crew
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus
from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus, is_replaying
from crewai.events.types.checkpoint_events import (
CheckpointBaseEvent,
CheckpointCompletedEvent,
CheckpointFailedEvent,
CheckpointForkBaseEvent,
CheckpointPrunedEvent,
CheckpointRestoreBaseEvent,
CheckpointStartedEvent,
)
from crewai.flow.flow import Flow
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.state.runtime import RuntimeState, _prepare_entities
@@ -53,12 +63,26 @@ def _resolve(value: CheckpointConfig | bool | None) -> CheckpointConfig | None |
if isinstance(value, CheckpointConfig):
_ensure_handlers_registered()
return value
if value is True:
if value:
_ensure_handlers_registered()
return CheckpointConfig()
if value is False:
return _SENTINEL
return None # None = inherit
return None
def _resolve_from_agent(agent: BaseAgent) -> CheckpointConfig | None:
"""Resolve a checkpoint config starting from an agent, walking to its crew."""
result = _resolve(agent.checkpoint)
if isinstance(result, CheckpointConfig):
return result
if result is _SENTINEL:
return None
crew = agent.crew
if isinstance(crew, Crew):
crew_result = _resolve(crew.checkpoint)
return crew_result if isinstance(crew_result, CheckpointConfig) else None
return None
def _find_checkpoint(source: Any) -> CheckpointConfig | None:
@@ -77,28 +101,11 @@ def _find_checkpoint(source: Any) -> CheckpointConfig | None:
result = _resolve(source.checkpoint)
return result if isinstance(result, CheckpointConfig) else None
if isinstance(source, BaseAgent):
result = _resolve(source.checkpoint)
if isinstance(result, CheckpointConfig):
return result
if result is _SENTINEL:
return None
crew = source.crew
if isinstance(crew, Crew):
result = _resolve(crew.checkpoint)
return result if isinstance(result, CheckpointConfig) else None
return None
return _resolve_from_agent(source)
if isinstance(source, Task):
agent = source.agent
if isinstance(agent, BaseAgent):
result = _resolve(agent.checkpoint)
if isinstance(result, CheckpointConfig):
return result
if result is _SENTINEL:
return None
crew = agent.crew
if isinstance(crew, Crew):
result = _resolve(crew.checkpoint)
return result if isinstance(result, CheckpointConfig) else None
return _resolve_from_agent(agent)
return None
return None
@@ -107,27 +114,106 @@ def _do_checkpoint(
state: RuntimeState, cfg: CheckpointConfig, event: BaseEvent | None = None
) -> None:
"""Write a checkpoint and prune old ones if configured."""
_prepare_entities(state.root)
payload = state.model_dump(mode="json")
if event is not None:
payload["trigger"] = event.type
data = json.dumps(payload)
location = cfg.provider.checkpoint(
data,
cfg.location,
parent_id=state._parent_id,
branch=state._branch,
)
state._chain_lineage(cfg.provider, location)
provider_name: str = type(cfg.provider).__name__
trigger: str | None = event.type if event is not None else None
context: dict[str, Any] = {
"task_id": event.task_id if event is not None else None,
"task_name": event.task_name if event is not None else None,
"agent_id": event.agent_id if event is not None else None,
"agent_role": event.agent_role if event is not None else None,
}
checkpoint_id: str = cfg.provider.extract_id(location)
parent_id_snapshot: str | None = state._parent_id
branch_snapshot: str = state._branch
crewai_event_bus.emit(
cfg,
CheckpointStartedEvent(
location=cfg.location,
provider=provider_name,
trigger=trigger,
branch=branch_snapshot,
parent_id=parent_id_snapshot,
**context,
),
)
start: float = time.perf_counter()
try:
_prepare_entities(state.root)
payload = state.model_dump(mode="json")
if event is not None:
payload["trigger"] = event.type
data = json.dumps(payload)
location = cfg.provider.checkpoint(
data,
cfg.location,
parent_id=parent_id_snapshot,
branch=branch_snapshot,
)
state._chain_lineage(cfg.provider, location)
checkpoint_id: str = cfg.provider.extract_id(location)
except Exception as exc:
crewai_event_bus.emit(
cfg,
CheckpointFailedEvent(
location=cfg.location,
provider=provider_name,
trigger=trigger,
branch=branch_snapshot,
parent_id=parent_id_snapshot,
error=str(exc),
**context,
),
)
raise
duration_ms: float = (time.perf_counter() - start) * 1000.0
msg: str = (
f"Checkpoint saved. Resume with: crewai checkpoint resume {checkpoint_id}"
)
logger.info(msg)
crewai_event_bus.emit(
cfg,
CheckpointCompletedEvent(
location=location,
provider=provider_name,
trigger=trigger,
branch=branch_snapshot,
parent_id=parent_id_snapshot,
checkpoint_id=checkpoint_id,
duration_ms=duration_ms,
**context,
),
)
if cfg.max_checkpoints is not None:
cfg.provider.prune(cfg.location, cfg.max_checkpoints, branch=state._branch)
try:
removed_count: int = cfg.provider.prune(
cfg.location, cfg.max_checkpoints, branch=branch_snapshot
)
except Exception:
logger.warning(
"Checkpoint prune failed for %s (branch=%s)",
cfg.location,
branch_snapshot,
exc_info=True,
)
return
crewai_event_bus.emit(
cfg,
CheckpointPrunedEvent(
location=cfg.location,
provider=provider_name,
trigger=trigger,
branch=branch_snapshot,
parent_id=parent_id_snapshot,
removed_count=removed_count,
max_checkpoints=cfg.max_checkpoints,
**context,
),
)
def _should_checkpoint(source: Any, event: BaseEvent) -> CheckpointConfig | None:
@@ -142,6 +228,13 @@ def _should_checkpoint(source: Any, event: BaseEvent) -> CheckpointConfig | None
def _on_any_event(source: Any, event: BaseEvent, state: Any) -> None:
"""Sync handler registered on every event class."""
if is_replaying():
return
if isinstance(
event,
(CheckpointBaseEvent, CheckpointForkBaseEvent, CheckpointRestoreBaseEvent),
):
return
cfg = _should_checkpoint(source, event)
if cfg is None:
return
@@ -161,7 +254,8 @@ def _register_all_handlers(event_bus: CrewAIEventsBus) -> None:
seen: set[type] = set()
def _collect(cls: type[BaseEvent]) -> None:
for sub in cls.__subclasses__():
subclasses: list[type[BaseEvent]] = cls.__subclasses__()
for sub in subclasses:
if sub not in seen:
seen.add(sub)
type_field = sub.model_fields.get("type")

View File

@@ -39,7 +39,8 @@ def _build_event_type_map() -> None:
"""Populate _event_type_map from all BaseEvent subclasses."""
def _collect(cls: type[BaseEvent]) -> None:
for sub in cls.__subclasses__():
subclasses: list[type[BaseEvent]] = cls.__subclasses__()
for sub in subclasses:
type_field = sub.model_fields.get("type")
if type_field and type_field.default:
_event_type_map[type_field.default] = sub
@@ -196,6 +197,21 @@ class EventRecord(BaseModel):
node for node in self.nodes.values() if not node.neighbors("parent")
]
def all_nodes(self) -> list[EventNode]:
"""Return a snapshot of every node under the read lock.
Returns:
A list copy of the current nodes, safe to iterate without holding
the lock.
"""
with self._lock.r_locked():
return list(self.nodes.values())
def clear(self) -> None:
"""Remove all nodes from the record under the write lock."""
with self._lock.w_locked():
self.nodes.clear()
def __len__(self) -> int:
with self._lock.r_locked():
return len(self.nodes)

View File

@@ -61,13 +61,16 @@ class BaseProvider(BaseModel, ABC):
...
@abstractmethod
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None:
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> int:
"""Remove old checkpoints, keeping at most *max_keep* per branch.
Args:
location: The storage destination passed to ``checkpoint``.
max_keep: Maximum number of checkpoints to retain.
branch: Only prune checkpoints on this branch.
Returns:
The number of checkpoints removed.
"""
...

View File

@@ -95,17 +95,20 @@ class JsonProvider(BaseProvider):
await f.write(data)
return str(file_path)
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None:
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> int:
"""Remove oldest checkpoint files beyond *max_keep* on a branch."""
_safe_branch(location, branch)
branch_dir = os.path.join(location, branch)
pattern = os.path.join(branch_dir, "*.json")
files = sorted(glob.glob(pattern), key=os.path.getmtime)
removed = 0
for path in files if max_keep == 0 else files[:-max_keep]:
try:
os.remove(path)
removed += 1
except OSError: # noqa: PERF203
logger.debug("Failed to remove %s", path, exc_info=True)
return removed
def extract_id(self, location: str) -> str:
"""Extract the checkpoint ID from a file path.

View File

@@ -111,11 +111,13 @@ class SqliteProvider(BaseProvider):
await db.commit()
return f"{location}#{checkpoint_id}"
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None:
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> int:
"""Remove oldest checkpoint rows beyond *max_keep* on a branch."""
with sqlite3.connect(location) as conn:
conn.execute(_PRUNE, (branch, branch, max_keep))
cursor = conn.execute(_PRUNE, (branch, branch, max_keep))
removed: int = cursor.rowcount
conn.commit()
return max(removed, 0)
def extract_id(self, location: str) -> str:
"""Extract the checkpoint ID from a ``db_path#id`` string."""

View File

@@ -10,6 +10,7 @@ via ``RuntimeState.model_rebuild()``.
from __future__ import annotations
import logging
import time
from typing import TYPE_CHECKING, Any
import uuid
@@ -23,6 +24,17 @@ from pydantic import (
)
from crewai.context import capture_execution_context
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.checkpoint_events import (
CheckpointCompletedEvent,
CheckpointFailedEvent,
CheckpointForkCompletedEvent,
CheckpointForkStartedEvent,
CheckpointRestoreCompletedEvent,
CheckpointRestoreFailedEvent,
CheckpointRestoreStartedEvent,
CheckpointStartedEvent,
)
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.state.event_record import EventRecord
from crewai.state.provider.core import BaseProvider
@@ -89,7 +101,7 @@ def _migrate(data: dict[str, Any]) -> dict[str, Any]:
"""
raw = data.get("crewai_version")
current = Version(get_crewai_version())
stored = Version(raw) if raw else Version("0.0.0")
stored = Version(raw) if isinstance(raw, str) and raw else Version("0.0.0")
if raw is None:
logger.warning("Checkpoint has no crewai_version — treating as 0.0.0")
@@ -159,6 +171,63 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
self._checkpoint_id = provider.extract_id(location)
self._parent_id = self._checkpoint_id
def _begin_checkpoint(self, location: str) -> tuple[str, str | None, str, float]:
"""Emit the start event and return the invariant context for a checkpoint."""
provider_name: str = type(self._provider).__name__
parent_id_snapshot: str | None = self._parent_id
branch_snapshot: str = self._branch
crewai_event_bus.emit(
self,
CheckpointStartedEvent(
location=location,
provider=provider_name,
branch=branch_snapshot,
parent_id=parent_id_snapshot,
),
)
return provider_name, parent_id_snapshot, branch_snapshot, time.perf_counter()
def _emit_checkpoint_failed(
self,
location: str,
provider_name: str,
branch_snapshot: str,
parent_id_snapshot: str | None,
exc: Exception,
) -> None:
"""Emit the failure event for a checkpoint write."""
crewai_event_bus.emit(
self,
CheckpointFailedEvent(
location=location,
provider=provider_name,
branch=branch_snapshot,
parent_id=parent_id_snapshot,
error=str(exc),
),
)
def _emit_checkpoint_completed(
self,
result: str,
provider_name: str,
branch_snapshot: str,
parent_id_snapshot: str | None,
start: float,
) -> None:
"""Emit the completion event for a successful checkpoint write."""
crewai_event_bus.emit(
self,
CheckpointCompletedEvent(
location=result,
provider=provider_name,
branch=branch_snapshot,
parent_id=parent_id_snapshot,
checkpoint_id=self._provider.extract_id(result),
duration_ms=(time.perf_counter() - start) * 1000.0,
),
)
def checkpoint(self, location: str) -> str:
"""Write a checkpoint.
@@ -169,14 +238,27 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
Returns:
A location identifier for the saved checkpoint.
"""
_prepare_entities(self.root)
result = self._provider.checkpoint(
self.model_dump_json(),
location,
parent_id=self._parent_id,
branch=self._branch,
provider_name, parent_id_snapshot, branch_snapshot, start = (
self._begin_checkpoint(location)
)
try:
_prepare_entities(self.root)
result = self._provider.checkpoint(
self.model_dump_json(),
location,
parent_id=parent_id_snapshot,
branch=branch_snapshot,
)
self._chain_lineage(self._provider, result)
except Exception as exc:
self._emit_checkpoint_failed(
location, provider_name, branch_snapshot, parent_id_snapshot, exc
)
raise
self._emit_checkpoint_completed(
result, provider_name, branch_snapshot, parent_id_snapshot, start
)
self._chain_lineage(self._provider, result)
return result
async def acheckpoint(self, location: str) -> str:
@@ -189,14 +271,27 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
Returns:
A location identifier for the saved checkpoint.
"""
_prepare_entities(self.root)
result = await self._provider.acheckpoint(
self.model_dump_json(),
location,
parent_id=self._parent_id,
branch=self._branch,
provider_name, parent_id_snapshot, branch_snapshot, start = (
self._begin_checkpoint(location)
)
try:
_prepare_entities(self.root)
result = await self._provider.acheckpoint(
self.model_dump_json(),
location,
parent_id=parent_id_snapshot,
branch=branch_snapshot,
)
self._chain_lineage(self._provider, result)
except Exception as exc:
self._emit_checkpoint_failed(
location, provider_name, branch_snapshot, parent_id_snapshot, exc
)
raise
self._emit_checkpoint_completed(
result, provider_name, branch_snapshot, parent_id_snapshot, start
)
self._chain_lineage(self._provider, result)
return result
def fork(self, branch: str | None = None) -> None:
@@ -211,11 +306,32 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
times without collisions.
"""
if branch:
self._branch = branch
new_branch = branch
elif self._checkpoint_id:
self._branch = f"fork/{self._checkpoint_id}_{uuid.uuid4().hex[:6]}"
new_branch = f"fork/{self._checkpoint_id}_{uuid.uuid4().hex[:6]}"
else:
self._branch = f"fork/{uuid.uuid4().hex[:8]}"
new_branch = f"fork/{uuid.uuid4().hex[:8]}"
parent_branch: str | None = self._branch
parent_checkpoint_id: str | None = self._checkpoint_id
crewai_event_bus.emit(
self,
CheckpointForkStartedEvent(
branch=new_branch,
parent_branch=parent_branch,
parent_checkpoint_id=parent_checkpoint_id,
),
)
self._branch = new_branch
crewai_event_bus.emit(
self,
CheckpointForkCompletedEvent(
branch=new_branch,
parent_branch=parent_branch,
parent_checkpoint_id=parent_checkpoint_id,
),
)
@classmethod
def from_checkpoint(cls, config: CheckpointConfig, **kwargs: Any) -> RuntimeState:
@@ -233,13 +349,41 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
if config.restore_from is None:
raise ValueError("CheckpointConfig.restore_from must be set")
location = str(config.restore_from)
provider = detect_provider(location)
raw = provider.from_checkpoint(location)
state = cls.model_validate_json(raw, **kwargs)
state._provider = provider
checkpoint_id = provider.extract_id(location)
state._checkpoint_id = checkpoint_id
state._parent_id = checkpoint_id
crewai_event_bus.emit(config, CheckpointRestoreStartedEvent(location=location))
start: float = time.perf_counter()
provider_name: str | None = None
try:
provider = detect_provider(location)
provider_name = type(provider).__name__
raw = provider.from_checkpoint(location)
state = cls.model_validate_json(raw, **kwargs)
state._provider = provider
checkpoint_id = provider.extract_id(location)
state._checkpoint_id = checkpoint_id
state._parent_id = checkpoint_id
except Exception as exc:
crewai_event_bus.emit(
config,
CheckpointRestoreFailedEvent(
location=location,
provider=provider_name,
error=str(exc),
),
)
raise
crewai_event_bus.emit(
config,
CheckpointRestoreCompletedEvent(
location=location,
provider=provider_name,
checkpoint_id=checkpoint_id,
branch=state._branch,
parent_id=state._parent_id,
duration_ms=(time.perf_counter() - start) * 1000.0,
),
)
return state
@classmethod
@@ -260,13 +404,41 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
if config.restore_from is None:
raise ValueError("CheckpointConfig.restore_from must be set")
location = str(config.restore_from)
provider = detect_provider(location)
raw = await provider.afrom_checkpoint(location)
state = cls.model_validate_json(raw, **kwargs)
state._provider = provider
checkpoint_id = provider.extract_id(location)
state._checkpoint_id = checkpoint_id
state._parent_id = checkpoint_id
crewai_event_bus.emit(config, CheckpointRestoreStartedEvent(location=location))
start: float = time.perf_counter()
provider_name: str | None = None
try:
provider = detect_provider(location)
provider_name = type(provider).__name__
raw = await provider.afrom_checkpoint(location)
state = cls.model_validate_json(raw, **kwargs)
state._provider = provider
checkpoint_id = provider.extract_id(location)
state._checkpoint_id = checkpoint_id
state._parent_id = checkpoint_id
except Exception as exc:
crewai_event_bus.emit(
config,
CheckpointRestoreFailedEvent(
location=location,
provider=provider_name,
error=str(exc),
),
)
raise
crewai_event_bus.emit(
config,
CheckpointRestoreCompletedEvent(
location=location,
provider=provider_name,
checkpoint_id=checkpoint_id,
branch=state._branch,
parent_id=state._parent_id,
duration_ms=(time.perf_counter() - start) * 1000.0,
),
)
return state

View File

@@ -0,0 +1,165 @@
"""Tests for event bus replay dispatch and is_replaying flag."""
from __future__ import annotations
from typing import Any
from unittest.mock import patch
from crewai.events.event_bus import _replaying, crewai_event_bus, is_replaying
from crewai.events.types.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
def _make_started(method: str, event_id: str, sequence: int) -> MethodExecutionStartedEvent:
"""Build a MethodExecutionStartedEvent with explicit ids/sequence."""
ev = MethodExecutionStartedEvent(
method_name=method,
flow_name="F",
params={},
state={},
)
ev.event_id = event_id
ev.emission_sequence = sequence
return ev
class TestReplayPreservesFields:
"""replay() must not overwrite event_id, parent_event_id, or emission_sequence."""
def test_preserves_ids_and_sequence(self) -> None:
captured: list[MethodExecutionStartedEvent] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _capture(_: Any, event: MethodExecutionStartedEvent) -> None:
captured.append(event)
ev = _make_started("outline", "orig-id-1", 42)
ev.parent_event_id = "parent-abc"
future = crewai_event_bus.replay(object(), ev)
if future is not None:
future.result(timeout=5.0)
assert len(captured) == 1
assert captured[0].event_id == "orig-id-1"
assert captured[0].parent_event_id == "parent-abc"
assert captured[0].emission_sequence == 42
class TestIsReplayingFlag:
"""is_replaying() must be True inside handlers dispatched via replay()."""
def test_flag_true_during_replay(self) -> None:
seen: list[bool] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _capture(_: Any, __: MethodExecutionStartedEvent) -> None:
seen.append(is_replaying())
ev = _make_started("m", "id-1", 1)
future = crewai_event_bus.replay(object(), ev)
if future is not None:
future.result(timeout=5.0)
assert seen == [True]
assert is_replaying() is False
def test_flag_false_during_emit(self) -> None:
seen: list[bool] = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _capture(_: Any, __: MethodExecutionStartedEvent) -> None:
seen.append(is_replaying())
ev = _make_started("m", "id-1", 1)
future = crewai_event_bus.emit(object(), ev)
if future is not None:
future.result(timeout=5.0)
assert seen == [False]
class TestCheckpointListenerOptsOut:
"""CheckpointListener must early-return during replay."""
def test_checkpoint_not_written_on_replay(self) -> None:
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.state.checkpoint_listener import _on_any_event
class FlowLike:
entity_type = "flow"
checkpoint = CheckpointConfig(trigger_all=True)
ev = _make_started("m", "id-1", 1)
with patch("crewai.state.checkpoint_listener._do_checkpoint") as do_cp:
token = _replaying.set(True)
try:
_on_any_event(FlowLike(), ev, state=None)
finally:
_replaying.reset(token)
assert do_cp.call_count == 0
class TestFlowResumeReplaysEvents:
"""End-to-end: a resumed flow emits MethodExecution* events for completed methods."""
def test_resume_dispatches_completed_method_events(self, tmp_path) -> None:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
db_path = tmp_path / "flows.db"
persistence = SQLiteFlowPersistence(str(db_path))
class ThreeStepFlow(Flow[dict]):
@start()
def step_a(self) -> str:
return "a"
@listen(step_a)
def step_b(self) -> str:
return "b"
@listen(step_b)
def step_c(self) -> str:
return "c"
if crewai_event_bus.runtime_state is not None:
crewai_event_bus.runtime_state.event_record.clear()
flow1 = ThreeStepFlow(persistence=persistence)
flow1.kickoff()
flow_id = flow1.state["id"]
captured_started: list[str] = []
captured_finished: list[str] = []
flow2 = ThreeStepFlow(persistence=persistence)
flow2._completed_methods = {"step_a", "step_b"}
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(MethodExecutionStartedEvent)
def _cs(_: Any, event: MethodExecutionStartedEvent) -> None:
captured_started.append(event.method_name)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def _cf(_: Any, event: MethodExecutionFinishedEvent) -> None:
captured_finished.append(event.method_name)
flow2.kickoff(inputs={"id": flow_id})
assert captured_started.count("step_a") == 1
assert captured_started.count("step_b") == 1
assert captured_started.count("step_c") == 1
assert captured_finished.count("step_a") == 1
assert captured_finished.count("step_b") == 1
assert captured_finished.count("step_c") == 1

View File

@@ -4,6 +4,8 @@ from pathlib import Path
import pytest
from crewai import Agent
from crewai.agent.utils import append_skill_context
from crewai.skills.loader import activate_skill, discover_skills, format_skill_context
from crewai.skills.models import INSTRUCTIONS, METADATA
@@ -76,3 +78,23 @@ class TestSkillDiscoveryAndActivation:
all_skills.extend(discover_skills(search_path))
names = {s.name for s in all_skills}
assert names == {"skill-a", "skill-b"}
def test_agent_preserves_metadata_for_discovered_skills(self, tmp_path: Path) -> None:
_create_skill_dir(tmp_path, "travel", body="Use this skill for travel planning.")
discovered = discover_skills(tmp_path)
agent = Agent(
role="Travel Advisor",
goal="Provide personalized travel suggestions.",
backstory="An experienced travel consultant.",
skills=discovered,
)
assert agent.skills is not None
assert agent.skills[0].disclosure_level == METADATA
assert agent.skills[0].instructions is None
prompt = append_skill_context(agent, "Plan a 10-day Japan itinerary.")
assert "## Skill: travel" in prompt
assert "Skill travel" in prompt
assert "Use this skill for travel planning." not in prompt

View File

@@ -11,11 +11,12 @@ from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from pydantic import BaseModel
from crewai.agent.core import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.crew import Crew
from crewai.flow.flow import Flow, start
from crewai.flow.flow import _INITIAL_STATE_CLASS_MARKER, Flow, start
from crewai.state.checkpoint_config import CheckpointConfig
from crewai.state.checkpoint_listener import (
_find_checkpoint,
@@ -310,6 +311,65 @@ class TestRuntimeStateLineage:
assert state._branch != first
class TestFlowInitialStateSerialization:
"""Regression tests for checkpoint serialization of ``Flow.initial_state``."""
def test_class_ref_serializes_as_schema(self) -> None:
class MyState(BaseModel):
id: str = "x"
foo: str = "bar"
flow = Flow(initial_state=MyState)
state = RuntimeState(root=[flow])
dumped = json.loads(state.model_dump_json())
entity = dumped["entities"][0]
wrapped = entity["initial_state"]
assert isinstance(wrapped, dict)
assert _INITIAL_STATE_CLASS_MARKER in wrapped
assert wrapped[_INITIAL_STATE_CLASS_MARKER].get("title") == "MyState"
def test_class_ref_round_trips_to_basemodel_subclass(self) -> None:
class MyState(BaseModel):
id: str = "x"
foo: str = "bar"
flow = Flow(initial_state=MyState)
raw = RuntimeState(root=[flow]).model_dump_json()
restored = RuntimeState.model_validate_json(
raw, context={"from_checkpoint": True}
)
rehydrated = restored.root[0].initial_state
assert isinstance(rehydrated, type)
assert issubclass(rehydrated, BaseModel)
assert set(rehydrated.model_fields.keys()) == {"id", "foo"}
def test_instance_serializes_as_values(self) -> None:
class MyState(BaseModel):
id: str = "x"
foo: str = "bar"
flow = Flow(initial_state=MyState(foo="baz"))
state = RuntimeState(root=[flow])
dumped = json.loads(state.model_dump_json())
entity = dumped["entities"][0]
assert entity["initial_state"] == {"id": "x", "foo": "baz"}
def test_dict_passthrough(self) -> None:
flow = Flow(initial_state={"id": "x", "foo": "bar"})
state = RuntimeState(root=[flow])
dumped = json.loads(state.model_dump_json())
entity = dumped["entities"][0]
assert entity["initial_state"] == {"id": "x", "foo": "bar"}
def test_dict_round_trips_as_dict(self) -> None:
flow = Flow(initial_state={"id": "x", "foo": "bar"})
raw = RuntimeState(root=[flow]).model_dump_json()
restored = RuntimeState.model_validate_json(
raw, context={"from_checkpoint": True}
)
assert restored.root[0].initial_state == {"id": "x", "foo": "bar"}
# ---------- JsonProvider forking ----------

View File

@@ -4519,8 +4519,8 @@ def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
flow.kickoff()
assert captured_crew is not None
assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew._flow_id == flow.execution_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.execution_id # type: ignore[attr-defined]
def test_sets_flow_context_when_outside_flow(researcher, writer):
@@ -4554,8 +4554,8 @@ def test_sets_flow_context_when_inside_flow(researcher, writer):
flow = MyFlow()
result = flow.kickoff()
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
assert result._flow_id == flow.execution_id # type: ignore[attr-defined]
assert result._request_id == flow.execution_id # type: ignore[attr-defined]
def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):

View File

@@ -0,0 +1,127 @@
"""Regression tests for ``Flow.execution_id``.
``execution_id`` is the stable tracking identifier for a single flow run.
It must stay independent of ``state.id`` so that consumers passing an
``id`` in ``inputs`` (used for persistence restore) cannot destabilize
the identity used by telemetry, tracing, and external correlation.
"""
from __future__ import annotations
from typing import Any
import pytest
from crewai.flow.flow import Flow, FlowState, start
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
class _CaptureState(FlowState):
captured_flow_id: str = ""
captured_state_id: str = ""
captured_current_flow_id: str = ""
captured_execution_id: str = ""
class _IdentityCaptureFlow(Flow[_CaptureState]):
initial_state = _CaptureState
@start()
def capture(self) -> None:
self.state.captured_flow_id = self.flow_id
self.state.captured_state_id = self.state.id
self.state.captured_current_flow_id = current_flow_id.get() or ""
self.state.captured_execution_id = self.execution_id
def test_execution_id_defaults_to_fresh_uuid_per_instance() -> None:
a = _IdentityCaptureFlow()
b = _IdentityCaptureFlow()
assert a.execution_id
assert b.execution_id
assert a.execution_id != b.execution_id
def test_execution_id_survives_consumer_id_in_inputs() -> None:
flow = _IdentityCaptureFlow()
original_execution_id = flow.execution_id
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.state.id == "consumer-supplied-id"
assert flow.flow_id == "consumer-supplied-id"
assert flow.execution_id == original_execution_id
assert flow.execution_id != "consumer-supplied-id"
def test_two_runs_with_same_consumer_id_have_distinct_execution_ids() -> None:
flow_a = _IdentityCaptureFlow()
flow_b = _IdentityCaptureFlow()
colliding_id = "shared-consumer-id"
flow_a.kickoff(inputs={"id": colliding_id})
flow_b.kickoff(inputs={"id": colliding_id})
assert flow_a.state.id == colliding_id
assert flow_b.state.id == colliding_id
assert flow_a.execution_id != flow_b.execution_id
def test_execution_id_is_writable() -> None:
flow = _IdentityCaptureFlow()
flow.execution_id = "external-task-id"
assert flow.execution_id == "external-task-id"
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.execution_id == "external-task-id"
assert flow.state.id == "consumer-supplied-id"
def test_current_flow_id_context_var_matches_execution_id() -> None:
flow = _IdentityCaptureFlow()
flow.execution_id = "external-task-id"
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.state.captured_current_flow_id == "external-task-id"
assert flow.state.captured_flow_id == "consumer-supplied-id"
assert flow.state.captured_execution_id == "external-task-id"
def test_execution_id_not_included_in_serialized_state() -> None:
flow = _IdentityCaptureFlow()
flow.execution_id = "external-task-id"
flow.kickoff()
dumped = flow.state.model_dump()
assert "execution_id" not in dumped
assert "_execution_id" not in dumped
assert dumped["id"] == flow.state.id
def test_dict_state_flow_also_exposes_stable_execution_id() -> None:
class DictFlow(Flow[dict[str, Any]]):
initial_state = dict # type: ignore[assignment]
@start()
def noop(self) -> None:
pass
flow = DictFlow()
original = flow.execution_id
flow.kickoff(inputs={"id": "consumer-supplied-id"})
assert flow.state["id"] == "consumer-supplied-id"
assert flow.execution_id == original
@pytest.fixture(autouse=True)
def _reset_flow_context_vars():
yield
for var in (current_flow_id, current_flow_request_id):
try:
var.set(None)
except LookupError:
# ContextVar was never set in this context; nothing to reset.
pass

View File

@@ -0,0 +1,411 @@
"""Tests for LLM.call() tool loop and LLMResult.
All LLM calls are mocked — no real API traffic.
"""
from __future__ import annotations
import json
from types import SimpleNamespace
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from crewai.llm_result import (
LLMResult,
ToolCallRecord,
_lookup_pricing,
estimate_cost_usd,
)
def _make_litellm_llm(model: str = "gpt-4o") -> Any:
"""Create an LLM instance that uses the litellm fallback path."""
from crewai.llm import LLM
return LLM(model=model, is_litellm=True)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_tool_call(name: str, arguments: dict, call_id: str = "call_1"):
"""Build a tool-call object using litellm's actual types."""
try:
from litellm.types.utils import (
ChatCompletionMessageToolCall,
Function,
)
return ChatCompletionMessageToolCall(
id=call_id,
function=Function(name=name, arguments=json.dumps(arguments)),
type="function",
)
except ImportError:
func = SimpleNamespace(name=name, arguments=json.dumps(arguments))
return SimpleNamespace(id=call_id, function=func, type="function")
def _make_model_response(content: str | None = None, tool_calls: list | None = None):
"""Build a minimal mock ModelResponse that passes isinstance checks.
We need it to be an instance of litellm's ModelResponse/ModelResponseBase
so the internal isinstance() checks work. We import those types when
litellm is available.
"""
try:
from litellm.types.utils import (
Choices,
Message,
ModelResponse,
Usage,
)
message = Message(content=content, tool_calls=tool_calls or None)
choice = Choices(message=message, finish_reason="stop", index=0)
resp = ModelResponse(
choices=[choice],
usage=Usage(
prompt_tokens=100,
completion_tokens=50,
total_tokens=150,
),
)
return resp
except ImportError:
# Fallback to SimpleNamespace if litellm not installed
message = SimpleNamespace(content=content, tool_calls=tool_calls or [])
choice = SimpleNamespace(message=message, finish_reason="stop")
usage = SimpleNamespace(
prompt_tokens=100,
completion_tokens=50,
total_tokens=150,
)
resp = SimpleNamespace(
choices=[choice],
model_extra={"usage": usage},
)
return resp
DUMMY_TOOL_SCHEMA = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a city",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string"},
},
"required": ["city"],
},
},
}
]
# ---------------------------------------------------------------------------
# Unit tests for LLMResult / ToolCallRecord
# ---------------------------------------------------------------------------
class TestLLMResultModels:
def test_tool_call_record_defaults(self):
r = ToolCallRecord(name="foo")
assert r.input == {}
assert r.output == ""
assert r.duration_ms == 0.0
assert r.is_error is False
def test_llm_result_defaults(self):
r = LLMResult()
assert r.text == ""
assert r.tool_calls == []
assert r.cost_usd == 0.0
assert r.iterations == 0
assert r.usage.total_tokens == 0
def test_llm_result_with_data(self):
r = LLMResult(
text="hello",
tool_calls=[ToolCallRecord(name="foo", input={"a": 1}, output="bar")],
iterations=2,
cost_usd=0.005,
)
assert r.text == "hello"
assert len(r.tool_calls) == 1
assert r.tool_calls[0].name == "foo"
# ---------------------------------------------------------------------------
# Cost estimation
# ---------------------------------------------------------------------------
class TestCostEstimation:
def test_known_model(self):
cost = estimate_cost_usd("gpt-4o", prompt_tokens=1_000_000, completion_tokens=0)
assert cost == pytest.approx(2.50)
def test_known_model_output(self):
cost = estimate_cost_usd("gpt-4o", prompt_tokens=0, completion_tokens=1_000_000)
assert cost == pytest.approx(10.00)
def test_unknown_model_returns_zero(self):
cost = estimate_cost_usd("some-random-model-xyz", 1000, 1000)
assert cost == 0.0
def test_provider_prefix_stripped(self):
cost = estimate_cost_usd("anthropic/claude-sonnet-4-6", 1_000_000, 0)
assert cost == pytest.approx(3.00)
def test_partial_match(self):
# "claude-sonnet-4-6-20250514" should match "claude-sonnet-4-6"
cost = estimate_cost_usd("claude-sonnet-4-6-20250514", 1_000_000, 0)
assert cost == pytest.approx(3.00)
def test_lookup_none(self):
assert _lookup_pricing("") is None
assert _lookup_pricing("nonexistent") is None
# ---------------------------------------------------------------------------
# LLM.call() backwards compatibility (no tools → returns str)
# ---------------------------------------------------------------------------
class TestCallBackwardsCompat:
"""LLM.call() without tools must return str exactly as before."""
@patch("crewai.llm.litellm")
def test_call_without_tools_returns_str(self, mock_litellm):
"""Plain call without tools should return a string."""
mock_litellm.completion.return_value = _make_model_response(content="Hello world")
mock_litellm.drop_params = True
mock_litellm.suppress_debug_info = True
mock_litellm.success_callback = []
mock_litellm._async_success_callback = []
mock_litellm.callbacks = []
llm = _make_litellm_llm()
result = llm.call("Say hello")
assert isinstance(result, str)
assert result == "Hello world"
# ---------------------------------------------------------------------------
# LLM.call() with tools → returns LLMResult
# ---------------------------------------------------------------------------
class TestCallWithToolLoop:
"""When tools + available_functions are passed, call() returns LLMResult."""
@patch("crewai.llm.litellm")
def test_single_tool_call_then_text(self, mock_litellm):
"""Model calls one tool, then responds with text."""
mock_litellm.drop_params = True
mock_litellm.suppress_debug_info = True
mock_litellm.success_callback = []
mock_litellm._async_success_callback = []
mock_litellm.callbacks = []
# First call: model wants to call get_weather
tool_call = _make_tool_call("get_weather", {"city": "SF"})
resp1 = _make_model_response(content=None, tool_calls=[tool_call])
# Second call: model responds with text
resp2 = _make_model_response(content="It's sunny in SF!")
mock_litellm.completion.side_effect = [resp1, resp2]
llm = _make_litellm_llm()
def get_weather(city: str) -> str:
return f"Sunny, 72°F in {city}"
result = llm.call(
messages="What's the weather in SF?",
tools=DUMMY_TOOL_SCHEMA,
available_functions={"get_weather": get_weather},
)
assert isinstance(result, LLMResult)
assert result.text == "It's sunny in SF!"
assert len(result.tool_calls) == 1
assert result.tool_calls[0].name == "get_weather"
assert result.tool_calls[0].input == {"city": "SF"}
assert "Sunny" in result.tool_calls[0].output
assert result.tool_calls[0].is_error is False
assert result.iterations == 2
@patch("crewai.llm.litellm")
def test_multiple_tool_calls_in_sequence(self, mock_litellm):
"""Model calls two tools across two iterations."""
mock_litellm.drop_params = True
mock_litellm.suppress_debug_info = True
mock_litellm.success_callback = []
mock_litellm._async_success_callback = []
mock_litellm.callbacks = []
tc1 = _make_tool_call("get_weather", {"city": "SF"}, "call_1")
resp1 = _make_model_response(content=None, tool_calls=[tc1])
tc2 = _make_tool_call("get_weather", {"city": "NYC"}, "call_2")
resp2 = _make_model_response(content=None, tool_calls=[tc2])
resp3 = _make_model_response(content="SF is sunny, NYC is rainy.")
mock_litellm.completion.side_effect = [resp1, resp2, resp3]
llm = _make_litellm_llm()
def get_weather(city: str) -> str:
return f"Weather for {city}: fine"
result = llm.call(
messages="Compare SF and NYC weather",
tools=DUMMY_TOOL_SCHEMA,
available_functions={"get_weather": get_weather},
)
assert isinstance(result, LLMResult)
assert len(result.tool_calls) == 2
assert result.tool_calls[0].input["city"] == "SF"
assert result.tool_calls[1].input["city"] == "NYC"
assert result.iterations == 3
@patch("crewai.llm.litellm")
def test_max_iterations_stops_loop(self, mock_litellm):
"""Loop stops when max_iterations is reached."""
mock_litellm.drop_params = True
mock_litellm.suppress_debug_info = True
mock_litellm.success_callback = []
mock_litellm._async_success_callback = []
mock_litellm.callbacks = []
# Model always wants to call a tool — never stops
def make_tool_resp():
tc = _make_tool_call("get_weather", {"city": "SF"})
return _make_model_response(content=None, tool_calls=[tc])
mock_litellm.completion.side_effect = [make_tool_resp() for _ in range(5)]
llm = _make_litellm_llm()
result = llm.call(
messages="Loop forever",
tools=DUMMY_TOOL_SCHEMA,
available_functions={"get_weather": lambda city: "sunny"},
max_iterations=3,
)
assert isinstance(result, LLMResult)
assert result.iterations == 3
assert len(result.tool_calls) == 3
# Should have a text noting max iterations
assert "Max iterations" in result.text
@patch("crewai.llm.litellm")
def test_tool_error_handling(self, mock_litellm):
"""Tool that raises an exception is captured in the record."""
mock_litellm.drop_params = True
mock_litellm.suppress_debug_info = True
mock_litellm.success_callback = []
mock_litellm._async_success_callback = []
mock_litellm.callbacks = []
tc = _make_tool_call("get_weather", {"city": "SF"})
resp1 = _make_model_response(content=None, tool_calls=[tc])
resp2 = _make_model_response(content="Sorry, couldn't get weather.")
mock_litellm.completion.side_effect = [resp1, resp2]
llm = _make_litellm_llm()
def broken_weather(city: str) -> str:
raise RuntimeError("API down")
result = llm.call(
messages="Weather?",
tools=DUMMY_TOOL_SCHEMA,
available_functions={"get_weather": broken_weather},
)
assert isinstance(result, LLMResult)
assert len(result.tool_calls) == 1
assert result.tool_calls[0].is_error is True
assert "API down" in result.tool_calls[0].output
assert result.text == "Sorry, couldn't get weather."
@patch("crewai.llm.litellm")
def test_unknown_function_error(self, mock_litellm):
"""Tool call for a function not in available_functions."""
mock_litellm.drop_params = True
mock_litellm.suppress_debug_info = True
mock_litellm.success_callback = []
mock_litellm._async_success_callback = []
mock_litellm.callbacks = []
tc = _make_tool_call("nonexistent_tool", {})
resp1 = _make_model_response(content=None, tool_calls=[tc])
resp2 = _make_model_response(content="I couldn't find that tool.")
mock_litellm.completion.side_effect = [resp1, resp2]
llm = _make_litellm_llm()
result = llm.call(
messages="Do something",
tools=DUMMY_TOOL_SCHEMA,
available_functions={"get_weather": lambda city: "sunny"},
)
assert isinstance(result, LLMResult)
assert result.tool_calls[0].is_error is True
assert "unknown function" in result.tool_calls[0].output
@patch("crewai.llm.litellm")
def test_cost_estimation_populated(self, mock_litellm):
"""cost_usd is populated from token usage and model pricing."""
mock_litellm.drop_params = True
mock_litellm.suppress_debug_info = True
mock_litellm.success_callback = []
mock_litellm._async_success_callback = []
mock_litellm.callbacks = []
resp = _make_model_response(content="Done!")
mock_litellm.completion.return_value = resp
llm = _make_litellm_llm()
result = llm.call(
messages="Hello",
tools=DUMMY_TOOL_SCHEMA,
available_functions={"get_weather": lambda city: "sunny"},
)
assert isinstance(result, LLMResult)
# cost_usd should be >= 0 (may be 0 if usage tracking didn't fire,
# but the field should exist and be a float)
assert isinstance(result.cost_usd, float)
@patch("crewai.llm.litellm")
def test_immediate_text_response_with_tools(self, mock_litellm):
"""Model responds with text on first call (no tool use)."""
mock_litellm.drop_params = True
mock_litellm.suppress_debug_info = True
mock_litellm.success_callback = []
mock_litellm._async_success_callback = []
mock_litellm.callbacks = []
resp = _make_model_response(content="I know the answer already.")
mock_litellm.completion.return_value = resp
llm = _make_litellm_llm()
result = llm.call(
messages="What's 2+2?",
tools=DUMMY_TOOL_SCHEMA,
available_functions={"get_weather": lambda city: "sunny"},
)
assert isinstance(result, LLMResult)
assert result.text == "I know the answer already."
assert len(result.tool_calls) == 0
assert result.iterations == 1

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.3a3"
__version__ = "1.14.3"

View File

@@ -164,7 +164,7 @@ info = "Commits must follow Conventional Commits 1.0.0."
[tool.uv]
# Pinned to include the security patch releases (authlib 1.6.11,
# langchain-text-splitters 1.1.2) uploaded on 2026-04-16.
exclude-newer = "2026-04-22"
exclude-newer = "2026-04-26"
# composio-core pins rich<14 but textual requires rich>=14.
# onnxruntime 1.24+ dropped Python 3.10 wheels; cap it so qdrant[fastembed] resolves on 3.10.

2
uv.lock generated
View File

@@ -13,7 +13,7 @@ resolution-markers = [
]
[options]
exclude-newer = "2026-04-22T16:00:00Z"
exclude-newer = "2026-04-23T07:00:00Z"
[manifest]
members = [