mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-30 19:28:29 +00:00
Compare commits
2 Commits
devin/1752
...
devin/1752
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9497715b42 | ||
|
|
f836b2dc90 |
57
examples/chunk_based_analysis_example.py
Normal file
57
examples/chunk_based_analysis_example.py
Normal file
@@ -0,0 +1,57 @@
|
||||
"""
|
||||
Example: Sequential Chunk-Based File Analysis with CrewAI
|
||||
|
||||
This example demonstrates how to use ChunkBasedTask to analyze large files
|
||||
by processing them in chunks with agent memory aggregation.
|
||||
"""
|
||||
|
||||
from crewai import Agent, Crew
|
||||
from crewai.tasks.chunk_based_task import ChunkBasedTask
|
||||
|
||||
|
||||
def main():
|
||||
document_analyzer = Agent(
|
||||
role="Document Analyzer",
|
||||
goal="Analyze documents thoroughly and extract key insights",
|
||||
backstory="""You are an expert document analyst with years of experience
|
||||
in processing and understanding complex documents. You excel at identifying
|
||||
patterns, themes, and important information across large texts."""
|
||||
)
|
||||
|
||||
analysis_task = ChunkBasedTask(
|
||||
description="""Analyze the provided document and identify:
|
||||
1. Main themes and topics
|
||||
2. Key arguments or points made
|
||||
3. Important facts or data mentioned
|
||||
4. Overall structure and organization""",
|
||||
expected_output="""A comprehensive analysis report containing:
|
||||
- Summary of main themes
|
||||
- List of key points
|
||||
- Notable facts and data
|
||||
- Assessment of document structure""",
|
||||
file_path="path/to/your/large_document.txt",
|
||||
chunk_size=4000,
|
||||
chunk_overlap=200,
|
||||
aggregation_prompt="""Synthesize the analysis from all document chunks into
|
||||
a cohesive report that captures the document's essence while highlighting
|
||||
the most important insights discovered."""
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[document_analyzer],
|
||||
tasks=[analysis_task],
|
||||
memory=True,
|
||||
verbose=True
|
||||
)
|
||||
|
||||
result = crew.kickoff()
|
||||
|
||||
print("Analysis Complete!")
|
||||
print("Final Result:", result)
|
||||
|
||||
chunk_results = analysis_task.get_chunk_results()
|
||||
print(f"Processed {len(chunk_results)} chunks")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -11,6 +11,7 @@ from crewai.llm import LLM
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
from crewai.process import Process
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.chunk_based_task import ChunkBasedTask
|
||||
from crewai.tasks.llm_guardrail import LLMGuardrail
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.telemetry.telemetry import Telemetry
|
||||
@@ -57,6 +58,7 @@ _track_install_async()
|
||||
__version__ = "0.141.0"
|
||||
__all__ = [
|
||||
"Agent",
|
||||
"ChunkBasedTask",
|
||||
"Crew",
|
||||
"CrewOutput",
|
||||
"Process",
|
||||
|
||||
159
src/crewai/tasks/chunk_based_task.py
Normal file
159
src/crewai/tasks/chunk_based_task.py
Normal file
@@ -0,0 +1,159 @@
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional, Union
|
||||
from pydantic import Field, field_validator
|
||||
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
|
||||
|
||||
class ChunkBasedTask(Task):
|
||||
"""Task that processes large files by breaking them into chunks and analyzing sequentially."""
|
||||
|
||||
file_path: Union[str, Path] = Field(
|
||||
description="Path to the file to be processed in chunks"
|
||||
)
|
||||
chunk_size: int = Field(
|
||||
default=4000,
|
||||
description="Size of each chunk in characters"
|
||||
)
|
||||
chunk_overlap: int = Field(
|
||||
default=200,
|
||||
description="Number of characters to overlap between chunks"
|
||||
)
|
||||
aggregation_prompt: Optional[str] = Field(
|
||||
default=None,
|
||||
description="Custom prompt for aggregating chunk results"
|
||||
)
|
||||
chunk_results: List[TaskOutput] = Field(
|
||||
default_factory=list,
|
||||
description="Results from processing each chunk"
|
||||
)
|
||||
|
||||
@field_validator("file_path")
|
||||
def validate_file_path(cls, v):
|
||||
path = Path(v)
|
||||
if not path.exists():
|
||||
raise ValueError(f"File not found: {path}")
|
||||
if not path.is_file():
|
||||
raise ValueError(f"Path is not a file: {path}")
|
||||
return path
|
||||
|
||||
def _read_file(self) -> str:
|
||||
"""Read the content of the file."""
|
||||
with open(self.file_path, 'r', encoding='utf-8') as f:
|
||||
return f.read()
|
||||
|
||||
def _chunk_text(self, text: str) -> List[str]:
|
||||
"""Split text into overlapping chunks."""
|
||||
if not text:
|
||||
return []
|
||||
|
||||
chunks = []
|
||||
for i in range(0, len(text), self.chunk_size - self.chunk_overlap):
|
||||
chunk = text[i:i + self.chunk_size]
|
||||
if chunk.strip():
|
||||
chunks.append(chunk)
|
||||
return chunks
|
||||
|
||||
def _create_chunk_task(self, chunk: str, chunk_index: int, total_chunks: int) -> str:
|
||||
"""Create a task description for processing a single chunk."""
|
||||
chunk_context = f"Processing chunk {chunk_index + 1} of {total_chunks}"
|
||||
if self.chunk_results:
|
||||
chunk_context += "\n\nPrevious chunk insights from memory will be available."
|
||||
|
||||
return f"{chunk_context}\n\n{self.description}\n\nChunk content:\n{chunk}"
|
||||
|
||||
def _execute_core(
|
||||
self,
|
||||
agent: Optional[BaseAgent],
|
||||
context: Optional[str],
|
||||
tools: Optional[List[Any]],
|
||||
) -> TaskOutput:
|
||||
"""Execute chunk-based processing."""
|
||||
agent = agent or self.agent
|
||||
if not agent:
|
||||
raise Exception(f"The chunk-based task '{self.description}' has no agent assigned")
|
||||
|
||||
self.start_time = datetime.datetime.now()
|
||||
|
||||
file_content = self._read_file()
|
||||
chunks = self._chunk_text(file_content)
|
||||
|
||||
if not chunks:
|
||||
raise ValueError("No valid chunks found in the file")
|
||||
|
||||
self.chunk_results = []
|
||||
|
||||
for i, chunk in enumerate(chunks):
|
||||
chunk_description = self._create_chunk_task(chunk, i, len(chunks))
|
||||
|
||||
chunk_task = Task(
|
||||
description=chunk_description,
|
||||
expected_output=self.expected_output,
|
||||
agent=agent,
|
||||
tools=tools or self.tools or []
|
||||
)
|
||||
|
||||
chunk_result = chunk_task._execute_core(agent, context, tools)
|
||||
self.chunk_results.append(chunk_result)
|
||||
|
||||
if hasattr(agent, 'crew') and agent.crew and hasattr(agent.crew, '_short_term_memory'):
|
||||
try:
|
||||
agent.crew._short_term_memory.save(
|
||||
value=f"Chunk {i+1} analysis: {chunk_result.raw}",
|
||||
metadata={
|
||||
"chunk_index": i,
|
||||
"total_chunks": len(chunks),
|
||||
"task_description": self.description
|
||||
},
|
||||
agent=agent.role
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Failed to save chunk result to memory: {e}")
|
||||
|
||||
return self._aggregate_results(agent, context, tools)
|
||||
|
||||
def _aggregate_results(
|
||||
self,
|
||||
agent: BaseAgent,
|
||||
context: Optional[str],
|
||||
tools: Optional[List[Any]]
|
||||
) -> TaskOutput:
|
||||
"""Aggregate results from all chunks."""
|
||||
if not self.chunk_results:
|
||||
raise ValueError("No chunk results to aggregate")
|
||||
|
||||
chunk_summaries = []
|
||||
for i, result in enumerate(self.chunk_results):
|
||||
chunk_summaries.append(f"Chunk {i+1} result: {result.raw}")
|
||||
|
||||
aggregation_description = self.aggregation_prompt or f"""
|
||||
Analyze and synthesize the following chunk analysis results into a comprehensive summary.
|
||||
|
||||
Original task: {self.description}
|
||||
Expected output: {self.expected_output}
|
||||
|
||||
Chunk results:
|
||||
{chr(10).join(chunk_summaries)}
|
||||
|
||||
Provide a comprehensive analysis that synthesizes insights from all chunks.
|
||||
"""
|
||||
|
||||
aggregation_task = Task(
|
||||
description=aggregation_description,
|
||||
expected_output=self.expected_output,
|
||||
agent=agent,
|
||||
tools=tools or self.tools or []
|
||||
)
|
||||
|
||||
final_result = aggregation_task._execute_core(agent, context, tools)
|
||||
|
||||
self.output = final_result
|
||||
self.end_time = datetime.datetime.now()
|
||||
return final_result
|
||||
|
||||
def get_chunk_results(self) -> List[TaskOutput]:
|
||||
"""Get results from individual chunk processing."""
|
||||
return self.chunk_results
|
||||
210
tests/test_chunk_based_task.py
Normal file
210
tests/test_chunk_based_task.py
Normal file
@@ -0,0 +1,210 @@
|
||||
import pytest
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from crewai.tasks.chunk_based_task import ChunkBasedTask
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
class TestChunkBasedTask:
|
||||
|
||||
def test_chunk_based_task_creation(self):
|
||||
"""Test creating a ChunkBasedTask with valid parameters."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write("Test content for chunking")
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
task = ChunkBasedTask(
|
||||
description="Analyze the content",
|
||||
expected_output="Analysis summary",
|
||||
file_path=temp_path,
|
||||
chunk_size=100,
|
||||
chunk_overlap=20
|
||||
)
|
||||
assert task.file_path == Path(temp_path)
|
||||
assert task.chunk_size == 100
|
||||
assert task.chunk_overlap == 20
|
||||
finally:
|
||||
Path(temp_path).unlink()
|
||||
|
||||
def test_file_path_validation(self):
|
||||
"""Test file path validation."""
|
||||
with pytest.raises(ValueError, match="File not found"):
|
||||
ChunkBasedTask(
|
||||
description="Test",
|
||||
expected_output="Test output",
|
||||
file_path="/nonexistent/file.txt"
|
||||
)
|
||||
|
||||
def test_chunk_text_method(self):
|
||||
"""Test text chunking functionality."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write("dummy content")
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
task = ChunkBasedTask(
|
||||
description="Test",
|
||||
expected_output="Test output",
|
||||
file_path=temp_path,
|
||||
chunk_size=10,
|
||||
chunk_overlap=2
|
||||
)
|
||||
|
||||
text = "This is a test text that should be chunked properly"
|
||||
chunks = task._chunk_text(text)
|
||||
|
||||
assert len(chunks) > 1
|
||||
assert all(len(chunk) <= 10 for chunk in chunks)
|
||||
assert chunks[1].startswith(chunks[0][-2:])
|
||||
finally:
|
||||
Path(temp_path).unlink()
|
||||
|
||||
def test_empty_file_handling(self):
|
||||
"""Test handling of empty files."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write("")
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
task = ChunkBasedTask(
|
||||
description="Test",
|
||||
expected_output="Test output",
|
||||
file_path=temp_path
|
||||
)
|
||||
|
||||
chunks = task._chunk_text("")
|
||||
assert chunks == []
|
||||
finally:
|
||||
Path(temp_path).unlink()
|
||||
|
||||
@patch('crewai.task.Task._execute_core')
|
||||
def test_chunk_processing_execution(self, mock_execute):
|
||||
"""Test the sequential chunk processing execution."""
|
||||
test_content = "A" * 100 + "B" * 100 + "C" * 100
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write(test_content)
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
mock_result = TaskOutput(
|
||||
description="Test",
|
||||
expected_output="Test output",
|
||||
raw="Chunk analysis result",
|
||||
agent="test_agent"
|
||||
)
|
||||
mock_execute.return_value = mock_result
|
||||
|
||||
task = ChunkBasedTask(
|
||||
description="Analyze content",
|
||||
expected_output="Analysis summary",
|
||||
file_path=temp_path,
|
||||
chunk_size=80,
|
||||
chunk_overlap=10
|
||||
)
|
||||
|
||||
mock_agent = Mock()
|
||||
mock_agent.role = "test_agent"
|
||||
mock_agent.crew = None
|
||||
|
||||
task._execute_core(mock_agent, None, None)
|
||||
|
||||
assert len(task.chunk_results) > 1
|
||||
assert mock_execute.call_count > len(task.chunk_results)
|
||||
|
||||
finally:
|
||||
Path(temp_path).unlink()
|
||||
|
||||
def test_memory_integration(self):
|
||||
"""Test integration with agent memory system."""
|
||||
test_content = "Test content for memory integration"
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write(test_content)
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
task = ChunkBasedTask(
|
||||
description="Test memory integration",
|
||||
expected_output="Memory test output",
|
||||
file_path=temp_path
|
||||
)
|
||||
|
||||
mock_agent = Mock()
|
||||
mock_agent.role = "test_agent"
|
||||
mock_crew = Mock()
|
||||
mock_memory = Mock()
|
||||
mock_crew._short_term_memory = mock_memory
|
||||
mock_agent.crew = mock_crew
|
||||
|
||||
with patch.object(task, '_aggregate_results') as mock_aggregate:
|
||||
mock_aggregate.return_value = TaskOutput(
|
||||
description="Test",
|
||||
expected_output="Test output",
|
||||
raw="Final result",
|
||||
agent="test_agent"
|
||||
)
|
||||
|
||||
with patch('crewai.task.Task._execute_core') as mock_execute:
|
||||
mock_execute.return_value = TaskOutput(
|
||||
description="Test",
|
||||
expected_output="Test output",
|
||||
raw="Chunk result",
|
||||
agent="test_agent"
|
||||
)
|
||||
|
||||
task._execute_core(mock_agent, None, None)
|
||||
|
||||
mock_memory.save.assert_called()
|
||||
|
||||
finally:
|
||||
Path(temp_path).unlink()
|
||||
|
||||
def test_get_chunk_results(self):
|
||||
"""Test accessing individual chunk results."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write("test content")
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
task = ChunkBasedTask(
|
||||
description="Test",
|
||||
expected_output="Test output",
|
||||
file_path=temp_path
|
||||
)
|
||||
|
||||
mock_result = TaskOutput(
|
||||
description="Test",
|
||||
expected_output="Test output",
|
||||
raw="Test result",
|
||||
agent="test_agent"
|
||||
)
|
||||
task.chunk_results = [mock_result]
|
||||
|
||||
results = task.get_chunk_results()
|
||||
assert len(results) == 1
|
||||
assert results[0] == mock_result
|
||||
|
||||
finally:
|
||||
Path(temp_path).unlink()
|
||||
|
||||
def test_custom_aggregation_prompt(self):
|
||||
"""Test using custom aggregation prompt."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write("test content")
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
custom_prompt = "Custom aggregation instructions"
|
||||
task = ChunkBasedTask(
|
||||
description="Test",
|
||||
expected_output="Test output",
|
||||
file_path=temp_path,
|
||||
aggregation_prompt=custom_prompt
|
||||
)
|
||||
|
||||
assert task.aggregation_prompt == custom_prompt
|
||||
|
||||
finally:
|
||||
Path(temp_path).unlink()
|
||||
91
tests/test_chunk_based_task_integration.py
Normal file
91
tests/test_chunk_based_task_integration.py
Normal file
@@ -0,0 +1,91 @@
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from crewai.tasks.chunk_based_task import ChunkBasedTask
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
|
||||
|
||||
class TestChunkBasedTaskIntegration:
|
||||
|
||||
def test_end_to_end_chunk_processing(self):
|
||||
"""Test complete chunk-based processing workflow."""
|
||||
test_content = """
|
||||
This is the first section of a large document that needs to be analyzed.
|
||||
It contains important information about the topic at hand.
|
||||
|
||||
This is the second section that builds upon the first section.
|
||||
It provides additional context and details that are relevant.
|
||||
|
||||
This is the third section that concludes the document.
|
||||
It summarizes the key points and provides final insights.
|
||||
"""
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write(test_content)
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
agent = Agent(
|
||||
role="Document Analyzer",
|
||||
goal="Analyze document content thoroughly",
|
||||
backstory="Expert at analyzing and summarizing documents"
|
||||
)
|
||||
|
||||
task = ChunkBasedTask(
|
||||
description="Analyze this document and identify key themes",
|
||||
expected_output="A comprehensive analysis of the document's main themes",
|
||||
file_path=temp_path,
|
||||
chunk_size=200,
|
||||
chunk_overlap=50,
|
||||
agent=agent
|
||||
)
|
||||
|
||||
Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
memory=True
|
||||
)
|
||||
|
||||
assert task.file_path == Path(temp_path)
|
||||
assert task.chunk_size == 200
|
||||
assert task.chunk_overlap == 50
|
||||
|
||||
finally:
|
||||
Path(temp_path).unlink()
|
||||
|
||||
def test_chunk_based_task_with_crew_structure(self):
|
||||
"""Test that ChunkBasedTask integrates properly with Crew structure."""
|
||||
test_content = "Sample content for testing crew integration with chunk-based tasks."
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
|
||||
f.write(test_content)
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
analyzer_agent = Agent(
|
||||
role="Content Analyzer",
|
||||
goal="Analyze content effectively",
|
||||
backstory="Experienced content analyst"
|
||||
)
|
||||
|
||||
chunk_task = ChunkBasedTask(
|
||||
description="Analyze the content for key insights",
|
||||
expected_output="Summary of key insights found",
|
||||
file_path=temp_path,
|
||||
chunk_size=50,
|
||||
chunk_overlap=10,
|
||||
agent=analyzer_agent
|
||||
)
|
||||
|
||||
crew = Crew(
|
||||
agents=[analyzer_agent],
|
||||
tasks=[chunk_task]
|
||||
)
|
||||
|
||||
assert len(crew.tasks) == 1
|
||||
assert isinstance(crew.tasks[0], ChunkBasedTask)
|
||||
assert crew.tasks[0].file_path == Path(temp_path)
|
||||
|
||||
finally:
|
||||
Path(temp_path).unlink()
|
||||
Reference in New Issue
Block a user