feat: implement sequential chunk-based file analysis with agent memory aggregation

- Add ChunkBasedTask class extending Task for large file processing
- Implement file chunking with configurable size and overlap
- Add sequential chunk processing with memory integration
- Include result aggregation and summarization capabilities
- Add comprehensive tests and example usage
- Resolves #3144

Co-Authored-By: Jo\u00E3o <joao@crewai.com>
This commit is contained in:
Devin AI
2025-07-13 00:29:59 +00:00
parent e7a5747c6b
commit f836b2dc90
5 changed files with 521 additions and 0 deletions

View File

@@ -0,0 +1,211 @@
import pytest
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch
from crewai.tasks.chunk_based_task import ChunkBasedTask
from crewai.agent import Agent
from crewai.tasks.task_output import TaskOutput
class TestChunkBasedTask:
def test_chunk_based_task_creation(self):
"""Test creating a ChunkBasedTask with valid parameters."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("Test content for chunking")
temp_path = f.name
try:
task = ChunkBasedTask(
description="Analyze the content",
expected_output="Analysis summary",
file_path=temp_path,
chunk_size=100,
chunk_overlap=20
)
assert task.file_path == Path(temp_path)
assert task.chunk_size == 100
assert task.chunk_overlap == 20
finally:
Path(temp_path).unlink()
def test_file_path_validation(self):
"""Test file path validation."""
with pytest.raises(ValueError, match="File not found"):
ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path="/nonexistent/file.txt"
)
def test_chunk_text_method(self):
"""Test text chunking functionality."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("dummy content")
temp_path = f.name
try:
task = ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path=temp_path,
chunk_size=10,
chunk_overlap=2
)
text = "This is a test text that should be chunked properly"
chunks = task._chunk_text(text)
assert len(chunks) > 1
assert all(len(chunk) <= 10 for chunk in chunks)
assert chunks[1].startswith(chunks[0][-2:])
finally:
Path(temp_path).unlink()
def test_empty_file_handling(self):
"""Test handling of empty files."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("")
temp_path = f.name
try:
task = ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path=temp_path
)
chunks = task._chunk_text("")
assert chunks == []
finally:
Path(temp_path).unlink()
@patch('crewai.task.Task._execute_core')
def test_chunk_processing_execution(self, mock_execute):
"""Test the sequential chunk processing execution."""
test_content = "A" * 100 + "B" * 100 + "C" * 100
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(test_content)
temp_path = f.name
try:
mock_result = TaskOutput(
description="Test",
expected_output="Test output",
raw="Chunk analysis result",
agent="test_agent"
)
mock_execute.return_value = mock_result
task = ChunkBasedTask(
description="Analyze content",
expected_output="Analysis summary",
file_path=temp_path,
chunk_size=80,
chunk_overlap=10
)
mock_agent = Mock()
mock_agent.role = "test_agent"
mock_agent.crew = None
result = task._execute_core(mock_agent, None, None)
assert len(task.chunk_results) > 1
assert mock_execute.call_count > len(task.chunk_results)
finally:
Path(temp_path).unlink()
def test_memory_integration(self):
"""Test integration with agent memory system."""
test_content = "Test content for memory integration"
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(test_content)
temp_path = f.name
try:
task = ChunkBasedTask(
description="Test memory integration",
expected_output="Memory test output",
file_path=temp_path
)
mock_agent = Mock()
mock_agent.role = "test_agent"
mock_crew = Mock()
mock_memory = Mock()
mock_crew._short_term_memory = mock_memory
mock_agent.crew = mock_crew
with patch.object(task, '_aggregate_results') as mock_aggregate:
mock_aggregate.return_value = TaskOutput(
description="Test",
expected_output="Test output",
raw="Final result",
agent="test_agent"
)
with patch('crewai.task.Task._execute_core') as mock_execute:
mock_execute.return_value = TaskOutput(
description="Test",
expected_output="Test output",
raw="Chunk result",
agent="test_agent"
)
task._execute_core(mock_agent, None, None)
mock_memory.save.assert_called()
finally:
Path(temp_path).unlink()
def test_get_chunk_results(self):
"""Test accessing individual chunk results."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("test content")
temp_path = f.name
try:
task = ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path=temp_path
)
mock_result = TaskOutput(
description="Test",
expected_output="Test output",
raw="Test result",
agent="test_agent"
)
task.chunk_results = [mock_result]
results = task.get_chunk_results()
assert len(results) == 1
assert results[0] == mock_result
finally:
Path(temp_path).unlink()
def test_custom_aggregation_prompt(self):
"""Test using custom aggregation prompt."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("test content")
temp_path = f.name
try:
custom_prompt = "Custom aggregation instructions"
task = ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path=temp_path,
aggregation_prompt=custom_prompt
)
assert task.aggregation_prompt == custom_prompt
finally:
Path(temp_path).unlink()

View File

@@ -0,0 +1,92 @@
import pytest
import tempfile
from pathlib import Path
from crewai.tasks.chunk_based_task import ChunkBasedTask
from crewai.agent import Agent
from crewai.crew import Crew
class TestChunkBasedTaskIntegration:
def test_end_to_end_chunk_processing(self):
"""Test complete chunk-based processing workflow."""
test_content = """
This is the first section of a large document that needs to be analyzed.
It contains important information about the topic at hand.
This is the second section that builds upon the first section.
It provides additional context and details that are relevant.
This is the third section that concludes the document.
It summarizes the key points and provides final insights.
"""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(test_content)
temp_path = f.name
try:
agent = Agent(
role="Document Analyzer",
goal="Analyze document content thoroughly",
backstory="Expert at analyzing and summarizing documents"
)
task = ChunkBasedTask(
description="Analyze this document and identify key themes",
expected_output="A comprehensive analysis of the document's main themes",
file_path=temp_path,
chunk_size=200,
chunk_overlap=50,
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task],
memory=True
)
assert task.file_path == Path(temp_path)
assert task.chunk_size == 200
assert task.chunk_overlap == 50
finally:
Path(temp_path).unlink()
def test_chunk_based_task_with_crew_structure(self):
"""Test that ChunkBasedTask integrates properly with Crew structure."""
test_content = "Sample content for testing crew integration with chunk-based tasks."
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(test_content)
temp_path = f.name
try:
analyzer_agent = Agent(
role="Content Analyzer",
goal="Analyze content effectively",
backstory="Experienced content analyst"
)
chunk_task = ChunkBasedTask(
description="Analyze the content for key insights",
expected_output="Summary of key insights found",
file_path=temp_path,
chunk_size=50,
chunk_overlap=10,
agent=analyzer_agent
)
crew = Crew(
agents=[analyzer_agent],
tasks=[chunk_task]
)
assert len(crew.tasks) == 1
assert isinstance(crew.tasks[0], ChunkBasedTask)
assert crew.tasks[0].file_path == Path(temp_path)
finally:
Path(temp_path).unlink()