mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-30 19:28:29 +00:00
Compare commits
3 Commits
bugfix-pyt
...
devin/1742
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af29bd495f | ||
|
|
d82a01d4f7 | ||
|
|
0ff73d22d7 |
@@ -14,6 +14,43 @@ This feature is especially useful in complex decision-making processes or when a
|
||||
To integrate human input into agent execution, set the `human_input` flag in the task definition. When enabled, the agent prompts the user for input before delivering its final answer.
|
||||
This input can provide extra context, clarify ambiguities, or validate the agent's output.
|
||||
|
||||
## Customizing human input sources
|
||||
|
||||
By default, human input is collected via the command line using the `input()` function. However, you can override this behavior by providing a custom function to handle human input from different sources:
|
||||
|
||||
```python
|
||||
def get_input_from_api(final_answer: str) -> str:
|
||||
"""Get human feedback from an API instead of CLI with error handling."""
|
||||
try:
|
||||
# Make an API call to get feedback
|
||||
response = requests.post(
|
||||
"https://your-api.com/feedback",
|
||||
json={"answer": final_answer},
|
||||
timeout=10 # Set timeout to avoid long waits
|
||||
)
|
||||
response.raise_for_status() # Raise exception for HTTP errors
|
||||
return response.json().get("feedback", "")
|
||||
except (requests.RequestException, json.JSONDecodeError, KeyError) as e:
|
||||
print(f"Error getting feedback from API: {str(e)}")
|
||||
# Fallback to CLI input if API fails
|
||||
return input(f"API failed, please provide feedback manually:\n\n{final_answer}\n\nYour feedback: ")
|
||||
|
||||
task = Task(
|
||||
description="Analyze the latest market trends",
|
||||
expected_output="A detailed analysis of market trends",
|
||||
agent=analyst,
|
||||
human_input=True,
|
||||
ask_human_input=get_input_from_api # Use the custom function
|
||||
)
|
||||
```
|
||||
|
||||
Note: CrewAI will automatically fallback to the default input method if your custom function raises an exception, but implementing your own fallback gives you more control over the user experience.
|
||||
|
||||
The custom function should:
|
||||
- Accept a string parameter (the agent's final answer)
|
||||
- Return a string (the human feedback)
|
||||
- Return an empty string if the answer is acceptable and no further iterations are needed
|
||||
|
||||
### Example:
|
||||
|
||||
```shell
|
||||
@@ -95,4 +132,4 @@ result = crew.kickoff()
|
||||
|
||||
print("######################")
|
||||
print(result)
|
||||
```
|
||||
```
|
||||
|
||||
@@ -250,6 +250,7 @@ class Agent(BaseAgent):
|
||||
"tool_names": self.agent_executor.tools_names,
|
||||
"tools": self.agent_executor.tools_description,
|
||||
"ask_for_human_input": task.human_input,
|
||||
"ask_human_input_function": task.ask_human_input,
|
||||
}
|
||||
)["output"]
|
||||
except Exception as e:
|
||||
|
||||
@@ -81,6 +81,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self.respect_context_window = respect_context_window
|
||||
self.request_within_rpm_limit = request_within_rpm_limit
|
||||
self.ask_for_human_input = False
|
||||
self.ask_human_input_function: Optional[Callable[[str], str]] = None
|
||||
self.messages: List[Dict[str, str]] = []
|
||||
self.iterations = 0
|
||||
self.log_error_after = 3
|
||||
@@ -103,6 +104,14 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self._show_start_logs()
|
||||
|
||||
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
|
||||
|
||||
# Type checking for ask_human_input_function to ensure it's callable or None
|
||||
ask_human_input = inputs.get("ask_human_input_function", None)
|
||||
if ask_human_input is not None and not callable(ask_human_input):
|
||||
print(f"Warning: ask_human_input_function is not callable, ignoring: {ask_human_input}")
|
||||
self.ask_human_input_function = None
|
||||
else:
|
||||
self.ask_human_input_function = ask_human_input
|
||||
|
||||
try:
|
||||
formatted_answer = self._invoke_loop()
|
||||
@@ -533,7 +542,18 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
Returns:
|
||||
AgentFinish: The final answer after processing feedback
|
||||
"""
|
||||
human_feedback = self._ask_human_input(formatted_answer.output)
|
||||
output = self._extract_output_from_agent_finish(formatted_answer)
|
||||
|
||||
# Use custom function if provided, otherwise use default
|
||||
try:
|
||||
if self.ask_human_input_function and callable(self.ask_human_input_function):
|
||||
human_feedback = self.ask_human_input_function(output)
|
||||
else:
|
||||
human_feedback = self._ask_human_input(output)
|
||||
except Exception as e:
|
||||
# Fallback to default method if custom method fails
|
||||
print(f"Error using custom input function: {str(e)}. Falling back to default.")
|
||||
human_feedback = self._ask_human_input(output)
|
||||
|
||||
if self._is_training_mode():
|
||||
return self._handle_training_feedback(formatted_answer, human_feedback)
|
||||
@@ -572,10 +592,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
self.ask_for_human_input = False
|
||||
else:
|
||||
answer = self._process_feedback_iteration(feedback)
|
||||
feedback = self._ask_human_input(answer.output)
|
||||
output = self._extract_output_from_agent_finish(answer)
|
||||
|
||||
# Use custom function if provided, otherwise use default
|
||||
try:
|
||||
if self.ask_human_input_function and callable(self.ask_human_input_function):
|
||||
feedback = self.ask_human_input_function(output)
|
||||
else:
|
||||
feedback = self._ask_human_input(output)
|
||||
except Exception as e:
|
||||
# Fallback to default method if custom method fails
|
||||
print(f"Error using custom input function: {str(e)}. Falling back to default.")
|
||||
feedback = self._ask_human_input(output)
|
||||
|
||||
return answer
|
||||
|
||||
def _extract_output_from_agent_finish(self, agent_finish: AgentFinish) -> str:
|
||||
"""Extract output from an AgentFinish object."""
|
||||
output = ""
|
||||
if hasattr(agent_finish, "return_values") and agent_finish.return_values:
|
||||
output = agent_finish.return_values.get("output", "")
|
||||
elif hasattr(agent_finish, "output"):
|
||||
output = agent_finish.output
|
||||
return output
|
||||
|
||||
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
|
||||
"""Process a single feedback iteration."""
|
||||
self.messages.append(
|
||||
|
||||
@@ -131,6 +131,10 @@ class Task(BaseModel):
|
||||
description="Whether the task should have a human review the final answer of the agent",
|
||||
default=False,
|
||||
)
|
||||
ask_human_input: Optional[Callable[[str], str]] = Field(
|
||||
description="Function to override the default human input method. Should accept a string (final_answer) and return a string (human feedback)",
|
||||
default=None,
|
||||
)
|
||||
converter_cls: Optional[Type[Converter]] = Field(
|
||||
description="A converter class used to export structured output",
|
||||
default=None,
|
||||
@@ -192,6 +196,14 @@ class Task(BaseModel):
|
||||
"If return type is annotated, it must be Tuple[bool, Any]"
|
||||
)
|
||||
return v
|
||||
|
||||
@field_validator("ask_human_input")
|
||||
@classmethod
|
||||
def validate_ask_human_input(cls, v: Optional[Callable]) -> Optional[Callable]:
|
||||
"""Validate that the ask_human_input function is callable."""
|
||||
if v is not None and not callable(v):
|
||||
raise ValueError("ask_human_input must be a callable function")
|
||||
return v
|
||||
|
||||
_original_description: Optional[str] = PrivateAttr(default=None)
|
||||
_original_expected_output: Optional[str] = PrivateAttr(default=None)
|
||||
|
||||
@@ -1205,6 +1205,7 @@ def test_agent_max_retry_limit():
|
||||
"tool_names": "",
|
||||
"tools": "",
|
||||
"ask_for_human_input": True,
|
||||
"ask_human_input_function": None,
|
||||
}
|
||||
),
|
||||
mock.call(
|
||||
@@ -1213,6 +1214,7 @@ def test_agent_max_retry_limit():
|
||||
"tool_names": "",
|
||||
"tools": "",
|
||||
"ask_for_human_input": True,
|
||||
"ask_human_input_function": None,
|
||||
}
|
||||
),
|
||||
]
|
||||
|
||||
75
tests/test_custom_human_input.py
Normal file
75
tests/test_custom_human_input.py
Normal file
@@ -0,0 +1,75 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
def test_task_custom_human_input_parameter():
|
||||
"""Test that the Task class accepts the ask_human_input parameter."""
|
||||
# Custom human input function
|
||||
def custom_input_func(final_answer):
|
||||
return "Custom feedback"
|
||||
|
||||
# Create a task with the custom function
|
||||
task = Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
human_input=True,
|
||||
ask_human_input=custom_input_func
|
||||
)
|
||||
|
||||
# Verify the parameter was stored correctly
|
||||
assert task.ask_human_input == custom_input_func
|
||||
assert callable(task.ask_human_input)
|
||||
|
||||
|
||||
def test_task_invalid_human_input_parameter():
|
||||
"""Test that non-callable input raises validation error."""
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
Task(
|
||||
description="Test task",
|
||||
expected_output="Test output",
|
||||
human_input=True,
|
||||
ask_human_input="not_a_function"
|
||||
)
|
||||
|
||||
assert "Input should be callable" in str(exc_info.value)
|
||||
|
||||
|
||||
def test_custom_input_function_error_handling():
|
||||
"""Test handling of errors in custom input function."""
|
||||
def failing_input(_):
|
||||
raise Exception("API Error")
|
||||
|
||||
# Create a simplified test for error handling
|
||||
# We'll directly test the error handling in the _handle_human_feedback method
|
||||
|
||||
# Create a mock agent finish object with a simple output
|
||||
agent_finish = MagicMock()
|
||||
agent_finish.output = "Test output"
|
||||
|
||||
# Create a mock executor with our failing function
|
||||
executor = MagicMock()
|
||||
executor.ask_human_input_function = failing_input
|
||||
|
||||
# Set up the default input method mock
|
||||
executor._ask_human_input = MagicMock(return_value="Default input used")
|
||||
|
||||
# Add the extract method that returns the output directly
|
||||
executor._extract_output_from_agent_finish = MagicMock(return_value="Test output")
|
||||
|
||||
# Test the error handling by calling the method directly
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
|
||||
# Capture print output to verify error message
|
||||
with patch('builtins.print') as mock_print:
|
||||
# Call the method we're testing
|
||||
CrewAgentExecutor._handle_human_feedback(executor, agent_finish)
|
||||
|
||||
# Verify error was printed
|
||||
mock_print.assert_called_once()
|
||||
assert "Error using custom input function" in mock_print.call_args[0][0]
|
||||
|
||||
# Verify fallback to default method occurred
|
||||
executor._ask_human_input.assert_called_once_with("Test output")
|
||||
Reference in New Issue
Block a user