Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
70379689cf Improve URL validation with better type hints and documentation
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-03 21:22:51 +00:00
Devin AI
e891563135 Fix #2746: Add URL protocol validation for Huggingface embedder
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-03 21:19:14 +00:00
7 changed files with 119 additions and 520 deletions

View File

@@ -1,47 +0,0 @@
"""
Example of using task decomposition in CrewAI.
This example demonstrates how to use the task decomposition feature
to break down complex tasks into simpler sub-tasks.
Feature introduced in CrewAI v1.x.x
"""
from crewai import Agent, Task, Crew
researcher = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher with skills in breaking down complex topics.",
)
research_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report covering multiple industries",
agent=researcher,
)
sub_tasks = research_task.decompose(
descriptions=[
"Research AI impact on healthcare industry",
"Research AI impact on finance industry",
"Research AI impact on education industry",
],
expected_outputs=[
"A report on AI in healthcare",
"A report on AI in finance",
"A report on AI in education",
],
names=["Healthcare", "Finance", "Education"],
)
crew = Crew(
agents=[researcher],
tasks=[research_task],
)
result = crew.kickoff()
print("Final result:", result)
for i, sub_task in enumerate(research_task.sub_tasks):
print(f"Sub-task {i+1} result: {sub_task.output.raw if hasattr(sub_task, 'output') and sub_task.output else 'No output'}")

View File

