Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
519f8ce0eb chore: re-trigger CI checks
Co-Authored-By: João <joao@crewai.com>
2026-01-10 21:12:38 +00:00
Devin AI
802ca92e42 fix: make PickleHandler thread-safe with portalocker and atomic writes
- Add threading lock for same-process thread safety
- Use atomic write operations (write to temp file, then rename) for data integrity
- Use portalocker for cross-process read locking
- Add comprehensive thread-safety tests covering concurrent reads, writes, and mixed operations

Fixes #4215

Co-Authored-By: João <joao@crewai.com>
2026-01-10 21:09:02 +00:00
17 changed files with 304 additions and 502 deletions

View File

@@ -574,10 +574,6 @@ When you run this Flow, the output will change based on the random boolean value
### Human in the Loop (human feedback)
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**.
</Note>
The `@human_feedback` decorator enables human-in-the-loop workflows by pausing flow execution to collect feedback from a human. This is useful for approval gates, quality review, and decision points that require human judgment.
```python Code

View File

@@ -91,10 +91,6 @@ The `A2AConfig` class accepts the following parameters:
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
</ParamField>
<ParamField path="transport_protocol" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
</ParamField>
## Authentication
For A2A agents that require authentication, use one of the provided auth schemes:

View File

@@ -7,10 +7,6 @@ mode: "wide"
## Overview
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**. Make sure to update your installation before using this feature.
</Note>
The `@human_feedback` decorator enables human-in-the-loop (HITL) workflows directly within CrewAI Flows. It allows you to pause flow execution, present output to a human for review, collect their feedback, and optionally route to different listeners based on the feedback outcome.
This is particularly valuable for:

View File

@@ -11,10 +11,10 @@ Human-in-the-Loop (HITL) is a powerful approach that combines artificial intelli
CrewAI offers two main approaches for implementing human-in-the-loop workflows:
| Approach | Best For | Integration | Version |
|----------|----------|-------------|---------|
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) | **1.8.0+** |
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide | - |
| Approach | Best For | Integration |
|----------|----------|-------------|
| **Flow-based** (`@human_feedback` decorator) | Local development, console-based review, synchronous workflows | [Human Feedback in Flows](/en/learn/human-feedback-in-flows) |
| **Webhook-based** (Enterprise) | Production deployments, async workflows, external integrations (Slack, Teams, etc.) | This guide |
<Tip>
If you're building flows and want to add human review steps with routing based on feedback, check out the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide for the `@human_feedback` decorator.

View File

@@ -567,10 +567,6 @@ Fourth method running
### Human in the Loop (인간 피드백)
<Note>
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다.
</Note>
`@human_feedback` 데코레이터는 인간의 피드백을 수집하기 위해 플로우 실행을 일시 중지하는 human-in-the-loop 워크플로우를 가능하게 합니다. 이는 승인 게이트, 품질 검토, 인간의 판단이 필요한 결정 지점에 유용합니다.
```python Code

View File

@@ -7,10 +7,6 @@ mode: "wide"
## 개요
<Note>
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다. 이 기능을 사용하기 전에 설치를 업데이트하세요.
</Note>
`@human_feedback` 데코레이터는 CrewAI Flow 내에서 직접 human-in-the-loop(HITL) 워크플로우를 가능하게 합니다. Flow 실행을 일시 중지하고, 인간에게 검토를 위해 출력을 제시하고, 피드백을 수집하고, 선택적으로 피드백 결과에 따라 다른 리스너로 라우팅할 수 있습니다.
이는 특히 다음과 같은 경우에 유용합니다:

View File

