mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-29 01:58:14 +00:00
Compare commits
4 Commits
devin/1745
...
63028e1b20
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
63028e1b20 | ||
|
|
81759e8c72 | ||
|
|
27472ba69e | ||
|
|
25aa774d8c |
@@ -1,47 +0,0 @@
|
||||
"""
|
||||
Example of using task decomposition in CrewAI.
|
||||
|
||||
This example demonstrates how to use the task decomposition feature
|
||||
to break down complex tasks into simpler sub-tasks.
|
||||
|
||||
Feature introduced in CrewAI v1.x.x
|
||||
"""
|
||||
|
||||
from crewai import Agent, Task, Crew
|
||||
|
||||
researcher = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher with skills in breaking down complex topics.",
|
||||
)
|
||||
|
||||
research_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report covering multiple industries",
|
||||
agent=researcher,
|
||||
)
|
||||
|
||||
sub_tasks = research_task.decompose(
|
||||
descriptions=[
|
||||
"Research AI impact on healthcare industry",
|
||||
"Research AI impact on finance industry",
|
||||
"Research AI impact on education industry",
|
||||
],
|
||||
expected_outputs=[
|
||||
"A report on AI in healthcare",
|
||||
"A report on AI in finance",
|
||||
"A report on AI in education",
|
||||
],
|
||||
names=["Healthcare", "Finance", "Education"],
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[researcher],
|
||||
tasks=[research_task],
|
||||
)
|
||||
|
||||
result = crew.kickoff()
|
||||
print("Final result:", result)
|
||||
|
||||
for i, sub_task in enumerate(research_task.sub_tasks):
|
||||
print(f"Sub-task {i+1} result: {sub_task.output.raw if hasattr(sub_task, 'output') and sub_task.output else 'No output'}")
|
||||
@@ -14,13 +14,13 @@ class Knowledge(BaseModel):
|
||||
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
|
||||
Args:
|
||||
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
|
||||
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
|
||||
storage: Optional[KnowledgeStorage] = Field(default=None)
|
||||
embedder_config: Optional[Dict[str, Any]] = None
|
||||
"""
|
||||
|
||||
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
|
||||
storage: Optional[KnowledgeStorage] = Field(default=None)
|
||||
embedder_config: Optional[Dict[str, Any]] = None
|
||||
collection_name: Optional[str] = None
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
||||
default_factory=list, description="The path to the file"
|
||||
)
|
||||
content: Dict[Path, str] = Field(init=False, default_factory=dict)
|
||||
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
|
||||
storage: Optional[KnowledgeStorage] = Field(default=None)
|
||||
safe_file_paths: List[Path] = Field(default_factory=list)
|
||||
|
||||
@field_validator("file_path", "file_paths", mode="before")
|
||||
@@ -62,7 +62,10 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
|
||||
|
||||
def _save_documents(self):
|
||||
"""Save the documents to the storage."""
|
||||
self.storage.save(self.chunks)
|
||||
if self.storage:
|
||||
self.storage.save(self.chunks)
|
||||
else:
|
||||
raise ValueError("No storage found to save documents.")
|
||||
|
||||
def convert_to_path(self, path: Union[Path, str]) -> Path:
|
||||
"""Convert a path to a Path object."""
|
||||
|
||||
@@ -16,7 +16,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
|
||||
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
|
||||
storage: Optional[KnowledgeStorage] = Field(default=None)
|
||||
metadata: Dict[str, Any] = Field(default_factory=dict) # Currently unused
|
||||
collection_name: Optional[str] = Field(default=None)
|
||||
|
||||
@@ -46,4 +46,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
|
||||
Save the documents to the storage.
|
||||
This method should be called after the chunks and embeddings are generated.
|
||||
"""
|
||||
self.storage.save(self.chunks)
|
||||
if self.storage:
|
||||
self.storage.save(self.chunks)
|
||||
else:
|
||||
raise ValueError("No storage found to save documents.")
|
||||
|
||||
@@ -19,7 +19,6 @@ from typing import (
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
ForwardRef,
|
||||
)
|
||||
|
||||
from opentelemetry.trace import Span
|
||||
@@ -138,16 +137,6 @@ class Task(BaseModel):
|
||||
default=0,
|
||||
description="Current number of retries"
|
||||
)
|
||||
parent_task: Optional['Task'] = Field(
|
||||
default=None,
|
||||
description="Parent task that this task was decomposed from.",
|
||||
exclude=True,
|
||||
)
|
||||
sub_tasks: List['Task'] = Field(
|
||||
default_factory=list,
|
||||
description="Sub-tasks that this task was decomposed into.",
|
||||
exclude=True,
|
||||
)
|
||||
|
||||
@field_validator("guardrail")
|
||||
@classmethod
|
||||
@@ -257,151 +246,13 @@ class Task(BaseModel):
|
||||
)
|
||||
return self
|
||||
|
||||
def decompose(
|
||||
self,
|
||||
descriptions: List[str],
|
||||
expected_outputs: Optional[List[str]] = None,
|
||||
names: Optional[List[str]] = None
|
||||
) -> List['Task']:
|
||||
"""
|
||||
Decompose a complex task into simpler sub-tasks.
|
||||
|
||||
Args:
|
||||
descriptions: List of descriptions for each sub-task.
|
||||
expected_outputs: Optional list of expected outputs for each sub-task.
|
||||
names: Optional list of names for each sub-task.
|
||||
|
||||
Returns:
|
||||
List of created sub-tasks.
|
||||
|
||||
Raises:
|
||||
ValueError: If descriptions is empty, or if expected_outputs or names
|
||||
have different lengths than descriptions.
|
||||
|
||||
Side Effects:
|
||||
Modifies self.sub_tasks by adding newly created sub-tasks.
|
||||
"""
|
||||
if not descriptions:
|
||||
raise ValueError("At least one sub-task description is required.")
|
||||
|
||||
if expected_outputs and len(expected_outputs) != len(descriptions):
|
||||
raise ValueError(
|
||||
f"If provided, expected_outputs must have the same length as descriptions. "
|
||||
f"Got {len(expected_outputs)} expected outputs and {len(descriptions)} descriptions."
|
||||
)
|
||||
|
||||
if names and len(names) != len(descriptions):
|
||||
raise ValueError(
|
||||
f"If provided, names must have the same length as descriptions. "
|
||||
f"Got {len(names)} names and {len(descriptions)} descriptions."
|
||||
)
|
||||
|
||||
for i, description in enumerate(descriptions):
|
||||
sub_task = Task(
|
||||
description=description,
|
||||
expected_output=expected_outputs[i] if expected_outputs else self.expected_output,
|
||||
name=names[i] if names else None,
|
||||
agent=self.agent, # Inherit the agent from the parent task
|
||||
tools=self.tools, # Inherit the tools from the parent task
|
||||
context=[self], # Set the parent task as context for the sub-task
|
||||
parent_task=self, # Reference back to the parent task
|
||||
)
|
||||
self.sub_tasks.append(sub_task)
|
||||
|
||||
return self.sub_tasks
|
||||
|
||||
def combine_sub_task_results(self) -> str:
|
||||
"""
|
||||
Combine the results from all sub-tasks into a single result for this task.
|
||||
|
||||
This method uses the task's agent to intelligently combine the results from
|
||||
all sub-tasks. It requires an agent capable of coherent text summarization
|
||||
and is designed for stateless prompt execution.
|
||||
|
||||
Returns:
|
||||
The combined result as a string.
|
||||
|
||||
Raises:
|
||||
ValueError: If the task has no sub-tasks or no agent assigned.
|
||||
|
||||
Side Effects:
|
||||
None. This method does not modify the task's state.
|
||||
"""
|
||||
if not self.sub_tasks:
|
||||
raise ValueError("Task has no sub-tasks to combine results from.")
|
||||
|
||||
if not self.agent:
|
||||
raise ValueError("Task has no agent to combine sub-task results.")
|
||||
|
||||
sub_task_results = "\n\n".join([
|
||||
f"Sub-task: {sub_task.description}\nResult: {sub_task.output.raw if sub_task.output else 'No result'}"
|
||||
for sub_task in self.sub_tasks
|
||||
])
|
||||
|
||||
combine_prompt = f"""
|
||||
You have completed the following sub-tasks for the main task: "{self.description}"
|
||||
|
||||
{sub_task_results}
|
||||
|
||||
Based on all these sub-tasks, please provide a consolidated final answer for the main task.
|
||||
Expected output format: {self.expected_output if self.expected_output else 'Not specified'}
|
||||
"""
|
||||
|
||||
result = self.agent.execute_task(
|
||||
task=self,
|
||||
context=combine_prompt,
|
||||
tools=self.tools or []
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def execute_sync(
|
||||
self,
|
||||
agent: Optional[BaseAgent] = None,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
) -> TaskOutput:
|
||||
"""
|
||||
Execute the task synchronously.
|
||||
|
||||
If the task has sub-tasks and no output yet, this method will:
|
||||
1. Execute all sub-tasks first
|
||||
2. Combine their results using the agent
|
||||
3. Set the combined result as this task's output
|
||||
|
||||
Args:
|
||||
agent: Optional agent to execute the task with.
|
||||
context: Optional context to pass to the task.
|
||||
tools: Optional tools to pass to the task.
|
||||
|
||||
Returns:
|
||||
TaskOutput: The result of the task execution.
|
||||
|
||||
Side Effects:
|
||||
Sets self.output with the execution result.
|
||||
"""
|
||||
if self.sub_tasks and not self.output:
|
||||
for sub_task in self.sub_tasks:
|
||||
sub_task.execute_sync(
|
||||
agent=sub_task.agent or agent,
|
||||
context=context,
|
||||
tools=sub_task.tools or tools or [],
|
||||
)
|
||||
|
||||
# Combine the results from sub-tasks
|
||||
result = self.combine_sub_task_results()
|
||||
|
||||
self.output = TaskOutput(
|
||||
description=self.description,
|
||||
name=self.name,
|
||||
expected_output=self.expected_output,
|
||||
raw=result,
|
||||
agent=self.agent.role if self.agent else None,
|
||||
output_format=self.output_format,
|
||||
)
|
||||
|
||||
return self.output
|
||||
|
||||
"""Execute the task synchronously."""
|
||||
return self._execute_core(agent, context, tools)
|
||||
|
||||
@property
|
||||
@@ -427,55 +278,6 @@ class Task(BaseModel):
|
||||
).start()
|
||||
return future
|
||||
|
||||
def execute_sub_tasks_async(
|
||||
self,
|
||||
agent: Optional[BaseAgent] = None,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[BaseTool]] = None,
|
||||
) -> List[Future[TaskOutput]]:
|
||||
"""
|
||||
Execute all sub-tasks asynchronously.
|
||||
|
||||
This method starts the execution of all sub-tasks in parallel and returns
|
||||
futures that can be awaited. After all futures are complete, you should call
|
||||
combine_sub_task_results() to aggregate the results.
|
||||
|
||||
Example:
|
||||
```python
|
||||
futures = task.execute_sub_tasks_async()
|
||||
|
||||
for future in futures:
|
||||
future.result()
|
||||
|
||||
# Combine the results
|
||||
result = task.combine_sub_task_results()
|
||||
```
|
||||
|
||||
Args:
|
||||
agent: Optional agent to execute the sub-tasks with.
|
||||
context: Optional context to pass to the sub-tasks.
|
||||
tools: Optional tools to pass to the sub-tasks.
|
||||
|
||||
Returns:
|
||||
List of futures for the sub-task executions.
|
||||
|
||||
Raises:
|
||||
ValueError: If the task has no sub-tasks.
|
||||
"""
|
||||
if not self.sub_tasks:
|
||||
return []
|
||||
|
||||
futures = []
|
||||
for sub_task in self.sub_tasks:
|
||||
future = sub_task.execute_async(
|
||||
agent=sub_task.agent or agent,
|
||||
context=context,
|
||||
tools=sub_task.tools or tools or [],
|
||||
)
|
||||
futures.append(future)
|
||||
|
||||
return futures
|
||||
|
||||
def _execute_task_async(
|
||||
self,
|
||||
agent: Optional[BaseAgent],
|
||||
@@ -632,8 +434,6 @@ class Task(BaseModel):
|
||||
"agent",
|
||||
"context",
|
||||
"tools",
|
||||
"parent_task",
|
||||
"sub_tasks",
|
||||
}
|
||||
|
||||
copied_data = self.model_dump(exclude=exclude)
|
||||
@@ -657,7 +457,6 @@ class Task(BaseModel):
|
||||
agent=cloned_agent,
|
||||
tools=cloned_tools,
|
||||
)
|
||||
|
||||
|
||||
return copied_task
|
||||
|
||||
@@ -727,6 +526,3 @@ class Task(BaseModel):
|
||||
|
||||
def __repr__(self):
|
||||
return f"Task(description={self.description}, expected_output={self.expected_output})"
|
||||
|
||||
|
||||
Task.model_rebuild()
|
||||
|
||||
@@ -1,157 +0,0 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from crewai import Agent, Task
|
||||
|
||||
|
||||
def test_task_decomposition_structure():
|
||||
"""Test that task decomposition creates the proper parent-child relationship."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_task_descriptions = [
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
"Research AI impact on education",
|
||||
]
|
||||
|
||||
sub_tasks = parent_task.decompose(
|
||||
descriptions=sub_task_descriptions,
|
||||
expected_outputs=["Healthcare report", "Finance report", "Education report"],
|
||||
names=["Healthcare", "Finance", "Education"],
|
||||
)
|
||||
|
||||
assert len(sub_tasks) == 3
|
||||
assert len(parent_task.sub_tasks) == 3
|
||||
|
||||
for sub_task in sub_tasks:
|
||||
assert sub_task.parent_task == parent_task
|
||||
assert parent_task in sub_task.context
|
||||
|
||||
|
||||
def test_task_execution_with_sub_tasks():
|
||||
"""Test that executing a task with sub-tasks executes the sub-tasks first."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_task_descriptions = [
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
"Research AI impact on education",
|
||||
]
|
||||
|
||||
parent_task.decompose(
|
||||
descriptions=sub_task_descriptions,
|
||||
expected_outputs=["Healthcare report", "Finance report", "Education report"],
|
||||
)
|
||||
|
||||
with patch.object(Agent, 'execute_task', return_value="Mock result") as mock_execute_task:
|
||||
result = parent_task.execute_sync()
|
||||
|
||||
assert mock_execute_task.call_count >= 3
|
||||
|
||||
for sub_task in parent_task.sub_tasks:
|
||||
assert sub_task.output is not None
|
||||
|
||||
assert result is not None
|
||||
assert result.raw is not None
|
||||
|
||||
|
||||
def test_combine_sub_task_results():
|
||||
"""Test that combining sub-task results works correctly."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_tasks = parent_task.decompose([
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
])
|
||||
|
||||
for sub_task in sub_tasks:
|
||||
sub_task.output = Mock()
|
||||
sub_task.output.raw = f"Result for {sub_task.description}"
|
||||
|
||||
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
|
||||
result = parent_task.combine_sub_task_results()
|
||||
|
||||
assert mock_execute_task.called
|
||||
assert result == "Combined result"
|
||||
|
||||
|
||||
def test_task_decomposition_validation():
|
||||
"""Test that task decomposition validates inputs correctly."""
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="At least one sub-task description is required"):
|
||||
parent_task.decompose([])
|
||||
|
||||
with pytest.raises(ValueError, match="expected_outputs must have the same length"):
|
||||
parent_task.decompose(
|
||||
["Task 1", "Task 2"],
|
||||
expected_outputs=["Output 1"]
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="names must have the same length"):
|
||||
parent_task.decompose(
|
||||
["Task 1", "Task 2"],
|
||||
names=["Name 1"]
|
||||
)
|
||||
|
||||
|
||||
def test_execute_sub_tasks_async():
|
||||
"""Test that executing sub-tasks asynchronously works correctly."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI on various industries",
|
||||
expected_output="A comprehensive report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_tasks = parent_task.decompose([
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
])
|
||||
|
||||
with patch.object(Task, 'execute_async') as mock_execute_async:
|
||||
mock_future = Mock()
|
||||
mock_execute_async.return_value = mock_future
|
||||
|
||||
futures = parent_task.execute_sub_tasks_async()
|
||||
|
||||
assert mock_execute_async.call_count == 2
|
||||
assert len(futures) == 2
|
||||
@@ -1,109 +0,0 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from crewai import Agent, Task, TaskOutput
|
||||
|
||||
|
||||
def test_combine_sub_task_results_no_sub_tasks():
|
||||
"""Test that combining sub-task results raises an error when there are no sub-tasks."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="Task has no sub-tasks to combine results from"):
|
||||
parent_task.combine_sub_task_results()
|
||||
|
||||
|
||||
def test_combine_sub_task_results_no_agent():
|
||||
"""Test that combining sub-task results raises an error when there is no agent."""
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
)
|
||||
|
||||
sub_task = Task(
|
||||
description="Research AI impact on healthcare",
|
||||
expected_output="Healthcare report",
|
||||
parent_task=parent_task,
|
||||
)
|
||||
parent_task.sub_tasks.append(sub_task)
|
||||
|
||||
with pytest.raises(ValueError, match="Task has no agent to combine sub-task results"):
|
||||
parent_task.combine_sub_task_results()
|
||||
|
||||
|
||||
def test_execute_sync_sets_output_after_combining():
|
||||
"""Test that execute_sync sets the output after combining sub-task results."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
sub_tasks = parent_task.decompose([
|
||||
"Research AI impact on healthcare",
|
||||
"Research AI impact on finance",
|
||||
])
|
||||
|
||||
with patch.object(Agent, 'execute_task', return_value="Combined result") as mock_execute_task:
|
||||
result = parent_task.execute_sync()
|
||||
|
||||
assert parent_task.output is not None
|
||||
assert parent_task.output.raw == "Combined result"
|
||||
assert result.raw == "Combined result"
|
||||
|
||||
assert mock_execute_task.call_count >= 3
|
||||
|
||||
|
||||
def test_deep_cloning_prevents_shared_state():
|
||||
"""Test that deep cloning prevents shared mutable state between tasks."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research effectively",
|
||||
backstory="You're an expert researcher",
|
||||
)
|
||||
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
copied_task = parent_task.copy()
|
||||
|
||||
copied_task.description = "Modified description"
|
||||
|
||||
assert parent_task.description == "Research the impact of AI"
|
||||
assert copied_task.description == "Modified description"
|
||||
|
||||
parent_task.decompose(["Sub-task 1", "Sub-task 2"])
|
||||
|
||||
assert len(parent_task.sub_tasks) == 2
|
||||
assert len(copied_task.sub_tasks) == 0
|
||||
|
||||
|
||||
def test_execute_sub_tasks_async_empty_sub_tasks():
|
||||
"""Test that execute_sub_tasks_async returns an empty list when there are no sub-tasks."""
|
||||
parent_task = Task(
|
||||
description="Research the impact of AI",
|
||||
expected_output="A report",
|
||||
)
|
||||
|
||||
futures = parent_task.execute_sub_tasks_async()
|
||||
|
||||
assert isinstance(futures, list)
|
||||
assert len(futures) == 0
|
||||
Reference in New Issue
Block a user