@@ -19,7 +19,6 @@ from typing import (
Tuple,
Type,
Union,
ForwardRef,
)
from opentelemetry.trace import Span
@@ -138,16 +137,6 @@ class Task(BaseModel):
default=0,
description="Current number of retries"
)
parent_task: Optional['Task'] = Field(
default=None,
description="Parent task that this task was decomposed from.",
exclude=True,
)
sub_tasks: List['Task'] = Field(
default_factory=list,
description="Sub-tasks that this task was decomposed into.",
exclude=True,
)
@field_validator("guardrail")
@classmethod
@@ -257,151 +246,13 @@ class Task(BaseModel):
)
return self
def decompose(
self,
descriptions: List[str],
expected_outputs: Optional[List[str]] = None,
names: Optional[List[str]] = None
) -> List['Task']:
"""
Decompose a complex task into simpler sub-tasks.
Args:
descriptions: List of descriptions for each sub-task.
expected_outputs: Optional list of expected outputs for each sub-task.
names: Optional list of names for each sub-task.
Returns:
List of created sub-tasks.
Raises:
ValueError: If descriptions is empty, or if expected_outputs or names
have different lengths than descriptions.
Side Effects:
Modifies self.sub_tasks by adding newly created sub-tasks.
"""
if not descriptions:
raise ValueError("At least one sub-task description is required.")
if expected_outputs and len(expected_outputs) != len(descriptions):
raise ValueError(
f"If provided, expected_outputs must have the same length as descriptions. "
f"Got {len(expected_outputs)} expected outputs and {len(descriptions)} descriptions."
)
if names and len(names) != len(descriptions):
raise ValueError(
f"If provided, names must have the same length as descriptions. "
f"Got {len(names)} names and {len(descriptions)} descriptions."
)
for i, description in enumerate(descriptions):
sub_task = Task(
description=description,
expected_output=expected_outputs[i] if expected_outputs else self.expected_output,
name=names[i] if names else None,
agent=self.agent, # Inherit the agent from the parent task
tools=self.tools, # Inherit the tools from the parent task
context=[self], # Set the parent task as context for the sub-task
parent_task=self, # Reference back to the parent task
)
self.sub_tasks.append(sub_task)
return self.sub_tasks
def combine_sub_task_results(self) -> str:
"""
Combine the results from all sub-tasks into a single result for this task.
This method uses the task's agent to intelligently combine the results from
all sub-tasks. It requires an agent capable of coherent text summarization
and is designed for stateless prompt execution.
Returns:
The combined result as a string.
Raises:
ValueError: If the task has no sub-tasks or no agent assigned.
Side Effects:
None. This method does not modify the task's state.
"""
if not self.sub_tasks:
raise ValueError("Task has no sub-tasks to combine results from.")
if not self.agent:
raise ValueError("Task has no agent to combine sub-task results.")
sub_task_results = "\n\n".join([
f"Sub-task: {sub_task.description}\nResult: {sub_task.output.raw if sub_task.output else 'No result'}"
for sub_task in self.sub_tasks
])
combine_prompt = f"""
You have completed the following sub-tasks for the main task: "{self.description}"
{sub_task_results}
Based on all these sub-tasks, please provide a consolidated final answer for the main task.
Expected output format: {self.expected_output if self.expected_output else 'Not specified'}
"""
result = self.agent.execute_task(
task=self,
context=combine_prompt,
tools=self.tools or []
)
return result
def execute_sync(
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> TaskOutput:
"""
Execute the task synchronously.
If the task has sub-tasks and no output yet, this method will:
1. Execute all sub-tasks first
2. Combine their results using the agent
3. Set the combined result as this task's output
Args:
agent: Optional agent to execute the task with.
context: Optional context to pass to the task.
tools: Optional tools to pass to the task.
Returns:
TaskOutput: The result of the task execution.
Side Effects:
Sets self.output with the execution result.
"""
if self.sub_tasks and not self.output:
for sub_task in self.sub_tasks:
sub_task.execute_sync(
agent=sub_task.agent or agent,
context=context,
tools=sub_task.tools or tools or [],
)
# Combine the results from sub-tasks
result = self.combine_sub_task_results()
self.output = TaskOutput(
description=self.description,
name=self.name,
expected_output=self.expected_output,
raw=result,
agent=self.agent.role if self.agent else None,
output_format=self.output_format,
)
return self.output
"""Execute the task synchronously."""
return self._execute_core(agent, context, tools)
@property
@@ -427,55 +278,6 @@ class Task(BaseModel):
).start()
return future
def execute_sub_tasks_async(
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> List[Future[TaskOutput]]:
"""
Execute all sub-tasks asynchronously.
This method starts the execution of all sub-tasks in parallel and returns
futures that can be awaited. After all futures are complete, you should call
combine_sub_task_results() to aggregate the results.
Example:
```python
futures = task.execute_sub_tasks_async()
for future in futures:
future.result()
# Combine the results
result = task.combine_sub_task_results()
```
Args:
agent: Optional agent to execute the sub-tasks with.
context: Optional context to pass to the sub-tasks.
tools: Optional tools to pass to the sub-tasks.
Returns:
List of futures for the sub-task executions.
Raises:
ValueError: If the task has no sub-tasks.
"""
if not self.sub_tasks:
return []
futures = []
for sub_task in self.sub_tasks:
future = sub_task.execute_async(
agent=sub_task.agent or agent,
context=context,
tools=sub_task.tools or tools or [],
)
futures.append(future)
return futures
def _execute_task_async(
self,
agent: Optional[BaseAgent],
@@ -632,8 +434,6 @@ class Task(BaseModel):
"agent",
"context",
"tools",
"parent_task",
"sub_tasks",
}
copied_data = self.model_dump(exclude=exclude)
@@ -657,7 +457,6 @@ class Task(BaseModel):
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
@@ -727,6 +526,3 @@ class Task(BaseModel):
def __repr__(self):
return f"Task(description={self.description}, expected_output={self.expected_output})"
Task.model_rebuild()

View File

@@ -135,13 +135,42 @@ class EmbeddingConfigurator:
)
@staticmethod
def _configure_huggingface(config, model_name):
def _normalize_api_url(api_url: str) -> str:
"""
Normalize API URL by ensuring it has a protocol.
Args:
api_url: The API URL to normalize
Returns:
Normalized URL with protocol (defaults to http:// if missing)
"""
if not (api_url.startswith("http://") or api_url.startswith("https://")):
return f"http://{api_url}"
return api_url
@staticmethod
def _configure_huggingface(config: dict, model_name: str):
"""
Configure Huggingface embedding function with the provided config.
Args:
config: Configuration dictionary for the Huggingface embedder
model_name: Name of the model to use
Returns:
Configured HuggingFaceEmbeddingServer instance
"""
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
api_url = config.get("api_url")
if api_url:
api_url = EmbeddingConfigurator._normalize_api_url(api_url)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
url=api_url,
)
@staticmethod

