mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
Address PR feedback: Add validation, error handling, refactoring, and tests
Co-Authored-By: Joe Moura <joao@crewai.com>
This commit is contained in:
@@ -20,10 +20,20 @@ By default, human input is collected via the command line using the `input()` fu
|
||||
|
||||
```python
|
||||
def get_input_from_api(final_answer: str) -> str:
|
||||
"""Get human feedback from an API instead of CLI."""
|
||||
# Make an API call to get feedback
|
||||
response = requests.post("https://your-api.com/feedback", json={"answer": final_answer})
|
||||
return response.json()["feedback"]
|
||||
"""Get human feedback from an API instead of CLI with error handling."""
|
||||
try:
|
||||
# Make an API call to get feedback
|
||||
response = requests.post(
|
||||
"https://your-api.com/feedback",
|
||||
json={"answer": final_answer},
|
||||
timeout=10 # Set timeout to avoid long waits
|
||||
)
|
||||
response.raise_for_status() # Raise exception for HTTP errors
|
||||
return response.json().get("feedback", "")
|
||||
except (requests.RequestException, json.JSONDecodeError, KeyError) as e:
|
||||
print(f"Error getting feedback from API: {str(e)}")
|
||||
# Fallback to CLI input if API fails
|
||||
return input(f"API failed, please provide feedback manually:\n\n{final_answer}\n\nYour feedback: ")
|
||||
|
||||
task = Task(
|
||||
description="Analyze the latest market trends",
|
||||
@@ -34,6 +44,8 @@ task = Task(
|
||||
)
|
||||
```
|
||||
|
||||
Note: CrewAI will automatically fallback to the default input method if your custom function raises an exception, but implementing your own fallback gives you more control over the user experience.
|
||||
|
||||
The custom function should:
|
||||
- Accept a string parameter (the agent's final answer)
|
||||
- Return a string (the human feedback)
|
||||
|
||||
@@ -81,7 +81,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self.respect_context_window = respect_context_window
|
||||
self.request_within_rpm_limit = request_within_rpm_limit
|
||||
self.ask_for_human_input = False
|
||||
self.ask_human_input_function = None
|
||||
self.ask_human_input_function: Optional[Callable[[str], str]] = None
|
||||
self.messages: List[Dict[str, str]] = []
|
||||
self.iterations = 0
|
||||
self.log_error_after = 3
|
||||
@@ -535,17 +535,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
Returns:
|
||||
AgentFinish: The final answer after processing feedback
|
||||
"""
|
||||
# Get output from either return_values dict or output attribute
|
||||
output = ""
|
||||
if hasattr(formatted_answer, "return_values") and formatted_answer.return_values:
|
||||
output = formatted_answer.return_values.get("output", "")
|
||||
elif hasattr(formatted_answer, "output"):
|
||||
output = formatted_answer.output
|
||||
output = self._extract_output_from_agent_finish(formatted_answer)
|
||||
|
||||
# Use custom function if provided, otherwise use default
|
||||
if self.ask_human_input_function and callable(self.ask_human_input_function):
|
||||
human_feedback = self.ask_human_input_function(output)
|
||||
else:
|
||||
try:
|
||||
if self.ask_human_input_function and callable(self.ask_human_input_function):
|
||||
human_feedback = self.ask_human_input_function(output)
|
||||
else:
|
||||
human_feedback = self._ask_human_input(output)
|
||||
except Exception as e:
|
||||
# Fallback to default method if custom method fails
|
||||
print(f"Error using custom input function: {str(e)}. Falling back to default.")
|
||||
human_feedback = self._ask_human_input(output)
|
||||
|
||||
if self._is_training_mode():
|
||||
@@ -585,21 +585,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self.ask_for_human_input = False
|
||||
else:
|
||||
answer = self._process_feedback_iteration(feedback)
|
||||
# Get output from either return_values dict or output attribute
|
||||
output = ""
|
||||
if hasattr(answer, "return_values") and answer.return_values:
|
||||
output = answer.return_values.get("output", "")
|
||||
elif hasattr(answer, "output"):
|
||||
output = answer.output
|
||||
|
||||
output = self._extract_output_from_agent_finish(answer)
|
||||
|
||||
# Use custom function if provided, otherwise use default
|
||||
if self.ask_human_input_function and callable(self.ask_human_input_function):
|
||||
feedback = self.ask_human_input_function(output)
|
||||
else:
|
||||
try:
|
||||
if self.ask_human_input_function and callable(self.ask_human_input_function):
|
||||
feedback = self.ask_human_input_function(output)
|
||||
else:
|
||||
feedback = self._ask_human_input(output)
|
||||
except Exception as e:
|
||||
# Fallback to default method if custom method fails
|
||||
print(f"Error using custom input function: {str(e)}. Falling back to default.")
|
||||
feedback = self._ask_human_input(output)
|
||||
|
||||
return answer
|
||||
|
||||
def _extract_output_from_agent_finish(self, agent_finish: AgentFinish) -> str:
|
||||
"""Extract output from an AgentFinish object."""
|
||||
output = ""
|
||||
if hasattr(agent_finish, "return_values") and agent_finish.return_values:
|
||||
output = agent_finish.return_values.get("output", "")
|
||||
elif hasattr(agent_finish, "output"):
|
||||
output = agent_finish.output
|
||||
return output
|
||||
|
||||
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
|
||||
"""Process a single feedback iteration."""
|
||||
self.messages.append(
|
||||
|
||||
@@ -196,6 +196,14 @@ class Task(BaseModel):
|
||||
"If return type is annotated, it must be Tuple[bool, Any]"
|
||||
)
|
||||
return v
|
||||
|
||||
@field_validator("ask_human_input")
|
||||
@classmethod
|
||||
def validate_ask_human_input(cls, v: Optional[Callable]) -> Optional[Callable]:
|
||||
"""Validate that the ask_human_input function is callable."""
|
||||
if v is not None and not callable(v):
|
||||
raise ValueError("ask_human_input must be a callable function")
|
||||
return v
|
||||
|
||||
_original_description: Optional[str] = PrivateAttr(default=None)
|
||||
_original_expected_output: Optional[str] = PrivateAttr(default=None)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from crewai.task import Task
|
||||
|
||||
@@ -21,3 +22,54 @@ def test_task_custom_human_input_parameter():
|
||||
# Verify the parameter was stored correctly
|
||||
assert task.ask_human_input == custom_input_func
|
||||
assert callable(task.ask_human_input)
|
||||
|
||||
|
||||
def test_task_invalid_human_input_parameter():
|
||||
"""Test that non-callable input raises validation error."""
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
human_input=True,
|
||||
ask_human_input="not_a_function"
|
||||
)
|
||||
|
||||
assert "Input should be callable" in str(exc_info.value)
|
||||
|
||||
|
||||
def test_custom_input_function_error_handling():
|
||||
"""Test handling of errors in custom input function."""
|
||||
def failing_input(_):
|
||||
raise Exception("API Error")
|
||||
|
||||
# Create a simplified test for error handling
|
||||
# We'll directly test the error handling in the _handle_human_feedback method
|
||||
|
||||
# Create a mock agent finish object with a simple output
|
||||
agent_finish = MagicMock()
|
||||
agent_finish.output = "Test output"
|
||||
|
||||
# Create a mock executor with our failing function
|
||||
executor = MagicMock()
|
||||
executor.ask_human_input_function = failing_input
|
||||
|
||||
# Set up the default input method mock
|
||||
executor._ask_human_input = MagicMock(return_value="Default input used")
|
||||
|
||||
# Add the extract method that returns the output directly
|
||||
executor._extract_output_from_agent_finish = MagicMock(return_value="Test output")
|
||||
|
||||
# Test the error handling by calling the method directly
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
|
||||
# Capture print output to verify error message
|
||||
with patch('builtins.print') as mock_print:
|
||||
# Call the method we're testing
|
||||
CrewAgentExecutor._handle_human_feedback(executor, agent_finish)
|
||||
|
||||
# Verify error was printed
|
||||
mock_print.assert_called_once()
|
||||
assert "Error using custom input function" in mock_print.call_args[0][0]
|
||||
|
||||
# Verify fallback to default method occurred
|
||||
executor._ask_human_input.assert_called_once_with("Test output")
|
||||
|
||||
Reference in New Issue
Block a user