@@ -5,22 +5,9 @@ icon: "user-check"
mode: "wide"
---
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. CrewAI는 필요에 따라 HITL을 구현하는 여러 가지 방법을 제공합니다.
휴먼 인 더 루프(HITL, Human-in-the-Loop)는 인공지능과 인간의 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. 이 가이드에서는 CrewAI 내에서 HITL을 구현하는 방법을 안내합니다.
## HITL 접근 방식 선택
CrewAI는 human-in-the-loop 워크플로우를 구현하기 위한 두 가지 주요 접근 방식을 제공합니다:
| 접근 방식 | 적합한 용도 | 통합 | 버전 |
|----------|----------|-------------|---------|
| **Flow 기반** (`@human_feedback` 데코레이터) | 로컬 개발, 콘솔 기반 검토, 동기식 워크플로우 | [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) | **1.8.0+** |
| **Webhook 기반** (Enterprise) | 프로덕션 배포, 비동기 워크플로우, 외부 통합 (Slack, Teams 등) | 이 가이드 | - |
<Tip>
Flow를 구축하면서 피드백을 기반으로 라우팅하는 인간 검토 단계를 추가하려면 `@human_feedback` 데코레이터에 대한 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) 가이드를 참조하세요.
</Tip>
## Webhook 기반 HITL 워크플로우 설정
## HITL 워크플로우 설정
<Steps>
<Step title="작업 구성">

View File

