mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-14 05:28:12 +00:00
Compare commits
9 Commits
1.14.3a3
...
feat/agent
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aee571b775 | ||
|
|
cb46a1c4ba | ||
|
|
d9046b98dd | ||
|
|
b0e2fda105 | ||
|
|
69d777ca50 | ||
|
|
77b2835a1d | ||
|
|
c77f1632dd | ||
|
|
69461076df | ||
|
|
55937d7523 |
@@ -4,6 +4,54 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="25 أبريل 2026">
|
||||
## v1.14.3
|
||||
|
||||
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.3)
|
||||
|
||||
## ما الذي تغير
|
||||
|
||||
### الميزات
|
||||
- إضافة أحداث دورة الحياة لعمليات نقطة التحقق
|
||||
- إضافة دعم لـ e2b
|
||||
- الرجوع إلى DefaultAzureCredential عند عدم توفير مفتاح API في تكامل Azure
|
||||
- إضافة دعم Bedrock V4
|
||||
- إضافة أدوات Daytona sandbox لوظائف محسّنة
|
||||
- إضافة دعم نقطة التحقق والتفرع للوكلاء المستقلين
|
||||
|
||||
### إصلاحات الأخطاء
|
||||
- إصلاح execution_id ليكون منفصلًا عن state.id
|
||||
- حل مشكلة إعادة تشغيل أحداث الطريقة المسجلة عند استئناف نقطة التحقق
|
||||
- إصلاح تسلسل مراجع class initial_state كـ JSON schema
|
||||
- الحفاظ على مهارات الوكلاء التي تحتوي على بيانات وصفية فقط
|
||||
- تمرير أسماء @CrewBase الضمنية إلى أحداث الطاقم
|
||||
- دمج بيانات التنفيذ عند تهيئة دفعة مكررة
|
||||
- إصلاح تسلسل حقول مراجع class Task لنقاط التحقق
|
||||
- التعامل مع نتيجة BaseModel في حلقة إعادة المحاولة guardrail
|
||||
- الحفاظ على thought_signature في استدعاءات أدوات Gemini للبث
|
||||
- إصدار task_started عند استئناف التفرع وإعادة تصميم واجهة المستخدم النصية لنقطة التحقق
|
||||
- استخدام تواريخ مستقبلية في اختبارات تقليم نقطة التحقق لمنع الفشل المعتمد على الوقت
|
||||
- إصلاح ترتيب التشغيل الجاف والتعامل مع الفرع القديم الذي تم التحقق منه في إصدار أدوات التطوير
|
||||
- ترقية lxml إلى >=6.1.0 لرقعة الأمان
|
||||
- رفع python-dotenv إلى >=1.2.2 لرقعة الأمان
|
||||
|
||||
### الوثائق
|
||||
- تحديث سجل التغييرات والإصدار لـ v1.14.3
|
||||
- إضافة صفحة "بناء باستخدام الذكاء الاصطناعي" وتحديث التنقل لجميع اللغات
|
||||
- إزالة الأسئلة الشائعة حول التسعير من صفحة البناء باستخدام الذكاء الاصطناعي عبر جميع المواقع
|
||||
|
||||
### الأداء
|
||||
- تحسين MCP SDK وأنواع الأحداث لتقليل بدء التشغيل البارد بنسبة ~29%
|
||||
|
||||
### إعادة الهيكلة
|
||||
- إعادة هيكلة مساعدي نقطة التحقق للقضاء على التكرار وتشديد تلميحات نوع الحالة
|
||||
|
||||
## المساهمون
|
||||
|
||||
@MatthiasHowellYopp, @akaKuruma, @alex-clawd, @github-actions[bot], @github-advanced-security[bot], @greysonlalonde, @iris-clawd, @lorenzejay, @mattatcha, @renatonitta
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="23 أبريل 2026">
|
||||
## v1.14.3a3
|
||||
|
||||
|
||||
1885
docs/docs.json
1885
docs/docs.json
File diff suppressed because it is too large
Load Diff
@@ -4,6 +4,54 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="Apr 25, 2026">
|
||||
## v1.14.3
|
||||
|
||||
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.3)
|
||||
|
||||
## What's Changed
|
||||
|
||||
### Features
|
||||
- Add lifecycle events for checkpoint operations
|
||||
- Add support for e2b
|
||||
- Fall back to DefaultAzureCredential when no API key is provided in Azure integration
|
||||
- Add Bedrock V4 support
|
||||
- Add Daytona sandbox tools for enhanced functionality
|
||||
- Add checkpoint and fork support to standalone agents
|
||||
|
||||
### Bug Fixes
|
||||
- Fix execution_id to be separate from state.id
|
||||
- Resolve replay of recorded method events on checkpoint resume
|
||||
- Fix serialization of initial_state class references as JSON schema
|
||||
- Preserve metadata-only agent skills
|
||||
- Propagate implicit @CrewBase names to crew events
|
||||
- Merge execution metadata on duplicate batch initialization
|
||||
- Fix serialization of Task class-reference fields for checkpointing
|
||||
- Handle BaseModel result in guardrail retry loop
|
||||
- Preserve thought_signature in Gemini streaming tool calls
|
||||
- Emit task_started on fork resume and redesign checkpoint TUI
|
||||
- Use future dates in checkpoint prune tests to prevent time-dependent failures
|
||||
- Fix dry-run order and handle checked-out stale branch in devtools release
|
||||
- Upgrade lxml to >=6.1.0 for security patch
|
||||
- Bump python-dotenv to >=1.2.2 for security patch
|
||||
|
||||
### Documentation
|
||||
- Update changelog and version for v1.14.3
|
||||
- Add 'Build with AI' page and update navigation for all languages
|
||||
- Remove pricing FAQ from build-with-ai page across all locales
|
||||
|
||||
### Performance
|
||||
- Optimize MCP SDK and event types to reduce cold start by ~29%
|
||||
|
||||
### Refactoring
|
||||
- Refactor checkpoint helpers to eliminate duplication and tighten state type hints
|
||||
|
||||
## Contributors
|
||||
|
||||
@MatthiasHowellYopp, @akaKuruma, @alex-clawd, @github-actions[bot], @github-advanced-security[bot], @greysonlalonde, @iris-clawd, @lorenzejay, @mattatcha, @renatonitta
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="Apr 23, 2026">
|
||||
## v1.14.3a3
|
||||
|
||||
|
||||
@@ -4,6 +4,54 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="2026년 4월 25일">
|
||||
## v1.14.3
|
||||
|
||||
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.3)
|
||||
|
||||
## 변경 사항
|
||||
|
||||
### 기능
|
||||
- 체크포인트 작업을 위한 생명주기 이벤트 추가
|
||||
- e2b 지원 추가
|
||||
- Azure 통합에서 API 키가 제공되지 않을 경우 DefaultAzureCredential로 대체
|
||||
- Bedrock V4 지원 추가
|
||||
- 향상된 기능을 위한 Daytona 샌드박스 도구 추가
|
||||
- 독립형 에이전트에 체크포인트 및 포크 지원 추가
|
||||
|
||||
### 버그 수정
|
||||
- execution_id를 state.id와 분리되도록 수정
|
||||
- 체크포인트 재개 시 기록된 메서드 이벤트 재생 문제 해결
|
||||
- initial_state 클래스 참조의 JSON 스키마 직렬화 수정
|
||||
- 메타데이터 전용 에이전트 기술 보존
|
||||
- 암묵적인 @CrewBase 이름을 크루 이벤트로 전파
|
||||
- 중복 배치 초기화 시 실행 메타데이터 병합
|
||||
- 체크포인트를 위한 Task 클래스 참조 필드의 직렬화 수정
|
||||
- 가드레일 재시도 루프에서 BaseModel 결과 처리
|
||||
- Gemini 스트리밍 도구 호출에서 thought_signature 보존
|
||||
- 포크 재개 시 task_started 방출 및 체크포인트 TUI 재설계
|
||||
- 체크포인트 가지치기 테스트에서 미래 날짜 사용하여 시간 의존적 실패 방지
|
||||
- 드라이 런 주문 수정 및 devtools 릴리스에서 체크아웃된 오래된 브랜치 처리
|
||||
- 보안 패치를 위해 lxml을 >=6.1.0으로 업그레이드
|
||||
- 보안 패치를 위해 python-dotenv를 >=1.2.2로 업그레이드
|
||||
|
||||
### 문서
|
||||
- v1.14.3에 대한 변경 로그 및 버전 업데이트
|
||||
- 'AI로 빌드하기' 페이지 추가 및 모든 언어에 대한 내비게이션 업데이트
|
||||
- 모든 로케일에서 build-with-ai 페이지의 가격 FAQ 제거
|
||||
|
||||
### 성능
|
||||
- MCP SDK 및 이벤트 유형 최적화하여 콜드 스타트를 약 29% 감소
|
||||
|
||||
### 리팩토링
|
||||
- 중복 제거 및 상태 유형 힌트를 강화하기 위해 체크포인트 헬퍼 리팩토링
|
||||
|
||||
## 기여자
|
||||
|
||||
@MatthiasHowellYopp, @akaKuruma, @alex-clawd, @github-actions[bot], @github-advanced-security[bot], @greysonlalonde, @iris-clawd, @lorenzejay, @mattatcha, @renatonitta
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="2026년 4월 23일">
|
||||
## v1.14.3a3
|
||||
|
||||
|
||||
@@ -4,6 +4,54 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
|
||||
icon: "clock"
|
||||
mode: "wide"
|
||||
---
|
||||
<Update label="25 abr 2026">
|
||||
## v1.14.3
|
||||
|
||||
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.3)
|
||||
|
||||
## O que Mudou
|
||||
|
||||
### Recursos
|
||||
- Adicionar eventos de ciclo de vida para operações de checkpoint
|
||||
- Adicionar suporte para e2b
|
||||
- Reverter para DefaultAzureCredential quando nenhuma chave de API for fornecida na integração com o Azure
|
||||
- Adicionar suporte ao Bedrock V4
|
||||
- Adicionar ferramentas de sandbox Daytona para funcionalidade aprimorada
|
||||
- Adicionar suporte a checkpoint e fork para agentes autônomos
|
||||
|
||||
### Correções de Bugs
|
||||
- Corrigir execution_id para ser separado de state.id
|
||||
- Resolver a reprodução de eventos de método gravados na retomada do checkpoint
|
||||
- Corrigir a serialização de referências de classe initial_state como esquema JSON
|
||||
- Preservar habilidades de agente somente de metadados
|
||||
- Propagar nomes implícitos @CrewBase para eventos da equipe
|
||||
- Mesclar metadados de execução na inicialização de lote duplicado
|
||||
- Corrigir a serialização de campos de referência de classe Task para checkpointing
|
||||
- Lidar com o resultado BaseModel no loop de retry do guardrail
|
||||
- Preservar thought_signature em chamadas de ferramentas de streaming Gemini
|
||||
- Emitir task_started na retomada do fork e redesenhar TUI de checkpoint
|
||||
- Usar datas futuras em testes de poda de checkpoint para evitar falhas dependentes do tempo
|
||||
- Corrigir a ordem de dry-run e lidar com branch obsoleta verificada na liberação do devtools
|
||||
- Atualizar lxml para >=6.1.0 para patch de segurança
|
||||
- Aumentar python-dotenv para >=1.2.2 para patch de segurança
|
||||
|
||||
### Documentação
|
||||
- Atualizar changelog e versão para v1.14.3
|
||||
- Adicionar página 'Construir com IA' e atualizar navegação para todos os idiomas
|
||||
- Remover FAQ de preços da página construir-com-ia em todos os locais
|
||||
|
||||
### Desempenho
|
||||
- Otimizar MCP SDK e tipos de eventos para reduzir o tempo de inicialização a frio em ~29%
|
||||
|
||||
### Refatoração
|
||||
- Refatorar auxiliares de checkpoint para eliminar duplicação e apertar dicas de tipo de estado
|
||||
|
||||
## Contribuidores
|
||||
|
||||
@MatthiasHowellYopp, @akaKuruma, @alex-clawd, @github-actions[bot], @github-advanced-security[bot], @greysonlalonde, @iris-clawd, @lorenzejay, @mattatcha, @renatonitta
|
||||
|
||||
</Update>
|
||||
|
||||
<Update label="23 abr 2026">
|
||||
## v1.14.3a3
|
||||
|
||||
|
||||
@@ -152,4 +152,4 @@ __all__ = [
|
||||
"wrap_file_source",
|
||||
]
|
||||
|
||||
__version__ = "1.14.3a3"
|
||||
__version__ = "1.14.3"
|
||||
|
||||
@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
|
||||
dependencies = [
|
||||
"pytube~=15.0.0",
|
||||
"requests>=2.33.0,<3",
|
||||
"crewai==1.14.3a3",
|
||||
"crewai==1.14.3",
|
||||
"tiktoken~=0.8.0",
|
||||
"beautifulsoup4~=4.13.4",
|
||||
"python-docx~=1.2.0",
|
||||
|
||||
@@ -321,4 +321,4 @@ __all__ = [
|
||||
"ZapierActionTools",
|
||||
]
|
||||
|
||||
__version__ = "1.14.3a3"
|
||||
__version__ = "1.14.3"
|
||||
|
||||
@@ -55,7 +55,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
|
||||
|
||||
[project.optional-dependencies]
|
||||
tools = [
|
||||
"crewai-tools==1.14.3a3",
|
||||
"crewai-tools==1.14.3",
|
||||
]
|
||||
embeddings = [
|
||||
"tiktoken~=0.8.0"
|
||||
|
||||
@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
|
||||
|
||||
_suppress_pydantic_deprecation_warnings()
|
||||
|
||||
__version__ = "1.14.3a3"
|
||||
__version__ = "1.14.3"
|
||||
|
||||
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
|
||||
"Memory": ("crewai.memory.unified_memory", "Memory"),
|
||||
@@ -94,10 +94,16 @@ try:
|
||||
}
|
||||
|
||||
from crewai.tools.base_tool import BaseTool as _BaseTool
|
||||
from crewai.tools.flow_tool import (
|
||||
FlowTool as _FlowTool,
|
||||
create_flow_tools as _create_flow_tools,
|
||||
)
|
||||
from crewai.tools.structured_tool import CrewStructuredTool as _CrewStructuredTool
|
||||
|
||||
_base_namespace["BaseTool"] = _BaseTool
|
||||
_base_namespace["CrewStructuredTool"] = _CrewStructuredTool
|
||||
_base_namespace["FlowTool"] = _FlowTool
|
||||
_base_namespace["create_flow_tools"] = _create_flow_tools # type: ignore[assignment]
|
||||
|
||||
try:
|
||||
from crewai.a2a.config import (
|
||||
|
||||
@@ -85,6 +85,7 @@ from crewai.skills.loader import activate_skill, discover_skills
|
||||
from crewai.skills.models import INSTRUCTIONS, Skill as SkillModel
|
||||
from crewai.state.checkpoint_config import CheckpointConfig, apply_checkpoint
|
||||
from crewai.tools.agent_tools.agent_tools import AgentTools
|
||||
from crewai.tools.flow_tool import create_flow_tools
|
||||
from crewai.types.callback import SerializableCallable
|
||||
from crewai.utilities.agent_utils import (
|
||||
get_tool_names,
|
||||
@@ -305,6 +306,10 @@ class Agent(BaseAgent):
|
||||
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
|
||||
""",
|
||||
)
|
||||
flows: list[Any] | None = Field(
|
||||
default=None,
|
||||
description="Flow classes that the agent can invoke as tools. Each entry is a Flow subclass (not an instance).",
|
||||
)
|
||||
agent_executor: CrewAgentExecutor | AgentExecutor | None = Field(
|
||||
default=None, description="An instance of the CrewAgentExecutor class."
|
||||
)
|
||||
@@ -347,6 +352,7 @@ class Agent(BaseAgent):
|
||||
)
|
||||
|
||||
self.set_skills()
|
||||
self._set_flow_tools()
|
||||
|
||||
if self.reasoning and self.planning_config is None:
|
||||
warnings.warn(
|
||||
@@ -394,15 +400,17 @@ class Agent(BaseAgent):
|
||||
self,
|
||||
resolved_crew_skills: list[SkillModel] | None = None,
|
||||
) -> None:
|
||||
"""Resolve skill paths and activate skills to INSTRUCTIONS level.
|
||||
"""Resolve skill paths while preserving explicit disclosure levels.
|
||||
|
||||
Path entries trigger discovery and activation. Pre-loaded Skill objects
|
||||
below INSTRUCTIONS level are activated. Crew-level skills are merged in
|
||||
with event emission so observability is consistent regardless of origin.
|
||||
Path entries trigger discovery and activation because directory-based
|
||||
skills opt into eager loading. Pre-loaded Skill objects keep their
|
||||
current disclosure level so callers can attach METADATA-only skills and
|
||||
progressively activate them later. Crew-level skills are merged in with
|
||||
event emission so observability is consistent regardless of origin.
|
||||
|
||||
Args:
|
||||
resolved_crew_skills: Pre-resolved crew skills (already discovered
|
||||
and activated). When provided, avoids redundant discovery per agent.
|
||||
resolved_crew_skills: Pre-resolved crew skills. When provided,
|
||||
avoids redundant discovery per agent.
|
||||
"""
|
||||
from crewai.crew import Crew
|
||||
|
||||
@@ -443,8 +451,7 @@ class Agent(BaseAgent):
|
||||
elif isinstance(item, SkillModel):
|
||||
if item.name not in seen:
|
||||
seen.add(item.name)
|
||||
activated = activate_skill(item, source=self)
|
||||
if activated is item and item.disclosure_level >= INSTRUCTIONS:
|
||||
if item.disclosure_level >= INSTRUCTIONS:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=SkillActivatedEvent(
|
||||
@@ -454,10 +461,20 @@ class Agent(BaseAgent):
|
||||
disclosure_level=item.disclosure_level,
|
||||
),
|
||||
)
|
||||
resolved.append(activated)
|
||||
resolved.append(item)
|
||||
|
||||
self.skills = resolved if resolved else None
|
||||
|
||||
def _set_flow_tools(self) -> None:
|
||||
"""Convert Flow classes in ``self.flows`` to tools and merge them."""
|
||||
if not self.flows:
|
||||
return
|
||||
flow_tools = create_flow_tools(self.flows)
|
||||
if flow_tools:
|
||||
if self.tools is None:
|
||||
self.tools = []
|
||||
self.tools.extend(flow_tools)
|
||||
|
||||
def _is_any_available_memory(self) -> bool:
|
||||
"""Check if unified memory is available (agent or crew)."""
|
||||
if getattr(self, "memory", None):
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.3a3"
|
||||
"crewai[tools]==1.14.3"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
|
||||
authors = [{ name = "Your Name", email = "you@example.com" }]
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.3a3"
|
||||
"crewai[tools]==1.14.3"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"crewai[tools]==1.14.3a3"
|
||||
"crewai[tools]==1.14.3"
|
||||
]
|
||||
|
||||
[tool.crewai]
|
||||
|
||||
@@ -21,6 +21,7 @@ from crewai.events.depends import Depends
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.handler_graph import CircularDependencyError
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.events.types.agent_events import (
|
||||
AgentEvaluationCompletedEvent,
|
||||
@@ -33,6 +34,20 @@ if TYPE_CHECKING:
|
||||
LiteAgentExecutionErrorEvent,
|
||||
LiteAgentExecutionStartedEvent,
|
||||
)
|
||||
from crewai.events.types.checkpoint_events import (
|
||||
CheckpointBaseEvent,
|
||||
CheckpointCompletedEvent,
|
||||
CheckpointFailedEvent,
|
||||
CheckpointForkBaseEvent,
|
||||
CheckpointForkCompletedEvent,
|
||||
CheckpointForkStartedEvent,
|
||||
CheckpointPrunedEvent,
|
||||
CheckpointRestoreBaseEvent,
|
||||
CheckpointRestoreCompletedEvent,
|
||||
CheckpointRestoreFailedEvent,
|
||||
CheckpointRestoreStartedEvent,
|
||||
CheckpointStartedEvent,
|
||||
)
|
||||
from crewai.events.types.crew_events import (
|
||||
CrewKickoffCompletedEvent,
|
||||
CrewKickoffFailedEvent,
|
||||
@@ -141,6 +156,19 @@ _LAZY_EVENT_MAPPING: dict[str, str] = {
|
||||
"LiteAgentExecutionCompletedEvent": "crewai.events.types.agent_events",
|
||||
"LiteAgentExecutionErrorEvent": "crewai.events.types.agent_events",
|
||||
"LiteAgentExecutionStartedEvent": "crewai.events.types.agent_events",
|
||||
# checkpoint_events
|
||||
"CheckpointBaseEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointCompletedEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointFailedEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointForkBaseEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointForkCompletedEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointForkStartedEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointPrunedEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointRestoreBaseEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointRestoreCompletedEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointRestoreFailedEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointRestoreStartedEvent": "crewai.events.types.checkpoint_events",
|
||||
"CheckpointStartedEvent": "crewai.events.types.checkpoint_events",
|
||||
# crew_events
|
||||
"CrewKickoffCompletedEvent": "crewai.events.types.crew_events",
|
||||
"CrewKickoffFailedEvent": "crewai.events.types.crew_events",
|
||||
@@ -265,6 +293,18 @@ __all__ = [
|
||||
"AgentReasoningFailedEvent",
|
||||
"AgentReasoningStartedEvent",
|
||||
"BaseEventListener",
|
||||
"CheckpointBaseEvent",
|
||||
"CheckpointCompletedEvent",
|
||||
"CheckpointFailedEvent",
|
||||
"CheckpointForkBaseEvent",
|
||||
"CheckpointForkCompletedEvent",
|
||||
"CheckpointForkStartedEvent",
|
||||
"CheckpointPrunedEvent",
|
||||
"CheckpointRestoreBaseEvent",
|
||||
"CheckpointRestoreCompletedEvent",
|
||||
"CheckpointRestoreFailedEvent",
|
||||
"CheckpointRestoreStartedEvent",
|
||||
"CheckpointStartedEvent",
|
||||
"CircularDependencyError",
|
||||
"CrewKickoffCompletedEvent",
|
||||
"CrewKickoffFailedEvent",
|
||||
|
||||
@@ -64,6 +64,22 @@ P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
|
||||
|
||||
_replaying: contextvars.ContextVar[bool] = contextvars.ContextVar(
|
||||
"crewai_event_replaying", default=False
|
||||
)
|
||||
|
||||
|
||||
def is_replaying() -> bool:
|
||||
"""Return True if the current context is dispatching a replayed event.
|
||||
|
||||
Listeners with side effects (checkpoint writes, external API calls that
|
||||
should not be repeated) should early-return when this is true. Listeners
|
||||
whose purpose is reconstructing timeline state (trace batch, console
|
||||
formatter) should ignore the flag and process replayed events normally.
|
||||
"""
|
||||
return _replaying.get()
|
||||
|
||||
|
||||
class CrewAIEventsBus:
|
||||
"""Singleton event bus for handling events in CrewAI.
|
||||
|
||||
@@ -261,6 +277,11 @@ class CrewAIEventsBus:
|
||||
self._runtime_state = state
|
||||
self._registered_entity_ids = {id(e) for e in state.root}
|
||||
|
||||
@property
|
||||
def runtime_state(self) -> RuntimeState | None:
|
||||
"""The RuntimeState currently attached to the bus, if any."""
|
||||
return self._runtime_state
|
||||
|
||||
def register_entity(self, entity: Any) -> None:
|
||||
"""Add an entity to the RuntimeState, creating it if needed.
|
||||
|
||||
@@ -568,6 +589,87 @@ class CrewAIEventsBus:
|
||||
|
||||
return None
|
||||
|
||||
async def _acall_handlers_replaying(
|
||||
self,
|
||||
source: Any,
|
||||
event: BaseEvent,
|
||||
handlers: AsyncHandlerSet,
|
||||
) -> None:
|
||||
"""Call async handlers with the replaying flag set on the loop thread."""
|
||||
token = _replaying.set(True)
|
||||
try:
|
||||
await self._acall_handlers(source, event, handlers)
|
||||
finally:
|
||||
_replaying.reset(token)
|
||||
|
||||
async def _emit_with_dependencies_replaying(
|
||||
self, source: Any, event: BaseEvent
|
||||
) -> None:
|
||||
"""Dependency-aware dispatch with the replaying flag set."""
|
||||
token = _replaying.set(True)
|
||||
try:
|
||||
await self._emit_with_dependencies(source, event)
|
||||
finally:
|
||||
_replaying.reset(token)
|
||||
|
||||
def replay(self, source: Any, event: BaseEvent) -> Future[None] | None:
|
||||
"""Dispatch a previously-recorded event without mutating its fields.
|
||||
|
||||
Unlike :meth:`emit`, this does not run ``_prepare_event`` (so stored
|
||||
event ids and ``emission_sequence`` are preserved) and does not
|
||||
re-record the event. Listeners can call :func:`is_replaying` to
|
||||
opt out of side-effectful processing.
|
||||
|
||||
Args:
|
||||
source: The emitting object.
|
||||
event: The previously-recorded event to dispatch.
|
||||
|
||||
Returns:
|
||||
Future that completes when handlers finish, or None if no handlers.
|
||||
"""
|
||||
event_type = type(event)
|
||||
|
||||
with self._rwlock.r_locked():
|
||||
if self._shutting_down:
|
||||
return None
|
||||
has_dependencies = event_type in self._handler_dependencies
|
||||
sync_handlers = self._sync_handlers.get(event_type, frozenset())
|
||||
async_handlers = self._async_handlers.get(event_type, frozenset())
|
||||
|
||||
if not sync_handlers and not async_handlers:
|
||||
return None
|
||||
|
||||
self._ensure_executor_initialized()
|
||||
self._has_pending_events = True
|
||||
|
||||
token = _replaying.set(True)
|
||||
try:
|
||||
if has_dependencies:
|
||||
return self._track_future(
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._emit_with_dependencies_replaying(source, event),
|
||||
self._loop,
|
||||
)
|
||||
)
|
||||
|
||||
if sync_handlers:
|
||||
ctx = contextvars.copy_context()
|
||||
sync_future = self._sync_executor.submit(
|
||||
ctx.run, self._call_handlers, source, event, sync_handlers
|
||||
)
|
||||
self._track_future(sync_future)
|
||||
if not async_handlers:
|
||||
return sync_future
|
||||
|
||||
return self._track_future(
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._acall_handlers_replaying(source, event, async_handlers),
|
||||
self._loop,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
_replaying.reset(token)
|
||||
|
||||
def flush(self, timeout: float | None = 30.0) -> bool:
|
||||
"""Block until all pending event handlers complete.
|
||||
|
||||
|
||||
@@ -30,6 +30,17 @@ from crewai.events.types.agent_events import (
|
||||
AgentExecutionStartedEvent,
|
||||
LiteAgentExecutionCompletedEvent,
|
||||
)
|
||||
from crewai.events.types.checkpoint_events import (
|
||||
CheckpointCompletedEvent,
|
||||
CheckpointFailedEvent,
|
||||
CheckpointForkCompletedEvent,
|
||||
CheckpointForkStartedEvent,
|
||||
CheckpointPrunedEvent,
|
||||
CheckpointRestoreCompletedEvent,
|
||||
CheckpointRestoreFailedEvent,
|
||||
CheckpointRestoreStartedEvent,
|
||||
CheckpointStartedEvent,
|
||||
)
|
||||
from crewai.events.types.crew_events import (
|
||||
CrewKickoffCompletedEvent,
|
||||
CrewKickoffFailedEvent,
|
||||
@@ -183,4 +194,13 @@ EventTypes = (
|
||||
| MCPToolExecutionCompletedEvent
|
||||
| MCPToolExecutionFailedEvent
|
||||
| MCPConfigFetchFailedEvent
|
||||
| CheckpointStartedEvent
|
||||
| CheckpointCompletedEvent
|
||||
| CheckpointFailedEvent
|
||||
| CheckpointForkStartedEvent
|
||||
| CheckpointForkCompletedEvent
|
||||
| CheckpointRestoreStartedEvent
|
||||
| CheckpointRestoreCompletedEvent
|
||||
| CheckpointRestoreFailedEvent
|
||||
| CheckpointPrunedEvent
|
||||
)
|
||||
|
||||
97
lib/crewai/src/crewai/events/types/checkpoint_events.py
Normal file
97
lib/crewai/src/crewai/events/types/checkpoint_events.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""Event family for automatic state checkpointing and forking."""
|
||||
|
||||
from typing import Literal
|
||||
|
||||
from crewai.events.base_events import BaseEvent
|
||||
|
||||
|
||||
class CheckpointBaseEvent(BaseEvent):
|
||||
"""Base event for checkpoint lifecycle operations."""
|
||||
|
||||
type: str
|
||||
location: str
|
||||
provider: str
|
||||
trigger: str | None = None
|
||||
branch: str | None = None
|
||||
parent_id: str | None = None
|
||||
|
||||
|
||||
class CheckpointStartedEvent(CheckpointBaseEvent):
|
||||
"""Event emitted immediately before a checkpoint is written."""
|
||||
|
||||
type: Literal["checkpoint_started"] = "checkpoint_started"
|
||||
|
||||
|
||||
class CheckpointCompletedEvent(CheckpointBaseEvent):
|
||||
"""Event emitted when a checkpoint has been written successfully."""
|
||||
|
||||
type: Literal["checkpoint_completed"] = "checkpoint_completed"
|
||||
checkpoint_id: str
|
||||
duration_ms: float
|
||||
|
||||
|
||||
class CheckpointFailedEvent(CheckpointBaseEvent):
|
||||
"""Event emitted when a checkpoint write fails."""
|
||||
|
||||
type: Literal["checkpoint_failed"] = "checkpoint_failed"
|
||||
error: str
|
||||
|
||||
|
||||
class CheckpointPrunedEvent(CheckpointBaseEvent):
|
||||
"""Event emitted after pruning old checkpoints from a branch."""
|
||||
|
||||
type: Literal["checkpoint_pruned"] = "checkpoint_pruned"
|
||||
removed_count: int
|
||||
max_checkpoints: int
|
||||
|
||||
|
||||
class CheckpointForkBaseEvent(BaseEvent):
|
||||
"""Base event for fork lifecycle operations on a RuntimeState."""
|
||||
|
||||
type: str
|
||||
branch: str
|
||||
parent_branch: str | None = None
|
||||
parent_checkpoint_id: str | None = None
|
||||
|
||||
|
||||
class CheckpointForkStartedEvent(CheckpointForkBaseEvent):
|
||||
"""Event emitted immediately before a fork relabels the branch."""
|
||||
|
||||
type: Literal["checkpoint_fork_started"] = "checkpoint_fork_started"
|
||||
|
||||
|
||||
class CheckpointForkCompletedEvent(CheckpointForkBaseEvent):
|
||||
"""Event emitted after a fork has established the new branch."""
|
||||
|
||||
type: Literal["checkpoint_fork_completed"] = "checkpoint_fork_completed"
|
||||
|
||||
|
||||
class CheckpointRestoreBaseEvent(BaseEvent):
|
||||
"""Base event for checkpoint restore lifecycle operations."""
|
||||
|
||||
type: str
|
||||
location: str
|
||||
provider: str | None = None
|
||||
|
||||
|
||||
class CheckpointRestoreStartedEvent(CheckpointRestoreBaseEvent):
|
||||
"""Event emitted immediately before a checkpoint restore begins."""
|
||||
|
||||
type: Literal["checkpoint_restore_started"] = "checkpoint_restore_started"
|
||||
|
||||
|
||||
class CheckpointRestoreCompletedEvent(CheckpointRestoreBaseEvent):
|
||||
"""Event emitted when a checkpoint has been restored successfully."""
|
||||
|
||||
type: Literal["checkpoint_restore_completed"] = "checkpoint_restore_completed"
|
||||
checkpoint_id: str
|
||||
branch: str | None = None
|
||||
parent_id: str | None = None
|
||||
duration_ms: float
|
||||
|
||||
|
||||
class CheckpointRestoreFailedEvent(CheckpointRestoreBaseEvent):
|
||||
"""Event emitted when a checkpoint restore fails."""
|
||||
|
||||
type: Literal["checkpoint_restore_failed"] = "checkpoint_restore_failed"
|
||||
error: str
|
||||
@@ -45,6 +45,7 @@ from pydantic import (
|
||||
BeforeValidator,
|
||||
ConfigDict,
|
||||
Field,
|
||||
PlainSerializer,
|
||||
PrivateAttr,
|
||||
SerializeAsAny,
|
||||
ValidationError,
|
||||
@@ -58,6 +59,7 @@ from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.event_context import (
|
||||
get_current_parent_id,
|
||||
reset_last_event_id,
|
||||
restore_event_scope,
|
||||
triggered_by_scope,
|
||||
)
|
||||
from crewai.events.listeners.tracing.trace_listener import (
|
||||
@@ -157,6 +159,37 @@ def _resolve_persistence(value: Any) -> Any:
|
||||
return value
|
||||
|
||||
|
||||
_INITIAL_STATE_CLASS_MARKER = "__crewai_pydantic_class_schema__"
|
||||
|
||||
|
||||
def _serialize_initial_state(value: Any) -> Any:
|
||||
"""Make ``initial_state`` safe for JSON checkpoint serialization.
|
||||
|
||||
``BaseModel`` class refs are emitted as their JSON schema under a sentinel
|
||||
marker key so deserialization can round-trip them back to a class.
|
||||
``BaseModel`` instances are dumped to JSON (round-trip as plain dicts,
|
||||
which ``_create_initial_state`` accepts). Bare ``type`` values that are
|
||||
not ``BaseModel`` subclasses (e.g. ``dict``) are dropped since they
|
||||
can't be represented in JSON.
|
||||
"""
|
||||
if isinstance(value, type):
|
||||
if issubclass(value, BaseModel):
|
||||
return {_INITIAL_STATE_CLASS_MARKER: value.model_json_schema()}
|
||||
return None
|
||||
if isinstance(value, BaseModel):
|
||||
return value.model_dump(mode="json")
|
||||
return value
|
||||
|
||||
|
||||
def _deserialize_initial_state(value: Any) -> Any:
|
||||
"""Rehydrate a class ref serialized by :func:`_serialize_initial_state`."""
|
||||
if isinstance(value, dict) and _INITIAL_STATE_CLASS_MARKER in value:
|
||||
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
|
||||
|
||||
return create_model_from_schema(value[_INITIAL_STATE_CLASS_MARKER])
|
||||
return value
|
||||
|
||||
|
||||
class FlowState(BaseModel):
|
||||
"""Base model for all flow states, ensuring each state has a unique ID."""
|
||||
|
||||
@@ -908,7 +941,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
|
||||
entity_type: Literal["flow"] = "flow"
|
||||
|
||||
initial_state: Any = Field(default=None)
|
||||
initial_state: Annotated[ # type: ignore[type-arg]
|
||||
type[BaseModel] | type[dict] | dict[str, Any] | BaseModel | None,
|
||||
BeforeValidator(_deserialize_initial_state),
|
||||
PlainSerializer(_serialize_initial_state, return_type=Any, when_used="json"),
|
||||
] = Field(default=None)
|
||||
name: str | None = Field(default=None)
|
||||
tracing: bool | None = Field(default=None)
|
||||
stream: bool = Field(default=False)
|
||||
@@ -980,13 +1017,18 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
A Flow instance on the new branch. Call kickoff() to run.
|
||||
"""
|
||||
flow = cls.from_checkpoint(config)
|
||||
state = crewai_event_bus._runtime_state
|
||||
state = crewai_event_bus.runtime_state
|
||||
if state is None:
|
||||
raise RuntimeError(
|
||||
"Cannot fork: no runtime state on the event bus. "
|
||||
"Ensure from_checkpoint() succeeded before calling fork()."
|
||||
)
|
||||
state.fork(branch)
|
||||
new_id = str(uuid4())
|
||||
if isinstance(flow._state, dict):
|
||||
flow._state["id"] = new_id
|
||||
else:
|
||||
object.__setattr__(flow._state, "id", new_id)
|
||||
return flow
|
||||
|
||||
checkpoint_completed_methods: set[str] | None = Field(default=None)
|
||||
@@ -1008,6 +1050,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
}
|
||||
if self.checkpoint_state is not None:
|
||||
self._restore_state(self.checkpoint_state)
|
||||
restore_event_scope(())
|
||||
reset_last_event_id()
|
||||
|
||||
_methods: dict[FlowMethodName, FlowMethod[Any, Any]] = PrivateAttr(
|
||||
default_factory=dict
|
||||
@@ -1030,6 +1074,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
|
||||
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
|
||||
_state: Any = PrivateAttr(default=None)
|
||||
_execution_id: str = PrivateAttr(default_factory=lambda: str(uuid4()))
|
||||
|
||||
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
|
||||
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
|
||||
@@ -1820,6 +1865,27 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
except (AttributeError, TypeError):
|
||||
return "" # Safely handle any unexpected attribute access issues
|
||||
|
||||
@property
|
||||
def execution_id(self) -> str:
|
||||
"""Stable identifier for this flow execution.
|
||||
|
||||
Separate from ``flow_id`` / ``state.id``, which consumers may
|
||||
override via ``kickoff(inputs={"id": ...})`` to resume a persisted
|
||||
flow. ``execution_id`` is never affected by ``inputs`` and stays
|
||||
stable for the lifetime of a single run, so it is the correct key
|
||||
for telemetry, tracing, and any external correlation that must
|
||||
uniquely identify a single execution even when callers pass an
|
||||
``id`` in ``inputs``.
|
||||
|
||||
Defaults to a fresh ``uuid4`` per ``Flow`` instance; assign to
|
||||
override when an outer system already has an execution identity.
|
||||
"""
|
||||
return self._execution_id
|
||||
|
||||
@execution_id.setter
|
||||
def execution_id(self, value: str) -> None:
|
||||
self._execution_id = value
|
||||
|
||||
def _initialize_state(self, inputs: dict[str, Any]) -> None:
|
||||
"""Initialize or update flow state with new inputs.
|
||||
|
||||
@@ -2133,9 +2199,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
flow_id_token = None
|
||||
request_id_token = None
|
||||
if current_flow_id.get() is None:
|
||||
flow_id_token = current_flow_id.set(self.flow_id)
|
||||
flow_id_token = current_flow_id.set(self.execution_id)
|
||||
if current_flow_request_id.get() is None:
|
||||
request_id_token = current_flow_request_id.set(self.flow_id)
|
||||
request_id_token = current_flow_request_id.set(self.execution_id)
|
||||
|
||||
try:
|
||||
# Reset flow state for fresh execution unless restoring from persistence
|
||||
@@ -2214,6 +2280,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
if inputs is not None and "id" not in inputs:
|
||||
self._initialize_state(inputs)
|
||||
|
||||
if self._is_execution_resuming:
|
||||
await self._replay_recorded_events()
|
||||
|
||||
try:
|
||||
# Determine which start methods to execute at kickoff
|
||||
# Conditional start methods (with __trigger_methods__) are only triggered by their conditions
|
||||
@@ -2361,6 +2430,44 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
"""
|
||||
return await self.kickoff_async(inputs, input_files, from_checkpoint)
|
||||
|
||||
async def _replay_recorded_events(self) -> None:
|
||||
"""Dispatch recorded ``MethodExecution*`` events from the event record."""
|
||||
state = crewai_event_bus.runtime_state
|
||||
if state is None:
|
||||
return
|
||||
record = state.event_record
|
||||
if len(record) == 0:
|
||||
return
|
||||
|
||||
replayable = (
|
||||
MethodExecutionStartedEvent,
|
||||
MethodExecutionFinishedEvent,
|
||||
MethodExecutionFailedEvent,
|
||||
)
|
||||
flow_name = self.name or self.__class__.__name__
|
||||
nodes = sorted(
|
||||
(
|
||||
n
|
||||
for n in record.all_nodes()
|
||||
if isinstance(n.event, replayable)
|
||||
and n.event.flow_name == flow_name
|
||||
and n.event.method_name in self._completed_methods
|
||||
),
|
||||
key=lambda n: n.event.emission_sequence or 0,
|
||||
)
|
||||
|
||||
for node in nodes:
|
||||
future = crewai_event_bus.replay(self, node.event)
|
||||
if future is not None:
|
||||
try:
|
||||
await asyncio.wrap_future(future)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Replayed event handler failed: %s",
|
||||
node.event.type,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
async def _execute_start_method(self, start_method_name: FlowMethodName) -> None:
|
||||
"""Executes a flow's start method and its triggered listeners.
|
||||
|
||||
|
||||
@@ -10,12 +10,22 @@ from __future__ import annotations
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.crew import Crew
|
||||
from crewai.events.base_events import BaseEvent
|
||||
from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus
|
||||
from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus, is_replaying
|
||||
from crewai.events.types.checkpoint_events import (
|
||||
CheckpointBaseEvent,
|
||||
CheckpointCompletedEvent,
|
||||
CheckpointFailedEvent,
|
||||
CheckpointForkBaseEvent,
|
||||
CheckpointPrunedEvent,
|
||||
CheckpointRestoreBaseEvent,
|
||||
CheckpointStartedEvent,
|
||||
)
|
||||
from crewai.flow.flow import Flow
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.state.runtime import RuntimeState, _prepare_entities
|
||||
@@ -53,12 +63,26 @@ def _resolve(value: CheckpointConfig | bool | None) -> CheckpointConfig | None |
|
||||
if isinstance(value, CheckpointConfig):
|
||||
_ensure_handlers_registered()
|
||||
return value
|
||||
if value is True:
|
||||
if value:
|
||||
_ensure_handlers_registered()
|
||||
return CheckpointConfig()
|
||||
if value is False:
|
||||
return _SENTINEL
|
||||
return None # None = inherit
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_from_agent(agent: BaseAgent) -> CheckpointConfig | None:
|
||||
"""Resolve a checkpoint config starting from an agent, walking to its crew."""
|
||||
result = _resolve(agent.checkpoint)
|
||||
if isinstance(result, CheckpointConfig):
|
||||
return result
|
||||
if result is _SENTINEL:
|
||||
return None
|
||||
crew = agent.crew
|
||||
if isinstance(crew, Crew):
|
||||
crew_result = _resolve(crew.checkpoint)
|
||||
return crew_result if isinstance(crew_result, CheckpointConfig) else None
|
||||
return None
|
||||
|
||||
|
||||
def _find_checkpoint(source: Any) -> CheckpointConfig | None:
|
||||
@@ -77,28 +101,11 @@ def _find_checkpoint(source: Any) -> CheckpointConfig | None:
|
||||
result = _resolve(source.checkpoint)
|
||||
return result if isinstance(result, CheckpointConfig) else None
|
||||
if isinstance(source, BaseAgent):
|
||||
result = _resolve(source.checkpoint)
|
||||
if isinstance(result, CheckpointConfig):
|
||||
return result
|
||||
if result is _SENTINEL:
|
||||
return None
|
||||
crew = source.crew
|
||||
if isinstance(crew, Crew):
|
||||
result = _resolve(crew.checkpoint)
|
||||
return result if isinstance(result, CheckpointConfig) else None
|
||||
return None
|
||||
return _resolve_from_agent(source)
|
||||
if isinstance(source, Task):
|
||||
agent = source.agent
|
||||
if isinstance(agent, BaseAgent):
|
||||
result = _resolve(agent.checkpoint)
|
||||
if isinstance(result, CheckpointConfig):
|
||||
return result
|
||||
if result is _SENTINEL:
|
||||
return None
|
||||
crew = agent.crew
|
||||
if isinstance(crew, Crew):
|
||||
result = _resolve(crew.checkpoint)
|
||||
return result if isinstance(result, CheckpointConfig) else None
|
||||
return _resolve_from_agent(agent)
|
||||
return None
|
||||
return None
|
||||
|
||||
@@ -107,27 +114,106 @@ def _do_checkpoint(
|
||||
state: RuntimeState, cfg: CheckpointConfig, event: BaseEvent | None = None
|
||||
) -> None:
|
||||
"""Write a checkpoint and prune old ones if configured."""
|
||||
_prepare_entities(state.root)
|
||||
payload = state.model_dump(mode="json")
|
||||
if event is not None:
|
||||
payload["trigger"] = event.type
|
||||
data = json.dumps(payload)
|
||||
location = cfg.provider.checkpoint(
|
||||
data,
|
||||
cfg.location,
|
||||
parent_id=state._parent_id,
|
||||
branch=state._branch,
|
||||
)
|
||||
state._chain_lineage(cfg.provider, location)
|
||||
provider_name: str = type(cfg.provider).__name__
|
||||
trigger: str | None = event.type if event is not None else None
|
||||
context: dict[str, Any] = {
|
||||
"task_id": event.task_id if event is not None else None,
|
||||
"task_name": event.task_name if event is not None else None,
|
||||
"agent_id": event.agent_id if event is not None else None,
|
||||
"agent_role": event.agent_role if event is not None else None,
|
||||
}
|
||||
|
||||
checkpoint_id: str = cfg.provider.extract_id(location)
|
||||
parent_id_snapshot: str | None = state._parent_id
|
||||
branch_snapshot: str = state._branch
|
||||
|
||||
crewai_event_bus.emit(
|
||||
cfg,
|
||||
CheckpointStartedEvent(
|
||||
location=cfg.location,
|
||||
provider=provider_name,
|
||||
trigger=trigger,
|
||||
branch=branch_snapshot,
|
||||
parent_id=parent_id_snapshot,
|
||||
**context,
|
||||
),
|
||||
)
|
||||
|
||||
start: float = time.perf_counter()
|
||||
try:
|
||||
_prepare_entities(state.root)
|
||||
payload = state.model_dump(mode="json")
|
||||
if event is not None:
|
||||
payload["trigger"] = event.type
|
||||
data = json.dumps(payload)
|
||||
location = cfg.provider.checkpoint(
|
||||
data,
|
||||
cfg.location,
|
||||
parent_id=parent_id_snapshot,
|
||||
branch=branch_snapshot,
|
||||
)
|
||||
state._chain_lineage(cfg.provider, location)
|
||||
checkpoint_id: str = cfg.provider.extract_id(location)
|
||||
except Exception as exc:
|
||||
crewai_event_bus.emit(
|
||||
cfg,
|
||||
CheckpointFailedEvent(
|
||||
location=cfg.location,
|
||||
provider=provider_name,
|
||||
trigger=trigger,
|
||||
branch=branch_snapshot,
|
||||
parent_id=parent_id_snapshot,
|
||||
error=str(exc),
|
||||
**context,
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
duration_ms: float = (time.perf_counter() - start) * 1000.0
|
||||
msg: str = (
|
||||
f"Checkpoint saved. Resume with: crewai checkpoint resume {checkpoint_id}"
|
||||
)
|
||||
logger.info(msg)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
cfg,
|
||||
CheckpointCompletedEvent(
|
||||
location=location,
|
||||
provider=provider_name,
|
||||
trigger=trigger,
|
||||
branch=branch_snapshot,
|
||||
parent_id=parent_id_snapshot,
|
||||
checkpoint_id=checkpoint_id,
|
||||
duration_ms=duration_ms,
|
||||
**context,
|
||||
),
|
||||
)
|
||||
|
||||
if cfg.max_checkpoints is not None:
|
||||
cfg.provider.prune(cfg.location, cfg.max_checkpoints, branch=state._branch)
|
||||
try:
|
||||
removed_count: int = cfg.provider.prune(
|
||||
cfg.location, cfg.max_checkpoints, branch=branch_snapshot
|
||||
)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Checkpoint prune failed for %s (branch=%s)",
|
||||
cfg.location,
|
||||
branch_snapshot,
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
crewai_event_bus.emit(
|
||||
cfg,
|
||||
CheckpointPrunedEvent(
|
||||
location=cfg.location,
|
||||
provider=provider_name,
|
||||
trigger=trigger,
|
||||
branch=branch_snapshot,
|
||||
parent_id=parent_id_snapshot,
|
||||
removed_count=removed_count,
|
||||
max_checkpoints=cfg.max_checkpoints,
|
||||
**context,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _should_checkpoint(source: Any, event: BaseEvent) -> CheckpointConfig | None:
|
||||
@@ -142,6 +228,13 @@ def _should_checkpoint(source: Any, event: BaseEvent) -> CheckpointConfig | None
|
||||
|
||||
def _on_any_event(source: Any, event: BaseEvent, state: Any) -> None:
|
||||
"""Sync handler registered on every event class."""
|
||||
if is_replaying():
|
||||
return
|
||||
if isinstance(
|
||||
event,
|
||||
(CheckpointBaseEvent, CheckpointForkBaseEvent, CheckpointRestoreBaseEvent),
|
||||
):
|
||||
return
|
||||
cfg = _should_checkpoint(source, event)
|
||||
if cfg is None:
|
||||
return
|
||||
@@ -161,7 +254,8 @@ def _register_all_handlers(event_bus: CrewAIEventsBus) -> None:
|
||||
seen: set[type] = set()
|
||||
|
||||
def _collect(cls: type[BaseEvent]) -> None:
|
||||
for sub in cls.__subclasses__():
|
||||
subclasses: list[type[BaseEvent]] = cls.__subclasses__()
|
||||
for sub in subclasses:
|
||||
if sub not in seen:
|
||||
seen.add(sub)
|
||||
type_field = sub.model_fields.get("type")
|
||||
|
||||
@@ -39,7 +39,8 @@ def _build_event_type_map() -> None:
|
||||
"""Populate _event_type_map from all BaseEvent subclasses."""
|
||||
|
||||
def _collect(cls: type[BaseEvent]) -> None:
|
||||
for sub in cls.__subclasses__():
|
||||
subclasses: list[type[BaseEvent]] = cls.__subclasses__()
|
||||
for sub in subclasses:
|
||||
type_field = sub.model_fields.get("type")
|
||||
if type_field and type_field.default:
|
||||
_event_type_map[type_field.default] = sub
|
||||
@@ -196,6 +197,21 @@ class EventRecord(BaseModel):
|
||||
node for node in self.nodes.values() if not node.neighbors("parent")
|
||||
]
|
||||
|
||||
def all_nodes(self) -> list[EventNode]:
|
||||
"""Return a snapshot of every node under the read lock.
|
||||
|
||||
Returns:
|
||||
A list copy of the current nodes, safe to iterate without holding
|
||||
the lock.
|
||||
"""
|
||||
with self._lock.r_locked():
|
||||
return list(self.nodes.values())
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Remove all nodes from the record under the write lock."""
|
||||
with self._lock.w_locked():
|
||||
self.nodes.clear()
|
||||
|
||||
def __len__(self) -> int:
|
||||
with self._lock.r_locked():
|
||||
return len(self.nodes)
|
||||
|
||||
@@ -61,13 +61,16 @@ class BaseProvider(BaseModel, ABC):
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None:
|
||||
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> int:
|
||||
"""Remove old checkpoints, keeping at most *max_keep* per branch.
|
||||
|
||||
Args:
|
||||
location: The storage destination passed to ``checkpoint``.
|
||||
max_keep: Maximum number of checkpoints to retain.
|
||||
branch: Only prune checkpoints on this branch.
|
||||
|
||||
Returns:
|
||||
The number of checkpoints removed.
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
@@ -95,17 +95,20 @@ class JsonProvider(BaseProvider):
|
||||
await f.write(data)
|
||||
return str(file_path)
|
||||
|
||||
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None:
|
||||
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> int:
|
||||
"""Remove oldest checkpoint files beyond *max_keep* on a branch."""
|
||||
_safe_branch(location, branch)
|
||||
branch_dir = os.path.join(location, branch)
|
||||
pattern = os.path.join(branch_dir, "*.json")
|
||||
files = sorted(glob.glob(pattern), key=os.path.getmtime)
|
||||
removed = 0
|
||||
for path in files if max_keep == 0 else files[:-max_keep]:
|
||||
try:
|
||||
os.remove(path)
|
||||
removed += 1
|
||||
except OSError: # noqa: PERF203
|
||||
logger.debug("Failed to remove %s", path, exc_info=True)
|
||||
return removed
|
||||
|
||||
def extract_id(self, location: str) -> str:
|
||||
"""Extract the checkpoint ID from a file path.
|
||||
|
||||
@@ -111,11 +111,13 @@ class SqliteProvider(BaseProvider):
|
||||
await db.commit()
|
||||
return f"{location}#{checkpoint_id}"
|
||||
|
||||
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> None:
|
||||
def prune(self, location: str, max_keep: int, *, branch: str = "main") -> int:
|
||||
"""Remove oldest checkpoint rows beyond *max_keep* on a branch."""
|
||||
with sqlite3.connect(location) as conn:
|
||||
conn.execute(_PRUNE, (branch, branch, max_keep))
|
||||
cursor = conn.execute(_PRUNE, (branch, branch, max_keep))
|
||||
removed: int = cursor.rowcount
|
||||
conn.commit()
|
||||
return max(removed, 0)
|
||||
|
||||
def extract_id(self, location: str) -> str:
|
||||
"""Extract the checkpoint ID from a ``db_path#id`` string."""
|
||||
|
||||
@@ -10,6 +10,7 @@ via ``RuntimeState.model_rebuild()``.
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any
|
||||
import uuid
|
||||
|
||||
@@ -23,6 +24,17 @@ from pydantic import (
|
||||
)
|
||||
|
||||
from crewai.context import capture_execution_context
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.checkpoint_events import (
|
||||
CheckpointCompletedEvent,
|
||||
CheckpointFailedEvent,
|
||||
CheckpointForkCompletedEvent,
|
||||
CheckpointForkStartedEvent,
|
||||
CheckpointRestoreCompletedEvent,
|
||||
CheckpointRestoreFailedEvent,
|
||||
CheckpointRestoreStartedEvent,
|
||||
CheckpointStartedEvent,
|
||||
)
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.state.event_record import EventRecord
|
||||
from crewai.state.provider.core import BaseProvider
|
||||
@@ -89,7 +101,7 @@ def _migrate(data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
raw = data.get("crewai_version")
|
||||
current = Version(get_crewai_version())
|
||||
stored = Version(raw) if raw else Version("0.0.0")
|
||||
stored = Version(raw) if isinstance(raw, str) and raw else Version("0.0.0")
|
||||
|
||||
if raw is None:
|
||||
logger.warning("Checkpoint has no crewai_version — treating as 0.0.0")
|
||||
@@ -159,6 +171,63 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
self._checkpoint_id = provider.extract_id(location)
|
||||
self._parent_id = self._checkpoint_id
|
||||
|
||||
def _begin_checkpoint(self, location: str) -> tuple[str, str | None, str, float]:
|
||||
"""Emit the start event and return the invariant context for a checkpoint."""
|
||||
provider_name: str = type(self._provider).__name__
|
||||
parent_id_snapshot: str | None = self._parent_id
|
||||
branch_snapshot: str = self._branch
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
CheckpointStartedEvent(
|
||||
location=location,
|
||||
provider=provider_name,
|
||||
branch=branch_snapshot,
|
||||
parent_id=parent_id_snapshot,
|
||||
),
|
||||
)
|
||||
return provider_name, parent_id_snapshot, branch_snapshot, time.perf_counter()
|
||||
|
||||
def _emit_checkpoint_failed(
|
||||
self,
|
||||
location: str,
|
||||
provider_name: str,
|
||||
branch_snapshot: str,
|
||||
parent_id_snapshot: str | None,
|
||||
exc: Exception,
|
||||
) -> None:
|
||||
"""Emit the failure event for a checkpoint write."""
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
CheckpointFailedEvent(
|
||||
location=location,
|
||||
provider=provider_name,
|
||||
branch=branch_snapshot,
|
||||
parent_id=parent_id_snapshot,
|
||||
error=str(exc),
|
||||
),
|
||||
)
|
||||
|
||||
def _emit_checkpoint_completed(
|
||||
self,
|
||||
result: str,
|
||||
provider_name: str,
|
||||
branch_snapshot: str,
|
||||
parent_id_snapshot: str | None,
|
||||
start: float,
|
||||
) -> None:
|
||||
"""Emit the completion event for a successful checkpoint write."""
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
CheckpointCompletedEvent(
|
||||
location=result,
|
||||
provider=provider_name,
|
||||
branch=branch_snapshot,
|
||||
parent_id=parent_id_snapshot,
|
||||
checkpoint_id=self._provider.extract_id(result),
|
||||
duration_ms=(time.perf_counter() - start) * 1000.0,
|
||||
),
|
||||
)
|
||||
|
||||
def checkpoint(self, location: str) -> str:
|
||||
"""Write a checkpoint.
|
||||
|
||||
@@ -169,14 +238,27 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
Returns:
|
||||
A location identifier for the saved checkpoint.
|
||||
"""
|
||||
_prepare_entities(self.root)
|
||||
result = self._provider.checkpoint(
|
||||
self.model_dump_json(),
|
||||
location,
|
||||
parent_id=self._parent_id,
|
||||
branch=self._branch,
|
||||
provider_name, parent_id_snapshot, branch_snapshot, start = (
|
||||
self._begin_checkpoint(location)
|
||||
)
|
||||
try:
|
||||
_prepare_entities(self.root)
|
||||
result = self._provider.checkpoint(
|
||||
self.model_dump_json(),
|
||||
location,
|
||||
parent_id=parent_id_snapshot,
|
||||
branch=branch_snapshot,
|
||||
)
|
||||
self._chain_lineage(self._provider, result)
|
||||
except Exception as exc:
|
||||
self._emit_checkpoint_failed(
|
||||
location, provider_name, branch_snapshot, parent_id_snapshot, exc
|
||||
)
|
||||
raise
|
||||
|
||||
self._emit_checkpoint_completed(
|
||||
result, provider_name, branch_snapshot, parent_id_snapshot, start
|
||||
)
|
||||
self._chain_lineage(self._provider, result)
|
||||
return result
|
||||
|
||||
async def acheckpoint(self, location: str) -> str:
|
||||
@@ -189,14 +271,27 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
Returns:
|
||||
A location identifier for the saved checkpoint.
|
||||
"""
|
||||
_prepare_entities(self.root)
|
||||
result = await self._provider.acheckpoint(
|
||||
self.model_dump_json(),
|
||||
location,
|
||||
parent_id=self._parent_id,
|
||||
branch=self._branch,
|
||||
provider_name, parent_id_snapshot, branch_snapshot, start = (
|
||||
self._begin_checkpoint(location)
|
||||
)
|
||||
try:
|
||||
_prepare_entities(self.root)
|
||||
result = await self._provider.acheckpoint(
|
||||
self.model_dump_json(),
|
||||
location,
|
||||
parent_id=parent_id_snapshot,
|
||||
branch=branch_snapshot,
|
||||
)
|
||||
self._chain_lineage(self._provider, result)
|
||||
except Exception as exc:
|
||||
self._emit_checkpoint_failed(
|
||||
location, provider_name, branch_snapshot, parent_id_snapshot, exc
|
||||
)
|
||||
raise
|
||||
|
||||
self._emit_checkpoint_completed(
|
||||
result, provider_name, branch_snapshot, parent_id_snapshot, start
|
||||
)
|
||||
self._chain_lineage(self._provider, result)
|
||||
return result
|
||||
|
||||
def fork(self, branch: str | None = None) -> None:
|
||||
@@ -211,11 +306,32 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
times without collisions.
|
||||
"""
|
||||
if branch:
|
||||
self._branch = branch
|
||||
new_branch = branch
|
||||
elif self._checkpoint_id:
|
||||
self._branch = f"fork/{self._checkpoint_id}_{uuid.uuid4().hex[:6]}"
|
||||
new_branch = f"fork/{self._checkpoint_id}_{uuid.uuid4().hex[:6]}"
|
||||
else:
|
||||
self._branch = f"fork/{uuid.uuid4().hex[:8]}"
|
||||
new_branch = f"fork/{uuid.uuid4().hex[:8]}"
|
||||
|
||||
parent_branch: str | None = self._branch
|
||||
parent_checkpoint_id: str | None = self._checkpoint_id
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
CheckpointForkStartedEvent(
|
||||
branch=new_branch,
|
||||
parent_branch=parent_branch,
|
||||
parent_checkpoint_id=parent_checkpoint_id,
|
||||
),
|
||||
)
|
||||
self._branch = new_branch
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
CheckpointForkCompletedEvent(
|
||||
branch=new_branch,
|
||||
parent_branch=parent_branch,
|
||||
parent_checkpoint_id=parent_checkpoint_id,
|
||||
),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_checkpoint(cls, config: CheckpointConfig, **kwargs: Any) -> RuntimeState:
|
||||
@@ -233,13 +349,41 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
if config.restore_from is None:
|
||||
raise ValueError("CheckpointConfig.restore_from must be set")
|
||||
location = str(config.restore_from)
|
||||
provider = detect_provider(location)
|
||||
raw = provider.from_checkpoint(location)
|
||||
state = cls.model_validate_json(raw, **kwargs)
|
||||
state._provider = provider
|
||||
checkpoint_id = provider.extract_id(location)
|
||||
state._checkpoint_id = checkpoint_id
|
||||
state._parent_id = checkpoint_id
|
||||
|
||||
crewai_event_bus.emit(config, CheckpointRestoreStartedEvent(location=location))
|
||||
start: float = time.perf_counter()
|
||||
provider_name: str | None = None
|
||||
try:
|
||||
provider = detect_provider(location)
|
||||
provider_name = type(provider).__name__
|
||||
raw = provider.from_checkpoint(location)
|
||||
state = cls.model_validate_json(raw, **kwargs)
|
||||
state._provider = provider
|
||||
checkpoint_id = provider.extract_id(location)
|
||||
state._checkpoint_id = checkpoint_id
|
||||
state._parent_id = checkpoint_id
|
||||
except Exception as exc:
|
||||
crewai_event_bus.emit(
|
||||
config,
|
||||
CheckpointRestoreFailedEvent(
|
||||
location=location,
|
||||
provider=provider_name,
|
||||
error=str(exc),
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
crewai_event_bus.emit(
|
||||
config,
|
||||
CheckpointRestoreCompletedEvent(
|
||||
location=location,
|
||||
provider=provider_name,
|
||||
checkpoint_id=checkpoint_id,
|
||||
branch=state._branch,
|
||||
parent_id=state._parent_id,
|
||||
duration_ms=(time.perf_counter() - start) * 1000.0,
|
||||
),
|
||||
)
|
||||
return state
|
||||
|
||||
@classmethod
|
||||
@@ -260,13 +404,41 @@ class RuntimeState(RootModel): # type: ignore[type-arg]
|
||||
if config.restore_from is None:
|
||||
raise ValueError("CheckpointConfig.restore_from must be set")
|
||||
location = str(config.restore_from)
|
||||
provider = detect_provider(location)
|
||||
raw = await provider.afrom_checkpoint(location)
|
||||
state = cls.model_validate_json(raw, **kwargs)
|
||||
state._provider = provider
|
||||
checkpoint_id = provider.extract_id(location)
|
||||
state._checkpoint_id = checkpoint_id
|
||||
state._parent_id = checkpoint_id
|
||||
|
||||
crewai_event_bus.emit(config, CheckpointRestoreStartedEvent(location=location))
|
||||
start: float = time.perf_counter()
|
||||
provider_name: str | None = None
|
||||
try:
|
||||
provider = detect_provider(location)
|
||||
provider_name = type(provider).__name__
|
||||
raw = await provider.afrom_checkpoint(location)
|
||||
state = cls.model_validate_json(raw, **kwargs)
|
||||
state._provider = provider
|
||||
checkpoint_id = provider.extract_id(location)
|
||||
state._checkpoint_id = checkpoint_id
|
||||
state._parent_id = checkpoint_id
|
||||
except Exception as exc:
|
||||
crewai_event_bus.emit(
|
||||
config,
|
||||
CheckpointRestoreFailedEvent(
|
||||
location=location,
|
||||
provider=provider_name,
|
||||
error=str(exc),
|
||||
),
|
||||
)
|
||||
raise
|
||||
|
||||
crewai_event_bus.emit(
|
||||
config,
|
||||
CheckpointRestoreCompletedEvent(
|
||||
location=location,
|
||||
provider=provider_name,
|
||||
checkpoint_id=checkpoint_id,
|
||||
branch=state._branch,
|
||||
parent_id=state._parent_id,
|
||||
duration_ms=(time.perf_counter() - start) * 1000.0,
|
||||
),
|
||||
)
|
||||
return state
|
||||
|
||||
|
||||
|
||||
82
lib/crewai/src/crewai/tools/flow_tool.py
Normal file
82
lib/crewai/src/crewai/tools/flow_tool.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Wrap Flow classes as callable tools so agents can invoke them."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities.string_utils import sanitize_tool_name
|
||||
|
||||
|
||||
class FlowToolInputSchema(BaseModel):
|
||||
"""Default input schema for a FlowTool."""
|
||||
|
||||
inputs: str = Field(
|
||||
default="{}",
|
||||
description=(
|
||||
"JSON string of key-value pairs to pass as inputs to the flow. "
|
||||
"Use '{}' if the flow requires no inputs."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class FlowTool(BaseTool):
|
||||
"""Wraps a Flow class as a BaseTool so an agent can invoke it.
|
||||
|
||||
The tool instantiates the Flow, calls ``kickoff(inputs=...)`` and returns
|
||||
the result as a string.
|
||||
"""
|
||||
|
||||
name: str = ""
|
||||
description: str = ""
|
||||
flow_class: Any = Field(
|
||||
default=None,
|
||||
description="The Flow class (not instance) to wrap.",
|
||||
exclude=True,
|
||||
)
|
||||
args_schema: Any = FlowToolInputSchema
|
||||
|
||||
def _run(self, inputs: str = "{}") -> str:
|
||||
"""Instantiate the Flow, run kickoff, and return the result."""
|
||||
try:
|
||||
parsed_inputs = json.loads(inputs) if isinstance(inputs, str) else inputs
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
parsed_inputs = {}
|
||||
|
||||
if not isinstance(parsed_inputs, dict):
|
||||
parsed_inputs = {}
|
||||
|
||||
flow_instance = self.flow_class()
|
||||
result = flow_instance.kickoff(inputs=parsed_inputs if parsed_inputs else None)
|
||||
return str(result)
|
||||
|
||||
|
||||
def create_flow_tools(flows: list[type] | None) -> list[BaseTool]:
|
||||
"""Convert a list of Flow classes into BaseTool wrappers.
|
||||
|
||||
Args:
|
||||
flows: Flow classes (not instances) to wrap as tools.
|
||||
|
||||
Returns:
|
||||
A list of FlowTool instances ready for agent use.
|
||||
"""
|
||||
if not flows:
|
||||
return []
|
||||
|
||||
tools: list[BaseTool] = []
|
||||
for flow_cls in flows:
|
||||
name = sanitize_tool_name(flow_cls.__name__)
|
||||
docstring = (flow_cls.__doc__ or "").strip()
|
||||
description = docstring if docstring else f"Run the {flow_cls.__name__} flow."
|
||||
|
||||
tools.append(
|
||||
FlowTool(
|
||||
name=name,
|
||||
description=description,
|
||||
flow_class=flow_cls,
|
||||
)
|
||||
)
|
||||
return tools
|
||||
165
lib/crewai/tests/events/test_event_replay.py
Normal file
165
lib/crewai/tests/events/test_event_replay.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Tests for event bus replay dispatch and is_replaying flag."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
from crewai.events.event_bus import _replaying, crewai_event_bus, is_replaying
|
||||
from crewai.events.types.flow_events import (
|
||||
MethodExecutionFinishedEvent,
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
|
||||
|
||||
def _make_started(method: str, event_id: str, sequence: int) -> MethodExecutionStartedEvent:
|
||||
"""Build a MethodExecutionStartedEvent with explicit ids/sequence."""
|
||||
ev = MethodExecutionStartedEvent(
|
||||
method_name=method,
|
||||
flow_name="F",
|
||||
params={},
|
||||
state={},
|
||||
)
|
||||
ev.event_id = event_id
|
||||
ev.emission_sequence = sequence
|
||||
return ev
|
||||
|
||||
|
||||
class TestReplayPreservesFields:
|
||||
"""replay() must not overwrite event_id, parent_event_id, or emission_sequence."""
|
||||
|
||||
def test_preserves_ids_and_sequence(self) -> None:
|
||||
captured: list[MethodExecutionStartedEvent] = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||
def _capture(_: Any, event: MethodExecutionStartedEvent) -> None:
|
||||
captured.append(event)
|
||||
|
||||
ev = _make_started("outline", "orig-id-1", 42)
|
||||
ev.parent_event_id = "parent-abc"
|
||||
|
||||
future = crewai_event_bus.replay(object(), ev)
|
||||
if future is not None:
|
||||
future.result(timeout=5.0)
|
||||
|
||||
assert len(captured) == 1
|
||||
assert captured[0].event_id == "orig-id-1"
|
||||
assert captured[0].parent_event_id == "parent-abc"
|
||||
assert captured[0].emission_sequence == 42
|
||||
|
||||
|
||||
class TestIsReplayingFlag:
|
||||
"""is_replaying() must be True inside handlers dispatched via replay()."""
|
||||
|
||||
def test_flag_true_during_replay(self) -> None:
|
||||
seen: list[bool] = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||
def _capture(_: Any, __: MethodExecutionStartedEvent) -> None:
|
||||
seen.append(is_replaying())
|
||||
|
||||
ev = _make_started("m", "id-1", 1)
|
||||
future = crewai_event_bus.replay(object(), ev)
|
||||
if future is not None:
|
||||
future.result(timeout=5.0)
|
||||
|
||||
assert seen == [True]
|
||||
assert is_replaying() is False
|
||||
|
||||
def test_flag_false_during_emit(self) -> None:
|
||||
seen: list[bool] = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||
def _capture(_: Any, __: MethodExecutionStartedEvent) -> None:
|
||||
seen.append(is_replaying())
|
||||
|
||||
ev = _make_started("m", "id-1", 1)
|
||||
future = crewai_event_bus.emit(object(), ev)
|
||||
if future is not None:
|
||||
future.result(timeout=5.0)
|
||||
|
||||
assert seen == [False]
|
||||
|
||||
|
||||
class TestCheckpointListenerOptsOut:
|
||||
"""CheckpointListener must early-return during replay."""
|
||||
|
||||
def test_checkpoint_not_written_on_replay(self) -> None:
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.state.checkpoint_listener import _on_any_event
|
||||
|
||||
class FlowLike:
|
||||
entity_type = "flow"
|
||||
checkpoint = CheckpointConfig(trigger_all=True)
|
||||
|
||||
ev = _make_started("m", "id-1", 1)
|
||||
|
||||
with patch("crewai.state.checkpoint_listener._do_checkpoint") as do_cp:
|
||||
token = _replaying.set(True)
|
||||
try:
|
||||
_on_any_event(FlowLike(), ev, state=None)
|
||||
finally:
|
||||
_replaying.reset(token)
|
||||
assert do_cp.call_count == 0
|
||||
|
||||
|
||||
class TestFlowResumeReplaysEvents:
|
||||
"""End-to-end: a resumed flow emits MethodExecution* events for completed methods."""
|
||||
|
||||
def test_resume_dispatches_completed_method_events(self, tmp_path) -> None:
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai.flow.persistence.sqlite import SQLiteFlowPersistence
|
||||
|
||||
db_path = tmp_path / "flows.db"
|
||||
persistence = SQLiteFlowPersistence(str(db_path))
|
||||
|
||||
class ThreeStepFlow(Flow[dict]):
|
||||
@start()
|
||||
def step_a(self) -> str:
|
||||
return "a"
|
||||
|
||||
@listen(step_a)
|
||||
def step_b(self) -> str:
|
||||
return "b"
|
||||
|
||||
@listen(step_b)
|
||||
def step_c(self) -> str:
|
||||
return "c"
|
||||
|
||||
if crewai_event_bus.runtime_state is not None:
|
||||
crewai_event_bus.runtime_state.event_record.clear()
|
||||
|
||||
flow1 = ThreeStepFlow(persistence=persistence)
|
||||
flow1.kickoff()
|
||||
flow_id = flow1.state["id"]
|
||||
|
||||
captured_started: list[str] = []
|
||||
captured_finished: list[str] = []
|
||||
|
||||
flow2 = ThreeStepFlow(persistence=persistence)
|
||||
flow2._completed_methods = {"step_a", "step_b"}
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionStartedEvent)
|
||||
def _cs(_: Any, event: MethodExecutionStartedEvent) -> None:
|
||||
captured_started.append(event.method_name)
|
||||
|
||||
@crewai_event_bus.on(MethodExecutionFinishedEvent)
|
||||
def _cf(_: Any, event: MethodExecutionFinishedEvent) -> None:
|
||||
captured_finished.append(event.method_name)
|
||||
|
||||
flow2.kickoff(inputs={"id": flow_id})
|
||||
|
||||
assert captured_started.count("step_a") == 1
|
||||
assert captured_started.count("step_b") == 1
|
||||
assert captured_started.count("step_c") == 1
|
||||
assert captured_finished.count("step_a") == 1
|
||||
assert captured_finished.count("step_b") == 1
|
||||
assert captured_finished.count("step_c") == 1
|
||||
@@ -4,6 +4,8 @@ from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai import Agent
|
||||
from crewai.agent.utils import append_skill_context
|
||||
from crewai.skills.loader import activate_skill, discover_skills, format_skill_context
|
||||
from crewai.skills.models import INSTRUCTIONS, METADATA
|
||||
|
||||
@@ -76,3 +78,23 @@ class TestSkillDiscoveryAndActivation:
|
||||
all_skills.extend(discover_skills(search_path))
|
||||
names = {s.name for s in all_skills}
|
||||
assert names == {"skill-a", "skill-b"}
|
||||
|
||||
def test_agent_preserves_metadata_for_discovered_skills(self, tmp_path: Path) -> None:
|
||||
_create_skill_dir(tmp_path, "travel", body="Use this skill for travel planning.")
|
||||
discovered = discover_skills(tmp_path)
|
||||
|
||||
agent = Agent(
|
||||
role="Travel Advisor",
|
||||
goal="Provide personalized travel suggestions.",
|
||||
backstory="An experienced travel consultant.",
|
||||
skills=discovered,
|
||||
)
|
||||
|
||||
assert agent.skills is not None
|
||||
assert agent.skills[0].disclosure_level == METADATA
|
||||
assert agent.skills[0].instructions is None
|
||||
|
||||
prompt = append_skill_context(agent, "Plan a 10-day Japan itinerary.")
|
||||
assert "## Skill: travel" in prompt
|
||||
assert "Skill travel" in prompt
|
||||
assert "Use this skill for travel planning." not in prompt
|
||||
|
||||
@@ -11,11 +11,12 @@ from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.agent.core import Agent
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.crew import Crew
|
||||
from crewai.flow.flow import Flow, start
|
||||
from crewai.flow.flow import _INITIAL_STATE_CLASS_MARKER, Flow, start
|
||||
from crewai.state.checkpoint_config import CheckpointConfig
|
||||
from crewai.state.checkpoint_listener import (
|
||||
_find_checkpoint,
|
||||
@@ -310,6 +311,65 @@ class TestRuntimeStateLineage:
|
||||
assert state._branch != first
|
||||
|
||||
|
||||
class TestFlowInitialStateSerialization:
|
||||
"""Regression tests for checkpoint serialization of ``Flow.initial_state``."""
|
||||
|
||||
def test_class_ref_serializes_as_schema(self) -> None:
|
||||
class MyState(BaseModel):
|
||||
id: str = "x"
|
||||
foo: str = "bar"
|
||||
|
||||
flow = Flow(initial_state=MyState)
|
||||
state = RuntimeState(root=[flow])
|
||||
dumped = json.loads(state.model_dump_json())
|
||||
entity = dumped["entities"][0]
|
||||
wrapped = entity["initial_state"]
|
||||
assert isinstance(wrapped, dict)
|
||||
assert _INITIAL_STATE_CLASS_MARKER in wrapped
|
||||
assert wrapped[_INITIAL_STATE_CLASS_MARKER].get("title") == "MyState"
|
||||
|
||||
def test_class_ref_round_trips_to_basemodel_subclass(self) -> None:
|
||||
class MyState(BaseModel):
|
||||
id: str = "x"
|
||||
foo: str = "bar"
|
||||
|
||||
flow = Flow(initial_state=MyState)
|
||||
raw = RuntimeState(root=[flow]).model_dump_json()
|
||||
restored = RuntimeState.model_validate_json(
|
||||
raw, context={"from_checkpoint": True}
|
||||
)
|
||||
rehydrated = restored.root[0].initial_state
|
||||
assert isinstance(rehydrated, type)
|
||||
assert issubclass(rehydrated, BaseModel)
|
||||
assert set(rehydrated.model_fields.keys()) == {"id", "foo"}
|
||||
|
||||
def test_instance_serializes_as_values(self) -> None:
|
||||
class MyState(BaseModel):
|
||||
id: str = "x"
|
||||
foo: str = "bar"
|
||||
|
||||
flow = Flow(initial_state=MyState(foo="baz"))
|
||||
state = RuntimeState(root=[flow])
|
||||
dumped = json.loads(state.model_dump_json())
|
||||
entity = dumped["entities"][0]
|
||||
assert entity["initial_state"] == {"id": "x", "foo": "baz"}
|
||||
|
||||
def test_dict_passthrough(self) -> None:
|
||||
flow = Flow(initial_state={"id": "x", "foo": "bar"})
|
||||
state = RuntimeState(root=[flow])
|
||||
dumped = json.loads(state.model_dump_json())
|
||||
entity = dumped["entities"][0]
|
||||
assert entity["initial_state"] == {"id": "x", "foo": "bar"}
|
||||
|
||||
def test_dict_round_trips_as_dict(self) -> None:
|
||||
flow = Flow(initial_state={"id": "x", "foo": "bar"})
|
||||
raw = RuntimeState(root=[flow]).model_dump_json()
|
||||
restored = RuntimeState.model_validate_json(
|
||||
raw, context={"from_checkpoint": True}
|
||||
)
|
||||
assert restored.root[0].initial_state == {"id": "x", "foo": "bar"}
|
||||
|
||||
|
||||
# ---------- JsonProvider forking ----------
|
||||
|
||||
|
||||
|
||||
@@ -4519,8 +4519,8 @@ def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
|
||||
flow.kickoff()
|
||||
|
||||
assert captured_crew is not None
|
||||
assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined]
|
||||
assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined]
|
||||
assert captured_crew._flow_id == flow.execution_id # type: ignore[attr-defined]
|
||||
assert captured_crew._request_id == flow.execution_id # type: ignore[attr-defined]
|
||||
|
||||
|
||||
def test_sets_flow_context_when_outside_flow(researcher, writer):
|
||||
@@ -4554,8 +4554,8 @@ def test_sets_flow_context_when_inside_flow(researcher, writer):
|
||||
|
||||
flow = MyFlow()
|
||||
result = flow.kickoff()
|
||||
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
|
||||
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
|
||||
assert result._flow_id == flow.execution_id # type: ignore[attr-defined]
|
||||
assert result._request_id == flow.execution_id # type: ignore[attr-defined]
|
||||
|
||||
|
||||
def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):
|
||||
|
||||
185
lib/crewai/tests/test_flow_as_tool.py
Normal file
185
lib/crewai/tests/test_flow_as_tool.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""Tests for Flow-as-tool functionality."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from crewai.flow.flow import Flow, start
|
||||
from crewai.tools.flow_tool import FlowTool, create_flow_tools
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test Flow classes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class SimpleFlow(Flow):
|
||||
"""A simple flow that greets the user."""
|
||||
|
||||
@start()
|
||||
def greet(self) -> str:
|
||||
return "Hello from SimpleFlow!"
|
||||
|
||||
|
||||
class MathFlow(Flow):
|
||||
"""Performs basic math operations."""
|
||||
|
||||
@start()
|
||||
def compute(self) -> str:
|
||||
return "42"
|
||||
|
||||
|
||||
class NoDocFlow(Flow):
|
||||
@start()
|
||||
def run_it(self) -> str:
|
||||
return "no doc"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FlowTool unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFlowTool:
|
||||
def test_wrap_simple_flow(self) -> None:
|
||||
tool = FlowTool(
|
||||
name="simple_flow",
|
||||
description="A simple flow that greets the user.",
|
||||
flow_class=SimpleFlow,
|
||||
)
|
||||
assert tool.name == "simple_flow"
|
||||
assert "greets the user" in tool.description
|
||||
|
||||
def test_run_invokes_kickoff(self) -> None:
|
||||
mock_flow = MagicMock()
|
||||
mock_flow.return_value = mock_flow # __init__ returns self
|
||||
mock_flow.kickoff.return_value = "mocked result"
|
||||
|
||||
tool = FlowTool(
|
||||
name="test_flow",
|
||||
description="test",
|
||||
flow_class=mock_flow,
|
||||
)
|
||||
result = tool._run(inputs="{}")
|
||||
assert result == "mocked result"
|
||||
mock_flow.kickoff.assert_called_once()
|
||||
|
||||
def test_run_with_json_inputs(self) -> None:
|
||||
mock_flow = MagicMock()
|
||||
mock_flow.return_value = mock_flow
|
||||
mock_flow.kickoff.return_value = "result with inputs"
|
||||
|
||||
tool = FlowTool(
|
||||
name="test_flow",
|
||||
description="test",
|
||||
flow_class=mock_flow,
|
||||
)
|
||||
result = tool._run(inputs='{"key": "value"}')
|
||||
assert result == "result with inputs"
|
||||
mock_flow.kickoff.assert_called_once_with(inputs={"key": "value"})
|
||||
|
||||
def test_run_with_invalid_json_defaults_to_empty(self) -> None:
|
||||
mock_flow = MagicMock()
|
||||
mock_flow.return_value = mock_flow
|
||||
mock_flow.kickoff.return_value = "ok"
|
||||
|
||||
tool = FlowTool(
|
||||
name="test_flow",
|
||||
description="test",
|
||||
flow_class=mock_flow,
|
||||
)
|
||||
result = tool._run(inputs="not valid json")
|
||||
assert result == "ok"
|
||||
mock_flow.kickoff.assert_called_once_with(inputs=None)
|
||||
|
||||
def test_run_returns_string(self) -> None:
|
||||
mock_flow = MagicMock()
|
||||
mock_flow.return_value = mock_flow
|
||||
mock_flow.kickoff.return_value = 42
|
||||
|
||||
tool = FlowTool(
|
||||
name="test_flow",
|
||||
description="test",
|
||||
flow_class=mock_flow,
|
||||
)
|
||||
result = tool._run()
|
||||
assert result == "42"
|
||||
assert isinstance(result, str)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# create_flow_tools tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCreateFlowTools:
|
||||
def test_creates_tools_from_flow_classes(self) -> None:
|
||||
tools = create_flow_tools([SimpleFlow, MathFlow])
|
||||
assert len(tools) == 2
|
||||
names = {t.name for t in tools}
|
||||
assert "simple_flow" in names
|
||||
assert "math_flow" in names
|
||||
|
||||
def test_description_from_docstring(self) -> None:
|
||||
tools = create_flow_tools([SimpleFlow])
|
||||
assert len(tools) == 1
|
||||
assert "greets the user" in tools[0].description
|
||||
|
||||
def test_description_fallback_when_no_docstring(self) -> None:
|
||||
tools = create_flow_tools([NoDocFlow])
|
||||
assert len(tools) == 1
|
||||
assert "NoDocFlow" in tools[0].description
|
||||
|
||||
def test_empty_list_returns_empty(self) -> None:
|
||||
assert create_flow_tools([]) == []
|
||||
|
||||
def test_none_returns_empty(self) -> None:
|
||||
assert create_flow_tools(None) == []
|
||||
|
||||
def test_tools_are_base_tool_instances(self) -> None:
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
|
||||
tools = create_flow_tools([SimpleFlow])
|
||||
for tool in tools:
|
||||
assert isinstance(tool, BaseTool)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent integration tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAgentFlowIntegration:
|
||||
def test_agent_with_flows_has_flow_tools(self) -> None:
|
||||
from crewai.agent.core import Agent
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test flows",
|
||||
backstory="I test things",
|
||||
flows=[SimpleFlow, MathFlow],
|
||||
)
|
||||
tool_names = {t.name for t in (agent.tools or [])}
|
||||
assert "simple_flow" in tool_names
|
||||
assert "math_flow" in tool_names
|
||||
|
||||
def test_agent_without_flows_no_extra_tools(self) -> None:
|
||||
from crewai.agent.core import Agent
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test",
|
||||
backstory="I test things",
|
||||
)
|
||||
# Should not have any flow tools
|
||||
flow_tool_names = {
|
||||
t.name for t in (agent.tools or []) if isinstance(t, FlowTool)
|
||||
}
|
||||
assert len(flow_tool_names) == 0
|
||||
|
||||
def test_flow_tool_executes_real_flow(self) -> None:
|
||||
"""Test that a FlowTool actually runs the Flow's kickoff."""
|
||||
tools = create_flow_tools([SimpleFlow])
|
||||
tool = tools[0]
|
||||
result = tool.run(inputs="{}")
|
||||
assert "Hello from SimpleFlow" in result
|
||||
127
lib/crewai/tests/test_flow_execution_id.py
Normal file
127
lib/crewai/tests/test_flow_execution_id.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""Regression tests for ``Flow.execution_id``.
|
||||
|
||||
``execution_id`` is the stable tracking identifier for a single flow run.
|
||||
It must stay independent of ``state.id`` so that consumers passing an
|
||||
``id`` in ``inputs`` (used for persistence restore) cannot destabilize
|
||||
the identity used by telemetry, tracing, and external correlation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from crewai.flow.flow import Flow, FlowState, start
|
||||
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
|
||||
|
||||
|
||||
class _CaptureState(FlowState):
|
||||
captured_flow_id: str = ""
|
||||
captured_state_id: str = ""
|
||||
captured_current_flow_id: str = ""
|
||||
captured_execution_id: str = ""
|
||||
|
||||
|
||||
class _IdentityCaptureFlow(Flow[_CaptureState]):
|
||||
initial_state = _CaptureState
|
||||
|
||||
@start()
|
||||
def capture(self) -> None:
|
||||
self.state.captured_flow_id = self.flow_id
|
||||
self.state.captured_state_id = self.state.id
|
||||
self.state.captured_current_flow_id = current_flow_id.get() or ""
|
||||
self.state.captured_execution_id = self.execution_id
|
||||
|
||||
|
||||
def test_execution_id_defaults_to_fresh_uuid_per_instance() -> None:
|
||||
a = _IdentityCaptureFlow()
|
||||
b = _IdentityCaptureFlow()
|
||||
|
||||
assert a.execution_id
|
||||
assert b.execution_id
|
||||
assert a.execution_id != b.execution_id
|
||||
|
||||
|
||||
def test_execution_id_survives_consumer_id_in_inputs() -> None:
|
||||
flow = _IdentityCaptureFlow()
|
||||
original_execution_id = flow.execution_id
|
||||
|
||||
flow.kickoff(inputs={"id": "consumer-supplied-id"})
|
||||
|
||||
assert flow.state.id == "consumer-supplied-id"
|
||||
assert flow.flow_id == "consumer-supplied-id"
|
||||
assert flow.execution_id == original_execution_id
|
||||
assert flow.execution_id != "consumer-supplied-id"
|
||||
|
||||
|
||||
def test_two_runs_with_same_consumer_id_have_distinct_execution_ids() -> None:
|
||||
flow_a = _IdentityCaptureFlow()
|
||||
flow_b = _IdentityCaptureFlow()
|
||||
|
||||
colliding_id = "shared-consumer-id"
|
||||
flow_a.kickoff(inputs={"id": colliding_id})
|
||||
flow_b.kickoff(inputs={"id": colliding_id})
|
||||
|
||||
assert flow_a.state.id == colliding_id
|
||||
assert flow_b.state.id == colliding_id
|
||||
assert flow_a.execution_id != flow_b.execution_id
|
||||
|
||||
|
||||
def test_execution_id_is_writable() -> None:
|
||||
flow = _IdentityCaptureFlow()
|
||||
flow.execution_id = "external-task-id"
|
||||
|
||||
assert flow.execution_id == "external-task-id"
|
||||
|
||||
flow.kickoff(inputs={"id": "consumer-supplied-id"})
|
||||
assert flow.execution_id == "external-task-id"
|
||||
assert flow.state.id == "consumer-supplied-id"
|
||||
|
||||
|
||||
def test_current_flow_id_context_var_matches_execution_id() -> None:
|
||||
flow = _IdentityCaptureFlow()
|
||||
flow.execution_id = "external-task-id"
|
||||
|
||||
flow.kickoff(inputs={"id": "consumer-supplied-id"})
|
||||
|
||||
assert flow.state.captured_current_flow_id == "external-task-id"
|
||||
assert flow.state.captured_flow_id == "consumer-supplied-id"
|
||||
assert flow.state.captured_execution_id == "external-task-id"
|
||||
|
||||
|
||||
def test_execution_id_not_included_in_serialized_state() -> None:
|
||||
flow = _IdentityCaptureFlow()
|
||||
flow.execution_id = "external-task-id"
|
||||
flow.kickoff()
|
||||
|
||||
dumped = flow.state.model_dump()
|
||||
assert "execution_id" not in dumped
|
||||
assert "_execution_id" not in dumped
|
||||
assert dumped["id"] == flow.state.id
|
||||
|
||||
|
||||
def test_dict_state_flow_also_exposes_stable_execution_id() -> None:
|
||||
class DictFlow(Flow[dict[str, Any]]):
|
||||
initial_state = dict # type: ignore[assignment]
|
||||
|
||||
@start()
|
||||
def noop(self) -> None:
|
||||
pass
|
||||
|
||||
flow = DictFlow()
|
||||
original = flow.execution_id
|
||||
flow.kickoff(inputs={"id": "consumer-supplied-id"})
|
||||
|
||||
assert flow.state["id"] == "consumer-supplied-id"
|
||||
assert flow.execution_id == original
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_flow_context_vars():
|
||||
yield
|
||||
for var in (current_flow_id, current_flow_request_id):
|
||||
try:
|
||||
var.set(None)
|
||||
except LookupError:
|
||||
# ContextVar was never set in this context; nothing to reset.
|
||||
pass
|
||||
@@ -1,3 +1,3 @@
|
||||
"""CrewAI development tools."""
|
||||
|
||||
__version__ = "1.14.3a3"
|
||||
__version__ = "1.14.3"
|
||||
|
||||
Reference in New Issue
Block a user