View File

@@ -584,3 +584,84 @@ def test_docling_source_with_local_file():
docling_source = CrewDoclingSource(file_paths=[pdf_path])
assert docling_source.file_paths == [pdf_path]
assert docling_source.content is not None
def test_huggingface_url_validation():
"""Test that Huggingface embedder properly handles URLs without protocol."""
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
config_missing_protocol = {
"api_url": "localhost:8080/embed"
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config_missing_protocol, "test-model"
)
# Verify that the URL now has a protocol
assert embedding_function._api_url.startswith("http://")
config_with_protocol = {
"api_url": "https://localhost:8080/embed"
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config_with_protocol, "test-model"
)
# Verify that the URL remains unchanged
assert embedding_function._api_url == "https://localhost:8080/embed"
config_with_other_protocol = {
"api_url": "http://localhost:8080/embed"
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config_with_other_protocol, "test-model"
)
# Verify that the URL remains unchanged
assert embedding_function._api_url == "http://localhost:8080/embed"
config_no_url = {}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config_no_url, "test-model"
)
# Verify that no exception is raised when URL is None
assert embedding_function._api_url == 'None'
def test_huggingface_missing_protocol_with_json_source():
"""Test that JSONKnowledgeSource works with Huggingface embedder without URL protocol."""
import os
import json
import tempfile
from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Create a temporary JSON file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp:
json.dump({"test": "data", "nested": {"value": 123}}, temp)
json_path = temp.name
# Test that the URL validation works in the embedder configurator
config = {
"api_url": "localhost:8080/embed" # Missing protocol
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config, "test-model"
)
# Verify that the URL now has a protocol
assert embedding_function._api_url.startswith("http://")
os.unlink(json_path)
def test_huggingface_missing_protocol_with_string_source():
"""Test that StringKnowledgeSource works with Huggingface embedder without URL protocol."""
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
# Test that the URL validation works in the embedder configurator
config = {
"api_url": "localhost:8080/embed" # Missing protocol
}
embedding_function = EmbeddingConfigurator()._configure_huggingface(
config, "test-model"
)
# Verify that the URL now has a protocol
assert embedding_function._api_url.startswith("http://")

View File

@@ -0,0 +1,6 @@
{
"test": "data",
"nested": {
"value": 123
}
}

View File

