Compare commits

...

3 Commits

Author SHA1 Message Date
Gabe
4637754cf1 feat: lazy import chromadb 2026-01-12 18:49:49 -03:00
João Moura
46846bcace fix: improve error handling for HumanFeedbackPending in flow execution (#4203)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fix: handle HumanFeedbackPending in flow error management

Updated the flow error handling to treat HumanFeedbackPending as expected control flow rather than an error. This change ensures that the flow can appropriately manage human feedback scenarios without signaling an error, improving the robustness of the flow execution.

* fix: improve error handling for HumanFeedbackPending in flow execution

Refined the flow error management to emit a paused event for HumanFeedbackPending exceptions instead of treating them as failures. This enhancement allows the flow to better manage human feedback scenarios, ensuring that the execution state is preserved and appropriately handled without signaling an error. Regular failure events are still emitted for other exceptions, maintaining robust error reporting.
2026-01-08 03:40:02 -03:00
João Moura
d71e91e8f2 fix: handle HumanFeedbackPending in flow error management (#4200)
Updated the flow error handling to treat HumanFeedbackPending as expected control flow rather than an error. This change ensures that the flow can appropriately manage human feedback scenarios without signaling an error, improving the robustness of the flow execution.
2026-01-08 00:52:38 -03:00
5 changed files with 84 additions and 33 deletions

View File

@@ -1203,7 +1203,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
result = self.kickoff(inputs=inputs)
result_holder.append(result)
except Exception as e:
signal_error(state, e)
# HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e)
finally:
self.stream = True
signal_end(state)
@@ -1258,7 +1264,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
result = await self.kickoff_async(inputs=inputs)
result_holder.append(result)
except Exception as e:
signal_error(state, e, is_async=True)
# HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
signal_error(state, e, is_async=True)
finally:
self.stream = True
signal_end(state, is_async=True)
@@ -1590,29 +1602,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
return result
except Exception as e:
if not self.suppress_flow_events:
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
self._persistence = SQLiteFlowPersistence()
self._persistence = SQLiteFlowPersistence()
# Regular failure
future = crewai_event_bus.emit(
self,
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
if future:
self._event_futures.append(future)
# Emit paused event (not failed)
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
MethodExecutionPausedEvent(
type="method_execution_paused",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
state=self._copy_and_serialize_state(),
flow_id=e.context.flow_id,
message=e.context.message,
emit=e.context.emit,
),
)
if future:
self._event_futures.append(future)
elif not self.suppress_flow_events:
# Regular failure - emit failed event
future = crewai_event_bus.emit(
self,
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
if future:
self._event_futures.append(future)
raise e
def _copy_and_serialize_state(self) -> dict[str, Any]:

View File

@@ -1,21 +1,24 @@
from __future__ import annotations
import logging
import traceback
from typing import Any, cast
from typing import TYPE_CHECKING, Any, cast
import warnings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.rag.chromadb.config import ChromaDBConfig
from crewai.rag.chromadb.types import ChromaEmbeddingFunctionWrapper
from crewai.rag.config.utils import get_rag_client
from crewai.rag.core.base_client import BaseClient
from crewai.rag.core.base_embeddings_provider import BaseEmbeddingsProvider
from crewai.rag.embeddings.factory import build_embedder
from crewai.rag.embeddings.types import ProviderSpec
from crewai.rag.factory import create_client
from crewai.rag.types import BaseRecord, SearchResult
from crewai.utilities.logger import Logger
if TYPE_CHECKING:
from crewai.rag.embeddings.types import ProviderSpec
class KnowledgeStorage(BaseKnowledgeStorage):
"""
Extends Storage to handle embeddings for memory entries, improving
@@ -30,6 +33,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
| None = None,
collection_name: str | None = None,
) -> None:
from crewai.rag.chromadb.config import ChromaDBConfig
from crewai.rag.chromadb.types import ChromaEmbeddingFunctionWrapper
self.collection_name = collection_name
self._client: BaseClient | None = None

View File

@@ -5,8 +5,6 @@ import traceback
from typing import TYPE_CHECKING, Any, cast
import warnings
from crewai.rag.chromadb.config import ChromaDBConfig
from crewai.rag.chromadb.types import ChromaEmbeddingFunctionWrapper
from crewai.rag.config.utils import get_rag_client
from crewai.rag.embeddings.factory import build_embedder
from crewai.rag.factory import create_client
@@ -37,6 +35,9 @@ class RAGStorage(BaseRAGStorage):
crew: Crew | None = None,
path: str | None = None,
) -> None:
from crewai.rag.chromadb.config import ChromaDBConfig
from crewai.rag.chromadb.types import ChromaEmbeddingFunctionWrapper
super().__init__(type, allow_reset, embedder_config, crew)
crew_agents = crew.agents if crew else []
sanitized_roles = [self._sanitize_role(agent.role) for agent in crew_agents]

View File

@@ -1,11 +1,12 @@
"""ChromaDB configuration model."""
from __future__ import annotations
from dataclasses import field
import os
from typing import Literal, cast
from typing import TYPE_CHECKING, Literal, cast
import warnings
from chromadb.config import Settings
from pydantic.dataclasses import dataclass as pyd_dataclass
from crewai.rag.chromadb.constants import (
@@ -13,10 +14,15 @@ from crewai.rag.chromadb.constants import (
DEFAULT_STORAGE_PATH,
DEFAULT_TENANT,
)
from crewai.rag.chromadb.types import ChromaEmbeddingFunctionWrapper
from crewai.rag.config.base import BaseRagConfig
if TYPE_CHECKING:
from chromadb.config import Settings
from crewai.rag.chromadb.types import ChromaEmbeddingFunctionWrapper
warnings.filterwarnings(
"ignore",
message=".*Mixing V1 models and V2 models.*",
@@ -37,6 +43,8 @@ def _default_settings() -> Settings:
Returns:
Settings with persistent storage and reset enabled.
"""
from chromadb.config import Settings
return Settings(
persist_directory=DEFAULT_STORAGE_PATH,
allow_reset=True,
@@ -54,6 +62,8 @@ def _default_embedding_function() -> ChromaEmbeddingFunctionWrapper:
OpenAIEmbeddingFunction,
)
from crewai.rag.chromadb.types import ChromaEmbeddingFunctionWrapper
return cast(
ChromaEmbeddingFunctionWrapper,
OpenAIEmbeddingFunction(

View File

@@ -1,13 +1,18 @@
"""Factory functions for creating ChromaDB clients."""
from __future__ import annotations
from hashlib import md5
import os
from typing import TYPE_CHECKING
from chromadb import PersistentClient
import portalocker
from crewai.rag.chromadb.client import ChromaDBClient
from crewai.rag.chromadb.config import ChromaDBConfig
if TYPE_CHECKING:
from crewai.rag.chromadb.client import ChromaDBClient
from crewai.rag.chromadb.config import ChromaDBConfig
def create_client(config: ChromaDBConfig) -> ChromaDBClient:
@@ -22,6 +27,7 @@ def create_client(config: ChromaDBConfig) -> ChromaDBClient:
Notes:
Need to update to use chromadb.Client to support more client types in the near future.
"""
from crewai.rag.chromadb.client import ChromaDBClient
persist_dir = config.settings.persist_directory
os.makedirs(persist_dir, exist_ok=True)