diff --git a/docs/how-to/human-input-on-execution.mdx b/docs/how-to/human-input-on-execution.mdx index 06b3ba588..310fc025f 100644 --- a/docs/how-to/human-input-on-execution.mdx +++ b/docs/how-to/human-input-on-execution.mdx @@ -20,10 +20,20 @@ By default, human input is collected via the command line using the `input()` fu ```python def get_input_from_api(final_answer: str) -> str: - """Get human feedback from an API instead of CLI.""" - # Make an API call to get feedback - response = requests.post("https://your-api.com/feedback", json={"answer": final_answer}) - return response.json()["feedback"] + """Get human feedback from an API instead of CLI with error handling.""" + try: + # Make an API call to get feedback + response = requests.post( + "https://your-api.com/feedback", + json={"answer": final_answer}, + timeout=10 # Set timeout to avoid long waits + ) + response.raise_for_status() # Raise exception for HTTP errors + return response.json().get("feedback", "") + except (requests.RequestException, json.JSONDecodeError, KeyError) as e: + print(f"Error getting feedback from API: {str(e)}") + # Fallback to CLI input if API fails + return input(f"API failed, please provide feedback manually:\n\n{final_answer}\n\nYour feedback: ") task = Task( description="Analyze the latest market trends", @@ -34,6 +44,8 @@ task = Task( ) ``` +Note: CrewAI will automatically fallback to the default input method if your custom function raises an exception, but implementing your own fallback gives you more control over the user experience. + The custom function should: - Accept a string parameter (the agent's final answer) - Return a string (the human feedback) diff --git a/src/crewai/agents/crew_agent_executor.py b/src/crewai/agents/crew_agent_executor.py index 1a0670340..ef0d48def 100644 --- a/src/crewai/agents/crew_agent_executor.py +++ b/src/crewai/agents/crew_agent_executor.py @@ -81,7 +81,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self.respect_context_window = respect_context_window self.request_within_rpm_limit = request_within_rpm_limit self.ask_for_human_input = False - self.ask_human_input_function = None + self.ask_human_input_function: Optional[Callable[[str], str]] = None self.messages: List[Dict[str, str]] = [] self.iterations = 0 self.log_error_after = 3 @@ -535,17 +535,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): Returns: AgentFinish: The final answer after processing feedback """ - # Get output from either return_values dict or output attribute - output = "" - if hasattr(formatted_answer, "return_values") and formatted_answer.return_values: - output = formatted_answer.return_values.get("output", "") - elif hasattr(formatted_answer, "output"): - output = formatted_answer.output + output = self._extract_output_from_agent_finish(formatted_answer) # Use custom function if provided, otherwise use default - if self.ask_human_input_function and callable(self.ask_human_input_function): - human_feedback = self.ask_human_input_function(output) - else: + try: + if self.ask_human_input_function and callable(self.ask_human_input_function): + human_feedback = self.ask_human_input_function(output) + else: + human_feedback = self._ask_human_input(output) + except Exception as e: + # Fallback to default method if custom method fails + print(f"Error using custom input function: {str(e)}. Falling back to default.") human_feedback = self._ask_human_input(output) if self._is_training_mode(): @@ -585,21 +585,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): self.ask_for_human_input = False else: answer = self._process_feedback_iteration(feedback) - # Get output from either return_values dict or output attribute - output = "" - if hasattr(answer, "return_values") and answer.return_values: - output = answer.return_values.get("output", "") - elif hasattr(answer, "output"): - output = answer.output - + output = self._extract_output_from_agent_finish(answer) + # Use custom function if provided, otherwise use default - if self.ask_human_input_function and callable(self.ask_human_input_function): - feedback = self.ask_human_input_function(output) - else: + try: + if self.ask_human_input_function and callable(self.ask_human_input_function): + feedback = self.ask_human_input_function(output) + else: + feedback = self._ask_human_input(output) + except Exception as e: + # Fallback to default method if custom method fails + print(f"Error using custom input function: {str(e)}. Falling back to default.") feedback = self._ask_human_input(output) return answer + def _extract_output_from_agent_finish(self, agent_finish: AgentFinish) -> str: + """Extract output from an AgentFinish object.""" + output = "" + if hasattr(agent_finish, "return_values") and agent_finish.return_values: + output = agent_finish.return_values.get("output", "") + elif hasattr(agent_finish, "output"): + output = agent_finish.output + return output + def _process_feedback_iteration(self, feedback: str) -> AgentFinish: """Process a single feedback iteration.""" self.messages.append( diff --git a/src/crewai/task.py b/src/crewai/task.py index 973abada1..31bf714f6 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -196,6 +196,14 @@ class Task(BaseModel): "If return type is annotated, it must be Tuple[bool, Any]" ) return v + + @field_validator("ask_human_input") + @classmethod + def validate_ask_human_input(cls, v: Optional[Callable]) -> Optional[Callable]: + """Validate that the ask_human_input function is callable.""" + if v is not None and not callable(v): + raise ValueError("ask_human_input must be a callable function") + return v _original_description: Optional[str] = PrivateAttr(default=None) _original_expected_output: Optional[str] = PrivateAttr(default=None) diff --git a/tests/test_custom_human_input.py b/tests/test_custom_human_input.py index 3cf66b09b..b1d7d688a 100644 --- a/tests/test_custom_human_input.py +++ b/tests/test_custom_human_input.py @@ -1,5 +1,6 @@ +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, MagicMock from crewai.task import Task @@ -21,3 +22,54 @@ def test_task_custom_human_input_parameter(): # Verify the parameter was stored correctly assert task.ask_human_input == custom_input_func assert callable(task.ask_human_input) + + +def test_task_invalid_human_input_parameter(): + """Test that non-callable input raises validation error.""" + with pytest.raises(ValueError) as exc_info: + Task( + description="Test task", + expected_output="Test output", + human_input=True, + ask_human_input="not_a_function" + ) + + assert "Input should be callable" in str(exc_info.value) + + +def test_custom_input_function_error_handling(): + """Test handling of errors in custom input function.""" + def failing_input(_): + raise Exception("API Error") + + # Create a simplified test for error handling + # We'll directly test the error handling in the _handle_human_feedback method + + # Create a mock agent finish object with a simple output + agent_finish = MagicMock() + agent_finish.output = "Test output" + + # Create a mock executor with our failing function + executor = MagicMock() + executor.ask_human_input_function = failing_input + + # Set up the default input method mock + executor._ask_human_input = MagicMock(return_value="Default input used") + + # Add the extract method that returns the output directly + executor._extract_output_from_agent_finish = MagicMock(return_value="Test output") + + # Test the error handling by calling the method directly + from crewai.agents.crew_agent_executor import CrewAgentExecutor + + # Capture print output to verify error message + with patch('builtins.print') as mock_print: + # Call the method we're testing + CrewAgentExecutor._handle_human_feedback(executor, agent_finish) + + # Verify error was printed + mock_print.assert_called_once() + assert "Error using custom input function" in mock_print.call_args[0][0] + + # Verify fallback to default method occurred + executor._ask_human_input.assert_called_once_with("Test output")