@@ -1,157 +0,0 @@
import pytest
from unittest.mock import Mock, patch
from crewai import Agent, Task
def test_task_decomposition_structure():
"""Test that task decomposition creates the proper parent-child relationship."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_task_descriptions = [
"Research AI impact on healthcare",
"Research AI impact on finance",
"Research AI impact on education",
]
sub_tasks = parent_task.decompose(
descriptions=sub_task_descriptions,
expected_outputs=["Healthcare report", "Finance report", "Education report"],
names=["Healthcare", "Finance", "Education"],
)
assert len(sub_tasks) == 3
assert len(parent_task.sub_tasks) == 3
for sub_task in sub_tasks:
assert sub_task.parent_task == parent_task
assert parent_task in sub_task.context
def test_task_execution_with_sub_tasks():
"""Test that executing a task with sub-tasks executes the sub-tasks first."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_task_descriptions = [
"Research AI impact on healthcare",
"Research AI impact on finance",
"Research AI impact on education",
]
parent_task.decompose(
descriptions=sub_task_descriptions,
expected_outputs=["Healthcare report", "Finance report", "Education report"],
)
with patch.object(Agent, 'execute_task', return_value="Mock result") as mock_execute_task:
result = parent_task.execute_sync()
assert mock_execute_task.call_count >= 3
for sub_task in parent_task.sub_tasks:
assert sub_task.output is not None
assert result is not None
assert result.raw is not None
def test_combine_sub_task_results():
"""Test that combining sub-task results works correctly."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
for sub_task in sub_tasks:
sub_task.output = Mock()
sub_task.output.raw = f"Result for {sub_task.description}"
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
result = parent_task.combine_sub_task_results()
assert mock_execute_task.called
assert result == "Combined result"
def test_task_decomposition_validation():
"""Test that task decomposition validates inputs correctly."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
with pytest.raises(ValueError, match="At least one sub-task description is required"):
parent_task.decompose([])
with pytest.raises(ValueError, match="expected_outputs must have the same length"):
parent_task.decompose(
["Task 1", "Task 2"],
expected_outputs=["Output 1"]
)
with pytest.raises(ValueError, match="names must have the same length"):
parent_task.decompose(
["Task 1", "Task 2"],
names=["Name 1"]
)
def test_execute_sub_tasks_async():
"""Test that executing sub-tasks asynchronously works correctly."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI on various industries",
expected_output="A comprehensive report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
with patch.object(Task, 'execute_async') as mock_execute_async:
mock_future = Mock()
mock_execute_async.return_value = mock_future
futures = parent_task.execute_sub_tasks_async()
assert mock_execute_async.call_count == 2
assert len(futures) == 2

View File

@@ -1,109 +0,0 @@
import pytest
from unittest.mock import Mock, patch
from crewai import Agent, Task, TaskOutput
def test_combine_sub_task_results_no_sub_tasks():
"""Test that combining sub-task results raises an error when there are no sub-tasks."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
with pytest.raises(ValueError, match="Task has no sub-tasks to combine results from"):
parent_task.combine_sub_task_results()
def test_combine_sub_task_results_no_agent():
"""Test that combining sub-task results raises an error when there is no agent."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
sub_task = Task(
description="Research AI impact on healthcare",
expected_output="Healthcare report",
parent_task=parent_task,
)
parent_task.sub_tasks.append(sub_task)
with pytest.raises(ValueError, match="Task has no agent to combine sub-task results"):
parent_task.combine_sub_task_results()
def test_execute_sync_sets_output_after_combining():
"""Test that execute_sync sets the output after combining sub-task results."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
sub_tasks = parent_task.decompose([
"Research AI impact on healthcare",
"Research AI impact on finance",
])
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
result = parent_task.execute_sync()
assert parent_task.output is not None
assert parent_task.output.raw == "Combined result"
assert result.raw == "Combined result"
assert mock_execute_task.call_count >= 3
def test_deep_cloning_prevents_shared_state():
"""Test that deep cloning prevents shared mutable state between tasks."""
agent = Agent(
role="Researcher",
goal="Research effectively",
backstory="You're an expert researcher",
)
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
agent=agent,
)
copied_task = parent_task.copy()
copied_task.description = "Modified description"
assert parent_task.description == "Research the impact of AI"
assert copied_task.description == "Modified description"
parent_task.decompose(["Sub-task 1", "Sub-task 2"])
assert len(parent_task.sub_tasks) == 2
assert len(copied_task.sub_tasks) == 0
def test_execute_sub_tasks_async_empty_sub_tasks():
"""Test that execute_sub_tasks_async returns an empty list when there are no sub-tasks."""
parent_task = Task(
description="Research the impact of AI",
expected_output="A report",
)
futures = parent_task.execute_sub_tasks_async()
assert isinstance(futures, list)
assert len(futures) == 0