Compare commits

...

4 Commits

Author SHA1 Message Date
Vini Brasil
906cd9769d feat(flow): type DSL triggers as route-aware decorators (#6042)
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Check Documentation Broken Links / Check broken links (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run
Centralize FlowTrigger and FlowMethodDecorator so start/listen/router and the boolean trigger helpers share one authoring contract. This preserves decorated method signatures for static checking while allowing route-label strings in nested FlowCondition data.

Export the shared typing helpers for static analyzers, use an explicit Protocol body, align condition validation with Sequence-backed condition data, and drop the stale call-arg ignore exposed by the signature-preserving decorators.

Update the flow guide to use or_(...) for multi-label listeners.
2026-06-04 18:07:49 -03:00
Lorenze Jay
14ce97d787 chat api for convo flows (#6034)
* Add conversational Flow chat helper

* Document conversational flow chat APIs in translations

* Stringify conversational chat REPL output
2026-06-04 13:36:48 -07:00
Matt Aitchison
f3a15a4f07 feat(lock_store): make locking backend overridable (#6015)
* feat(lock_store): make locking backend overridable

Allow the centralised lock factory to use a pluggable backend instead of
the hardcoded Redis/file selection. Backends are resolved with precedence
override > CREWAI_LOCK_FACTORY env > built-in default:

- set_lock_backend()/reset_lock_backend() and a scoped lock_backend()
  context manager for programmatic overrides
- CREWAI_LOCK_FACTORY="module:callable" env import-path, resolved lazily
  and cached, with clear errors on malformed or non-callable specs
- LockBackend Protocol documenting the contract (raw name in, context
  manager out; backend owns its namespacing)

Default Redis/file behavior is unchanged when nothing is overridden.

* refactor(lock_store): use explicit body for LockBackend protocol method

Replace the no-op `...` body with `raise NotImplementedError` to satisfy
the CodeQL ineffectual-statement check while keeping the Protocol
structural-typing only.

* refactor(lock_store): drop scoped lock_backend context manager

Keep the backend overridable via set_lock_backend/reset_lock_backend and
the CREWAI_LOCK_FACTORY env path, but remove the scoped lock_backend()
context manager. It was speculative surface and the only thread-unsafe
piece (racy save/restore of the module global); nothing depends on it.

* refactor(lock_store): drop reset_lock_backend alias

reset_lock_backend() was just set_lock_backend(None); callers use that
directly. Clearing the override is documented on set_lock_backend.

* style(lock_store): apply ruff format

* refactor(lock_store): simplify overridable backend to a single setter

Reduce the override surface to just set_lock_backend(): lock() uses the
custom backend when one is set, otherwise the unchanged Redis/file default.

Drop the CREWAI_LOCK_FACTORY env import-path, the runtime_checkable
Protocol, the precedence resolver, and the getter — a custom backend is
now any callable(name, *, timeout) -> context manager, registered in
process.

* fix(lock_store): snapshot backend to avoid check-then-call race

Read the module-global backend once into a local before the None check
and the call, so a concurrent set_lock_backend(None) cannot make lock()
invoke None.

* docs(lock_store): clarify name handling for custom backends

The default namespaces the lock name; custom backends receive it
verbatim. Correct the lock() docstring which implied namespacing always
happens.

* docs(lock_store): note set_lock_backend is for one-time startup setup
2026-06-04 13:28:31 -05:00
Vini Brasil
75dad212a2 Split flow DSL monolith into focused decorator modules (#6040)
The Flow DSL lived in one 1033-line `dsl.py` that mixed every decorator
(`@start`/`@listen`/`@router`), the `human_feedback` decorator,
condition combinators, and FlowDefinition extraction helpers in a single
file.

Split it into a `dsl/` package where each decorator gets its own module
(`start.py` 68 lines, `listen.py` 55, `router.py` 164,
`human_feedback.py` 98) and the shared extraction/condition helpers stay
in `utils.py`. The public API is re-exported from `dsl/__init__.py`, so
import paths are unchanged.

This is simpler because each decorator is now read and changed in
isolation instead of scanning a 1000-line file to find one of them, and
router-specific annotation parsing no longer sits next to unrelated
start/listen logic.
2026-06-04 15:02:06 -03:00
24 changed files with 1229 additions and 743 deletions

View File

@@ -7,7 +7,7 @@ mode: "wide"
## نظرة عامة
تعامل التطبيقات المحادثية مع كل سطر من المستخدم كـ **تشغيل flow جديد** بنفس **معرّف الجلسة**. توفر CrewAI مساعدات لسجل الرسائل وتصنيف النية الاختياري وتأجيل التتبع وجسور الواجهة — دون API منفصل `chat()` على `Flow`.
تعامل التطبيقات المحادثية مع كل سطر من المستخدم كـ **تشغيل flow جديد** بنفس **معرّف الجلسة**. توفر CrewAI مساعدات لسجل الرسائل وتصنيف النية الاختياري وتأجيل التتبع وجسور الواجهة، إضافة إلى REPL محلي `flow.chat()` للتدفقات المحادثية.
| المفهوم | التنفيذ |
|---------|---------|
@@ -16,13 +16,15 @@ mode: "wide"
| اكتمال الجولة | `FlowFinished` لهذا **التشغيل** فقط؛ تستمر المحادثة في `kickoff` التالي |
| تتبع الجلسة | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
## نقطة دخول واحدة: `kickoff`
## واجهات الجولات
استخدم **`flow.kickoff(user_message=..., session_id=...)`** لكل رسالة مستخدم (REST أو WebSocket أو CLI). لا تنشئ غلاف `chat()` مخصصاً على `Flow`.
استخدم **`flow.kickoff(user_message=..., session_id=...)`** أو **`flow.handle_turn(...)`** لكل رسالة مستخدم من REST أو WebSocket أو الاختبارات أو الواجهات المخصصة. استخدم **`flow.chat()`** عندما تريد حلقة دردشة محلية في الطرفية لـ `Flow` محادثي.
| API | الاستخدام |
|-----|-----------|
| `kickoff(user_message=..., session_id=...)` | كل رسالة مستخدم |
| `handle_turn(message, session_id=...)` | غلاف مريح لجولة واحدة في `Flow` محادثي |
| `chat()` | REPL محلي في الطرفية لـ `Flow` محادثي |
| `kickoff_async(...)` | نفس المعاملات؛ دخول async أصلي |
| `ask()` | مطالبة حاجزة **داخل** خطوة واحدة |
| `@human_feedback` | الموافقة/الرفض على **مخرجات خطوة** — وليس السطر التالي |
@@ -290,6 +292,15 @@ finally:
flow.finalize_session_traces()
```
للدردشة المحلية في الطرفية، استخدم `chat()`:
```python
def kickoff() -> None:
SupportFlow().chat()
```
يلف `chat()` استدعاءات `handle_turn()` داخل REPL، ويخرج عند `exit` / `quit`، ويتجاهل الأسطر الفارغة افتراضياً، ويستدعي `finalize_session_traces()` عند انتهاء الجلسة.
### `ConversationConfig`
مزخرف صنف يُلحق افتراضيات الدردشة على مستوى الصنف.
@@ -373,6 +384,36 @@ Routes:
يمكنك أيضاً استدعاء `flow.kickoff(user_message=..., session_id=...)` مباشرةً — نفس منطق الإعادة والتشغيل يعمل. `handle_turn` هو الغلاف المريح.
### `chat()` للـ REPL المحلي
`flow.chat()` هو غلاف الطرفية الجاهز فوق `handle_turn()`:
```python
flow = SupportFlow()
flow.chat()
```
يتولى الحلقة المحلية الشائعة:
1. يطلب رسالة من المستخدم.
2. يتوقف عند `exit` / `quit` أو `EOFError` أو `KeyboardInterrupt`.
3. يستدعي `handle_turn(message, session_id=...)`.
4. يطبع نتيجة المساعد.
5. ينهي traces الجلسة المؤجلة داخل كتلة `finally`.
خصص سلوك الطرفية عبر I/O قابل للحقن:
```python
flow.chat(
session_id="demo-session",
prompt="You: ",
assistant_prefix="Assistant: ",
exit_commands=("exit", "quit", "bye"),
)
```
لتطبيقات الويب والـ workers الخلفية والاختبارات ووسائط النقل المخصصة، استمر في استخدام `handle_turn()` مباشرةً.
### سلوك موجّه مخصص
لتشغيل آثار جانبية (إعداد ناقل أحداث، قياس عن بُعد) في كل قرار توجيه، تجاوز `route_turn`:
@@ -407,17 +448,10 @@ class SupportFlow(Flow[ConversationState]):
- **العمل المتداخل** (`Agent.kickoff()`, crews, Exa) يُلحق بدفعة **الأب**؛ flow داخلي من `AgentExecutor` لا يغلق دفعة الجلسة مبكراً.
```python
try:
while True:
line = input("You: ").strip()
if not line:
break
flow.kickoff(user_message=line, session_id=session_id)
finally:
flow.finalize_session_traces()
flow.chat(session_id=session_id)
```
`ChatSession.close()` يستدعي `finalize_session_traces()` عند التأجيل.
`flow.chat()` يستدعي `finalize_session_traces()` نيابةً عنك. عندما تملك الحلقة عبر `handle_turn()` أو `kickoff(...)`، استدعِ `finalize_session_traces()` عند انتهاء الجلسة.
`suppress_flow_events=True` يخفي لوحات Rich فقط؛ أحداث trace والـ methods تُصدر.

View File

@@ -172,7 +172,7 @@ Flows are ideal when:
```python
# Example: Customer Support Flow with structured processing
from crewai.flow.flow import Flow, listen, router, start
from crewai.flow.flow import Flow, listen, or_, router, start
from pydantic import BaseModel
from typing import List, Dict
@@ -238,7 +238,7 @@ class CustomerSupportFlow(Flow[SupportTicketState]):
# Additional category handlers...
@listen("billing", "account_access", "technical_issue", "feature_request", "other")
@listen(or_("billing", "account_access", "technical_issue", "feature_request", "other"))
def resolve_ticket(self, resolution_info):
# Final resolution step
self.state.resolution = f"Issue resolved: {resolution_info}"

View File

@@ -7,7 +7,7 @@ mode: "wide"
## Overview
Conversational apps treat each user line as a **new flow run** with the **same session id**. CrewAI adds helpers for message history, optional intent classification, deferred tracing, and UI bridges — without a separate `chat()` API on `Flow`.
Conversational apps treat each user line as a **new flow run** with the **same session id**. CrewAI adds helpers for message history, optional intent classification, deferred tracing, UI bridges, and a local `flow.chat()` REPL for conversational flows.
| Concept | Implementation |
|---------|----------------|
@@ -16,13 +16,15 @@ Conversational apps treat each user line as a **new flow run** with the **same s
| Turn complete | `FlowFinished` for **this run** only; chat continues on the next `kickoff` |
| Full-session trace | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
## One entry point: `kickoff`
## Turn APIs
Use **`flow.kickoff(user_message=..., session_id=...)`** for every user message (REST, WebSocket, CLI). Do not add a custom `chat()` wrapper on `Flow`.
Use **`flow.kickoff(user_message=..., session_id=...)`** or **`flow.handle_turn(...)`** for every user message from REST, WebSocket, tests, and custom UIs. Use **`flow.chat()`** when you want a local terminal chat loop for a conversational `Flow`.
| API | Use for |
|-----|---------|
| `kickoff(user_message=..., session_id=...)` | Each user message |
| `handle_turn(message, session_id=...)` | Ergonomic one-turn wrapper for conversational `Flow` |
| `chat()` | Local terminal REPL for conversational `Flow` |
| `kickoff_async(...)` | Same parameters; native async entry |
| `ask()` | Blocking prompt **inside** one step (wizard, clarification) |
| `@human_feedback` | Approve/reject **a step output** — not the next chat line |
@@ -293,6 +295,15 @@ finally:
flow.finalize_session_traces()
```
For a local terminal chat, use `chat()`:
```python
def kickoff() -> None:
SupportFlow().chat()
```
`chat()` wraps `handle_turn()` in a REPL, exits on `exit` / `quit`, skips blank lines by default, and calls `finalize_session_traces()` when the session ends.
### `ConversationConfig`
Class decorator that attaches per-class chat defaults.
@@ -376,6 +387,36 @@ You can override any of these by defining a same-named handler in your subclass.
You can also call `flow.kickoff(user_message=..., session_id=...)` directly — the same reset/run logic fires. `handle_turn` is the ergonomic wrapper.
### `chat()` for local REPLs
`flow.chat()` is the batteries-included terminal wrapper around `handle_turn()`:
```python
flow = SupportFlow()
flow.chat()
```
It handles the common local loop:
1. Prompts for a user message.
2. Stops on `exit` / `quit`, `EOFError`, or `KeyboardInterrupt`.
3. Calls `handle_turn(message, session_id=...)`.
4. Prints the assistant result.
5. Finalizes deferred session traces in a `finally` block.
Customize the terminal behavior with injectable I/O:
```python
flow.chat(
session_id="demo-session",
prompt="You: ",
assistant_prefix="Assistant: ",
exit_commands=("exit", "quit", "bye"),
)
```
For web apps, background workers, tests, and custom transports, keep using `handle_turn()` directly.
### Custom router behavior
To run side effects (event bus setup, telemetry) on every routing decision, override `route_turn`:
@@ -410,17 +451,12 @@ With `defer_trace_finalization=True` (default in `ConversationalConfig`):
- **Nested work** (`Agent.kickoff()`, crews, Exa tools) appends to the **parent** batch; inner `AgentExecutor` flows do not close the session batch early.
```python
try:
while True:
line = input("You: ").strip()
if not line:
break
flow.kickoff(user_message=line, session_id=session_id)
finally:
flow.finalize_session_traces()
flow.chat(session_id=session_id)
```
`ChatSession.close()` calls `finalize_session_traces()` when deferral is enabled.
`flow.chat()` calls `finalize_session_traces()` for you. When you own the loop
with `handle_turn()` or `kickoff(...)`, call `finalize_session_traces()` when
the session ends.
`suppress_flow_events=True` only hides Rich console panels; trace and method events still emit for observability.

View File

@@ -7,7 +7,7 @@ mode: "wide"
## 개요
대화형 앱은 각 사용자 입력을 **동일한 세션 id**로 **새 flow 실행**으로 처리합니다. CrewAI는 메시지 기록, 선택적 의도 분류, 지연 트레이싱, UI 브리지를 제공하며, `Flow`에 별도 `chat()` API는 없습니다.
대화형 앱은 각 사용자 입력을 **동일한 세션 id**로 **새 flow 실행**으로 처리합니다. CrewAI는 메시지 기록, 선택적 의도 분류, 지연 트레이싱, UI 브리지, 그리고 대화형 flow용 로컬 `flow.chat()` REPL을 제공합니다.
| 개념 | 구현 |
|------|------|
@@ -16,13 +16,15 @@ mode: "wide"
| 턴 완료 | `FlowFinished`는 **이번 실행**만 의미; 다음 `kickoff`로 대화 계속 |
| 세션 전체 트레이스 | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
## 단일 진입점: `kickoff`
## 턴 API
모든 사용자 메시지에 **`flow.kickoff(user_message=..., session_id=...)`**를 사용하세요 (REST, WebSocket, CLI). `Flow`에 커스텀 `chat()` 래퍼를 만들지 마세요.
REST, WebSocket, 테스트, 커스텀 UI에서 오는 모든 사용자 메시지에 **`flow.kickoff(user_message=..., session_id=...)`** 또는 **`flow.handle_turn(...)`**를 사용하세요. 대화형 `Flow`를 로컬 터미널 채팅 루프로 실행하고 싶을 때는 **`flow.chat()`**을 사용하세요.
| API | 용도 |
|-----|------|
| `kickoff(user_message=..., session_id=...)` | 각 사용자 메시지 |
| `handle_turn(message, session_id=...)` | 대화형 `Flow`용 한 턴 편의 래퍼 |
| `chat()` | 대화형 `Flow`용 로컬 터미널 REPL |
| `kickoff_async(...)` | 동일 파라미터; 네이티브 async 진입 |
| `ask()` | 한 스텝 **내부** 블로킹 프롬프트 (마법사, 확인) |
| `@human_feedback` | **스텝 출력** 승인/거부 — 다음 채팅 줄이 아님 |
@@ -292,6 +294,15 @@ finally:
flow.finalize_session_traces()
```
로컬 터미널 채팅에는 `chat()`을 사용하세요:
```python
def kickoff() -> None:
SupportFlow().chat()
```
`chat()`은 `handle_turn()`을 REPL로 감싸고, `exit` / `quit`에서 종료하며, 기본적으로 빈 줄을 건너뛰고, 세션이 끝날 때 `finalize_session_traces()`를 호출합니다.
### `ConversationConfig`
클래스 단위의 챗 기본값을 부착하는 클래스 데코레이터입니다.
@@ -375,6 +386,36 @@ Routes:
`flow.kickoff(user_message=..., session_id=...)`를 직접 호출해도 동일한 reset/run 로직이 동작합니다. `handle_turn`은 그 위에 얹은 편의 래퍼입니다.
### 로컬 REPL용 `chat()`
`flow.chat()`은 `handle_turn()` 위에 얹은 바로 쓸 수 있는 터미널 래퍼입니다:
```python
flow = SupportFlow()
flow.chat()
```
일반적인 로컬 루프를 처리합니다:
1. 사용자 메시지를 입력받습니다.
2. `exit` / `quit`, `EOFError`, `KeyboardInterrupt`에서 멈춥니다.
3. `handle_turn(message, session_id=...)`를 호출합니다.
4. 어시스턴트 결과를 출력합니다.
5. `finally` 블록에서 지연된 세션 trace를 finalize합니다.
주입 가능한 I/O로 터미널 동작을 커스터마이즈할 수 있습니다:
```python
flow.chat(
session_id="demo-session",
prompt="You: ",
assistant_prefix="Assistant: ",
exit_commands=("exit", "quit", "bye"),
)
```
웹 앱, 백그라운드 worker, 테스트, 커스텀 transport에서는 계속 `handle_turn()`을 직접 사용하세요.
### 커스텀 router 동작
매 라우팅 결정마다 사이드 이펙트(이벤트 버스 셋업, 텔레메트리)를 실행하려면 `route_turn`을 오버라이드하세요:
@@ -409,17 +450,10 @@ LLM router를 우회해 프로그램적으로 라우트를 선택하려면 `rout
- **중첩 작업** (`Agent.kickoff()`, crew, Exa tool)은 **부모** batch에 추가; 내부 `AgentExecutor` flow가 세션 batch를 조기 종료하지 않음.
```python
try:
while True:
line = input("You: ").strip()
if not line:
break
flow.kickoff(user_message=line, session_id=session_id)
finally:
flow.finalize_session_traces()
flow.chat(session_id=session_id)
```
지연 활성화 시 `ChatSession.close()`가 `finalize_session_traces()`를 호출합니다.
`flow.chat()`이 `finalize_session_traces()`를 대신 호출합니다. `handle_turn()`이나 `kickoff(...)`로 직접 루프를 소유하는 경우, 세션이 끝날 때 `finalize_session_traces()`를 호출하세요.
`suppress_flow_events=True`는 Rich 콘솔 패널만 숨깁니다. trace 및 method 이벤트는 계속 발생합니다.

View File

@@ -7,7 +7,7 @@ mode: "wide"
## Visão geral
Apps conversacionais tratam cada linha do usuário como uma **nova execução do flow** com o **mesmo id de sessão**. A CrewAI oferece helpers para histórico de mensagens, classificação opcional de intenção, tracing adiado e pontes para UI — sem uma API `chat()` separada em `Flow`.
Apps conversacionais tratam cada linha do usuário como uma **nova execução do flow** com o **mesmo id de sessão**. A CrewAI oferece helpers para histórico de mensagens, classificação opcional de intenção, tracing adiado, pontes para UI e um REPL local `flow.chat()` para flows conversacionais.
| Conceito | Implementação |
|---------|----------------|
@@ -16,13 +16,15 @@ Apps conversacionais tratam cada linha do usuário como uma **nova execução do
| Fim do turno | `FlowFinished` só para **esta execução**; o chat segue no próximo `kickoff` |
| Trace da sessão | `ConversationalConfig(defer_trace_finalization=True)` + `finalize_session_traces()` |
## Um ponto de entrada: `kickoff`
## APIs de turno
Use **`flow.kickoff(user_message=..., session_id=...)`** para cada mensagem (REST, WebSocket, CLI). Não crie um wrapper `chat()` customizado em `Flow`.
Use **`flow.kickoff(user_message=..., session_id=...)`** ou **`flow.handle_turn(...)`** para cada mensagem de usuário em REST, WebSocket, testes e UIs customizadas. Use **`flow.chat()`** quando quiser um loop de chat local no terminal para um `Flow` conversacional.
| API | Uso |
|-----|-----|
| `kickoff(user_message=..., session_id=...)` | Cada mensagem do usuário |
| `handle_turn(message, session_id=...)` | Wrapper ergonômico de um turno para `Flow` conversacional |
| `chat()` | REPL local no terminal para `Flow` conversacional |
| `kickoff_async(...)` | Mesmos parâmetros; entrada async nativa |
| `ask()` | Prompt bloqueante **dentro** de um passo (wizard, esclarecimento) |
| `@human_feedback` | Aprovar/rejeitar **saída de um passo** — não a próxima linha do chat |
@@ -293,6 +295,15 @@ finally:
flow.finalize_session_traces()
```
Para um chat local no terminal, use `chat()`:
```python
def kickoff() -> None:
SupportFlow().chat()
```
`chat()` envolve `handle_turn()` em um REPL, sai com `exit` / `quit`, ignora linhas em branco por padrão e chama `finalize_session_traces()` quando a sessão termina.
### `ConversationConfig`
Decorador de classe que anexa os defaults de chat por classe.
@@ -376,6 +387,36 @@ Você pode sobrescrever qualquer uma definindo um handler com o mesmo nome na su
Você também pode chamar `flow.kickoff(user_message=..., session_id=...)` diretamente — a mesma lógica de reset/run é acionada. `handle_turn` é o wrapper ergonômico.
### `chat()` para REPLs locais
`flow.chat()` é o wrapper de terminal pronto para uso em cima de `handle_turn()`:
```python
flow = SupportFlow()
flow.chat()
```
Ele cobre o loop local comum:
1. Solicita uma mensagem do usuário.
2. Para com `exit` / `quit`, `EOFError` ou `KeyboardInterrupt`.
3. Chama `handle_turn(message, session_id=...)`.
4. Imprime o resultado do assistente.
5. Finaliza traces de sessão adiados em um bloco `finally`.
Customize o comportamento do terminal com I/O injetável:
```python
flow.chat(
session_id="demo-session",
prompt="You: ",
assistant_prefix="Assistant: ",
exit_commands=("exit", "quit", "bye"),
)
```
Para apps web, workers em background, testes e transportes customizados, continue usando `handle_turn()` diretamente.
### Comportamento customizado do router
Para rodar efeitos colaterais (setup de event bus, telemetria) em toda decisão de routing, sobrescreva `route_turn`:
@@ -410,17 +451,10 @@ Com `defer_trace_finalization=True` (padrão em `ConversationalConfig`):
- **Trabalho aninhado** (`Agent.kickoff()`, crews, tools Exa) acrescenta ao batch **pai**; flows internos de `AgentExecutor` não fecham o batch da sessão cedo.
```python
try:
while True:
line = input("You: ").strip()
if not line:
break
flow.kickoff(user_message=line, session_id=session_id)
finally:
flow.finalize_session_traces()
flow.chat(session_id=session_id)
```
`ChatSession.close()` chama `finalize_session_traces()` quando o adiamento está habilitado.
`flow.chat()` chama `finalize_session_traces()` para você. Quando você controla o loop com `handle_turn()` ou `kickoff(...)`, chame `finalize_session_traces()` quando a sessão terminar.
`suppress_flow_events=True` só oculta painéis do console; eventos de trace e método ainda são emitidos.

View File

@@ -1,14 +1,18 @@
"""Centralised lock factory.
If ``REDIS_URL`` is set and the ``redis`` package is installed, locks are
distributed via ``portalocker.RedisLock``. Otherwise, falls back to the
standard file-based ``portalocker.Lock`` in the system temp dir.
By default, if ``REDIS_URL`` is set and the ``redis`` package is installed,
locks are distributed via ``portalocker.RedisLock``. Otherwise, falls back to
the standard file-based ``portalocker.Lock`` in the system temp dir.
The backend can be replaced via :func:`set_lock_backend` to plug in a custom
locking strategy (e.g. a different distributed lock service, or an in-process
lock for tests).
"""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from collections.abc import Callable, Iterator
from contextlib import AbstractContextManager, contextmanager
from functools import lru_cache
from hashlib import md5
import logging
@@ -30,6 +34,25 @@ _REDIS_URL: str | None = os.environ.get("REDIS_URL")
_DEFAULT_TIMEOUT: Final[int] = 120
# A backend is called as ``backend(name, timeout=...)`` and returns a context
# manager that holds the lock while the ``with`` block runs.
LockBackend = Callable[..., AbstractContextManager[None]]
# ``None`` means use the built-in Redis/file selection.
_backend: LockBackend | None = None
def set_lock_backend(backend: LockBackend | None) -> None:
"""Replace the process-wide locking backend used by :func:`lock`.
Intended for one-time setup at startup. Pass ``None`` to restore the
built-in Redis/file default. In-flight :func:`lock` calls keep the backend
they started with, but swapping backends while other threads acquire locks
is otherwise unsynchronised.
"""
global _backend
_backend = backend
def _redis_available() -> bool:
"""Return True if redis is installed and REDIS_URL is set."""
@@ -58,10 +81,19 @@ def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]:
"""Acquire a named lock, yielding while it is held.
Args:
name: A human-readable lock name (e.g. ``"chromadb_init"``).
Automatically namespaced to avoid collisions.
name: A human-readable lock name (e.g. ``"chromadb_init"``). The
built-in default namespaces it to avoid collisions; a custom
backend receives it verbatim.
timeout: Maximum seconds to wait for the lock before raising.
"""
# Snapshot the global once: a concurrent set_lock_backend() must not turn
# the check-then-call into calling ``None``.
backend = _backend
if backend is not None:
with backend(name, timeout=timeout):
yield
return
channel = f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}"
if _redis_available():

View File

@@ -16,7 +16,7 @@ Import surface:
from __future__ import annotations
from collections.abc import Mapping, Sequence
from collections.abc import Callable, Mapping, Sequence
from enum import Enum
import json
import logging
@@ -243,6 +243,59 @@ class _ConversationalMixin:
self.append_assistant_message(self._stringify_result(result))
return result
def chat(
self,
*,
session_id: str | None = None,
prompt: str = "\nYou: ",
assistant_prefix: str = "\nAssistant: ",
exit_commands: Sequence[str] = ("exit", "quit"),
input_fn: Callable[[str], str] = input,
output_fn: Callable[[str], None] = print,
skip_empty: bool = True,
defer_trace_finalization: bool = True,
**handle_turn_kwargs: Any,
) -> None:
"""Run an interactive terminal chat loop for a conversational Flow.
``chat()`` is a convenience wrapper around ``handle_turn()`` for local
REPLs. For web apps, tests, and custom transports, call
``handle_turn()`` directly. The input/output callables are injectable so
callers can customize prompts or exercise the loop without patching
builtins.
"""
if not getattr(type(self), "conversational", False):
raise ValueError("Flow.chat() is only available on conversational flows")
exit_set = {command.lower() for command in exit_commands}
previous_defer = getattr(self, "defer_trace_finalization", False)
if defer_trace_finalization:
self.defer_trace_finalization = True
try:
while True:
try:
message = input_fn(prompt).strip()
except (EOFError, KeyboardInterrupt):
output_fn("")
break
if message.lower() in exit_set:
break
if skip_empty and not message:
continue
result = self.handle_turn(
message,
session_id=session_id,
**handle_turn_kwargs,
)
output_fn(f"{assistant_prefix}{self._stringify_result(result)}")
finally:
self.finalize_session_traces()
if defer_trace_finalization:
self.defer_trace_finalization = previous_defer
def build_router_context(self) -> dict[str, Any]:
"""Build context used by the routing policy for the current turn."""
state = cast(ConversationState, self.state)

View File

@@ -9,9 +9,9 @@ from crewai.flow.conversation import (
ConversationalConfig,
ConversationalInputs,
)
from crewai.flow.dsl import HumanFeedbackResult, human_feedback
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_config import flow_config
from crewai.flow.human_feedback import HumanFeedbackResult, human_feedback
from crewai.flow.input_provider import InputProvider, InputResponse
from crewai.flow.persistence import persist
from crewai.flow.visualization import (

View File

@@ -0,0 +1,32 @@
"""Flow DSL: the Python authoring layer for Flows.
Provides the ``@start`` / ``@listen`` / ``@router`` decorators and the
``or_`` / ``and_`` condition combinators used to write Flow classes in
Python. The DSL is one way to produce a Flow Structure: this package
extracts a :class:`~crewai.flow.flow_definition.FlowDefinition` from a
Python Flow class. Execution is handled by ``runtime``.
"""
from crewai.flow.dsl._conditions import and_, or_
from crewai.flow.dsl._human_feedback import (
HumanFeedbackResult,
human_feedback,
)
from crewai.flow.dsl._listen import listen
from crewai.flow.dsl._router import router
from crewai.flow.dsl._start import start
from crewai.flow.dsl._utils import (
build_flow_definition as build_flow_definition,
extract_flow_definition as extract_flow_definition,
)
__all__ = [
"HumanFeedbackResult",
"and_",
"human_feedback",
"listen",
"or_",
"router",
"start",
]

View File

@@ -0,0 +1,287 @@
"""Flow DSL condition primitives.
Type guards, the public ``or_`` / ``and_`` combinators, and the conversions
between runtime conditions, normalized conditions, and the
``FlowDefinitionCondition`` shape stored on a :class:`FlowDefinition`. These are
the lower layer of the DSL: the decorators and the definition builder
(``_utils``) build on top of them, so this module imports nothing from its
siblings.
"""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from typing_extensions import TypeIs
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.dsl._types import FlowTrigger
from crewai.flow.flow_definition import FlowDefinitionCondition
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
SimpleFlowCondition,
)
from crewai.flow.types import FlowMethodName
def _is_non_string_sequence(value: Any) -> bool:
return isinstance(value, Sequence) and not isinstance(value, (str, bytes))
def is_simple_flow_condition(obj: Any) -> TypeIs[SimpleFlowCondition]:
"""Check if the object is a ``(condition_type, methods)`` tuple."""
return (
isinstance(obj, tuple)
and len(obj) == 2
and isinstance(obj[0], str)
and isinstance(obj[1], list)
)
def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]:
"""Check if the object matches the FlowCondition structure."""
if not isinstance(obj, dict):
return False
type_value = obj.get("type")
if type_value not in ("AND", "OR"):
return False
if "conditions" in obj:
conditions = obj["conditions"]
if not _is_non_string_sequence(conditions):
return False
for cond in conditions:
if not (
isinstance(cond, str)
or (isinstance(cond, dict) and is_flow_condition_dict(cond))
):
return False
if "methods" in obj:
methods = obj["methods"]
if not (
_is_non_string_sequence(methods)
and all(isinstance(m, str) for m in methods)
):
return False
allowed_keys = {"type", "conditions", "methods"}
if not set(obj).issubset(allowed_keys):
return False
return True
def _method_reference_name(value: Any) -> FlowMethodName | None:
name = getattr(value, "__name__", None)
if callable(value) and isinstance(name, str):
return FlowMethodName(name)
return None
def _normalize_condition(
condition: FlowConditions | FlowCondition | str,
) -> FlowCondition:
if isinstance(condition, str):
return {"type": OR_CONDITION, "conditions": [FlowMethodName(condition)]}
if is_flow_condition_dict(condition):
if "conditions" in condition:
return condition
if "methods" in condition:
normalized_methods: list[str | FlowMethodName | FlowCondition] = list(
condition["methods"]
)
return {"type": condition["type"], "conditions": normalized_methods}
return condition
if _is_non_string_sequence(condition) and all(
isinstance(item, str) or is_flow_condition_dict(item) for item in condition
):
return {"type": OR_CONDITION, "conditions": condition}
raise ValueError(f"Cannot normalize condition: {condition}")
def _extract_all_methods_recursive(
condition: str | FlowCondition | dict[str, Any] | list[Any],
flow: Any | None = None,
) -> list[FlowMethodName]:
if isinstance(condition, str):
if flow is not None:
if condition in flow._methods:
return [FlowMethodName(condition)]
return []
return [FlowMethodName(condition)]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
methods = []
for sub_cond in normalized.get("conditions", []):
methods.extend(_extract_all_methods_recursive(sub_cond, flow))
return methods
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods_recursive(item, flow))
return methods
return []
def _extract_all_methods(
condition: str | FlowCondition | dict[str, Any] | list[Any],
) -> list[FlowMethodName]:
if isinstance(condition, str):
return [FlowMethodName(condition)]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
cond_type = normalized.get("type", OR_CONDITION)
if cond_type == AND_CONDITION:
return [
FlowMethodName(sub_cond)
for sub_cond in normalized.get("conditions", [])
if isinstance(sub_cond, str)
]
return []
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods(item))
return methods
return []
def _condition_trigger(condition: FlowTrigger) -> FlowMethodName | FlowCondition:
if isinstance(condition, str):
return FlowMethodName(condition)
if is_flow_condition_dict(condition):
return condition
method_name = _method_reference_name(condition)
if method_name is not None:
return method_name
raise ValueError("Invalid condition")
def _condition_triggers(
conditions: Sequence[FlowTrigger],
error_message: str,
) -> FlowConditions:
try:
return [_condition_trigger(condition) for condition in conditions]
except ValueError as exc:
raise ValueError(error_message) from exc
def _definition_condition_from_runtime(condition: Any) -> FlowDefinitionCondition:
if isinstance(condition, str):
return str(condition)
method_name = _method_reference_name(condition)
if method_name is not None:
return str(method_name)
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
key = "and" if normalized.get("type") == AND_CONDITION else "or"
return {
key: [
_definition_condition_from_runtime(sub_condition)
for sub_condition in normalized.get("conditions", [])
]
}
if isinstance(condition, list):
return {"or": [_definition_condition_from_runtime(item) for item in condition]}
return str(condition)
def or_(*triggers: FlowTrigger) -> FlowCondition:
"""Combine multiple triggers with OR logic for flow control.
Creates a condition that is satisfied when any of the specified triggers
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
triggers: Route labels, method references, or existing conditions
returned by or_() / and_().
Returns:
A condition dictionary with format {"type": "OR", "conditions": list_of_triggers}.
Raises:
ValueError: If a trigger format is invalid.
Examples:
>>> @listen(or_("success", "timeout"))
>>> def handle_completion(self):
... pass
>>> @listen(or_(and_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(triggers, "Invalid trigger in or_()")
return {"type": OR_CONDITION, "conditions": processed_triggers}
def and_(*triggers: FlowTrigger) -> FlowCondition:
"""Combine multiple triggers with AND logic for flow control.
Creates a condition that is satisfied only when all specified triggers
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
triggers: Route labels, method references, or existing conditions
returned by or_() / and_().
Returns:
A condition dictionary with format {"type": "AND", "conditions": list_of_conditions}
where each condition can be a route label, method name, or nested condition.
Raises:
ValueError: If any trigger is invalid.
Examples:
>>> @listen(and_("validated", "processed"))
>>> def handle_complete_data(self):
... pass
>>> @listen(and_(or_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(triggers, "Invalid trigger in and_()")
return {"type": AND_CONDITION, "conditions": processed_triggers}
def _runtime_condition_from_definition(
condition: FlowDefinitionCondition,
) -> FlowMethodName | FlowCondition:
if isinstance(condition, str):
return FlowMethodName(condition)
if is_flow_condition_dict(condition):
return condition
if "and" in condition:
return {
"type": AND_CONDITION,
"conditions": [
_runtime_condition_from_definition(item)
for item in condition.get("and", [])
],
}
return {
"type": OR_CONDITION,
"conditions": [
_runtime_condition_from_definition(item) for item in condition.get("or", [])
],
}
def _runtime_listener_condition_from_definition(
condition: FlowDefinitionCondition,
) -> SimpleFlowCondition | FlowCondition:
runtime_condition = _runtime_condition_from_definition(condition)
if isinstance(runtime_condition, str):
return (OR_CONDITION, [FlowMethodName(str(runtime_condition))])
return runtime_condition

View File

@@ -0,0 +1,98 @@
from __future__ import annotations
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any, TypeVar
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.human_feedback import (
HumanFeedbackConfig,
HumanFeedbackResult,
_build_human_feedback_runtime_decorator,
)
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import HumanFeedbackProvider
from crewai.llms.base_llm import BaseLLM
F = TypeVar("F", bound=Callable[..., Any])
__all__ = ["HumanFeedbackResult", "human_feedback"]
def _stamp_human_feedback_metadata(
wrapper: Any,
func: Callable[..., Any],
config: HumanFeedbackConfig,
) -> None:
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__trigger_condition__",
"__is_flow_method__",
"__flow_persistence_config__",
"__is_router__",
"__router_emit__",
"__flow_method_definition__",
]:
if hasattr(func, attr):
setattr(wrapper, attr, getattr(func, attr))
wrapper.__human_feedback_config__ = config
wrapper.__is_flow_method__ = True
if config.emit:
wrapper.__is_router__ = True
wrapper.__router_emit__ = list(config.emit)
fragment = getattr(wrapper, "__flow_method_definition__", None)
if isinstance(fragment, FlowMethodDefinition):
wrapper.__flow_method_definition__ = fragment.model_copy(
update={"router": True, "emit": list(config.emit)}
)
wrapper._human_feedback_llm = config.llm
def human_feedback(
message: str,
emit: Sequence[str] | None = None,
llm: str | BaseLLM | None = "gpt-4o-mini",
default_outcome: str | None = None,
metadata: dict[str, Any] | None = None,
provider: HumanFeedbackProvider | None = None,
learn: bool = False,
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Decorator for Flow methods that require human feedback."""
runtime_decorator = _build_human_feedback_runtime_decorator(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)
config = HumanFeedbackConfig(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)
def decorator(func: F) -> F:
wrapper = runtime_decorator(func)
_stamp_human_feedback_metadata(wrapper, func, config)
return wrapper
return decorator

View File

@@ -0,0 +1,55 @@
from __future__ import annotations
from collections.abc import Callable
from typing import cast
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_set_flow_method_definition,
_set_trigger_metadata,
)
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import ListenMethod
def listen(condition: FlowTrigger) -> FlowMethodDecorator:
"""Creates a listener that executes when specified conditions are met.
This decorator sets up a method to execute in response to other method
executions in the flow. It supports both simple and complex triggering
conditions.
Args:
condition: Route label, method reference, or condition returned by
or_() / and_() that triggers the listener.
Returns:
A flow method decorator that preserves the decorated method's static signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @listen("process_data")
>>> def handle_processed_data(self):
... pass
>>> @listen("method_name")
>>> def handle_completion(self):
... pass
"""
def decorator(func: Callable[P, R]) -> ListenMethod[P, R]:
wrapper = ListenMethod(func)
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(listen=_definition_condition_from_runtime(condition)),
)
_set_trigger_metadata(wrapper, condition)
return wrapper
return cast(FlowMethodDecorator, decorator)

View File

@@ -0,0 +1,166 @@
from __future__ import annotations
from collections.abc import Callable, Sequence
from enum import Enum
import inspect
from types import UnionType
from typing import (
Any,
Literal,
Union,
cast,
get_args,
get_origin,
get_type_hints,
)
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_set_flow_method_definition,
_set_trigger_metadata,
)
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import RouterMethod
def _unwrap_function(function: Any) -> Any:
if hasattr(function, "__func__"):
function = function.__func__
if hasattr(function, "__wrapped__"):
wrapped = function.__wrapped__
if hasattr(wrapped, "unwrap"):
return wrapped.unwrap()
return wrapped
if hasattr(function, "unwrap"):
return function.unwrap()
return function
def _string_values_from_annotation(annotation: Any) -> list[str]:
if annotation is inspect.Signature.empty or isinstance(annotation, str):
return []
if isinstance(annotation, type) and issubclass(annotation, Enum):
return [member.value for member in annotation if isinstance(member.value, str)]
origin = get_origin(annotation)
if origin is None:
return []
args = get_args(annotation)
if origin is Literal or getattr(origin, "__name__", "") == "Literal":
return [arg for arg in args if isinstance(arg, str)]
if not (
origin is Union
or origin is UnionType
or getattr(origin, "__name__", "") == "Annotated"
):
return []
values: list[str] = []
for arg in args:
values.extend(_string_values_from_annotation(arg))
return values
def _return_annotation(function: Any) -> Any:
unwrapped = _unwrap_function(function)
try:
return get_type_hints(unwrapped, include_extras=True).get(
"return", inspect.Signature.empty
)
except (NameError, TypeError, ValueError):
try:
return inspect.signature(unwrapped).return_annotation
except (TypeError, ValueError):
return inspect.Signature.empty
def _get_router_return_events(function: Any) -> list[str] | None:
values = _string_values_from_annotation(_return_annotation(function))
return list(dict.fromkeys(values)) if values else None
def _normalize_router_emit(value: Sequence[Any] | str) -> list[str]:
if isinstance(value, str):
return [str(value)]
return list(dict.fromkeys(str(item) for item in value))
def router(
condition: FlowTrigger,
*,
emit: Sequence[str] | str | None = None,
) -> FlowMethodDecorator:
"""Creates a routing method that directs flow execution based on conditions.
This decorator marks a method as a router, which can dynamically determine
the next steps in the flow based on its return value. Routers are triggered
by specified conditions and can return constants that emit downstream events.
Args:
condition: Specifies when the router should execute. Can be:
- str: Route label or method name that triggers this router
- FlowCondition: Result from or_() or and_(), including nested conditions
- Flow method reference: A method whose completion triggers this router
emit: Optional explicit router output events for static FlowDefinition
and visualization. If omitted, Literal/Enum return annotations are
used when available.
Returns:
A flow method decorator that preserves the decorated method's static signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @router("check_status")
>>> def route_based_on_status(self):
... if self.state.status == "success":
... return "SUCCESS"
... return "FAILURE"
>>> @router(and_("validate", "process"))
>>> def complex_routing(self):
... if all([self.state.valid, self.state.processed]):
... return "CONTINUE"
... return "STOP"
>>> @router("check_status", emit=["SUCCESS", "FAILURE"])
>>> def explicit_routing(self):
... return "SUCCESS"
"""
def decorator(func: Callable[P, R]) -> RouterMethod[P, R]:
wrapper = RouterMethod(func)
if emit is not None:
router_events = _normalize_router_emit(emit)
else:
router_events = _get_router_return_events(func) or []
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
listen=_definition_condition_from_runtime(condition),
router=True,
emit=router_events or None,
),
)
_set_trigger_metadata(wrapper, condition)
if emit is not None:
wrapper.__router_emit__ = router_events
elif router_events:
wrapper.__router_emit__ = router_events
return wrapper
return cast(FlowMethodDecorator, decorator)

View File

@@ -0,0 +1,69 @@
from __future__ import annotations
from collections.abc import Callable
from typing import cast
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
from crewai.flow.dsl._types import FlowMethodDecorator, FlowTrigger
from crewai.flow.dsl._utils import (
P,
R,
_set_flow_method_definition,
_set_trigger_metadata,
)
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import StartMethod
def start(
condition: FlowTrigger | None = None,
) -> FlowMethodDecorator:
"""Marks a method as a flow's starting point.
This decorator designates a method as an entry point for the flow execution.
It can optionally specify conditions that trigger the start based on other
method executions.
Args:
condition: Defines when the start method should execute. Can be:
- str: Route label or method name that triggers this start
- FlowCondition: Result from or_() or and_(), including nested conditions
- Flow method reference: A method whose completion triggers this start
Default is None, meaning unconditional start.
Returns:
A flow method decorator that preserves the decorated method's static signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @start() # Unconditional start
>>> def begin_flow(self):
... pass
>>> @start("method_name") # Start after specific method
>>> def conditional_start(self):
... pass
>>> @start(and_("method1", "method2")) # Start after multiple methods
>>> def complex_start(self):
... pass
"""
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
wrapper = StartMethod(func)
if condition is not None:
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
start=_definition_condition_from_runtime(condition)
),
)
_set_trigger_metadata(wrapper, condition)
else:
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
return wrapper
return cast(FlowMethodDecorator, decorator)

View File

@@ -0,0 +1,27 @@
"""Private typing helpers for the Python Flow DSL."""
from __future__ import annotations
from collections.abc import Callable
from typing import Any, Protocol, TypeAlias, TypeVar
from crewai.flow.flow_wrappers import FlowCondition
from crewai.flow.types import FlowMethodCallable
__all__ = ["FlowMethodDecorator", "FlowTrigger"]
F = TypeVar("F", bound=Callable[..., Any])
FlowTrigger: TypeAlias = str | FlowMethodCallable[..., Any] | FlowCondition
class FlowMethodDecorator(Protocol):
"""Decorator returned by Flow DSL authoring helpers.
The runtime wraps methods in FlowMethod subclasses, but the authoring
contract preserves the decorated method's static callable type.
"""
def __call__(self, func: F) -> F:
raise NotImplementedError

View File

@@ -1,35 +1,22 @@
"""Flow DSL: the Python authoring layer for Flows.
Provides the ``@start`` / ``@listen`` / ``@router`` decorators and the
``or_`` / ``and_`` condition combinators used to write Flow classes in
Python. The DSL is one way to produce a Flow Structure: this module
extracts a :class:`~crewai.flow.flow_definition.FlowDefinition` from a
Python Flow class. Execution is handled by ``runtime``.
"""
from __future__ import annotations
from collections.abc import Callable, Sequence
from enum import Enum
import inspect
from collections.abc import Sequence
import json
import logging
from types import UnionType
from typing import (
Any,
Literal,
ParamSpec,
TypeVar,
Union,
get_args,
get_origin,
get_type_hints,
)
from typing import Any, ParamSpec, TypeVar
from pydantic import BaseModel
from typing_extensions import TypeIs
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.dsl._conditions import (
_definition_condition_from_runtime,
_extract_all_methods,
_method_reference_name,
_runtime_listener_condition_from_definition,
is_flow_condition_dict,
)
from crewai.flow.dsl._types import FlowTrigger
from crewai.flow.flow_definition import (
FlowConfigDefinition,
FlowDefinition,
@@ -41,12 +28,9 @@ from crewai.flow.flow_definition import (
FlowStateDefinition,
)
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
FlowMethod,
ListenMethod,
RouterMethod,
SimpleFlowCondition,
StartMethod,
)
from crewai.flow.types import FlowMethodName
@@ -57,21 +41,9 @@ R = TypeVar("R")
logger = logging.getLogger(__name__)
__all__ = ["and_", "listen", "or_", "router", "start"]
_FLOW_METHOD_DEFINITION_ATTR = "__flow_method_definition__"
def is_simple_flow_condition(obj: Any) -> TypeIs[SimpleFlowCondition]:
"""Check if the object is a ``(condition_type, methods)`` tuple."""
return (
isinstance(obj, tuple)
and len(obj) == 2
and isinstance(obj[0], str)
and isinstance(obj[1], list)
)
def is_flow_method(obj: Any) -> TypeIs[FlowMethod[Any, Any]]:
"""Check if the object carries Flow method wrapper metadata."""
return (
@@ -89,187 +61,13 @@ def _should_include_flow_method(flow_class: type, method: Any) -> bool:
return True
def is_flow_condition_dict(obj: Any) -> TypeIs[FlowCondition]:
"""Check if the object matches the FlowCondition structure."""
if not isinstance(obj, dict):
return False
type_value = obj.get("type")
if type_value not in ("AND", "OR"):
return False
if "conditions" in obj:
conditions = obj["conditions"]
if not isinstance(conditions, list):
return False
for cond in conditions:
if not (
isinstance(cond, str)
or (isinstance(cond, dict) and is_flow_condition_dict(cond))
):
return False
if "methods" in obj:
methods = obj["methods"]
if not (isinstance(methods, list) and all(isinstance(m, str) for m in methods)):
return False
allowed_keys = {"type", "conditions", "methods"}
if not set(obj).issubset(allowed_keys):
return False
return True
def _method_reference_name(value: Any) -> FlowMethodName | None:
name = getattr(value, "__name__", None)
if callable(value) and isinstance(name, str):
return FlowMethodName(name)
return None
def _flow_method_names(values: Sequence[Any]) -> list[FlowMethodName]:
return [FlowMethodName(str(value)) for value in values]
def _extract_all_methods_recursive(
condition: str | FlowCondition | dict[str, Any] | list[Any],
flow: Any | None = None,
) -> list[FlowMethodName]:
if isinstance(condition, str):
if flow is not None:
if condition in flow._methods:
return [FlowMethodName(condition)]
return []
return [FlowMethodName(condition)]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
methods = []
for sub_cond in normalized.get("conditions", []):
methods.extend(_extract_all_methods_recursive(sub_cond, flow))
return methods
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods_recursive(item, flow))
return methods
return []
def _normalize_condition(
condition: FlowConditions | FlowCondition | str,
) -> FlowCondition:
if isinstance(condition, str):
return {"type": OR_CONDITION, "conditions": [FlowMethodName(condition)]}
if is_flow_condition_dict(condition):
if "conditions" in condition:
return condition
if "methods" in condition:
return {"type": condition["type"], "conditions": condition["methods"]}
return condition
if isinstance(condition, list) and all(
isinstance(item, str) or is_flow_condition_dict(item) for item in condition
):
return {"type": OR_CONDITION, "conditions": condition}
raise ValueError(f"Cannot normalize condition: {condition}")
def _extract_all_methods(
condition: str | FlowCondition | dict[str, Any] | list[Any],
) -> list[FlowMethodName]:
if isinstance(condition, str):
return [FlowMethodName(condition)]
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
cond_type = normalized.get("type", OR_CONDITION)
if cond_type == AND_CONDITION:
return [
FlowMethodName(sub_cond)
for sub_cond in normalized.get("conditions", [])
if isinstance(sub_cond, str)
]
return []
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods(item))
return methods
return []
def _unwrap_function(function: Any) -> Any:
if hasattr(function, "__func__"):
function = function.__func__
if hasattr(function, "__wrapped__"):
wrapped = function.__wrapped__
if hasattr(wrapped, "unwrap"):
return wrapped.unwrap()
return wrapped
if hasattr(function, "unwrap"):
return function.unwrap()
return function
def _string_values_from_annotation(annotation: Any) -> list[str]:
if annotation is inspect.Signature.empty or isinstance(annotation, str):
return []
if isinstance(annotation, type) and issubclass(annotation, Enum):
return [member.value for member in annotation if isinstance(member.value, str)]
origin = get_origin(annotation)
if origin is None:
return []
args = get_args(annotation)
if origin is Literal or getattr(origin, "__name__", "") == "Literal":
return [arg for arg in args if isinstance(arg, str)]
if not (
origin is Union
or origin is UnionType
or getattr(origin, "__name__", "") == "Annotated"
):
return []
values: list[str] = []
for arg in args:
values.extend(_string_values_from_annotation(arg))
return values
def _return_annotation(function: Any) -> Any:
unwrapped = _unwrap_function(function)
try:
return get_type_hints(unwrapped, include_extras=True).get(
"return", inspect.Signature.empty
)
except (NameError, TypeError, ValueError):
try:
return inspect.signature(unwrapped).return_annotation
except (TypeError, ValueError):
return inspect.Signature.empty
def _get_router_return_events(function: Any) -> list[str] | None:
values = _string_values_from_annotation(_return_annotation(function))
return list(dict.fromkeys(values)) if values else None
def _normalize_router_emit(value: Sequence[Any] | str) -> list[str]:
if isinstance(value, str):
return [str(value)]
return list(dict.fromkeys(str(item) for item in value))
def _set_trigger_metadata(
wrapper: StartMethod[P, R] | ListenMethod[P, R] | RouterMethod[P, R],
condition: str | FlowCondition | Callable[..., Any],
condition: FlowTrigger,
) -> None:
if isinstance(condition, str):
wrapper.__trigger_methods__ = [FlowMethodName(condition)]
@@ -299,29 +97,6 @@ def _set_trigger_metadata(
)
def _condition_trigger(
condition: str | FlowCondition | Callable[..., Any],
) -> FlowMethodName | FlowCondition:
if isinstance(condition, str):
return FlowMethodName(condition)
if is_flow_condition_dict(condition):
return condition
method_name = _method_reference_name(condition)
if method_name is not None:
return method_name
raise ValueError("Invalid condition")
def _condition_triggers(
conditions: Sequence[str | FlowCondition | Callable[..., Any]],
error_message: str,
) -> FlowConditions:
try:
return [_condition_trigger(condition) for condition in conditions]
except ValueError as exc:
raise ValueError(error_message) from exc
def _set_flow_method_definition(
wrapper: StartMethod[P, R] | ListenMethod[P, R] | RouterMethod[P, R],
definition: FlowMethodDefinition,
@@ -338,232 +113,6 @@ def _get_flow_method_definition(method: Any) -> FlowMethodDefinition | None:
return None
def start(
condition: str | FlowCondition | Callable[..., Any] | None = None,
) -> Callable[[Callable[P, R]], StartMethod[P, R]]:
"""Marks a method as a flow's starting point.
This decorator designates a method as an entry point for the flow execution.
It can optionally specify conditions that trigger the start based on other
method executions.
Args:
condition: Defines when the start method should execute. Can be:
- str: Name of a method that triggers this start
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this start
Default is None, meaning unconditional start.
Returns:
A decorator function that wraps the method as a flow start point and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @start() # Unconditional start
>>> def begin_flow(self):
... pass
>>> @start("method_name") # Start after specific method
>>> def conditional_start(self):
... pass
>>> @start(and_("method1", "method2")) # Start after multiple methods
>>> def complex_start(self):
... pass
"""
def decorator(func: Callable[P, R]) -> StartMethod[P, R]:
wrapper = StartMethod(func)
if condition is not None:
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
start=_definition_condition_from_runtime(condition)
),
)
_set_trigger_metadata(wrapper, condition)
else:
_set_flow_method_definition(wrapper, FlowMethodDefinition(start=True))
return wrapper
return decorator
def listen(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], ListenMethod[P, R]]:
"""Creates a listener that executes when specified conditions are met.
This decorator sets up a method to execute in response to other method
executions in the flow. It supports both simple and complex triggering
conditions.
Args:
condition: Specifies when the listener should execute.
Returns:
A decorator function that wraps the method as a flow listener and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @listen("process_data")
>>> def handle_processed_data(self):
... pass
>>> @listen("method_name")
>>> def handle_completion(self):
... pass
"""
def decorator(func: Callable[P, R]) -> ListenMethod[P, R]:
wrapper = ListenMethod(func)
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(listen=_definition_condition_from_runtime(condition)),
)
_set_trigger_metadata(wrapper, condition)
return wrapper
return decorator
def router(
condition: str | FlowCondition | Callable[..., Any],
*,
emit: Sequence[str] | str | None = None,
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
"""Creates a routing method that directs flow execution based on conditions.
This decorator marks a method as a router, which can dynamically determine
the next steps in the flow based on its return value. Routers are triggered
by specified conditions and can return constants that emit downstream events.
Args:
condition: Specifies when the router should execute. Can be:
- str: Name of a method that triggers this router
- FlowCondition: Result from or_() or and_(), including nested conditions
- Callable[..., Any]: A method reference that triggers this router
emit: Optional explicit router output events for static FlowDefinition
and visualization. If omitted, Literal/Enum return annotations are
used when available.
Returns:
A decorator function that wraps the method as a router and preserves its signature.
Raises:
ValueError: If the condition format is invalid.
Examples:
>>> @router("check_status")
>>> def route_based_on_status(self):
... if self.state.status == "success":
... return "SUCCESS"
... return "FAILURE"
>>> @router(and_("validate", "process"))
>>> def complex_routing(self):
... if all([self.state.valid, self.state.processed]):
... return "CONTINUE"
... return "STOP"
>>> @router("check_status", emit=["SUCCESS", "FAILURE"])
>>> def explicit_routing(self):
... return "SUCCESS"
"""
def decorator(func: Callable[P, R]) -> RouterMethod[P, R]:
wrapper = RouterMethod(func)
if emit is not None:
router_events = _normalize_router_emit(emit)
else:
router_events = _get_router_return_events(func) or []
_set_flow_method_definition(
wrapper,
FlowMethodDefinition(
listen=_definition_condition_from_runtime(condition),
router=True,
emit=router_events or None,
),
)
_set_trigger_metadata(wrapper, condition)
if emit is not None:
wrapper.__router_emit__ = router_events
elif router_events:
wrapper.__router_emit__ = router_events
return wrapper
return decorator
def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with OR logic for flow control.
Creates a condition that is satisfied when any of the specified conditions
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
Returns:
A condition dictionary with format {"type": "OR", "conditions": list_of_conditions} where each condition can be a string (method name) or a nested dict
Raises:
ValueError: If condition format is invalid.
Examples:
>>> @listen(or_("success", "timeout"))
>>> def handle_completion(self):
... pass
>>> @listen(or_(and_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(conditions, "Invalid condition in or_()")
return {"type": OR_CONDITION, "conditions": processed_triggers}
def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with AND logic for flow control.
Creates a condition that is satisfied only when all specified conditions
are met. This is used with @start, @listen, or @router decorators to create
complex triggering conditions.
Args:
*conditions: Variable number of conditions that can be method names, existing condition dictionaries, or method references.
Returns:
A condition dictionary with format {"type": "AND", "conditions": list_of_conditions}
where each condition can be a string (method name) or a nested dict
Raises:
ValueError: If any condition is invalid.
Examples:
>>> @listen(and_("validated", "processed"))
>>> def handle_complete_data(self):
... pass
>>> @listen(and_(or_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_triggers = _condition_triggers(conditions, "Invalid condition in and_()")
return {"type": AND_CONDITION, "conditions": processed_triggers}
def _object_ref(value: Any) -> str:
target = value if isinstance(value, type) else type(value)
module = getattr(target, "__module__", "")
@@ -689,26 +238,6 @@ def _build_config_definition(
return FlowConfigDefinition(**values)
def _definition_condition_from_runtime(condition: Any) -> FlowDefinitionCondition:
if isinstance(condition, str):
return str(condition)
method_name = _method_reference_name(condition)
if method_name is not None:
return str(method_name)
if is_flow_condition_dict(condition):
normalized = _normalize_condition(condition)
key = "and" if normalized.get("type") == AND_CONDITION else "or"
return {
key: [
_definition_condition_from_runtime(sub_condition)
for sub_condition in normalized.get("conditions", [])
]
}
if isinstance(condition, list):
return {"or": [_definition_condition_from_runtime(item) for item in condition]}
return str(condition)
def _condition_from_method_metadata(method: Any) -> FlowDefinitionCondition | None:
trigger_condition = getattr(method, "__trigger_condition__", None)
if trigger_condition is not None:
@@ -760,39 +289,6 @@ def _definition_trigger_condition(
return None
def _runtime_condition_from_definition(
condition: FlowDefinitionCondition,
) -> FlowMethodName | FlowCondition:
if isinstance(condition, str):
return FlowMethodName(condition)
if is_flow_condition_dict(condition):
return condition
if "and" in condition:
return {
"type": AND_CONDITION,
"conditions": [
_runtime_condition_from_definition(item)
for item in condition.get("and", [])
],
}
return {
"type": OR_CONDITION,
"conditions": [
_runtime_condition_from_definition(item) for item in condition.get("or", [])
],
}
def _runtime_listener_condition_from_definition(
condition: FlowDefinitionCondition,
) -> SimpleFlowCondition | FlowCondition:
runtime_condition = _runtime_condition_from_definition(condition)
if isinstance(runtime_condition, str):
return (OR_CONDITION, [FlowMethodName(str(runtime_condition))])
return runtime_condition
def _build_human_feedback_definition(
method: Any,
diagnostics: list[FlowDefinitionDiagnostic],

View File

@@ -37,16 +37,16 @@ class FlowCondition(TypedDict, total=False):
Attributes:
type: The type of the condition.
conditions: A list of conditions types.
methods: A list of methods.
conditions: A sequence of route labels, method names, or nested conditions.
methods: A legacy sequence of route labels or method names.
"""
type: Required[FlowConditionType]
conditions: Sequence[FlowMethodName | FlowCondition]
methods: list[FlowMethodName]
conditions: Sequence[str | FlowMethodName | FlowCondition]
methods: Sequence[str | FlowMethodName]
FlowConditions: TypeAlias = list[FlowMethodName | FlowCondition]
FlowConditions: TypeAlias = Sequence[str | FlowMethodName | FlowCondition]
class FlowMethod(Generic[P, R]):

View File

@@ -65,7 +65,6 @@ from typing import TYPE_CHECKING, Any, TypeVar
from pydantic import BaseModel, Field
from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import FlowMethod
@@ -222,7 +221,7 @@ class DistilledLessons(BaseModel):
)
def human_feedback(
def _build_human_feedback_runtime_decorator(
message: str,
emit: Sequence[str] | None = None,
llm: str | BaseLLM | None = "gpt-4o-mini",
@@ -233,102 +232,6 @@ def human_feedback(
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Decorator for Flow methods that require human feedback.
This decorator wraps a Flow method to:
1. Execute the method and capture its output
2. Display the output to the human with a feedback request
3. Collect the human's free-form feedback
4. Optionally collapse the feedback to a predefined outcome using an LLM
5. Store the result for access by downstream methods
When `emit` is specified, the decorator acts as a router, and the
collapsed outcome triggers the appropriate @listen decorated method.
Supports both synchronous (blocking) and asynchronous (non-blocking)
feedback collection through the `provider` parameter. If no provider
is specified, defaults to synchronous console input.
Args:
message: The message shown to the human when requesting feedback.
This should clearly explain what kind of feedback is expected.
emit: Optional sequence of outcome strings. When provided, the
human's feedback will be collapsed to one of these outcomes
using the specified LLM. The outcome then triggers @listen
methods that match.
llm: The LLM model to use for collapsing feedback to outcomes.
Required when emit is specified. Can be a model string
like "gpt-4o-mini" or a BaseLLM instance.
default_outcome: The outcome to use when the human provides no
feedback (empty input). Must be one of the emit values
if emit is specified.
metadata: Optional metadata for enterprise integrations. This is
passed through to the HumanFeedbackResult and can be used
by enterprise forks for features like Slack/Teams integration.
provider: Optional HumanFeedbackProvider for custom feedback
collection. Use this for async workflows that integrate with
external systems like Slack, Teams, or webhooks. When the
provider raises HumanFeedbackPending, the flow pauses and
can be resumed later with Flow.resume().
learn: Enable HITL learning. Recall past lessons to pre-review
output before the human sees it, and distill new lessons
from feedback after.
learn_source: Memory source tag for stored/recalled lessons.
learn_strict: When True, re-raise exceptions from the pre-review
and distillation steps instead of falling back to raw output.
Default False preserves graceful degradation; failures are
always logged via ``logger.warning`` regardless of this flag.
Returns:
A decorator function that wraps the method with human feedback
collection logic.
Raises:
ValueError: If emit is specified but llm is not provided.
ValueError: If default_outcome is specified but emit is not.
ValueError: If default_outcome is not in the emit list.
HumanFeedbackPending: When an async provider pauses execution.
Example:
Basic feedback without routing:
```python
@start()
@human_feedback(message="Please review this output:")
def generate_content(self):
return "Generated content..."
```
With routing based on feedback:
```python
@start()
@human_feedback(
message="Review and approve or reject:",
emit=["approved", "rejected", "needs_revision"],
llm="gpt-4o-mini",
default_outcome="needs_revision",
)
def review_document(self):
return document_content
@listen("approved")
def publish(self):
print(f"Publishing: {self.last_human_feedback.output}")
```
Async feedback with custom provider:
```python
@start()
@human_feedback(
message="Review this content:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
provider=SlackProvider(channel="#reviews"),
)
def generate_content(self):
return "Content to review..."
```
"""
if emit is not None:
if not llm:
raise ValueError(
@@ -631,55 +534,33 @@ def human_feedback(
wrapper = sync_wrapper
for attr in [
"__is_start_method__",
"__trigger_methods__",
"__condition_type__",
"__trigger_condition__",
"__is_flow_method__",
"__flow_persistence_config__",
"__is_router__",
"__router_emit__",
"__flow_method_definition__",
]:
if hasattr(func, attr):
setattr(wrapper, attr, getattr(func, attr))
# Create config inline to avoid race conditions
wrapper.__human_feedback_config__ = HumanFeedbackConfig(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)
wrapper.__is_flow_method__ = True
if emit:
wrapper.__is_router__ = True
wrapper.__router_emit__ = list(emit)
# Keep the definition fragment in sync: emit promotes the method to
# a router and the feedback outcomes replace any emit recorded by an
# inner @router. Copy before updating so the wrapped method's own
# fragment (shared by reference) is left untouched.
fragment = getattr(wrapper, "__flow_method_definition__", None)
if isinstance(fragment, FlowMethodDefinition):
wrapper.__flow_method_definition__ = fragment.model_copy(
update={"router": True, "emit": list(emit)}
)
# Stash the live LLM object for HITL resume to retrieve.
# When a flow pauses for human feedback and later resumes (possibly in a
# different process), the serialized context only contains a model string.
# By storing the original LLM on the wrapper, resume_async can retrieve
# the fully-configured LLM (with credentials, project, safety_settings, etc.)
# instead of creating a bare LLM from just the model string.
wrapper._human_feedback_llm = llm
return wrapper # type: ignore[no-any-return]
return decorator
def human_feedback(
message: str,
emit: Sequence[str] | None = None,
llm: str | BaseLLM | None = "gpt-4o-mini",
default_outcome: str | None = None,
metadata: dict[str, Any] | None = None,
provider: HumanFeedbackProvider | None = None,
learn: bool = False,
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Compatibility import path for the Flow human-feedback DSL decorator."""
from crewai.flow.dsl._human_feedback import human_feedback as dsl_human_feedback
return dsl_human_feedback(
message=message,
emit=emit,
llm=llm,
default_outcome=default_outcome,
metadata=metadata,
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)

View File

@@ -90,15 +90,17 @@ from crewai.experimental.conversational import (
)
from crewai.experimental.conversational_mixin import _ConversationalMixin
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.dsl import (
from crewai.flow.dsl._conditions import (
_extract_all_methods,
_extract_all_methods_recursive,
_normalize_condition,
is_flow_condition_dict,
is_simple_flow_condition,
)
from crewai.flow.dsl._utils import (
build_flow_definition,
extract_flow_definition,
is_flow_condition_dict,
is_flow_method,
is_simple_flow_condition,
)
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_definition import FlowDefinition
@@ -704,16 +706,16 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
# When ``conversational = True`` on a subclass, the built-in conversational
# graph (``conversation_start`` -> ``route_conversation`` -> ``converse_turn``
# / ``end_conversation`` / ``answer_from_history_turn``) registers and
# ``handle_turn`` becomes the chat entry point. When ``False`` (default),
# the methods exist as inert attributes and never register or fire —
# non-chat flows pay no runtime cost.
# ``handle_turn`` / ``chat`` become the chat entry points. When ``False``
# (default), the methods exist as inert attributes and never register or
# fire — non-chat flows pay no runtime cost.
#
# ⚠ EXPERIMENTAL FEATURE. The whole conversational surface
# (``conversational`` ClassVar, ``handle_turn``, ``ConversationConfig``,
# ``RouterConfig``, ``ConversationState``, the built-in graph + helpers)
# lives under ``crewai.experimental`` and may change shape before
# graduating. Pin your CrewAI version if you depend on specific
# behavior, and watch the changelog for breaking updates.
# (``conversational`` ClassVar, ``handle_turn``, ``chat``,
# ``ConversationConfig``, ``RouterConfig``, ``ConversationState``, the
# built-in graph + helpers) lives under ``crewai.experimental`` and may
# change shape before graduating. Pin your CrewAI version if you depend on
# specific behavior, and watch the changelog for breaking updates.
conversational: ClassVar[bool] = False
conversational_config: ClassVar[ConversationConfig | None] = None
builtin_routes: ClassVar[tuple[str, ...]] = ("converse", "end")
@@ -2830,7 +2832,7 @@ class Flow(_ConversationalMixin, BaseModel, Generic[T], metaclass=FlowMeta):
def _evaluate_condition(
self,
condition: FlowMethodName | FlowCondition,
condition: str | FlowMethodName | FlowCondition,
trigger_method: FlowMethodName,
listener_name: FlowMethodName,
) -> bool:

View File

@@ -31,7 +31,7 @@ PendingListenerKey = NewType(
class FlowMethodCallable(Protocol[P, R]):
"""A callable that can be used as a flow method reference."""
__name__: FlowMethodName
__name__: str
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: ...

View File

@@ -337,7 +337,7 @@ class RecallFlow(Flow[RecallState]):
@router(re_search)
def re_decide_depth(self) -> str:
"""Re-evaluate depth after re-search. Same logic as decide_depth."""
return self.decide_depth() # type: ignore[call-arg]
return self.decide_depth()
@listen("synthesize")
def synthesize_results(self) -> list[MemoryMatch]:

View File

@@ -858,6 +858,86 @@ class TestConversationalFlow:
flow.handle_turn("anything")
assert flow.state.messages[-1].content == "worked"
def test_chat_runs_repl_over_handle_turn_and_finalizes(self) -> None:
@ConversationConfig(defer_trace_finalization=False)
class MyChat(ConversationalFlow):
turns: int = 0
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> str:
self.turns += 1
reply = f"worked: {self.state.current_user_message}"
self.append_assistant_message(reply)
return reply
flow = MyChat()
inputs = iter(["first", "", "second", "quit"])
prompts: list[str] = []
outputs: list[str] = []
def input_fn(prompt: str) -> str:
prompts.append(prompt)
return next(inputs)
with patch.object(flow, "finalize_session_traces") as mock_finalize:
flow.chat(
session_id="session-1",
input_fn=input_fn,
output_fn=outputs.append,
)
assert flow.turns == 2
assert prompts == ["\nYou: ", "\nYou: ", "\nYou: ", "\nYou: "]
assert outputs == [
"\nAssistant: worked: first",
"\nAssistant: worked: second",
]
mock_finalize.assert_called_once_with()
assert flow.defer_trace_finalization is False
def test_chat_stringifies_repl_output_like_conversation_helpers(self) -> None:
class RawResult:
raw = "raw assistant output"
@ConversationConfig(defer_trace_finalization=False)
class MyChat(ConversationalFlow):
def route_turn(self, context: dict[str, Any]) -> str | None:
return "work"
@listen("work")
def do_work(self) -> RawResult:
return RawResult()
flow = MyChat()
inputs = iter(["first", "quit"])
outputs: list[str] = []
with patch.object(flow, "finalize_session_traces"):
flow.chat(
input_fn=lambda _: next(inputs),
output_fn=outputs.append,
)
assert outputs == ["\nAssistant: raw assistant output"]
def test_chat_rejects_non_conversational_flows(self) -> None:
class PlainFlow(Flow):
@start()
def begin(self) -> str:
return "done"
flow = PlainFlow()
try:
flow.chat(input_fn=lambda _: "quit")
except ValueError as exc:
assert "conversational flows" in str(exc)
else:
raise AssertionError("Flow.chat() should reject regular flows")
def test_defer_trace_finalization_skips_per_turn_finalize(self) -> None:
"""``defer_trace_finalization = True`` suppresses per-turn ``finalize_batch``.

View File

@@ -14,6 +14,7 @@ import crewai.flow.dsl as flow_dsl
import crewai.flow.flow_definition as flow_definition
import crewai.flow.visualization.builder as visualization_builder
from crewai.flow import Flow, and_, human_feedback, listen, or_, persist, router, start
from crewai.flow.dsl._conditions import is_flow_condition_dict
def test_flow_public_exports_are_explicit():
@@ -25,7 +26,15 @@ def test_flow_public_exports_are_explicit():
assert "FlowDefinitionDiagnostic" not in flow_package.__all__
assert "build_flow_definition" not in flow_package.__all__
assert "flow_structure" not in flow_package.__all__
assert set(flow_dsl.__all__) == {"and_", "listen", "or_", "router", "start"}
assert set(flow_dsl.__all__) == {
"HumanFeedbackResult",
"and_",
"human_feedback",
"listen",
"or_",
"router",
"start",
}
assert set(flow_definition.__all__) == {
"FlowConfigDefinition",
"FlowDefinition",
@@ -40,6 +49,20 @@ def test_flow_public_exports_are_explicit():
assert "calculate_node_levels" not in flow_visualization.__all__
def test_flow_condition_dict_accepts_non_string_sequences():
condition = {
"type": "OR",
"conditions": (
"approved",
{"type": "AND", "methods": ("validated", "processed")},
),
}
assert is_flow_condition_dict(condition)
assert not is_flow_condition_dict({"type": "OR", "conditions": "approved"})
assert not is_flow_condition_dict({"type": "OR", "methods": b"approved"})
def test_private_flow_helpers_do_not_have_docstrings():
import crewai.flow.flow_wrappers as flow_wrappers
import crewai.flow.human_feedback as human_feedback

View File

@@ -1,11 +1,13 @@
"""Tests for lock_store.
We verify our own logic: the _redis_available guard and which portalocker
backend is selected. We trust portalocker to handle actual locking mechanics.
We verify our own logic: the _redis_available guard, which portalocker
backend is selected, and that a custom backend can be plugged in. We trust
portalocker to handle actual locking mechanics.
"""
from __future__ import annotations
from contextlib import contextmanager
import sys
from unittest import mock
@@ -20,6 +22,14 @@ def no_redis_url(monkeypatch):
monkeypatch.setattr(lock_store, "_REDIS_URL", None)
@pytest.fixture(autouse=True)
def reset_backend():
"""Ensure a custom backend never leaks across tests."""
lock_store.set_lock_backend(None)
yield
lock_store.set_lock_backend(None)
# _redis_available
@@ -64,3 +74,40 @@ def test_uses_redis_lock_when_redis_available(monkeypatch):
kwargs = mock_redis_lock.call_args.kwargs
assert kwargs["channel"].startswith("crewai:")
assert kwargs["connection"] is fake_conn
# custom backend
def test_custom_backend_is_used():
calls = []
@contextmanager
def fake_backend(name, *, timeout):
calls.append((name, timeout))
yield
lock_store.set_lock_backend(fake_backend)
# The default file/redis path must not be touched when overridden.
with mock.patch("portalocker.Lock") as mock_lock:
with lock("custom_test", timeout=5):
pass
mock_lock.assert_not_called()
assert calls == [("custom_test", 5)]
def test_clearing_backend_restores_default():
@contextmanager
def fake_backend(name, *, timeout):
yield
lock_store.set_lock_backend(fake_backend)
lock_store.set_lock_backend(None)
with mock.patch("portalocker.Lock") as mock_lock:
with lock("after_clear"):
pass
mock_lock.assert_called_once()