@@ -309,10 +309,6 @@ Ao executar esse Flow, a saída será diferente dependendo do valor booleano ale
### Human in the Loop (feedback humano)
<Note>
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**.
</Note>
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop, pausando a execução do flow para coletar feedback de um humano. Isso é útil para portões de aprovação, revisão de qualidade e pontos de decisão que requerem julgamento humano.
```python Code

View File

@@ -7,10 +7,6 @@ mode: "wide"
## Visão Geral
<Note>
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**. Certifique-se de atualizar sua instalação antes de usar este recurso.
</Note>
O decorador `@human_feedback` permite fluxos de trabalho human-in-the-loop (HITL) diretamente nos CrewAI Flows. Ele permite pausar a execução do flow, apresentar a saída para um humano revisar, coletar seu feedback e, opcionalmente, rotear para diferentes listeners com base no resultado do feedback.
Isso é particularmente valioso para:

View File

@@ -5,22 +5,9 @@ icon: "user-check"
mode: "wide"
---
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. CrewAI oferece várias maneiras de implementar HITL dependendo das suas necessidades.
Human-in-the-Loop (HITL) é uma abordagem poderosa que combina a inteligência artificial com a experiência humana para aprimorar a tomada de decisões e melhorar os resultados das tarefas. Este guia mostra como implementar HITL dentro da CrewAI.
## Escolhendo Sua Abordagem HITL
CrewAI oferece duas abordagens principais para implementar workflows human-in-the-loop:
| Abordagem | Melhor Para | Integração | Versão |
|----------|----------|-------------|---------|
| **Baseada em Flow** (decorador `@human_feedback`) | Desenvolvimento local, revisão via console, workflows síncronos | [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) | **1.8.0+** |
| **Baseada em Webhook** (Enterprise) | Deployments em produção, workflows assíncronos, integrações externas (Slack, Teams, etc.) | Este guia | - |
<Tip>
Se você está construindo flows e deseja adicionar etapas de revisão humana com roteamento baseado em feedback, confira o guia [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows) para o decorador `@human_feedback`.
</Tip>
## Configurando Workflows HITL Baseados em Webhook
## Configurando Workflows HITL
<Steps>
<Step title="Configure sua Tarefa">

View File

@@ -5,7 +5,7 @@ This module is separate from experimental.a2a to avoid circular imports.
from __future__ import annotations
from typing import Annotated, Any, ClassVar, Literal
from typing import Annotated, Any, ClassVar
from pydantic import (
BaseModel,
@@ -53,7 +53,6 @@ class A2AConfig(BaseModel):
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
transport_protocol: A2A transport protocol (grpc, jsonrpc, http+json).
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@@ -83,7 +82,3 @@ class A2AConfig(BaseModel):
default_factory=_get_default_update_config,
description="Update mechanism config",
)
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)

View File

@@ -7,7 +7,7 @@ from collections.abc import AsyncIterator, MutableMapping
from contextlib import asynccontextmanager
from functools import lru_cache
import time
from typing import TYPE_CHECKING, Any, Literal
from typing import TYPE_CHECKING, Any
import uuid
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
@@ -18,6 +18,7 @@ from a2a.types import (
PushNotificationConfig as A2APushNotificationConfig,
Role,
TextPart,
TransportProtocol,
)
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
@@ -258,7 +259,6 @@ async def _afetch_agent_card_impl(
def execute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -282,23 +282,6 @@ def execute_a2a_delegation(
use aexecute_a2a_delegation directly.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
timeout: Request timeout in seconds
task_description: The task to delegate
context: Optional context information
context_id: Context ID for correlating messages/tasks
task_id: Specific task identifier
reference_task_ids: List of related task IDs
metadata: Additional metadata (external_id, request_id, etc.)
extensions: Protocol extensions for custom fields
conversation_history: Previous Message objects from conversation
agent_id: Agent identifier for logging
agent_role: Role of the CrewAI agent delegating the task
agent_branch: Optional agent tree branch for logging
response_model: Optional Pydantic model for structured outputs
turn_number: Optional turn number for multi-turn conversations
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -340,7 +323,6 @@ def execute_a2a_delegation(
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
transport_protocol=transport_protocol,
turn_number=turn_number,
updates=updates,
)
@@ -351,7 +333,6 @@ def execute_a2a_delegation(
async def aexecute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -375,23 +356,6 @@ async def aexecute_a2a_delegation(
in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()).
Args:
endpoint: A2A agent endpoint URL
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
task_description: Task to delegate
context: Optional context
context_id: Context ID for correlation
task_id: Specific task identifier
reference_task_ids: Related task IDs
metadata: Additional metadata
extensions: Protocol extensions
conversation_history: Previous Message objects
turn_number: Current turn number
agent_branch: Agent tree branch for logging
agent_id: Agent identifier for logging
agent_role: Agent role for logging
response_model: Optional Pydantic model for structured outputs
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -450,7 +414,6 @@ async def aexecute_a2a_delegation(
agent_role=agent_role,
response_model=response_model,
updates=updates,
transport_protocol=transport_protocol,
)
crewai_event_bus.emit(
@@ -468,7 +431,6 @@ async def aexecute_a2a_delegation(
async def _aexecute_a2a_delegation_impl(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -562,6 +524,7 @@ async def _aexecute_a2a_delegation_impl(
extensions=extensions,
)
transport_protocol = TransportProtocol("JSONRPC")
new_messages: list[Message] = [*conversation_history, message]
crewai_event_bus.emit(
None,
@@ -633,7 +596,7 @@ async def _aexecute_a2a_delegation_impl(
@asynccontextmanager
async def _create_a2a_client(
agent_card: AgentCard,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
transport_protocol: TransportProtocol,
timeout: int,
headers: MutableMapping[str, str],
streaming: bool,
@@ -677,7 +640,7 @@ async def _create_a2a_client(
config = ClientConfig(
httpx_client=httpx_client,
supported_transports=[transport_protocol],
supported_transports=[str(transport_protocol.value)],
streaming=streaming and not use_polling,
polling=use_polling,
accepted_output_modes=["application/json"],

View File

@@ -771,7 +771,6 @@ def _delegate_to_a2a(
response_model=agent_config.response_model,
turn_number=turn_num + 1,
updates=agent_config.updates,
transport_protocol=agent_config.transport_protocol,
)
conversation_history = a2a_result.get("history", [])
@@ -1086,7 +1085,6 @@ async def _adelegate_to_a2a(
agent_branch=agent_branch,
response_model=agent_config.response_model,
turn_number=turn_num + 1,
transport_protocol=agent_config.transport_protocol,
updates=agent_config.updates,
)

View File

@@ -209,9 +209,10 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
# Handle telemetry
span = self.execution_spans.pop(source, None)
span = self.execution_spans.get(source)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)
@@ -221,10 +222,11 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
span = self.execution_spans.pop(source, None)
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)

View File

@@ -1,370 +0,0 @@
"""Lazy loader for Python packages.
Makes it easy to load subpackages and functions on demand.
Pulled from https://github.com/scientific-python/lazy-loader/blob/main/src/lazy_loader/__init__.py,
modernized a little.
"""
import ast
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
import importlib
import importlib.metadata
import importlib.util
import inspect
import os
from pathlib import Path
import sys
import threading
import types
from typing import Any, NoReturn
import warnings
import packaging.requirements
_threadlock = threading.Lock()
@dataclass(frozen=True, slots=True)
class _FrameData:
"""Captured stack frame information for delayed error reporting."""
filename: str
lineno: int
function: str
code_context: Sequence[str] | None
def attach(
package_name: str,
submodules: set[str] | None = None,
submod_attrs: dict[str, list[str]] | None = None,
) -> tuple[Callable[[str], Any], Callable[[], list[str]], list[str]]:
"""Attach lazily loaded submodules, functions, or other attributes.
Replaces a package's `__getattr__`, `__dir__`, and `__all__` such that
imports work normally but occur upon first use.
Example:
__getattr__, __dir__, __all__ = lazy.attach(
__name__, ["mysubmodule"], {"foo": ["someattr"]}
)
Args:
package_name: The package name, typically ``__name__``.
submodules: Set of submodule names to attach.
submod_attrs: Mapping of submodule names to lists of attributes.
These attributes are imported as they are used.
Returns:
A tuple of (__getattr__, __dir__, __all__) to assign in the package.
"""
submod_attrs = submod_attrs or {}
submodules = set(submodules) if submodules else set()
attr_to_modules = {
attr: mod for mod, attrs in submod_attrs.items() for attr in attrs
}
__all__ = sorted(submodules | attr_to_modules.keys())
def __getattr__(name: str) -> Any: # noqa: N807
if name in submodules:
return importlib.import_module(f"{package_name}.{name}")
if name in attr_to_modules:
submod_path = f"{package_name}.{attr_to_modules[name]}"
submod = importlib.import_module(submod_path)
attr = getattr(submod, name)
# If the attribute lives in a file (module) with the same
# name as the attribute, ensure that the attribute and *not*
# the module is accessible on the package.
if name == attr_to_modules[name]:
pkg = sys.modules[package_name]
pkg.__dict__[name] = attr
return attr
raise AttributeError(f"No {package_name} attribute {name}")
def __dir__() -> list[str]: # noqa: N807
return __all__.copy()
if os.environ.get("EAGER_IMPORT"):
for attr in set(attr_to_modules.keys()) | submodules:
__getattr__(attr)
return __getattr__, __dir__, __all__.copy()
class DelayedImportErrorModule(types.ModuleType):
"""Module type that delays raising ModuleNotFoundError until attribute access.
Captures stack frame data to provide helpful error messages showing where
the original import was attempted.
"""
def __init__(
self,
frame_data: _FrameData,
*args: Any,
message: str,
**kwargs: Any,
) -> None:
"""Initialize the delayed error module.
Args:
frame_data: Captured frame information for error reporting.
*args: Positional arguments passed to ModuleType.
message: The error message to display when accessed.
**kwargs: Keyword arguments passed to ModuleType.
"""
self._frame_data = frame_data
self._message = message
super().__init__(*args, **kwargs)
def __getattr__(self, name: str) -> NoReturn:
"""Raise ModuleNotFoundError with detailed context on any attribute access."""
frame = self._frame_data
code = "".join(frame.code_context) if frame.code_context else ""
raise ModuleNotFoundError(
f"{self._message}\n\n"
"This error is lazily reported, having originally occurred in\n"
f" File {frame.filename}, line {frame.lineno}, in {frame.function}\n\n"
f"----> {code.strip()}"
)
def load(
fullname: str,
*,
require: str | None = None,
error_on_import: bool = False,
suppress_warning: bool = False,
) -> types.ModuleType:
"""Return a lazily imported proxy for a module.
The proxy module delays actual import until first attribute access.
Example:
np = lazy.load("numpy")
def myfunc():
np.norm(...)
Warning:
Lazily loading subpackages causes the parent package to be eagerly
loaded. Use `lazy_loader.attach` instead for subpackages.
Args:
fullname: The full name of the module to import (e.g., "scipy").
require: A PEP-508 dependency requirement (e.g., "numpy >=1.24").
If specified, raises an error if the installed version doesn't match.
error_on_import: If True, raise import errors immediately.
If False (default), delay errors until module is accessed.
suppress_warning: If True, suppress the warning when loading subpackages.
Returns:
A proxy module that loads on first attribute access.
"""
with _threadlock:
module = sys.modules.get(fullname)
# Most common, short-circuit
if module is not None and require is None:
return module
have_module = module is not None
if not suppress_warning and "." in fullname:
msg = (
"subpackages can technically be lazily loaded, but it causes the "
"package to be eagerly loaded even if it is already lazily loaded. "
"So, you probably shouldn't use subpackages with this lazy feature."
)
warnings.warn(msg, RuntimeWarning, stacklevel=2)
spec = None
if not have_module:
spec = importlib.util.find_spec(fullname)
have_module = spec is not None
if not have_module:
not_found_message = f"No module named '{fullname}'"
elif require is not None:
try:
have_module = _check_requirement(require)
except ModuleNotFoundError as e:
raise ValueError(
f"Found module '{fullname}' but cannot test "
"requirement '{require}'. "
"Requirements must match distribution name, not module name."
) from e
not_found_message = f"No distribution can be found matching '{require}'"
if not have_module:
if error_on_import:
raise ModuleNotFoundError(not_found_message)
parent = inspect.stack()[1]
frame_data = _FrameData(
filename=parent.filename,
lineno=parent.lineno,
function=parent.function,
code_context=parent.code_context,
)
del parent
return DelayedImportErrorModule(
frame_data,
"DelayedImportErrorModule",
message=not_found_message,
)
if spec is not None:
module = importlib.util.module_from_spec(spec)
sys.modules[fullname] = module
if spec.loader is not None:
loader = importlib.util.LazyLoader(spec.loader)
loader.exec_module(module)
if module is None:
raise ModuleNotFoundError(f"No module named '{fullname}'")
return module
def _check_requirement(require: str) -> bool:
"""Verify that a package requirement is satisfied.
Args:
require: A dependency requirement as defined in PEP-508.
Returns:
True if the installed version matches the requirement, False otherwise.
Raises:
ModuleNotFoundError: If the package is not installed.
"""
req = packaging.requirements.Requirement(require)
return req.specifier.contains(
importlib.metadata.version(req.name),
prereleases=True,
)
@dataclass
class _StubVisitor(ast.NodeVisitor):
"""AST visitor to parse a stub file for submodules and submod_attrs."""
_submodules: set[str] = field(default_factory=set)
_submod_attrs: dict[str, list[str]] = field(default_factory=dict)
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
"""Visit an ImportFrom node and extract submodule/attribute information.
Args:
node: The AST ImportFrom node to visit.
Raises:
ValueError: If the import is not a relative import or uses star import.
"""
if node.level != 1:
raise ValueError(
"Only within-module imports are supported (`from .* import`)"
)
names = [alias.name for alias in node.names]
if node.module:
if "*" in names:
raise ValueError(
f"lazy stub loader does not support star import "
f"`from {node.module} import *`"
)
self._submod_attrs.setdefault(node.module, []).extend(names)
else:
self._submodules.update(names)
def attach_stub(
package_name: str,
filename: str,
) -> tuple[Callable[[str], Any], Callable[[], list[str]], list[str]]:
"""Attach lazily loaded submodules and functions from a type stub.
Parses a `.pyi` stub file to infer submodules and attributes. This allows
static type checkers to find imports while providing lazy loading at runtime.
Args:
package_name: The package name, typically ``__name__``.
filename: Path to `.py` file with an adjacent `.pyi` file.
Typically use ``__file__``.
Returns:
A tuple of (__getattr__, __dir__, __all__) to assign in the package.
Raises:
ValueError: If stub file is not found or contains invalid imports.
"""
path = Path(filename)
stubfile = path if path.suffix == ".pyi" else path.with_suffix(".pyi")
if not stubfile.exists():
raise ValueError(f"Cannot load imports from non-existent stub {stubfile!r}")
visitor = _StubVisitor()
visitor.visit(ast.parse(stubfile.read_text()))
return attach(package_name, visitor._submodules, visitor._submod_attrs)
def lazy_exports_stub(package_name: str, filename: str) -> None:
"""Install lazy loading on a module based on its .pyi stub file.
Parses the adjacent `.pyi` stub file to determine what to export lazily.
Type checkers see the stub, runtime gets lazy loading.
Example:
# __init__.py
from crewai.utilities.lazy import lazy_exports_stub
lazy_exports_stub(__name__, __file__)
# __init__.pyi
from .config import ChromaDBConfig, ChromaDBSettings
from .types import EmbeddingType
Args:
package_name: The package name, typically ``__name__``.
filename: Path to the module file, typically ``__file__``.
"""
__getattr__, __dir__, __all__ = attach_stub(package_name, filename)
module = sys.modules[package_name]
module.__getattr__ = __getattr__ # type: ignore[method-assign]
module.__dir__ = __dir__ # type: ignore[method-assign]
module.__dict__["__all__"] = __all__
def lazy_exports(
package_name: str,
submod_attrs: dict[str, list[str]],
submodules: set[str] | None = None,
) -> None:
"""Install lazy loading on a module.
Example:
from crewai.utilities.lazy import lazy_exports
lazy_exports(__name__, {
'config': ['ChromaDBConfig', 'ChromaDBSettings'],
'types': ['EmbeddingType'],
})
Args:
package_name: The package name, typically ``__name__``.
submod_attrs: Mapping of submodule names to lists of attributes.
submodules: Optional set of submodule names to expose directly.
"""
__getattr__, __dir__, __all__ = attach(package_name, submodules, submod_attrs)
module = sys.modules[package_name]
module.__getattr__ = __getattr__ # type: ignore[method-assign]
module.__dir__ = __dir__ # type: ignore[method-assign]
module.__dict__["__all__"] = __all__

View File

@@ -2,8 +2,11 @@ from datetime import datetime
import json
import os
import pickle
import tempfile
import threading
from typing import Any, TypedDict
import portalocker
from typing_extensions import Unpack
@@ -123,10 +126,15 @@ class FileHandler:
class PickleHandler:
"""Handler for saving and loading data using pickle.
"""Thread-safe handler for saving and loading data using pickle.
This class provides thread-safe file operations using portalocker for
cross-process file locking and atomic write operations to prevent
data corruption during concurrent access.
Attributes:
file_path: The path to the pickle file.
_lock: Threading lock for thread-safe operations within the same process.
"""
def __init__(self, file_name: str) -> None:
@@ -141,34 +149,62 @@ class PickleHandler:
file_name += ".pkl"
self.file_path = os.path.join(os.getcwd(), file_name)
self._lock = threading.Lock()
def initialize_file(self) -> None:
"""Initialize the file with an empty dictionary and overwrite any existing data."""
self.save({})
def save(self, data: Any) -> None:
"""
Save the data to the specified file using pickle.
"""Save the data to the specified file using pickle with thread-safe atomic writes.
This method uses a two-phase approach for thread safety:
1. Threading lock for same-process thread safety
2. Atomic write (write to temp file, then rename) for cross-process safety
and data integrity
Args:
data: The data to be saved to the file.
data: The data to be saved to the file.
"""
with open(self.file_path, "wb") as f:
pickle.dump(obj=data, file=f)
with self._lock:
dir_name = os.path.dirname(self.file_path) or os.getcwd()
fd, temp_path = tempfile.mkstemp(
suffix=".pkl.tmp", prefix="pickle_", dir=dir_name
)
try:
with os.fdopen(fd, "wb") as f:
pickle.dump(obj=data, file=f)
f.flush()
os.fsync(f.fileno())
os.replace(temp_path, self.file_path)
except Exception:
if os.path.exists(temp_path):
os.unlink(temp_path)
raise
def load(self) -> Any:
"""Load the data from the specified file using pickle.
"""Load the data from the specified file using pickle with thread-safe locking.
This method uses portalocker for cross-process read locking to ensure
data consistency when multiple processes may be accessing the file.
Returns:
The data loaded from the file.
The data loaded from the file, or an empty dictionary if the file
does not exist or is empty.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
return {} # Return an empty dictionary if the file does not exist or is empty
with self._lock:
if (
not os.path.exists(self.file_path)
or os.path.getsize(self.file_path) == 0
):
return {}
with open(self.file_path, "rb") as file:
try:
return pickle.load(file) # noqa: S301
except EOFError:
return {} # Return an empty dictionary if the file is empty or corrupted
except Exception:
raise # Raise any other exceptions that occur during loading
with portalocker.Lock(
self.file_path, "rb", flags=portalocker.LOCK_SH
) as file:
try:
return pickle.load(file) # noqa: S301
except EOFError:
return {}
except Exception:
raise

View File

@@ -1,6 +1,8 @@
import os
import threading
import unittest
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
import pytest
from crewai.utilities.file_handler import PickleHandler
@@ -8,7 +10,6 @@ from crewai.utilities.file_handler import PickleHandler
class TestPickleHandler(unittest.TestCase):
def setUp(self):
# Use a unique file name for each test to avoid race conditions in parallel test execution
unique_id = str(uuid.uuid4())
self.file_name = f"test_data_{unique_id}.pkl"
self.file_path = os.path.join(os.getcwd(), self.file_name)
@@ -47,3 +48,234 @@ class TestPickleHandler(unittest.TestCase):
assert str(exc.value) == "pickle data was truncated"
assert "<class '_pickle.UnpicklingError'>" == str(exc.type)
class TestPickleHandlerThreadSafety(unittest.TestCase):
"""Tests for thread-safety of PickleHandler operations."""
def setUp(self):
unique_id = str(uuid.uuid4())
self.file_name = f"test_thread_safe_{unique_id}.pkl"
self.file_path = os.path.join(os.getcwd(), self.file_name)
self.handler = PickleHandler(self.file_name)
def tearDown(self):
if os.path.exists(self.file_path):
os.remove(self.file_path)
def test_concurrent_writes_same_handler(self):
"""Test that concurrent writes from multiple threads using the same handler don't corrupt data."""
num_threads = 10
num_writes_per_thread = 20
errors: list[Exception] = []
write_count = 0
count_lock = threading.Lock()
def write_data(thread_id: int) -> None:
nonlocal write_count
for i in range(num_writes_per_thread):
try:
data = {"thread": thread_id, "iteration": i, "data": f"value_{thread_id}_{i}"}
self.handler.save(data)
with count_lock:
write_count += 1
except Exception as e:
errors.append(e)
threads = []
for i in range(num_threads):
t = threading.Thread(target=write_data, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Errors occurred during concurrent writes: {errors}"
assert write_count == num_threads * num_writes_per_thread
loaded_data = self.handler.load()
assert isinstance(loaded_data, dict)
assert "thread" in loaded_data
assert "iteration" in loaded_data
def test_concurrent_reads_same_handler(self):
"""Test that concurrent reads from multiple threads don't cause issues."""
test_data = {"key": "value", "nested": {"a": 1, "b": 2}}
self.handler.save(test_data)
num_threads = 20
results: list[dict] = []
errors: list[Exception] = []
results_lock = threading.Lock()
def read_data() -> None:
try:
data = self.handler.load()
with results_lock:
results.append(data)
except Exception as e:
errors.append(e)
threads = []
for _ in range(num_threads):
t = threading.Thread(target=read_data)
threads.append(t)
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Errors occurred during concurrent reads: {errors}"
assert len(results) == num_threads
for result in results:
assert result == test_data
def test_concurrent_read_write_same_handler(self):
"""Test that concurrent reads and writes don't corrupt data or cause errors."""
initial_data = {"counter": 0}
self.handler.save(initial_data)
num_writers = 5
num_readers = 10
writes_per_thread = 10
reads_per_thread = 20
write_errors: list[Exception] = []
read_errors: list[Exception] = []
read_results: list[dict] = []
results_lock = threading.Lock()
def writer(thread_id: int) -> None:
for i in range(writes_per_thread):
try:
data = {"writer": thread_id, "write_num": i}
self.handler.save(data)
except Exception as e:
write_errors.append(e)
def reader() -> None:
for _ in range(reads_per_thread):
try:
data = self.handler.load()
with results_lock:
read_results.append(data)
except Exception as e:
read_errors.append(e)
threads = []
for i in range(num_writers):
t = threading.Thread(target=writer, args=(i,))
threads.append(t)
for _ in range(num_readers):
t = threading.Thread(target=reader)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
assert len(write_errors) == 0, f"Write errors: {write_errors}"
assert len(read_errors) == 0, f"Read errors: {read_errors}"
for result in read_results:
assert isinstance(result, dict)
def test_atomic_write_no_partial_data(self):
"""Test that atomic writes prevent partial/corrupted data from being read."""
large_data = {"key": "x" * 100000, "numbers": list(range(10000))}
num_iterations = 50
errors: list[Exception] = []
corruption_detected = False
corruption_lock = threading.Lock()
def writer() -> None:
for _ in range(num_iterations):
try:
self.handler.save(large_data)
except Exception as e:
errors.append(e)
def reader() -> None:
nonlocal corruption_detected
for _ in range(num_iterations * 2):
try:
data = self.handler.load()
if data and data != {} and data != large_data:
with corruption_lock:
corruption_detected = True
except Exception as e:
errors.append(e)
writer_thread = threading.Thread(target=writer)
reader_thread = threading.Thread(target=reader)
writer_thread.start()
reader_thread.start()
writer_thread.join()
reader_thread.join()
assert len(errors) == 0, f"Errors occurred: {errors}"
assert not corruption_detected, "Partial/corrupted data was read"
def test_thread_pool_concurrent_operations(self):
"""Test thread safety using ThreadPoolExecutor for more realistic concurrent access."""
num_operations = 100
errors: list[Exception] = []
def operation(op_id: int) -> str:
try:
if op_id % 3 == 0:
self.handler.save({"op_id": op_id, "type": "write"})
return f"write_{op_id}"
else:
data = self.handler.load()
return f"read_{op_id}_{type(data).__name__}"
except Exception as e:
errors.append(e)
return f"error_{op_id}"
with ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(operation, i) for i in range(num_operations)]
results = [f.result() for f in as_completed(futures)]
assert len(errors) == 0, f"Errors occurred: {errors}"
assert len(results) == num_operations
def test_multiple_handlers_same_file(self):
"""Test that multiple PickleHandler instances for the same file work correctly."""
handler1 = PickleHandler(self.file_name)
handler2 = PickleHandler(self.file_name)
num_operations = 50
errors: list[Exception] = []
def use_handler1() -> None:
for i in range(num_operations):
try:
handler1.save({"handler": 1, "iteration": i})
except Exception as e:
errors.append(e)
def use_handler2() -> None:
for i in range(num_operations):
try:
handler2.save({"handler": 2, "iteration": i})
except Exception as e:
errors.append(e)
t1 = threading.Thread(target=use_handler1)
t2 = threading.Thread(target=use_handler2)
t1.start()
t2.start()
t1.join()
t2.join()
assert len(errors) == 0, f"Errors occurred: {errors}"
final_data = self.handler.load()
assert isinstance(final_data, dict)
assert "handler" in final_data
assert "iteration" in final_data