Compare commits

...

4 Commits

Author SHA1 Message Date
Devin AI
dd1b812a7f Improve code quality with better import sorting and test organization
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-12 13:27:10 +00:00
Devin AI
8ce05791d9 Refactor Flow class to improve code quality
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-12 13:25:23 +00:00
Devin AI
7925a6d507 Fix linting issues in test file
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-12 13:23:09 +00:00
Devin AI
1145f68d91 Fix issue #2347: Process tools with result_as_answer=True in Flow mode
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-12 13:21:16 +00:00
2 changed files with 200 additions and 0 deletions

View File

@@ -2,7 +2,10 @@ import asyncio
import copy
import inspect
import logging
# Forward reference for type hints
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
@@ -17,6 +20,9 @@ from typing import (
)
from uuid import uuid4
if TYPE_CHECKING:
from crewai.agent import Agent
from pydantic import BaseModel, Field, ValidationError
from crewai.flow.flow_visualizer import plot_flow
@@ -787,6 +793,27 @@ class Flow(Generic[T], metaclass=FlowMeta):
await asyncio.gather(*tasks)
final_output = self._method_outputs[-1] if self._method_outputs else None
# Check for tool results with result_as_answer=True in method-associated agents
for method_name, method in self._methods.items():
if hasattr(method, "__agent"):
tool_result = self._check_tool_results(getattr(method, "__agent"))
if tool_result:
final_output = tool_result
break
# Also check for agents stored as instance variables
for attr_name in dir(self):
if attr_name.startswith('_'):
continue
try:
attr = getattr(self, attr_name)
tool_result = self._check_tool_results(attr)
if tool_result:
final_output = tool_result
break
except (AttributeError, TypeError):
continue
crewai_event_bus.emit(
self,
@@ -1070,6 +1097,46 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif level == "warning":
logger.warning(message)
def _check_tool_results(self, obj) -> Optional[str]:
"""
Check if an object has tool results with result_as_answer=True.
Parameters
----------
obj : Any
The object to check for tool results
Returns
-------
Optional[str]
The tool result if found with result_as_answer=True, None otherwise
"""
if hasattr(obj, "tools_results") and obj.tools_results:
for tool_result in obj.tools_results:
if tool_result.get("result_as_answer", False):
return tool_result["result"]
return None
@classmethod
def with_agent(cls, agent: 'Agent') -> Callable:
"""
Decorator to associate an agent with a flow method.
This allows tracking which agents are used in the flow.
Parameters
----------
agent : Agent
The agent to associate with the method
Returns
-------
Callable
A decorator function that associates the agent with the method
"""
def decorator(func):
func.__agent = agent
return func
return decorator
def plot(self, filename: str = "crewai_flow") -> None:
crewai_event_bus.emit(
self,

View File

@@ -0,0 +1,133 @@
from typing import Type
from unittest.mock import MagicMock, patch
import pytest
from pydantic import BaseModel, Field
from crewai import Agent, Crew, Flow, Task
from crewai.flow import listen, start
from crewai.tools import BaseTool
class TestToolInput(BaseModel):
query: str = Field(..., description='Query to process')
class TestTool(BaseTool):
name: str = 'Test Tool'
description: str = 'A test tool to demonstrate the issue'
args_schema: Type[BaseModel] = TestToolInput
result_as_answer: bool = True
def _run(self, query: str) -> str:
return f'Result for query: {query}'
class TestFlowToolResult:
"""Test suite for Flow tool result handling."""
@pytest.fixture
def test_tool(self):
"""Create a test tool with result_as_answer=True."""
return TestTool()
@pytest.fixture
def test_agent(self, test_tool):
"""Create a test agent with the tool."""
return Agent(
role='Tester',
goal='Test tools',
backstory='Testing tools in Flow vs Crew',
tools=[test_tool]
)
@pytest.fixture
def test_task(self, test_agent, test_tool):
"""Create a task with the tool."""
return Task(
description='Test task using the tool',
expected_output='Test output',
agent=test_agent,
tools=[test_tool]
)
def test_flow_tool_result_as_answer(self, test_agent, test_task):
"""Test that tools with result_as_answer=True are properly processed in Flow mode."""
# Create a simple Flow with direct access to the agent
class SimpleFlow(Flow):
def __init__(self):
super().__init__()
self.test_agent = test_agent
@start()
def start_task(self):
return 'Task started'
@listen('start_task')
@Flow.with_agent(test_agent) # Associate the agent with this method
def execute_task(self):
# Simulate tool execution and setting tools_results
self.test_agent.tools_results = [{
"name": "Test Tool",
"input": {"query": "test"},
"result": "Result for query: test",
"result_as_answer": True
}]
return "Agent task execution result"
# Create a mock for Crew to return the same result
with patch('crewai.crew.Crew.kickoff') as mock_crew_kickoff:
mock_crew_kickoff.return_value = "Result for query: test"
# Test Flow
flow = SimpleFlow()
flow_result = flow.kickoff()
# Verify that Flow returns the tool result with our fix
assert flow_result == "Result for query: test", "Flow should return tool result with result_as_answer=True"
# Test Crew
crew = Crew(agents=[test_agent], tasks=[test_task])
crew_result = crew.kickoff()
# Verify that Crew returns the tool result
assert crew_result == "Result for query: test", "Crew should return tool result with result_as_answer=True"
def test_flow_multiple_tool_results(self, test_agent, test_task):
"""Test handling of multiple tool results with result_as_answer=True."""
# Create a simple Flow with direct access to the agent
class SimpleFlow(Flow):
def __init__(self):
super().__init__()
self.test_agent = test_agent
@start()
def start_task(self):
return 'Task started'
@listen('start_task')
@Flow.with_agent(test_agent)
def execute_task(self):
# Simulate multiple tool executions with result_as_answer=True
self.test_agent.tools_results = [
{
"name": "Test Tool 1",
"input": {"query": "test1"},
"result": "Result for query: test1",
"result_as_answer": True
},
{
"name": "Test Tool 2",
"input": {"query": "test2"},
"result": "Result for query: test2",
"result_as_answer": True
}
]
return "Agent task execution result"
# Test Flow with multiple tool results
flow = SimpleFlow()
flow_result = flow.kickoff()
# Verify that Flow returns the first tool result with result_as_answer=True
assert flow_result == "Result for query: test1", "Flow should return the first tool result with result_as_answer=True"