mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-16 19:48:30 +00:00
Compare commits
2 Commits
gl/feat/de
...
devin/1768
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
519f8ce0eb | ||
|
|
802ca92e42 |
@@ -574,10 +574,6 @@ When you run this Flow, the output will change based on the random boolean value
|
||||
|
||||
### Human in the Loop (human feedback)
|
||||
|
||||
<Note>
|
||||
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**.
|
||||
</Note>
|
||||
|
||||
The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
|
||||
|
||||
```python Code
|
||||
|
||||
@@ -91,10 +91,6 @@ The `A2AConfig` class accepts the following parameters:
|
||||
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
|
||||
</ParamField>
|
||||
|
||||
<ParamField path="transport_protocol" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
|
||||
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
|
||||
</ParamField>
|
||||
|
||||
## Authentication
|
||||
|
||||
For A2A agents that require authentication, use one of the provided auth schemes:
|
||||
|
||||
@@ -7,10 +7,6 @@ mode: "wide"
|
||||
|
||||
## Overview
|
||||
|
||||
<Note>
|
||||
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**. Make sure to update your installation before using this feature.
|
||||
</Note>
|
||||
|
||||
The `@human_feedback` decorator enables human-in-the-loop (HITL) workflows directly within CrewAI Flows. It allows you to pause flow execution, present output to a human for review, collect their feedback, and optionally route to different listeners based on the feedback outcome.
|
||||
|
||||
This is particularly valuable for:
|
||||
|
||||
@@ -11,10 +11,10 @@ Human-in-the-Loop (HITL) is a powerful approach that combines artificial intelli
|
||||
|
||||
CrewAI offers two main approaches for implementing human-in-the-loop workflows:
|
||||
|
||||
| Approach | Best For | Integration | Version |
|
||||
|----------|----------|-------------|---------|
|
||||
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) | **1.8.0+** |
|
||||
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide | - |
|
||||
| Approach | Best For | Integration |
|
||||
|----------|----------|-------------|
|
||||
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) |
|
||||
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide |
|
||||
|
||||
<Tip>
|
||||
If you're building flows and want to add human review steps with routing based on feedback, check out the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide for the `@human_feedback` decorator.
|
||||
|
||||
@@ -567,10 +567,6 @@ Fourth method running
|
||||
|
||||
### Human in the Loop (인간 피드백)
|
||||
|
||||
<Note>
|
||||
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다.
|
||||
</Note>
|
||||
|
||||
`@human_feedback` 데코레이터는 인간의 피드백을 수집하기 위해 플로우 실행을 일시 중지하는 human-in-the-loop 워크플로우를 가능하게 합니다. 이는 승인 게이트, 품질 검토, 인간의 판단이 필요한 결정 지점에 유용합니다.
|
||||
|
||||
```python Code
|
||||
|
||||
@@ -7,10 +7,6 @@ mode: "wide"
|
||||
|
||||
## 개요
|
||||
|
||||
<Note>
|
||||
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다. 이 기능을 사용하기 전에 설치를 업데이트하세요.
|
||||
</Note>
|
||||
|
||||
`@human_feedback` 데코레이터는 CrewAI Flow 내에서 직접 human-in-the-loop(HITL) 워크플로우를 가능하게 합니다. Flow 실행을 일시 중지하고, 인간에게 검토를 위해 출력을 제시하고, 피드백을 수집하고, 선택적으로 피드백 결과에 따라 다른 리스너로 라우팅할 수 있습니다.
|
||||
|
||||
이는 특히 다음과 같은 경우에 유용합니다:
|
||||
|
||||
@@ -5,22 +5,9 @@ icon: "user-check"
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. CrewAI는 필요에 따라 HITL을 구현하는 여러 가지 방법을 제공합니다.
|
||||
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. 이 가이드에서는 CrewAI 내에서 HITL을 구현하는 방법을 안내합니다.
|
||||
|
||||
## HITL 접근 방식 선택
|
||||
|
||||
CrewAI는 human-in-the-loop 워크플로우를 구현하기 위한 두 가지 주요 접근 방식을 제공합니다:
|
||||
|
||||
| 접근 방식 | 적합한 용도 | 통합 | 버전 |
|
||||
|----------|----------|-------------|---------|
|
||||
| **Flow 기반** (`@human_feedback` 데코레이터) | 로컬 개발, 콘솔 기반 검토, 동기식 워크플로우 | [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) | **1.8.0+** |
|
||||
| **Webhook 기반** (Enterprise) | 프로덕션 배포, 비동기 워크플로우, 외부 통합 (Slack, Teams 등) | 이 가이드 | - |
|
||||
|
||||
<Tip>
|
||||
Flow를 구축하면서 피드백을 기반으로 라우팅하는 인간 검토 단계를 추가하려면 `@human_feedback` 데코레이터에 대한 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) 가이드를 참조하세요.
|
||||
</Tip>
|
||||
|
||||
## Webhook 기반 HITL 워크플로우 설정
|
||||
## HITL 워크플로우 설정
|
||||
|
||||
<Steps>
|
||||
<Step title="작업 구성">
|
||||
|
||||
@@ -309,10 +309,6 @@ Ao executar esse Flow, a saída será diferente dependendo do valor booleano ale
|
||||
|
||||
### Human in the Loop (feedback humano)
|
||||
|
||||
<Note>
|
||||
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**.
|
||||
</Note>
|
||||
|
||||
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop, pausando a execução do flow para coletar feedback de um humano. Isso é útil para portões de aprovação, revisão de qualidade e pontos de decisão que requerem julgamento humano.
|
||||
|
||||
```python Code
|
||||
|
||||
@@ -7,10 +7,6 @@ mode: "wide"
|
||||
|
||||
## Visão Geral
|
||||
|
||||
<Note>
|
||||
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**. Certifique-se de atualizar sua instalação antes de usar este recurso.
|
||||
</Note>
|
||||
|
||||
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback.
|
||||
|
||||
Isso é particularmente valioso para:
|
||||
|
||||
@@ -5,22 +5,9 @@ icon: "user-check"
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. CrewAI oferece várias maneiras de implementar HITL dependendo das suas necessidades.
|
||||
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. Este guia mostra como implementar HITL dentro da CrewAI.
|
||||
|
||||
## Escolhendo Sua Abordagem HITL
|
||||
|
||||
CrewAI oferece duas abordagens principais para implementar workflows human-in-the-loop:
|
||||
|
||||
| Abordagem | Melhor Para | Integração | Versão |
|
||||
|----------|----------|-------------|---------|
|
||||
| **Baseada em Flow** (decorador `@human_feedback`) | Desenvolvimento local, revisão via console, workflows síncronos | [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) | **1.8.0+** |
|
||||
| **Baseada em Webhook** (Enterprise) | Deployments em produção, workflows assíncronos, integrações externas (Slack, Teams, etc.) | Este guia | - |
|
||||
|
||||
<Tip>
|
||||
Se você está construindo flows e deseja adicionar etapas de revisão humana com roteamento baseado em feedback, confira o guia [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) para o decorador `@human_feedback`.
|
||||
</Tip>
|
||||
|
||||
## Configurando Workflows HITL Baseados em Webhook
|
||||
## Configurando Workflows HITL
|
||||
|
||||
<Steps>
|
||||
<Step title="Configure sua Tarefa">
|
||||
|
||||
@@ -5,7 +5,7 @@ This module is separate from experimental.a2a to avoid circular imports.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Annotated, Any, ClassVar, Literal
|
||||
from typing import Annotated, Any, ClassVar
|
||||
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
@@ -53,7 +53,6 @@ class A2AConfig(BaseModel):
|
||||
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
|
||||
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
|
||||
updates: Update mechanism config.
|
||||
transport_protocol: A2A transport protocol (grpc, jsonrpc, http+json).
|
||||
"""
|
||||
|
||||
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
|
||||
@@ -83,7 +82,3 @@ class A2AConfig(BaseModel):
|
||||
default_factory=_get_default_update_config,
|
||||
description="Update mechanism config",
|
||||
)
|
||||
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
|
||||
default="JSONRPC",
|
||||
description="Specified mode of A2A transport protocol",
|
||||
)
|
||||
|
||||
@@ -7,7 +7,7 @@ from collections.abc import AsyncIterator, MutableMapping
|
||||
from contextlib import asynccontextmanager
|
||||
from functools import lru_cache
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
from typing import TYPE_CHECKING, Any
|
||||
import uuid
|
||||
|
||||
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
|
||||
@@ -18,6 +18,7 @@ from a2a.types import (
|
||||
PushNotificationConfig as A2APushNotificationConfig,
|
||||
Role,
|
||||
TextPart,
|
||||
TransportProtocol,
|
||||
)
|
||||
from aiocache import cached # type: ignore[import-untyped]
|
||||
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
|
||||
@@ -258,7 +259,6 @@ async def _afetch_agent_card_impl(
|
||||
|
||||
def execute_a2a_delegation(
|
||||
endpoint: str,
|
||||
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
|
||||
auth: AuthScheme | None,
|
||||
timeout: int,
|
||||
task_description: str,
|
||||
@@ -282,23 +282,6 @@ def execute_a2a_delegation(
|
||||
use aexecute_a2a_delegation directly.
|
||||
|
||||
Args:
|
||||
endpoint: A2A agent endpoint URL (AgentCard URL)
|
||||
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
|
||||
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
|
||||
timeout: Request timeout in seconds
|
||||
task_description: The task to delegate
|
||||
context: Optional context information
|
||||
context_id: Context ID for correlating messages/tasks
|
||||
task_id: Specific task identifier
|
||||
reference_task_ids: List of related task IDs
|
||||
metadata: Additional metadata (external_id, request_id, etc.)
|
||||
extensions: Protocol extensions for custom fields
|
||||
conversation_history: Previous Message objects from conversation
|
||||
agent_id: Agent identifier for logging
|
||||
agent_role: Role of the CrewAI agent delegating the task
|
||||
agent_branch: Optional agent tree branch for logging
|
||||
response_model: Optional Pydantic model for structured outputs
|
||||
turn_number: Optional turn number for multi-turn conversations
|
||||
endpoint: A2A agent endpoint URL.
|
||||
auth: Optional AuthScheme for authentication.
|
||||
timeout: Request timeout in seconds.
|
||||
@@ -340,7 +323,6 @@ def execute_a2a_delegation(
|
||||
agent_role=agent_role,
|
||||
agent_branch=agent_branch,
|
||||
response_model=response_model,
|
||||
transport_protocol=transport_protocol,
|
||||
turn_number=turn_number,
|
||||
updates=updates,
|
||||
)
|
||||
@@ -351,7 +333,6 @@ def execute_a2a_delegation(
|
||||
|
||||
async def aexecute_a2a_delegation(
|
||||
endpoint: str,
|
||||
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
|
||||
auth: AuthScheme | None,
|
||||
timeout: int,
|
||||
task_description: str,
|
||||
@@ -375,23 +356,6 @@ async def aexecute_a2a_delegation(
|
||||
in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()).
|
||||
|
||||
Args:
|
||||
endpoint: A2A agent endpoint URL
|
||||
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
|
||||
auth: Optional AuthScheme for authentication
|
||||
timeout: Request timeout in seconds
|
||||
task_description: Task to delegate
|
||||
context: Optional context
|
||||
context_id: Context ID for correlation
|
||||
task_id: Specific task identifier
|
||||
reference_task_ids: Related task IDs
|
||||
metadata: Additional metadata
|
||||
extensions: Protocol extensions
|
||||
conversation_history: Previous Message objects
|
||||
turn_number: Current turn number
|
||||
agent_branch: Agent tree branch for logging
|
||||
agent_id: Agent identifier for logging
|
||||
agent_role: Agent role for logging
|
||||
response_model: Optional Pydantic model for structured outputs
|
||||
endpoint: A2A agent endpoint URL.
|
||||
auth: Optional AuthScheme for authentication.
|
||||
timeout: Request timeout in seconds.
|
||||
@@ -450,7 +414,6 @@ async def aexecute_a2a_delegation(
|
||||
agent_role=agent_role,
|
||||
response_model=response_model,
|
||||
updates=updates,
|
||||
transport_protocol=transport_protocol,
|
||||
)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
@@ -468,7 +431,6 @@ async def aexecute_a2a_delegation(
|
||||
|
||||
async def _aexecute_a2a_delegation_impl(
|
||||
endpoint: str,
|
||||
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
|
||||
auth: AuthScheme | None,
|
||||
timeout: int,
|
||||
task_description: str,
|
||||
@@ -562,6 +524,7 @@ async def _aexecute_a2a_delegation_impl(
|
||||
extensions=extensions,
|
||||
)
|
||||
|
||||
transport_protocol = TransportProtocol("JSONRPC")
|
||||
new_messages: list[Message] = [*conversation_history, message]
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
@@ -633,7 +596,7 @@ async def _aexecute_a2a_delegation_impl(
|
||||
@asynccontextmanager
|
||||
async def _create_a2a_client(
|
||||
agent_card: AgentCard,
|
||||
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
|
||||
transport_protocol: TransportProtocol,
|
||||
timeout: int,
|
||||
headers: MutableMapping[str, str],
|
||||
streaming: bool,
|
||||
@@ -677,7 +640,7 @@ async def _create_a2a_client(
|
||||
|
||||
config = ClientConfig(
|
||||
httpx_client=httpx_client,
|
||||
supported_transports=[transport_protocol],
|
||||
supported_transports=[str(transport_protocol.value)],
|
||||
streaming=streaming and not use_polling,
|
||||
polling=use_polling,
|
||||
accepted_output_modes=["application/json"],
|
||||
|
||||
@@ -771,7 +771,6 @@ def _delegate_to_a2a(
|
||||
response_model=agent_config.response_model,
|
||||
turn_number=turn_num + 1,
|
||||
updates=agent_config.updates,
|
||||
transport_protocol=agent_config.transport_protocol,
|
||||
)
|
||||
|
||||
conversation_history = a2a_result.get("history", [])
|
||||
@@ -1086,7 +1085,6 @@ async def _adelegate_to_a2a(
|
||||
agent_branch=agent_branch,
|
||||
response_model=agent_config.response_model,
|
||||
turn_number=turn_num + 1,
|
||||
transport_protocol=agent_config.transport_protocol,
|
||||
updates=agent_config.updates,
|
||||
)
|
||||
|
||||
|
||||
@@ -209,9 +209,10 @@ class EventListener(BaseEventListener):
|
||||
@crewai_event_bus.on(TaskCompletedEvent)
|
||||
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
|
||||
# Handle telemetry
|
||||
span = self.execution_spans.pop(source, None)
|
||||
span = self.execution_spans.get(source)
|
||||
if span:
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.execution_spans[source] = None
|
||||
|
||||
# Pass task name if it exists
|
||||
task_name = get_task_name(source)
|
||||
@@ -221,10 +222,11 @@ class EventListener(BaseEventListener):
|
||||
|
||||
@crewai_event_bus.on(TaskFailedEvent)
|
||||
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
|
||||
span = self.execution_spans.pop(source, None)
|
||||
span = self.execution_spans.get(source)
|
||||
if span:
|
||||
if source.agent and source.agent.crew:
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.execution_spans[source] = None
|
||||
|
||||
# Pass task name if it exists
|
||||
task_name = get_task_name(source)
|
||||
|
||||
@@ -1,370 +0,0 @@
|
||||
"""Lazy loader for Python packages.
|
||||
|
||||
Makes it easy to load subpackages and functions on demand.
|
||||
|
||||
Pulled from https://github.com/scientific-python/lazy-loader/blob/main/src/lazy_loader/__init__.py,
|
||||
modernized a little.
|
||||
"""
|
||||
|
||||
import ast
|
||||
from collections.abc import Callable, Sequence
|
||||
from dataclasses import dataclass, field
|
||||
import importlib
|
||||
import importlib.metadata
|
||||
import importlib.util
|
||||
import inspect
|
||||
import os
|
||||
from pathlib import Path
|
||||
import sys
|
||||
import threading
|
||||
import types
|
||||
from typing import Any, NoReturn
|
||||
import warnings
|
||||
|
||||
import packaging.requirements
|
||||
|
||||
|
||||
_threadlock = threading.Lock()
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class _FrameData:
|
||||
"""Captured stack frame information for delayed error reporting."""
|
||||
|
||||
filename: str
|
||||
lineno: int
|
||||
function: str
|
||||
code_context: Sequence[str] | None
|
||||
|
||||
|
||||
def attach(
|
||||
package_name: str,
|
||||
submodules: set[str] | None = None,
|
||||
submod_attrs: dict[str, list[str]] | None = None,
|
||||
) -> tuple[Callable[[str], Any], Callable[[], list[str]], list[str]]:
|
||||
"""Attach lazily loaded submodules, functions, or other attributes.
|
||||
|
||||
Replaces a package's `__getattr__`, `__dir__`, and `__all__` such that
|
||||
imports work normally but occur upon first use.
|
||||
|
||||
Example:
|
||||
__getattr__, __dir__, __all__ = lazy.attach(
|
||||
__name__, ["mysubmodule"], {"foo": ["someattr"]}
|
||||
)
|
||||
|
||||
Args:
|
||||
package_name: The package name, typically ``__name__``.
|
||||
submodules: Set of submodule names to attach.
|
||||
submod_attrs: Mapping of submodule names to lists of attributes.
|
||||
These attributes are imported as they are used.
|
||||
|
||||
Returns:
|
||||
A tuple of (__getattr__, __dir__, __all__) to assign in the package.
|
||||
"""
|
||||
submod_attrs = submod_attrs or {}
|
||||
submodules = set(submodules) if submodules else set()
|
||||
attr_to_modules = {
|
||||
attr: mod for mod, attrs in submod_attrs.items() for attr in attrs
|
||||
}
|
||||
__all__ = sorted(submodules | attr_to_modules.keys())
|
||||
|
||||
def __getattr__(name: str) -> Any: # noqa: N807
|
||||
if name in submodules:
|
||||
return importlib.import_module(f"{package_name}.{name}")
|
||||
if name in attr_to_modules:
|
||||
submod_path = f"{package_name}.{attr_to_modules[name]}"
|
||||
submod = importlib.import_module(submod_path)
|
||||
attr = getattr(submod, name)
|
||||
|
||||
# If the attribute lives in a file (module) with the same
|
||||
# name as the attribute, ensure that the attribute and *not*
|
||||
# the module is accessible on the package.
|
||||
if name == attr_to_modules[name]:
|
||||
pkg = sys.modules[package_name]
|
||||
pkg.__dict__[name] = attr
|
||||
|
||||
return attr
|
||||
raise AttributeError(f"No {package_name} attribute {name}")
|
||||
|
||||
def __dir__() -> list[str]: # noqa: N807
|
||||
return __all__.copy()
|
||||
|
||||
if os.environ.get("EAGER_IMPORT"):
|
||||
for attr in set(attr_to_modules.keys()) | submodules:
|
||||
__getattr__(attr)
|
||||
|
||||
return __getattr__, __dir__, __all__.copy()
|
||||
|
||||
|
||||
class DelayedImportErrorModule(types.ModuleType):
|
||||
"""Module type that delays raising ModuleNotFoundError until attribute access.
|
||||
|
||||
Captures stack frame data to provide helpful error messages showing where
|
||||
the original import was attempted.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
frame_data: _FrameData,
|
||||
*args: Any,
|
||||
message: str,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Initialize the delayed error module.
|
||||
|
||||
Args:
|
||||
frame_data: Captured frame information for error reporting.
|
||||
*args: Positional arguments passed to ModuleType.
|
||||
message: The error message to display when accessed.
|
||||
**kwargs: Keyword arguments passed to ModuleType.
|
||||
"""
|
||||
self._frame_data = frame_data
|
||||
self._message = message
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def __getattr__(self, name: str) -> NoReturn:
|
||||
"""Raise ModuleNotFoundError with detailed context on any attribute access."""
|
||||
frame = self._frame_data
|
||||
code = "".join(frame.code_context) if frame.code_context else ""
|
||||
raise ModuleNotFoundError(
|
||||
f"{self._message}\n\n"
|
||||
"This error is lazily reported, having originally occurred in\n"
|
||||
f" File {frame.filename}, line {frame.lineno}, in {frame.function}\n\n"
|
||||
f"----> {code.strip()}"
|
||||
)
|
||||
|
||||
|
||||
def load(
|
||||
fullname: str,
|
||||
*,
|
||||
require: str | None = None,
|
||||
error_on_import: bool = False,
|
||||
suppress_warning: bool = False,
|
||||
) -> types.ModuleType:
|
||||
"""Return a lazily imported proxy for a module.
|
||||
|
||||
The proxy module delays actual import until first attribute access.
|
||||
|
||||
Example:
|
||||
np = lazy.load("numpy")
|
||||
|
||||
def myfunc():
|
||||
np.norm(...)
|
||||
|
||||
Warning:
|
||||
Lazily loading subpackages causes the parent package to be eagerly
|
||||
loaded. Use `lazy_loader.attach` instead for subpackages.
|
||||
|
||||
Args:
|
||||
fullname: The full name of the module to import (e.g., "scipy").
|
||||
require: A PEP-508 dependency requirement (e.g., "numpy >=1.24").
|
||||
If specified, raises an error if the installed version doesn't match.
|
||||
error_on_import: If True, raise import errors immediately.
|
||||
If False (default), delay errors until module is accessed.
|
||||
suppress_warning: If True, suppress the warning when loading subpackages.
|
||||
|
||||
Returns:
|
||||
A proxy module that loads on first attribute access.
|
||||
"""
|
||||
with _threadlock:
|
||||
module = sys.modules.get(fullname)
|
||||
|
||||
# Most common, short-circuit
|
||||
if module is not None and require is None:
|
||||
return module
|
||||
|
||||
have_module = module is not None
|
||||
|
||||
if not suppress_warning and "." in fullname:
|
||||
msg = (
|
||||
"subpackages can technically be lazily loaded, but it causes the "
|
||||
"package to be eagerly loaded even if it is already lazily loaded. "
|
||||
"So, you probably shouldn't use subpackages with this lazy feature."
|
||||
)
|
||||
warnings.warn(msg, RuntimeWarning, stacklevel=2)
|
||||
|
||||
spec = None
|
||||
|
||||
if not have_module:
|
||||
spec = importlib.util.find_spec(fullname)
|
||||
have_module = spec is not None
|
||||
|
||||
if not have_module:
|
||||
not_found_message = f"No module named '{fullname}'"
|
||||
elif require is not None:
|
||||
try:
|
||||
have_module = _check_requirement(require)
|
||||
except ModuleNotFoundError as e:
|
||||
raise ValueError(
|
||||
f"Found module '{fullname}' but cannot test "
|
||||
"requirement '{require}'. "
|
||||
"Requirements must match distribution name, not module name."
|
||||
) from e
|
||||
|
||||
not_found_message = f"No distribution can be found matching '{require}'"
|
||||
|
||||
if not have_module:
|
||||
if error_on_import:
|
||||
raise ModuleNotFoundError(not_found_message)
|
||||
|
||||
parent = inspect.stack()[1]
|
||||
frame_data = _FrameData(
|
||||
filename=parent.filename,
|
||||
lineno=parent.lineno,
|
||||
function=parent.function,
|
||||
code_context=parent.code_context,
|
||||
)
|
||||
del parent
|
||||
return DelayedImportErrorModule(
|
||||
frame_data,
|
||||
"DelayedImportErrorModule",
|
||||
message=not_found_message,
|
||||
)
|
||||
|
||||
if spec is not None:
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[fullname] = module
|
||||
|
||||
if spec.loader is not None:
|
||||
loader = importlib.util.LazyLoader(spec.loader)
|
||||
loader.exec_module(module)
|
||||
|
||||
if module is None:
|
||||
raise ModuleNotFoundError(f"No module named '{fullname}'")
|
||||
|
||||
return module
|
||||
|
||||
|
||||
def _check_requirement(require: str) -> bool:
|
||||
"""Verify that a package requirement is satisfied.
|
||||
|
||||
Args:
|
||||
require: A dependency requirement as defined in PEP-508.
|
||||
|
||||
Returns:
|
||||
True if the installed version matches the requirement, False otherwise.
|
||||
|
||||
Raises:
|
||||
ModuleNotFoundError: If the package is not installed.
|
||||
"""
|
||||
req = packaging.requirements.Requirement(require)
|
||||
return req.specifier.contains(
|
||||
importlib.metadata.version(req.name),
|
||||
prereleases=True,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class _StubVisitor(ast.NodeVisitor):
|
||||
"""AST visitor to parse a stub file for submodules and submod_attrs."""
|
||||
|
||||
_submodules: set[str] = field(default_factory=set)
|
||||
_submod_attrs: dict[str, list[str]] = field(default_factory=dict)
|
||||
|
||||
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
|
||||
"""Visit an ImportFrom node and extract submodule/attribute information.
|
||||
|
||||
Args:
|
||||
node: The AST ImportFrom node to visit.
|
||||
|
||||
Raises:
|
||||
ValueError: If the import is not a relative import or uses star import.
|
||||
"""
|
||||
if node.level != 1:
|
||||
raise ValueError(
|
||||
"Only within-module imports are supported (`from .* import`)"
|
||||
)
|
||||
names = [alias.name for alias in node.names]
|
||||
if node.module:
|
||||
if "*" in names:
|
||||
raise ValueError(
|
||||
f"lazy stub loader does not support star import "
|
||||
f"`from {node.module} import *`"
|
||||
)
|
||||
self._submod_attrs.setdefault(node.module, []).extend(names)
|
||||
else:
|
||||
self._submodules.update(names)
|
||||
|
||||
|
||||
def attach_stub(
|
||||
package_name: str,
|
||||
filename: str,
|
||||
) -> tuple[Callable[[str], Any], Callable[[], list[str]], list[str]]:
|
||||
"""Attach lazily loaded submodules and functions from a type stub.
|
||||
|
||||
Parses a `.pyi` stub file to infer submodules and attributes. This allows
|
||||
static type checkers to find imports while providing lazy loading at runtime.
|
||||
|
||||
Args:
|
||||
package_name: The package name, typically ``__name__``.
|
||||
filename: Path to `.py` file with an adjacent `.pyi` file.
|
||||
Typically use ``__file__``.
|
||||
|
||||
Returns:
|
||||
A tuple of (__getattr__, __dir__, __all__) to assign in the package.
|
||||
|
||||
Raises:
|
||||
ValueError: If stub file is not found or contains invalid imports.
|
||||
"""
|
||||
path = Path(filename)
|
||||
stubfile = path if path.suffix == ".pyi" else path.with_suffix(".pyi")
|
||||
|
||||
if not stubfile.exists():
|
||||
raise ValueError(f"Cannot load imports from non-existent stub {stubfile!r}")
|
||||
|
||||
visitor = _StubVisitor()
|
||||
visitor.visit(ast.parse(stubfile.read_text()))
|
||||
return attach(package_name, visitor._submodules, visitor._submod_attrs)
|
||||
|
||||
|
||||
def lazy_exports_stub(package_name: str, filename: str) -> None:
|
||||
"""Install lazy loading on a module based on its .pyi stub file.
|
||||
|
||||
Parses the adjacent `.pyi` stub file to determine what to export lazily.
|
||||
Type checkers see the stub, runtime gets lazy loading.
|
||||
|
||||
Example:
|
||||
# __init__.py
|
||||
from crewai.utilities.lazy import lazy_exports_stub
|
||||
lazy_exports_stub(__name__, __file__)
|
||||
|
||||
# __init__.pyi
|
||||
from .config import ChromaDBConfig, ChromaDBSettings
|
||||
from .types import EmbeddingType
|
||||
|
||||
Args:
|
||||
package_name: The package name, typically ``__name__``.
|
||||
filename: Path to the module file, typically ``__file__``.
|
||||
"""
|
||||
__getattr__, __dir__, __all__ = attach_stub(package_name, filename)
|
||||
module = sys.modules[package_name]
|
||||
module.__getattr__ = __getattr__ # type: ignore[method-assign]
|
||||
module.__dir__ = __dir__ # type: ignore[method-assign]
|
||||
module.__dict__["__all__"] = __all__
|
||||
|
||||
|
||||
def lazy_exports(
|
||||
package_name: str,
|
||||
submod_attrs: dict[str, list[str]],
|
||||
submodules: set[str] | None = None,
|
||||
) -> None:
|
||||
"""Install lazy loading on a module.
|
||||
|
||||
Example:
|
||||
from crewai.utilities.lazy import lazy_exports
|
||||
|
||||
lazy_exports(__name__, {
|
||||
'config': ['ChromaDBConfig', 'ChromaDBSettings'],
|
||||
'types': ['EmbeddingType'],
|
||||
})
|
||||
|
||||
Args:
|
||||
package_name: The package name, typically ``__name__``.
|
||||
submod_attrs: Mapping of submodule names to lists of attributes.
|
||||
submodules: Optional set of submodule names to expose directly.
|
||||
"""
|
||||
__getattr__, __dir__, __all__ = attach(package_name, submodules, submod_attrs)
|
||||
module = sys.modules[package_name]
|
||||
module.__getattr__ = __getattr__ # type: ignore[method-assign]
|
||||
module.__dir__ = __dir__ # type: ignore[method-assign]
|
||||
module.__dict__["__all__"] = __all__
|
||||
@@ -2,8 +2,11 @@ from datetime import datetime
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
import tempfile
|
||||
import threading
|
||||
from typing import Any, TypedDict
|
||||
|
||||
import portalocker
|
||||
from typing_extensions import Unpack
|
||||
|
||||
|
||||
@@ -123,10 +126,15 @@ class FileHandler:
|
||||
|
||||
|
||||
class PickleHandler:
|
||||
"""Handler for saving and loading data using pickle.
|
||||
"""Thread-safe handler for saving and loading data using pickle.
|
||||
|
||||
This class provides thread-safe file operations using portalocker for
|
||||
cross-process file locking and atomic write operations to prevent
|
||||
data corruption during concurrent access.
|
||||
|
||||
Attributes:
|
||||
file_path: The path to the pickle file.
|
||||
_lock: Threading lock for thread-safe operations within the same process.
|
||||
"""
|
||||
|
||||
def __init__(self, file_name: str) -> None:
|
||||
@@ -141,34 +149,62 @@ class PickleHandler:
|
||||
file_name += ".pkl"
|
||||
|
||||
self.file_path = os.path.join(os.getcwd(), file_name)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def initialize_file(self) -> None:
|
||||
"""Initialize the file with an empty dictionary and overwrite any existing data."""
|
||||
self.save({})
|
||||
|
||||
def save(self, data: Any) -> None:
|
||||
"""
|
||||
Save the data to the specified file using pickle.
|
||||
"""Save the data to the specified file using pickle with thread-safe atomic writes.
|
||||
|
||||
This method uses a two-phase approach for thread safety:
|
||||
1. Threading lock for same-process thread safety
|
||||
2. Atomic write (write to temp file, then rename) for cross-process safety
|
||||
and data integrity
|
||||
|
||||
Args:
|
||||
data: The data to be saved to the file.
|
||||
data: The data to be saved to the file.
|
||||
"""
|
||||
with open(self.file_path, "wb") as f:
|
||||
pickle.dump(obj=data, file=f)
|
||||
with self._lock:
|
||||
dir_name = os.path.dirname(self.file_path) or os.getcwd()
|
||||
fd, temp_path = tempfile.mkstemp(
|
||||
suffix=".pkl.tmp", prefix="pickle_", dir=dir_name
|
||||
)
|
||||
try:
|
||||
with os.fdopen(fd, "wb") as f:
|
||||
pickle.dump(obj=data, file=f)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(temp_path, self.file_path)
|
||||
except Exception:
|
||||
if os.path.exists(temp_path):
|
||||
os.unlink(temp_path)
|
||||
raise
|
||||
|
||||
def load(self) -> Any:
|
||||
"""Load the data from the specified file using pickle.
|
||||
"""Load the data from the specified file using pickle with thread-safe locking.
|
||||
|
||||
This method uses portalocker for cross-process read locking to ensure
|
||||
data consistency when multiple processes may be accessing the file.
|
||||
|
||||
Returns:
|
||||
The data loaded from the file.
|
||||
The data loaded from the file, or an empty dictionary if the file
|
||||
does not exist or is empty.
|
||||
"""
|
||||
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
|
||||
return {} # Return an empty dictionary if the file does not exist or is empty
|
||||
with self._lock:
|
||||
if (
|
||||
not os.path.exists(self.file_path)
|
||||
or os.path.getsize(self.file_path) == 0
|
||||
):
|
||||
return {}
|
||||
|
||||
with open(self.file_path, "rb") as file:
|
||||
try:
|
||||
return pickle.load(file) # noqa: S301
|
||||
except EOFError:
|
||||
return {} # Return an empty dictionary if the file is empty or corrupted
|
||||
except Exception:
|
||||
raise # Raise any other exceptions that occur during loading
|
||||
with portalocker.Lock(
|
||||
self.file_path, "rb", flags=portalocker.LOCK_SH
|
||||
) as file:
|
||||
try:
|
||||
return pickle.load(file) # noqa: S301
|
||||
except EOFError:
|
||||
return {}
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import os
|
||||
import threading
|
||||
import unittest
|
||||
import uuid
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
import pytest
|
||||
from crewai.utilities.file_handler import PickleHandler
|
||||
@@ -8,7 +10,6 @@ from crewai.utilities.file_handler import PickleHandler
|
||||
|
||||
class TestPickleHandler(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Use a unique file name for each test to avoid race conditions in parallel test execution
|
||||
unique_id = str(uuid.uuid4())
|
||||
self.file_name = f"test_data_{unique_id}.pkl"
|
||||
self.file_path = os.path.join(os.getcwd(), self.file_name)
|
||||
@@ -47,3 +48,234 @@ class TestPickleHandler(unittest.TestCase):
|
||||
|
||||
assert str(exc.value) == "pickle data was truncated"
|
||||
assert "<class '_pickle.UnpicklingError'>" == str(exc.type)
|
||||
|
||||
|
||||
class TestPickleHandlerThreadSafety(unittest.TestCase):
|
||||
"""Tests for thread-safety of PickleHandler operations."""
|
||||
|
||||
def setUp(self):
|
||||
unique_id = str(uuid.uuid4())
|
||||
self.file_name = f"test_thread_safe_{unique_id}.pkl"
|
||||
self.file_path = os.path.join(os.getcwd(), self.file_name)
|
||||
self.handler = PickleHandler(self.file_name)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.file_path):
|
||||
os.remove(self.file_path)
|
||||
|
||||
def test_concurrent_writes_same_handler(self):
|
||||
"""Test that concurrent writes from multiple threads using the same handler don't corrupt data."""
|
||||
num_threads = 10
|
||||
num_writes_per_thread = 20
|
||||
errors: list[Exception] = []
|
||||
write_count = 0
|
||||
count_lock = threading.Lock()
|
||||
|
||||
def write_data(thread_id: int) -> None:
|
||||
nonlocal write_count
|
||||
for i in range(num_writes_per_thread):
|
||||
try:
|
||||
data = {"thread": thread_id, "iteration": i, "data": f"value_{thread_id}_{i}"}
|
||||
self.handler.save(data)
|
||||
with count_lock:
|
||||
write_count += 1
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = []
|
||||
for i in range(num_threads):
|
||||
t = threading.Thread(target=write_data, args=(i,))
|
||||
threads.append(t)
|
||||
t.start()
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(errors) == 0, f"Errors occurred during concurrent writes: {errors}"
|
||||
assert write_count == num_threads * num_writes_per_thread
|
||||
loaded_data = self.handler.load()
|
||||
assert isinstance(loaded_data, dict)
|
||||
assert "thread" in loaded_data
|
||||
assert "iteration" in loaded_data
|
||||
|
||||
def test_concurrent_reads_same_handler(self):
|
||||
"""Test that concurrent reads from multiple threads don't cause issues."""
|
||||
test_data = {"key": "value", "nested": {"a": 1, "b": 2}}
|
||||
self.handler.save(test_data)
|
||||
|
||||
num_threads = 20
|
||||
results: list[dict] = []
|
||||
errors: list[Exception] = []
|
||||
results_lock = threading.Lock()
|
||||
|
||||
def read_data() -> None:
|
||||
try:
|
||||
data = self.handler.load()
|
||||
with results_lock:
|
||||
results.append(data)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
threads = []
|
||||
for _ in range(num_threads):
|
||||
t = threading.Thread(target=read_data)
|
||||
threads.append(t)
|
||||
t.start()
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(errors) == 0, f"Errors occurred during concurrent reads: {errors}"
|
||||
assert len(results) == num_threads
|
||||
for result in results:
|
||||
assert result == test_data
|
||||
|
||||
def test_concurrent_read_write_same_handler(self):
|
||||
"""Test that concurrent reads and writes don't corrupt data or cause errors."""
|
||||
initial_data = {"counter": 0}
|
||||
self.handler.save(initial_data)
|
||||
|
||||
num_writers = 5
|
||||
num_readers = 10
|
||||
writes_per_thread = 10
|
||||
reads_per_thread = 20
|
||||
write_errors: list[Exception] = []
|
||||
read_errors: list[Exception] = []
|
||||
read_results: list[dict] = []
|
||||
results_lock = threading.Lock()
|
||||
|
||||
def writer(thread_id: int) -> None:
|
||||
for i in range(writes_per_thread):
|
||||
try:
|
||||
data = {"writer": thread_id, "write_num": i}
|
||||
self.handler.save(data)
|
||||
except Exception as e:
|
||||
write_errors.append(e)
|
||||
|
||||
def reader() -> None:
|
||||
for _ in range(reads_per_thread):
|
||||
try:
|
||||
data = self.handler.load()
|
||||
with results_lock:
|
||||
read_results.append(data)
|
||||
except Exception as e:
|
||||
read_errors.append(e)
|
||||
|
||||
threads = []
|
||||
for i in range(num_writers):
|
||||
t = threading.Thread(target=writer, args=(i,))
|
||||
threads.append(t)
|
||||
|
||||
for _ in range(num_readers):
|
||||
t = threading.Thread(target=reader)
|
||||
threads.append(t)
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert len(write_errors) == 0, f"Write errors: {write_errors}"
|
||||
assert len(read_errors) == 0, f"Read errors: {read_errors}"
|
||||
for result in read_results:
|
||||
assert isinstance(result, dict)
|
||||
|
||||
def test_atomic_write_no_partial_data(self):
|
||||
"""Test that atomic writes prevent partial/corrupted data from being read."""
|
||||
large_data = {"key": "x" * 100000, "numbers": list(range(10000))}
|
||||
num_iterations = 50
|
||||
errors: list[Exception] = []
|
||||
corruption_detected = False
|
||||
corruption_lock = threading.Lock()
|
||||
|
||||
def writer() -> None:
|
||||
for _ in range(num_iterations):
|
||||
try:
|
||||
self.handler.save(large_data)
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
def reader() -> None:
|
||||
nonlocal corruption_detected
|
||||
for _ in range(num_iterations * 2):
|
||||
try:
|
||||
data = self.handler.load()
|
||||
if data and data != {} and data != large_data:
|
||||
with corruption_lock:
|
||||
corruption_detected = True
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
writer_thread = threading.Thread(target=writer)
|
||||
reader_thread = threading.Thread(target=reader)
|
||||
|
||||
writer_thread.start()
|
||||
reader_thread.start()
|
||||
|
||||
writer_thread.join()
|
||||
reader_thread.join()
|
||||
|
||||
assert len(errors) == 0, f"Errors occurred: {errors}"
|
||||
assert not corruption_detected, "Partial/corrupted data was read"
|
||||
|
||||
def test_thread_pool_concurrent_operations(self):
|
||||
"""Test thread safety using ThreadPoolExecutor for more realistic concurrent access."""
|
||||
num_operations = 100
|
||||
errors: list[Exception] = []
|
||||
|
||||
def operation(op_id: int) -> str:
|
||||
try:
|
||||
if op_id % 3 == 0:
|
||||
self.handler.save({"op_id": op_id, "type": "write"})
|
||||
return f"write_{op_id}"
|
||||
else:
|
||||
data = self.handler.load()
|
||||
return f"read_{op_id}_{type(data).__name__}"
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
return f"error_{op_id}"
|
||||
|
||||
with ThreadPoolExecutor(max_workers=20) as executor:
|
||||
futures = [executor.submit(operation, i) for i in range(num_operations)]
|
||||
results = [f.result() for f in as_completed(futures)]
|
||||
|
||||
assert len(errors) == 0, f"Errors occurred: {errors}"
|
||||
assert len(results) == num_operations
|
||||
|
||||
def test_multiple_handlers_same_file(self):
|
||||
"""Test that multiple PickleHandler instances for the same file work correctly."""
|
||||
handler1 = PickleHandler(self.file_name)
|
||||
handler2 = PickleHandler(self.file_name)
|
||||
|
||||
num_operations = 50
|
||||
errors: list[Exception] = []
|
||||
|
||||
def use_handler1() -> None:
|
||||
for i in range(num_operations):
|
||||
try:
|
||||
handler1.save({"handler": 1, "iteration": i})
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
def use_handler2() -> None:
|
||||
for i in range(num_operations):
|
||||
try:
|
||||
handler2.save({"handler": 2, "iteration": i})
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
|
||||
t1 = threading.Thread(target=use_handler1)
|
||||
t2 = threading.Thread(target=use_handler2)
|
||||
|
||||
t1.start()
|
||||
t2.start()
|
||||
|
||||
t1.join()
|
||||
t2.join()
|
||||
|
||||
assert len(errors) == 0, f"Errors occurred: {errors}"
|
||||
final_data = self.handler.load()
|
||||
assert isinstance(final_data, dict)
|
||||
assert "handler" in final_data
|
||||
assert "iteration" in final_data
|
||||
|
||||
Reference in New Issue
Block a user