Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
70379689cf Improve URL validation with better type hints and documentation
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-03 21:22:51 +00:00
Devin AI
e891563135 Fix #2746: Add URL protocol validation for Huggingface embedder
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-03 21:19:14 +00:00
6 changed files with 119 additions and 247 deletions

View File

@@ -9,7 +9,6 @@ from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.cli.constants import ENV_VARS, LITELLM_PARAMS
from crewai.utilities import Logger
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
@@ -63,12 +62,8 @@ class Agent(BaseAgent):
tools: Tools at agents disposal
step_callback: Callback to be executed after each step of the agent execution.
knowledge_sources: Knowledge sources for the agent.
allow_feedback: Whether the agent can receive and process feedback during execution.
allow_conflict: Whether the agent can handle conflicts with other agents during execution.
allow_iteration: Whether the agent can iterate on its solutions based on feedback and validation.
"""
_logger = PrivateAttr(default_factory=lambda: Logger(verbose=False))
_times_executed: int = PrivateAttr(default=0)
max_execution_time: Optional[int] = Field(
default=None,
@@ -128,18 +123,6 @@ class Agent(BaseAgent):
default="safe",
description="Mode for code execution: 'safe' (using Docker) or 'unsafe' (direct execution).",
)
allow_feedback: bool = Field(
default=False,
description="Enable agent to receive and process feedback during execution.",
)
allow_conflict: bool = Field(
default=False,
description="Enable agent to handle conflicts with other agents during execution.",
)
allow_iteration: bool = Field(
default=False,
description="Enable agent to iterate on its solutions based on feedback and validation.",
)
embedder_config: Optional[Dict[str, Any]] = Field(
default=None,
description="Embedder configuration for the agent.",
@@ -156,19 +139,6 @@ class Agent(BaseAgent):
def post_init_setup(self):
self._set_knowledge()
self.agent_ops_agent_name = self.role
if self.allow_feedback:
self._logger.log("info", "Feedback mode enabled for agent.", color="bold_green")
if self.allow_conflict:
self._logger.log("info", "Conflict handling enabled for agent.", color="bold_green")
if self.allow_iteration:
self._logger.log("info", "Iteration mode enabled for agent.", color="bold_green")
# Validate boolean parameters
for param in ['allow_feedback', 'allow_conflict', 'allow_iteration']:
if not isinstance(getattr(self, param), bool):
raise ValueError(f"Parameter '{param}' must be a boolean value.")
unaccepted_attributes = [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
@@ -430,9 +400,6 @@ class Agent(BaseAgent):
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
allow_feedback=self.allow_feedback,
allow_conflict=self.allow_conflict,
allow_iteration=self.allow_iteration,
request_within_rpm_limit=(
self._rpm_controller.check_or_wait if self._rpm_controller else None
),

View File

@@ -31,34 +31,6 @@ class ToolResult:
class CrewAgentExecutor(CrewAgentExecutorMixin):
"""CrewAgentExecutor class for managing agent execution.
This class is responsible for executing agent tasks, handling tools,
managing agent interactions, and processing the results.
Parameters:
llm: The language model to use for generating responses.
task: The task to be executed.
crew: The crew that the agent belongs to.
agent: The agent to execute the task.
prompt: The prompt to use for generating responses.
max_iter: Maximum number of iterations for the agent execution.
tools: The tools available to the agent.
tools_names: The names of the tools available to the agent.
stop_words: Words that signal the end of agent execution.
tools_description: Description of the tools available to the agent.
tools_handler: Handler for tool operations.
step_callback: Callback function for each step of execution.
original_tools: Original list of tools before processing.
function_calling_llm: LLM specifically for function calling.
respect_context_window: Whether to respect the context window size.
request_within_rpm_limit: Function to check if request is within RPM limit.
callbacks: List of callback functions.
allow_feedback: Controls feedback processing during execution.
allow_conflict: Enables conflict handling between agents.
allow_iteration: Allows solution iteration based on feedback.
"""
_logger: Logger = Logger()
def __init__(
@@ -80,9 +52,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
respect_context_window: bool = False,
request_within_rpm_limit: Any = None,
callbacks: List[Any] = [],
allow_feedback: bool = False,
allow_conflict: bool = False,
allow_iteration: bool = False,
):
self._i18n: I18N = I18N()
self.llm = llm
@@ -104,9 +73,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.function_calling_llm = function_calling_llm
self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit
self.allow_feedback = allow_feedback
self.allow_conflict = allow_conflict
self.allow_iteration = allow_iteration
self.ask_for_human_input = False
self.messages: List[Dict[str, str]] = []
self.iterations = 0
@@ -521,56 +487,3 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.ask_for_human_input = False
return formatted_answer
def process_feedback(self, feedback: str) -> bool:
"""
Process feedback for the agent if feedback mode is enabled.
Parameters:
feedback (str): The feedback to process.
Returns:
bool: True if the feedback was processed successfully, False otherwise.
"""
if not self.allow_feedback:
self._logger.log("warning", "Feedback processing skipped (allow_feedback=False).", color="yellow")
return False
self._logger.log("info", f"Processing feedback: {feedback}", color="green")
# Add feedback to messages
self.messages.append(self._format_msg(f"Feedback: {feedback}"))
return True
def handle_conflict(self, other_agent: 'CrewAgentExecutor') -> bool:
"""
Handle conflict with another agent if conflict handling is enabled.
Parameters:
other_agent (CrewAgentExecutor): The other agent involved in the conflict.
Returns:
bool: True if the conflict was handled successfully, False otherwise.
"""
if not self.allow_conflict:
self._logger.log("warning", "Conflict handling skipped (allow_conflict=False).", color="yellow")
return False
self._logger.log("info", f"Handling conflict with agent: {other_agent.agent.role}", color="green")
return True
def process_iteration(self, result: Any) -> bool:
"""
Process iteration based on result if iteration mode is enabled.
Parameters:
result (Any): The result to iterate on.
Returns:
bool: True if the iteration was processed successfully, False otherwise.
"""
if not self.allow_iteration:
self._logger.log("warning", "Iteration processing skipped (allow_iteration=False).", color="yellow")
return False
self._logger.log("info", "Processing iteration on result.", color="green")
return True

View File

@@ -135,13 +135,42 @@ class EmbeddingConfigurator:
)
@staticmethod
def _configure_huggingface(config, model_name):
def _normalize_api_url(api_url: str) -> str:
"""
Normalize API URL by ensuring it has a protocol.
Args:
api_url: The API URL to normalize
Returns:
Normalized URL with protocol (defaults to http:// if missing)
"""
if not (api_url.startswith("http://") or api_url.startswith("https://")):
return f"http://{api_url}"
return api_url
@staticmethod
def _configure_huggingface(config: dict, model_name: str):
"""
Configure Huggingface embedding function with the provided config.
Args:
config: Configuration dictionary for the Huggingface embedder
model_name: Name of the model to use
Returns:
Configured HuggingFaceEmbeddingServer instance
"""
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
api_url = config.get("api_url")
if api_url:
api_url = EmbeddingConfigurator._normalize_api_url(api_url)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
url=api_url,
)
@staticmethod

View File

@@ -1625,127 +1625,3 @@ def test_agent_with_knowledge_sources():
# Assert that the agent provides the correct information
assert "red" in result.raw.lower()
def test_agent_with_feedback_conflict_iteration_params():
"""Test that the agent correctly handles the allow_feedback, allow_conflict, and allow_iteration parameters."""
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
allow_feedback=True,
allow_conflict=True,
allow_iteration=True,
)
assert agent.allow_feedback is True
assert agent.allow_conflict is True
assert agent.allow_iteration is True
# Create another agent with default values
default_agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
)
assert default_agent.allow_feedback is False
assert default_agent.allow_conflict is False
assert default_agent.allow_iteration is False
def test_agent_feedback_processing():
"""Test that the agent correctly processes feedback when allow_feedback is enabled."""
from unittest.mock import patch, MagicMock
# Create a mock CrewAgentExecutor
mock_executor = MagicMock()
mock_executor.allow_feedback = True
mock_executor.process_feedback.return_value = True
# Mock the create_agent_executor method at the module level
with patch('crewai.agent.Agent.create_agent_executor', return_value=mock_executor):
# Create an agent with allow_feedback=True
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
allow_feedback=True,
llm=MagicMock() # Mock LLM to avoid API calls
)
executor = agent.create_agent_executor()
assert executor.allow_feedback is True
result = executor.process_feedback("Test feedback")
assert result is True
executor.process_feedback.assert_called_once_with("Test feedback")
def test_agent_conflict_handling():
"""Test that the agent correctly handles conflicts when allow_conflict is enabled."""
from unittest.mock import patch, MagicMock
mock_executor1 = MagicMock()
mock_executor1.allow_conflict = True
mock_executor1.handle_conflict.return_value = True
mock_executor2 = MagicMock()
mock_executor2.allow_conflict = True
with patch('crewai.agent.Agent.create_agent_executor', return_value=mock_executor1):
# Create agents with allow_conflict=True
agent1 = Agent(
role="role1",
goal="goal1",
backstory="backstory1",
allow_conflict=True,
llm=MagicMock() # Mock LLM to avoid API calls
)
agent2 = Agent(
role="role2",
goal="goal2",
backstory="backstory2",
allow_conflict=True,
llm=MagicMock() # Mock LLM to avoid API calls
)
# Get the executors
executor1 = agent1.create_agent_executor()
executor2 = agent2.create_agent_executor()
assert executor1.allow_conflict is True
assert executor2.allow_conflict is True
result = executor1.handle_conflict(executor2)
assert result is True
executor1.handle_conflict.assert_called_once_with(executor2)
def test_agent_iteration_processing():
"""Test that the agent correctly processes iterations when allow_iteration is enabled."""
from unittest.mock import patch, MagicMock
# Create a mock CrewAgentExecutor
mock_executor = MagicMock()
mock_executor.allow_iteration = True
mock_executor.process_iteration.return_value = True
# Mock the create_agent_executor method at the module level
with patch('crewai.agent.Agent.create_agent_executor', return_value=mock_executor):
# Create an agent with allow_iteration=True
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
allow_iteration=True,
llm=MagicMock() # Mock LLM to avoid API calls
)
executor = agent.create_agent_executor()
assert executor.allow_iteration is True
result = executor.process_iteration("Test result")
assert result is True
executor.process_iteration.assert_called_once_with("Test result")

View File

@@ -584,3 +584,84 @@ def test_docling_source_with_local_file():
docling_source = CrewDoclingSource(file_paths=[pdf_path])
assert docling_source.file_paths == [pdf_path]
assert docling_source.content is not None
def test_huggingface_url_validation():
"""Test that Huggingface embedder properly handles URLs without protocol."""
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
config_missing_protocol = {
"api_url": "localhost:8080/embed"
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config_missing_protocol, "test-model"
)
# Verify that the URL now has a protocol
assert embedding_function._api_url.startswith("http://")
config_with_protocol = {
"api_url": "https://localhost:8080/embed"
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config_with_protocol, "test-model"
)
# Verify that the URL remains unchanged
assert embedding_function._api_url == "https://localhost:8080/embed"
config_with_other_protocol = {
"api_url": "http://localhost:8080/embed"
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config_with_other_protocol, "test-model"
)
# Verify that the URL remains unchanged
assert embedding_function._api_url == "http://localhost:8080/embed"
config_no_url = {}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config_no_url, "test-model"
)
# Verify that no exception is raised when URL is None
assert embedding_function._api_url == 'None'
def test_huggingface_missing_protocol_with_json_source():
"""Test that JSONKnowledgeSource works with Huggingface embedder without URL protocol."""
import os
import json
import tempfile
from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Create a temporary JSON file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp:
json.dump({"test": "data", "nested": {"value": 123}}, temp)
json_path = temp.name
# Test that the URL validation works in the embedder configurator
config = {
"api_url": "localhost:8080/embed" # Missing protocol
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config, "test-model"
)
# Verify that the URL now has a protocol
assert embedding_function._api_url.startswith("http://")
os.unlink(json_path)
def test_huggingface_missing_protocol_with_string_source():
"""Test that StringKnowledgeSource works with Huggingface embedder without URL protocol."""
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Test that the URL validation works in the embedder configurator
config = {
"api_url": "localhost:8080/embed" # Missing protocol
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config, "test-model"
)
# Verify that the URL now has a protocol
assert embedding_function._api_url.startswith("http://")

View File

@@ -0,0 +1,6 @@
{
"test": "data",
"nested": {
"value": 123
}
}