Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
af29bd495f Fix type error in crew_agent_executor.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-20 09:02:27 +00:00
Devin AI
d82a01d4f7 Address PR feedback: Add validation, error handling, refactoring, and tests
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-20 08:57:19 +00:00
Devin AI
0ff73d22d7 Add feature to override _ask_human_input function in Task (#2419)
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-03-20 08:44:15 +00:00
6 changed files with 170 additions and 3 deletions

View File

@@ -14,6 +14,43 @@ This feature is especially useful in complex decision-making processes or when a
To integrate human input into agent execution, set the `human_input` flag in the task definition. When enabled, the agent prompts the user for input before delivering its final answer.
This input can provide extra context, clarify ambiguities, or validate the agent's output.
## Customizing human input sources
By default, human input is collected via the command line using the `input()` function. However, you can override this behavior by providing a custom function to handle human input from different sources:
```python
def get_input_from_api(final_answer: str) -> str:
"""Get human feedback from an API instead of CLI with error handling."""
try:
# Make an API call to get feedback
response = requests.post(
"https://your-api.com/feedback",
json={"answer": final_answer},
timeout=10 # Set timeout to avoid long waits
)
response.raise_for_status() # Raise exception for HTTP errors
return response.json().get("feedback", "")
except (requests.RequestException, json.JSONDecodeError, KeyError) as e:
print(f"Error getting feedback from API: {str(e)}")
# Fallback to CLI input if API fails
return input(f"API failed, please provide feedback manually:\n\n{final_answer}\n\nYour feedback: ")
task = Task(
description="Analyze the latest market trends",
expected_output="A detailed analysis of market trends",
agent=analyst,
human_input=True,
ask_human_input=get_input_from_api # Use the custom function
)
```
Note: CrewAI will automatically fallback to the default input method if your custom function raises an exception, but implementing your own fallback gives you more control over the user experience.
The custom function should:
- Accept a string parameter (the agent's final answer)
- Return a string (the human feedback)
- Return an empty string if the answer is acceptable and no further iterations are needed
### Example:
```shell
@@ -95,4 +132,4 @@ result = crew.kickoff()
print("######################")
print(result)
```
```

View File

@@ -250,6 +250,7 @@ class Agent(BaseAgent):
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
"ask_for_human_input": task.human_input,
"ask_human_input_function": task.ask_human_input,
}
)["output"]
except Exception as e:

View File

@@ -81,6 +81,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit
self.ask_for_human_input = False
self.ask_human_input_function: Optional[Callable[[str], str]] = None
self.messages: List[Dict[str, str]] = []
self.iterations = 0
self.log_error_after = 3
@@ -103,6 +104,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_start_logs()
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
# Type checking for ask_human_input_function to ensure it's callable or None
ask_human_input = inputs.get("ask_human_input_function", None)
if ask_human_input is not None and not callable(ask_human_input):
print(f"Warning: ask_human_input_function is not callable, ignoring: {ask_human_input}")
self.ask_human_input_function = None
else:
self.ask_human_input_function = ask_human_input
try:
formatted_answer = self._invoke_loop()
@@ -533,7 +542,18 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
AgentFinish: The final answer after processing feedback
"""
human_feedback = self._ask_human_input(formatted_answer.output)
output = self._extract_output_from_agent_finish(formatted_answer)
# Use custom function if provided, otherwise use default
try:
if self.ask_human_input_function and callable(self.ask_human_input_function):
human_feedback = self.ask_human_input_function(output)
else:
human_feedback = self._ask_human_input(output)
except Exception as e:
# Fallback to default method if custom method fails
print(f"Error using custom input function: {str(e)}. Falling back to default.")
human_feedback = self._ask_human_input(output)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)
@@ -572,10 +592,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
feedback = self._ask_human_input(answer.output)
output = self._extract_output_from_agent_finish(answer)
# Use custom function if provided, otherwise use default
try:
if self.ask_human_input_function and callable(self.ask_human_input_function):
feedback = self.ask_human_input_function(output)
else:
feedback = self._ask_human_input(output)
except Exception as e:
# Fallback to default method if custom method fails
print(f"Error using custom input function: {str(e)}. Falling back to default.")
feedback = self._ask_human_input(output)
return answer
def _extract_output_from_agent_finish(self, agent_finish: AgentFinish) -> str:
"""Extract output from an AgentFinish object."""
output = ""
if hasattr(agent_finish, "return_values") and agent_finish.return_values:
output = agent_finish.return_values.get("output", "")
elif hasattr(agent_finish, "output"):
output = agent_finish.output
return output
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration."""
self.messages.append(

View File

@@ -131,6 +131,10 @@ class Task(BaseModel):
description="Whether the task should have a human review the final answer of the agent",
default=False,
)
ask_human_input: Optional[Callable[[str], str]] = Field(
description="Function to override the default human input method. Should accept a string (final_answer) and return a string (human feedback)",
default=None,
)
converter_cls: Optional[Type[Converter]] = Field(
description="A converter class used to export structured output",
default=None,
@@ -192,6 +196,14 @@ class Task(BaseModel):
"If return type is annotated, it must be Tuple[bool, Any]"
)
return v
@field_validator("ask_human_input")
@classmethod
def validate_ask_human_input(cls, v: Optional[Callable]) -> Optional[Callable]:
"""Validate that the ask_human_input function is callable."""
if v is not None and not callable(v):
raise ValueError("ask_human_input must be a callable function")
return v
_original_description: Optional[str] = PrivateAttr(default=None)
_original_expected_output: Optional[str] = PrivateAttr(default=None)

View File

@@ -1205,6 +1205,7 @@ def test_agent_max_retry_limit():
"tool_names": "",
"tools": "",
"ask_for_human_input": True,
"ask_human_input_function": None,
}
),
mock.call(
@@ -1213,6 +1214,7 @@ def test_agent_max_retry_limit():
"tool_names": "",
"tools": "",
"ask_for_human_input": True,
"ask_human_input_function": None,
}
),
]

View File

@@ -0,0 +1,75 @@
from unittest.mock import MagicMock, patch
import pytest
from crewai.task import Task
def test_task_custom_human_input_parameter():
"""Test that the Task class accepts the ask_human_input parameter."""
# Custom human input function
def custom_input_func(final_answer):
return "Custom feedback"
# Create a task with the custom function
task = Task(
description="Test task",
expected_output="Test output",
human_input=True,
ask_human_input=custom_input_func
)
# Verify the parameter was stored correctly
assert task.ask_human_input == custom_input_func
assert callable(task.ask_human_input)
def test_task_invalid_human_input_parameter():
"""Test that non-callable input raises validation error."""
with pytest.raises(ValueError) as exc_info:
Task(
description="Test task",
expected_output="Test output",
human_input=True,
ask_human_input="not_a_function"
)
assert "Input should be callable" in str(exc_info.value)
def test_custom_input_function_error_handling():
"""Test handling of errors in custom input function."""
def failing_input(_):
raise Exception("API Error")
# Create a simplified test for error handling
# We'll directly test the error handling in the _handle_human_feedback method
# Create a mock agent finish object with a simple output
agent_finish = MagicMock()
agent_finish.output = "Test output"
# Create a mock executor with our failing function
executor = MagicMock()
executor.ask_human_input_function = failing_input
# Set up the default input method mock
executor._ask_human_input = MagicMock(return_value="Default input used")
# Add the extract method that returns the output directly
executor._extract_output_from_agent_finish = MagicMock(return_value="Test output")
# Test the error handling by calling the method directly
from crewai.agents.crew_agent_executor import CrewAgentExecutor
# Capture print output to verify error message
with patch('builtins.print') as mock_print:
# Call the method we're testing
CrewAgentExecutor._handle_human_feedback(executor, agent_finish)
# Verify error was printed
mock_print.assert_called_once()
assert "Error using custom input function" in mock_print.call_args[0][0]
# Verify fallback to default method occurred
executor._ask_human_input.assert_called_once_with("Test output")