feat(conversational): implement route permissions and access control

- Added  to  for defining access control on routes.
- Introduced  decorator to specify permissions needed for route access.
- Enhanced  with methods to check route access based on user permissions.
- Updated documentation across multiple languages to reflect changes in route permissions and usage examples.
- Added tests to verify permission handling and redirection to  for unauthorized access.
This commit is contained in:
Lorenze Jay
2026-06-06 15:09:07 -07:00
parent 913a3abead
commit 9b251b1de4
9 changed files with 462 additions and 14 deletions

View File

@@ -316,6 +316,8 @@ RouterConfig(
route_descriptions={
"INTERNET_SEARCH": "تجاوز الـ docstring لهذا المسار فقط.",
},
route_permissions={"INTERNET_SEARCH": "web_search"}, # تجاوز اختياري
permission_denied_route="permission_denied", # الافتراضي
default_intent="converse", # يُستخدم عند فشل LLM أو غيابه
fallback_intent="converse", # يُستخدم عندما يعيد LLM مساراً غير صالح
intent_field="intent",
@@ -350,6 +352,58 @@ Routes:
`RouterConfig.prompt` مخصص لـ **تأطير النطاق** (شخصية المساعد، قواعد العمل، النبرة). فهرس المسارات يُبنى تلقائياً — لا تُدرج المسارات في `prompt`؛ سيختل التزامن لحظة إضافة معالج جديد.
### أذونات المسارات
استخدم `@listen(..., required_permissions=[...])` لحماية المسارات بعد أن يختار الموجّه مساراً صالحاً. يتحقق المفوّض الافتراضي من مجموعة `permissions` في `self.state`؛ وتُعاد الجولات المرفوضة إلى `permission_denied`.
```python
from crewai import Flow
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
RouterConfig,
)
from crewai.flow import listen, start
from pydantic import Field
class AppState(ConversationState):
permissions: set[str] = Field(default_factory=set)
@ConversationConfig(
router=RouterConfig(
prompt="أرسل بحث الويب إلى INTERNET_SEARCH وطلبات التدقيق إلى ADMIN_REPORT.",
),
)
class SupportFlow(Flow[AppState]):
conversational = True
@start()
def load_permissions(self) -> None:
if self.state.session_ready:
return
self.state.permissions = {"web_search"}
self.state.session_ready = True
@listen("INTERNET_SEARCH", required_permissions=["web_search"])
def internet_search(self) -> str:
...
@listen("ADMIN_REPORT", required_permissions=["admin"])
def admin_report(self) -> str:
...
@listen("permission_denied")
def deny(self) -> str:
reply = "ليست لديك صلاحية لتنفيذ هذا الإجراء."
self.append_assistant_message(reply)
return reply
```
استخدم `RouterConfig.route_permissions` عندما تحتاج إلى تكوين الأذونات بعيداً عن المعالجات. للتفويض المخصص، تجاوز `can_access_route(required_permissions)` بدلاً من تجاوز `route_turn`.
### المسارات المدمجة
| المسار | المعالج | الغرض |

View File

@@ -333,6 +333,8 @@ router_config = RouterConfig(
route_descriptions={
"INTERNET_SEARCH": "Override the docstring for this one route.",
},
route_permissions={"INTERNET_SEARCH": "web_search"}, # optional override
permission_denied_route="permission_denied", # default
default_intent="converse", # used when LLM call fails or no LLM available
fallback_intent="converse", # used when LLM returns an invalid route
intent_field="intent",
@@ -370,6 +372,63 @@ Routes:
`RouterConfig.prompt` is for **domain framing** (assistant persona, business rules, voice). The route catalog is auto-built — don't list routes in `prompt`; they'll drift the moment you add a handler.
### Route permissions
Use `@listen(..., required_permissions=[...])` to gate protected routes after the router
selects a valid route. The default authorizer checks for a `permissions`
collection on `self.state`; denied turns redirect to `permission_denied`.
```python
from crewai import Flow
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
RouterConfig,
)
from crewai.flow import listen, start
from pydantic import Field
class AppState(ConversationState):
permissions: set[str] = Field(default_factory=set)
@ConversationConfig(
router=RouterConfig(
prompt="Send web lookups to INTERNET_SEARCH and audits to ADMIN_REPORT.",
),
)
class SupportFlow(Flow[AppState]):
conversational = True
@start()
def load_permissions(self) -> None:
if self.state.session_ready:
return
self.state.permissions = {"web_search"}
self.state.session_ready = True
@listen("INTERNET_SEARCH", required_permissions=["web_search"])
def internet_search(self) -> str:
...
@listen("ADMIN_REPORT", required_permissions=["admin"])
def admin_report(self) -> str:
...
@listen("permission_denied")
def deny(self) -> str:
reply = "You do not have permission for that action."
self.append_assistant_message(reply)
return reply
```
Use `RouterConfig.route_permissions` when permissions need to be configured
away from the handlers. For custom auth, override
`can_access_route(required_permissions)` instead of overriding
`route_turn`.
### Built-in routes
| Route | Handler | Purpose |

View File

@@ -317,6 +317,8 @@ RouterConfig(
route_descriptions={
"INTERNET_SEARCH": "이 라우트만 docstring 대신 사용할 설명.",
},
route_permissions={"INTERNET_SEARCH": "web_search"}, # 선택적 오버라이드
permission_denied_route="permission_denied", # 기본값
default_intent="converse", # LLM 호출 실패 또는 LLM 없음일 때 사용
fallback_intent="converse", # LLM이 잘못된 라우트를 반환할 때 사용
intent_field="intent",
@@ -351,6 +353,58 @@ Routes:
`RouterConfig.prompt`는 **도메인 프레이밍** (어시스턴트 페르소나, 비즈니스 규칙, 톤)을 위한 자리입니다. 라우트 카탈로그는 자동 생성되니 `prompt` 안에 라우트 목록을 넣지 마세요. 핸들러를 추가하는 순간 동기화가 깨집니다.
### 라우트 권한
router가 유효한 라우트를 선택한 뒤 보호된 라우트를 게이트하려면 `@listen(..., required_permissions=[...])`를 사용하세요. 기본 authorizer는 `self.state`의 `permissions` 컬렉션을 확인합니다. 거부된 턴은 `permission_denied`로 리다이렉트됩니다.
```python
from crewai import Flow
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
RouterConfig,
)
from crewai.flow import listen, start
from pydantic import Field
class AppState(ConversationState):
permissions: set[str] = Field(default_factory=set)
@ConversationConfig(
router=RouterConfig(
prompt="웹 조회는 INTERNET_SEARCH로, 감사 요청은 ADMIN_REPORT로 보내세요.",
),
)
class SupportFlow(Flow[AppState]):
conversational = True
@start()
def load_permissions(self) -> None:
if self.state.session_ready:
return
self.state.permissions = {"web_search"}
self.state.session_ready = True
@listen("INTERNET_SEARCH", required_permissions=["web_search"])
def internet_search(self) -> str:
...
@listen("ADMIN_REPORT", required_permissions=["admin"])
def admin_report(self) -> str:
...
@listen("permission_denied")
def deny(self) -> str:
reply = "이 작업을 수행할 권한이 없습니다."
self.append_assistant_message(reply)
return reply
```
권한을 핸들러 밖에서 구성해야 한다면 `RouterConfig.route_permissions`를 사용하세요. 커스텀 인증에는 `route_turn`을 오버라이드하는 대신 `can_access_route(required_permissions)`를 오버라이드하세요.
### 빌트인 라우트
| 라우트 | 핸들러 | 목적 |

View File

@@ -318,6 +318,8 @@ RouterConfig(
route_descriptions={
"INTERNET_SEARCH": "Sobrescreve a docstring só desta rota.",
},
route_permissions={"INTERNET_SEARCH": "web_search"}, # override opcional
permission_denied_route="permission_denied", # padrão
default_intent="converse", # usado quando a chamada ao LLM falha ou não há LLM
fallback_intent="converse", # usado quando o LLM retorna rota inválida
intent_field="intent",
@@ -352,6 +354,58 @@ Routes:
`RouterConfig.prompt` é para **enquadramento de domínio** (persona do assistente, regras de negócio, voz). O catálogo de rotas é auto-gerado — não liste rotas em `prompt`; elas vão sair de sincronia assim que você adicionar um handler.
### Permissões de rota
Use `@listen(..., required_permissions=[...])` para proteger rotas depois que o router selecionar uma rota válida. O autorizador padrão verifica uma coleção `permissions` em `self.state`; turnos negados redirecionam para `permission_denied`.
```python
from crewai import Flow
from crewai.experimental.conversational import (
ConversationConfig,
ConversationState,
RouterConfig,
)
from crewai.flow import listen, start
from pydantic import Field
class AppState(ConversationState):
permissions: set[str] = Field(default_factory=set)
@ConversationConfig(
router=RouterConfig(
prompt="Envie buscas web para INTERNET_SEARCH e auditorias para ADMIN_REPORT.",
),
)
class SupportFlow(Flow[AppState]):
conversational = True
@start()
def load_permissions(self) -> None:
if self.state.session_ready:
return
self.state.permissions = {"web_search"}
self.state.session_ready = True
@listen("INTERNET_SEARCH", required_permissions=["web_search"])
def internet_search(self) -> str:
...
@listen("ADMIN_REPORT", required_permissions=["admin"])
def admin_report(self) -> str:
...
@listen("permission_denied")
def deny(self) -> str:
reply = "Você não tem permissão para essa ação."
self.append_assistant_message(reply)
return reply
```
Use `RouterConfig.route_permissions` quando as permissões precisarem ser configuradas fora dos handlers. Para autorização customizada, sobrescreva `can_access_route(required_permissions)` em vez de sobrescrever `route_turn`.
### Rotas embutidas
| Rota | Handler | Propósito |

View File

@@ -54,8 +54,11 @@ class RouterConfig:
``route_descriptions`` overrides the per-route descriptions used to build
the router LLM's "available routes" catalog. Routes without an entry fall
back to the handler's docstring first line (or, for built-in routes, the
framework's canned description). ``prompt`` is reserved for domain
policy/voice, not the route catalog — that's auto-built.
framework's canned description). ``route_permissions`` maps protected route
labels to one or more permission names; alternatively, pass
``required_permissions`` to ``@listen(...)``. Denied turns redirect to
``permission_denied_route``. ``prompt`` is reserved for domain policy/voice,
not the route catalog — that's auto-built.
"""
prompt: str | None = None
@@ -63,6 +66,8 @@ class RouterConfig:
llm: Any | None = None
routes: Sequence[str] | None = None
route_descriptions: dict[str, str] | None = None
route_permissions: dict[str, str | Sequence[str]] | None = None
permission_denied_route: str = "permission_denied"
default_intent: str | None = "converse"
fallback_intent: str | None = "converse"
intent_field: str = "intent"

View File

@@ -358,8 +358,8 @@ class _ConversationalMixin:
Pass an explicit ``RouterConfig`` only to override the routing prompt,
supply per-route descriptions, or change the default/fallback intent.
Override this method to bypass the LLM router entirely (e.g.,
permission gates before the LLM decision).
Use ``RouterConfig.route_permissions`` for route-level access control.
Override this method to bypass the LLM router entirely.
"""
config = self._conversation_config
if config is None:
@@ -539,6 +539,24 @@ class _ConversationalMixin:
cast("Flow[Any]", self), text, outcomes=outcomes, llm=llm
)
def can_access_route(
self,
required_permissions: Sequence[str],
) -> bool:
"""Return whether the current turn may use a protected route.
By default, this checks for a ``permissions`` collection on state. Apps
with richer auth models can override this method instead of overriding
``route_turn``.
"""
state_permissions = getattr(cast(Any, self.state), "permissions", None)
if state_permissions is None and isinstance(self.state, dict):
state_permissions = self.state.get("permissions")
if state_permissions is None:
return False
return all(permission in state_permissions for permission in required_permissions)
def classify_intent(
self,
text: str,
@@ -659,8 +677,42 @@ class _ConversationalMixin:
if valid_labels and intent not in valid_labels:
return router_config.fallback_intent or router_config.default_intent
if not self._can_access_router_intent(intent, router_config):
return router_config.permission_denied_route
return intent
def _can_access_router_intent(
self,
intent: str,
router_config: RouterConfig,
) -> bool:
required_permissions = self._route_required_permissions(
intent,
router_config,
)
if not required_permissions:
return True
return self.can_access_route(required_permissions)
def _route_required_permissions(
self,
route: str,
router_config: RouterConfig,
) -> tuple[str, ...]:
permissions = (router_config.route_permissions or {}).get(route)
if permissions is None:
handler_name = self._listener_methods_by_route().get(route)
if handler_name is None:
return ()
handler = getattr(type(self), handler_name, None)
permissions = getattr(handler, "__route_permissions__", None)
if permissions is None:
return ()
if isinstance(permissions, str):
return (permissions,)
return tuple(permissions)
def _default_router_llm(self, router_config: RouterConfig) -> Any | None:
config = self._conversation_config
return (
@@ -732,13 +784,7 @@ class _ConversationalMixin:
self,
router_config: RouterConfig | None,
) -> dict[str, str]:
label_to_method: dict[str, str] = {}
for listener_name, condition in self._listeners.items():
if isinstance(condition, tuple):
_, trigger_labels = condition
for trigger_label in trigger_labels:
label_to_method.setdefault(str(trigger_label), str(listener_name))
label_to_method = self._listener_methods_by_route()
routes = self._effective_routes(router_config)
overrides = (
router_config.route_descriptions
@@ -794,6 +840,15 @@ class _ConversationalMixin:
labels.update(str(method) for method in methods)
return labels
def _listener_methods_by_route(self) -> dict[str, str]:
label_to_method: dict[str, str] = {}
for listener_name, condition in self._listeners.items():
if isinstance(condition, tuple):
_, trigger_labels = condition
for trigger_label in trigger_labels:
label_to_method.setdefault(str(trigger_label), str(listener_name))
return label_to_method
def _effective_routes(self, router_config: RouterConfig | None = None) -> set[str]:
custom_routes = set(router_config.routes or ()) if router_config else set()
if not custom_routes:
@@ -802,6 +857,8 @@ class _ConversationalMixin:
- set(self.builtin_routes)
- set(self.internal_routes)
)
if router_config is not None:
custom_routes.discard(router_config.permission_denied_route)
return custom_routes | set(self.builtin_routes)
def _default_conversation_llm(self) -> Any | None:

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from collections.abc import Callable
from collections.abc import Callable, Sequence
from typing import cast
from crewai.flow.dsl._conditions import _definition_condition_from_runtime
@@ -15,7 +15,11 @@ from crewai.flow.flow_definition import FlowMethodDefinition
from crewai.flow.flow_wrappers import ListenMethod
def listen(condition: FlowTrigger) -> FlowMethodDecorator:
def listen(
condition: FlowTrigger,
*,
required_permissions: str | Sequence[str] | None = None,
) -> FlowMethodDecorator:
"""Creates a listener that executes when specified conditions are met.
This decorator sets up a method to execute in response to other method
@@ -25,6 +29,8 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
Args:
condition: Route label, method reference, or condition returned by
or_() / and_() that triggers the listener.
required_permissions: Optional permission name or names required to
access this route in conversational flows.
Returns:
A flow method decorator that preserves the decorated method's static signature.
@@ -50,6 +56,15 @@ def listen(condition: FlowTrigger) -> FlowMethodDecorator:
FlowMethodDefinition(listen=_definition_condition_from_runtime(condition)),
)
_set_trigger_metadata(wrapper, condition)
if required_permissions is not None:
permissions = (
(required_permissions,)
if isinstance(required_permissions, str)
else tuple(required_permissions)
)
if not permissions:
raise ValueError("required_permissions must not be empty")
wrapper.__route_permissions__ = permissions
return wrapper
return cast(FlowMethodDecorator, decorator)

View File

@@ -87,6 +87,7 @@ class FlowMethod(Generic[P, R]):
"__router_emit__",
"__human_feedback_config__",
"__conversational_only__", # gates registration on Flow.conversational
"__route_permissions__", # conversational route access control
"__flow_persistence_config__",
"__flow_method_definition__",
"_human_feedback_llm", # Live LLM object for HITL resume

View File

@@ -2,11 +2,12 @@
from __future__ import annotations
from collections.abc import Sequence
from typing import Any, Literal
from unittest.mock import MagicMock, patch
from uuid import uuid4
from pydantic import BaseModel
from pydantic import BaseModel, Field
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
@@ -858,6 +859,154 @@ class TestConversationalFlow:
assert result == "researched"
assert flow.state.messages[-1].content == "researched"
def test_router_route_permissions_redirect_denied_intent(self) -> None:
class Route(BaseModel):
intent: Literal["research", "permission_denied", "converse", "end"]
class PermissionState(ConversationState):
permissions: set[str] = Field(default_factory=set)
router_llm = MagicMock()
router_llm.call.return_value = Route(intent="research")
@ConversationConfig(
router=RouterConfig(
response_format=Route,
llm=router_llm,
routes=["research"],
route_permissions={"research": "web_search"},
),
)
class PermissionFlow(Flow[PermissionState]):
conversational = True
@listen("research")
def run_research(self) -> str:
self.append_assistant_message("researched")
return "researched"
@listen("permission_denied")
def deny(self) -> str:
self.append_assistant_message("denied")
return "denied"
flow = PermissionFlow()
result = flow.handle_turn("research CrewAI")
assert result == "denied"
assert flow.state.last_intent == "permission_denied"
assert flow.state.messages[-1].content == "denied"
def test_router_route_permissions_allow_when_state_has_permission(self) -> None:
class Route(BaseModel):
intent: Literal["research", "permission_denied", "converse", "end"]
class PermissionState(ConversationState):
permissions: set[str] = Field(default_factory=lambda: {"web_search"})
router_llm = MagicMock()
router_llm.call.return_value = Route(intent="research")
@ConversationConfig(
router=RouterConfig(
response_format=Route,
llm=router_llm,
routes=["research"],
route_permissions={"research": "web_search"},
),
)
class PermissionFlow(Flow[PermissionState]):
conversational = True
@listen("research")
def run_research(self) -> str:
self.append_assistant_message("researched")
return "researched"
@listen("permission_denied")
def deny(self) -> str:
self.append_assistant_message("denied")
return "denied"
flow = PermissionFlow()
result = flow.handle_turn("research CrewAI")
assert result == "researched"
assert flow.state.last_intent == "research"
assert flow.state.messages[-1].content == "researched"
def test_router_route_permissions_can_use_custom_authorizer(self) -> None:
class Route(BaseModel):
intent: Literal["admin", "permission_denied", "converse", "end"]
router_llm = MagicMock()
router_llm.call.return_value = Route(intent="admin")
@ConversationConfig(
router=RouterConfig(
response_format=Route,
llm=router_llm,
routes=["admin"],
route_permissions={"admin": ("audit", "admin")},
),
)
class PermissionFlow(ConversationalFlow):
def can_access_route(
self,
required_permissions: Sequence[str],
) -> bool:
assert tuple(required_permissions) == ("audit", "admin")
assert self.state.current_user_message == "show audit"
return True
@listen("admin")
def run_admin(self) -> str:
self.append_assistant_message("admin report")
return "admin report"
flow = PermissionFlow()
result = flow.handle_turn("show audit")
assert result == "admin report"
assert flow.state.last_intent == "admin"
def test_router_infers_permissions_from_listener_metadata(self) -> None:
class PermissionState(ConversationState):
permissions: set[str] = Field(default_factory=set)
router_llm = MagicMock()
@ConversationConfig(
router=RouterConfig(
llm=router_llm,
),
)
class PermissionFlow(Flow[PermissionState]):
conversational = True
@listen("research", required_permissions=["web_search"])
def run_research(self) -> str:
"""Fresh web research."""
self.append_assistant_message("researched")
return "researched"
@listen("permission_denied")
def deny(self) -> str:
self.append_assistant_message("denied")
return "denied"
flow = PermissionFlow()
response_format = flow._router_response_format(flow.conversational_config.router)
assert "permission_denied" not in response_format.model_fields[
"intent"
].description
router_llm.call.return_value = response_format(intent="research")
result = flow.handle_turn("research CrewAI")
assert result == "denied"
assert flow.state.last_intent == "permission_denied"
def test_conversational_flow_auto_defaults_to_conversation_state(self) -> None:
"""``class C(Flow): conversational = True`` resolves state to ConversationState.