Compare commits

..

4 Commits

Author SHA1 Message Date
Heitor Sammuel Carvalho
2fd5c46b42 Emit FlowFailedEvent and finalise batch via added event listener 2025-12-18 11:04:46 -03:00
Heitor Sammuel Carvalho
97fed5229f Remove redundant parameter from emit call 2025-12-18 11:03:53 -03:00
Heitor Sammuel Carvalho
0f28d14e61 Move flow trace collection and batch finalisation to event listener 2025-12-18 11:03:10 -03:00
Heitor Sammuel Carvalho
b4b6434480 Add flow failed event 2025-12-18 11:00:16 -03:00
6 changed files with 40 additions and 140 deletions

View File

@@ -131,28 +131,11 @@ class RagTool(BaseTool):
@field_validator("config", mode="before")
@classmethod
def _validate_config(cls, value: Any) -> Any:
"""Validate config with improved error messages for embedding providers.
Also normalizes 'embedder' key to 'embedding_model' for backward compatibility
with documentation examples that use the 'embedder' key.
"""
"""Validate config with improved error messages for embedding providers."""
if not isinstance(value, dict):
return value
embedder = value.get("embedder")
embedding_model = value.get("embedding_model")
if embedder is not None and embedding_model is not None:
raise ValueError(
"Cannot specify both 'embedder' and 'embedding_model' in config. "
"Please use only one of them (they are aliases for the same setting)."
)
if embedder is not None:
value["embedding_model"] = embedder
del value["embedder"]
embedding_model = embedder
if embedding_model:
try:
value["embedding_model"] = _validate_embedding_config(embedding_model)

View File

@@ -65,10 +65,8 @@ class RagToolConfig(TypedDict, total=False):
Attributes:
embedding_model: Embedding model configuration accepted by RAG tools.
embedder: Alias for embedding_model (for backward compatibility with docs).
vectordb: Vector database configuration accepted by RAG tools.
"""
embedding_model: ProviderSpec
embedder: ProviderSpec
vectordb: VectorDbConfig

View File

@@ -299,113 +299,3 @@ def test_rag_tool_config_with_qdrant_and_azure_embeddings(
assert tool.adapter is not None
assert isinstance(tool.adapter, CrewAIRagAdapter)
@patch("crewai_tools.adapters.crewai_rag_adapter.create_client")
def test_rag_tool_with_embedder_key_alias(
mock_create_client: Mock,
) -> None:
"""Test that RagTool accepts 'embedder' key as alias for 'embedding_model'.
This test verifies the fix for GitHub issue #4122 where users following
the documentation examples using 'embedder' key were getting errors.
"""
mock_embedding_func = MagicMock()
mock_embedding_func.return_value = [[0.1] * 1536]
mock_client = MagicMock()
mock_client.get_or_create_collection = MagicMock(return_value=None)
mock_create_client.return_value = mock_client
with patch(
"crewai_tools.tools.rag.rag_tool.build_embedder",
return_value=mock_embedding_func,
):
class MyTool(RagTool):
pass
config = {
"embedder": {
"provider": "openai",
"config": {
"model": "text-embedding-3-small",
"api_key": "sk-test123",
},
}
}
tool = MyTool(config=config)
assert tool.adapter is not None
assert isinstance(tool.adapter, CrewAIRagAdapter)
@patch("crewai_tools.adapters.crewai_rag_adapter.create_client")
def test_rag_tool_with_bedrock_embedder_config(
mock_create_client: Mock,
) -> None:
"""Test RagTool with Amazon Bedrock embedder configuration.
This test verifies the fix for GitHub issue #4122 where users trying
to use Bedrock embeddings were getting OPENAI_API_KEY errors.
"""
mock_embedding_func = MagicMock()
mock_embedding_func.return_value = [[0.1] * 1536]
mock_client = MagicMock()
mock_client.get_or_create_collection = MagicMock(return_value=None)
mock_create_client.return_value = mock_client
with patch(
"crewai_tools.tools.rag.rag_tool.build_embedder",
return_value=mock_embedding_func,
):
class MyTool(RagTool):
pass
config = {
"embedder": {
"provider": "amazon-bedrock",
"config": {
"model_name": "amazon.titan-embed-text-v2:0",
},
}
}
tool = MyTool(config=config)
assert tool.adapter is not None
assert isinstance(tool.adapter, CrewAIRagAdapter)
@patch("crewai_tools.adapters.crewai_rag_adapter.create_client")
def test_rag_tool_rejects_both_embedder_and_embedding_model(
mock_create_client: Mock,
) -> None:
"""Test that RagTool raises error when both 'embedder' and 'embedding_model' are provided."""
import pytest
mock_client = MagicMock()
mock_client.get_or_create_collection = MagicMock(return_value=None)
mock_create_client.return_value = mock_client
class MyTool(RagTool):
pass
config = {
"embedder": {
"provider": "openai",
"config": {"model": "text-embedding-3-small"},
},
"embedding_model": {
"provider": "openai",
"config": {"model": "text-embedding-3-large"},
},
}
with pytest.raises(ValueError) as exc_info:
MyTool(config=config)
assert "Cannot specify both 'embedder' and 'embedding_model'" in str(exc_info.value)

View File

@@ -33,6 +33,7 @@ from crewai.events.types.crew_events import (
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFailedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
@@ -194,6 +195,22 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self._handle_trace_event("flow_finished", source, event)
if self.batch_manager.batch_owner_type == "flow":
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
self.batch_manager.finalize_batch()
@event_bus.on(FlowFailedEvent)
def on_flow_failed(source: Any, event: FlowFailedEvent) -> None:
self._handle_trace_event("flow_failed", source, event)
if self.batch_manager.batch_owner_type == "flow":
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source: Any, event: FlowPlotEvent) -> None:

View File

@@ -67,6 +67,16 @@ class FlowFinishedEvent(FlowEvent):
state: dict[str, Any] | BaseModel
class FlowFailedEvent(FlowEvent):
"""Event emitted when a flow fails execution"""
flow_name: str
error: Exception
type: str = "flow_failed"
model_config = ConfigDict(arbitrary_types_allowed=True)
class FlowPlotEvent(FlowEvent):
"""Event emitted when a flow plot is created"""

View File

@@ -40,6 +40,7 @@ from crewai.events.listeners.tracing.utils import (
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFailedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
@@ -977,7 +978,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
future = crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
@@ -1005,7 +1005,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_output,
state=self._copy_and_serialize_state(),
@@ -1020,15 +1019,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
return final_output
except Exception as e:
future = crewai_event_bus.emit(
self,
FlowFailedEvent(
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
if future:
self._event_futures.append(future)
raise e
finally:
detach(flow_token)