Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
9497715b42 fix: resolve lint issues in ChunkBasedTask implementation
- Remove unused Dict import from typing
- Fix f-string without placeholders
- Remove unused imports and variables in tests

Co-Authored-By: Jo\u00E3o <joao@crewai.com>
2025-07-13 00:34:24 +00:00
Devin AI
f836b2dc90 feat: implement sequential chunk-based file analysis with agent memory aggregation
- Add ChunkBasedTask class extending Task for large file processing
- Implement file chunking with configurable size and overlap
- Add sequential chunk processing with memory integration
- Include result aggregation and summarization capabilities
- Add comprehensive tests and example usage
- Resolves #3144

Co-Authored-By: Jo\u00E3o <joao@crewai.com>
2025-07-13 00:29:59 +00:00
5 changed files with 519 additions and 0 deletions

View File

@@ -0,0 +1,57 @@
"""
Example: Sequential Chunk-Based File Analysis with CrewAI
This example demonstrates how to use ChunkBasedTask to analyze large files
by processing them in chunks with agent memory aggregation.
"""
from crewai import Agent, Crew
from crewai.tasks.chunk_based_task import ChunkBasedTask
def main():
document_analyzer = Agent(
role="Document Analyzer",
goal="Analyze documents thoroughly and extract key insights",
backstory="""You are an expert document analyst with years of experience
in processing and understanding complex documents. You excel at identifying
patterns, themes, and important information across large texts."""
)
analysis_task = ChunkBasedTask(
description="""Analyze the provided document and identify:
1. Main themes and topics
2. Key arguments or points made
3. Important facts or data mentioned
4. Overall structure and organization""",
expected_output="""A comprehensive analysis report containing:
- Summary of main themes
- List of key points
- Notable facts and data
- Assessment of document structure""",
file_path="path/to/your/large_document.txt",
chunk_size=4000,
chunk_overlap=200,
aggregation_prompt="""Synthesize the analysis from all document chunks into
a cohesive report that captures the document's essence while highlighting
the most important insights discovered."""
)
crew = Crew(
agents=[document_analyzer],
tasks=[analysis_task],
memory=True,
verbose=True
)
result = crew.kickoff()
print("Analysis Complete!")
print("Final Result:", result)
chunk_results = analysis_task.get_chunk_results()
print(f"Processed {len(chunk_results)} chunks")
if __name__ == "__main__":
main()

View File

@@ -11,6 +11,7 @@ from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.chunk_based_task import ChunkBasedTask
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
@@ -57,6 +58,7 @@ _track_install_async()
__version__ = "0.141.0"
__all__ = [
"Agent",
"ChunkBasedTask",
"Crew",
"CrewOutput",
"Process",

View File

@@ -0,0 +1,159 @@
import datetime
from pathlib import Path
from typing import Any, List, Optional, Union
from pydantic import Field, field_validator
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.agents.agent_builder.base_agent import BaseAgent
class ChunkBasedTask(Task):
"""Task that processes large files by breaking them into chunks and analyzing sequentially."""
file_path: Union[str, Path] = Field(
description="Path to the file to be processed in chunks"
)
chunk_size: int = Field(
default=4000,
description="Size of each chunk in characters"
)
chunk_overlap: int = Field(
default=200,
description="Number of characters to overlap between chunks"
)
aggregation_prompt: Optional[str] = Field(
default=None,
description="Custom prompt for aggregating chunk results"
)
chunk_results: List[TaskOutput] = Field(
default_factory=list,
description="Results from processing each chunk"
)
@field_validator("file_path")
def validate_file_path(cls, v):
path = Path(v)
if not path.exists():
raise ValueError(f"File not found: {path}")
if not path.is_file():
raise ValueError(f"Path is not a file: {path}")
return path
def _read_file(self) -> str:
"""Read the content of the file."""
with open(self.file_path, 'r', encoding='utf-8') as f:
return f.read()
def _chunk_text(self, text: str) -> List[str]:
"""Split text into overlapping chunks."""
if not text:
return []
chunks = []
for i in range(0, len(text), self.chunk_size - self.chunk_overlap):
chunk = text[i:i + self.chunk_size]
if chunk.strip():
chunks.append(chunk)
return chunks
def _create_chunk_task(self, chunk: str, chunk_index: int, total_chunks: int) -> str:
"""Create a task description for processing a single chunk."""
chunk_context = f"Processing chunk {chunk_index + 1} of {total_chunks}"
if self.chunk_results:
chunk_context += "\n\nPrevious chunk insights from memory will be available."
return f"{chunk_context}\n\n{self.description}\n\nChunk content:\n{chunk}"
def _execute_core(
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
) -> TaskOutput:
"""Execute chunk-based processing."""
agent = agent or self.agent
if not agent:
raise Exception(f"The chunk-based task '{self.description}' has no agent assigned")
self.start_time = datetime.datetime.now()
file_content = self._read_file()
chunks = self._chunk_text(file_content)
if not chunks:
raise ValueError("No valid chunks found in the file")
self.chunk_results = []
for i, chunk in enumerate(chunks):
chunk_description = self._create_chunk_task(chunk, i, len(chunks))
chunk_task = Task(
description=chunk_description,
expected_output=self.expected_output,
agent=agent,
tools=tools or self.tools or []
)
chunk_result = chunk_task._execute_core(agent, context, tools)
self.chunk_results.append(chunk_result)
if hasattr(agent, 'crew') and agent.crew and hasattr(agent.crew, '_short_term_memory'):
try:
agent.crew._short_term_memory.save(
value=f"Chunk {i+1} analysis: {chunk_result.raw}",
metadata={
"chunk_index": i,
"total_chunks": len(chunks),
"task_description": self.description
},
agent=agent.role
)
except Exception as e:
print(f"Failed to save chunk result to memory: {e}")
return self._aggregate_results(agent, context, tools)
def _aggregate_results(
self,
agent: BaseAgent,
context: Optional[str],
tools: Optional[List[Any]]
) -> TaskOutput:
"""Aggregate results from all chunks."""
if not self.chunk_results:
raise ValueError("No chunk results to aggregate")
chunk_summaries = []
for i, result in enumerate(self.chunk_results):
chunk_summaries.append(f"Chunk {i+1} result: {result.raw}")
aggregation_description = self.aggregation_prompt or f"""
Analyze and synthesize the following chunk analysis results into a comprehensive summary.
Original task: {self.description}
Expected output: {self.expected_output}
Chunk results:
{chr(10).join(chunk_summaries)}
Provide a comprehensive analysis that synthesizes insights from all chunks.
"""
aggregation_task = Task(
description=aggregation_description,
expected_output=self.expected_output,
agent=agent,
tools=tools or self.tools or []
)
final_result = aggregation_task._execute_core(agent, context, tools)
self.output = final_result
self.end_time = datetime.datetime.now()
return final_result
def get_chunk_results(self) -> List[TaskOutput]:
"""Get results from individual chunk processing."""
return self.chunk_results

View File

@@ -0,0 +1,210 @@
import pytest
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch
from crewai.tasks.chunk_based_task import ChunkBasedTask
from crewai.tasks.task_output import TaskOutput
class TestChunkBasedTask:
def test_chunk_based_task_creation(self):
"""Test creating a ChunkBasedTask with valid parameters."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("Test content for chunking")
temp_path = f.name
try:
task = ChunkBasedTask(
description="Analyze the content",
expected_output="Analysis summary",
file_path=temp_path,
chunk_size=100,
chunk_overlap=20
)
assert task.file_path == Path(temp_path)
assert task.chunk_size == 100
assert task.chunk_overlap == 20
finally:
Path(temp_path).unlink()
def test_file_path_validation(self):
"""Test file path validation."""
with pytest.raises(ValueError, match="File not found"):
ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path="/nonexistent/file.txt"
)
def test_chunk_text_method(self):
"""Test text chunking functionality."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("dummy content")
temp_path = f.name
try:
task = ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path=temp_path,
chunk_size=10,
chunk_overlap=2
)
text = "This is a test text that should be chunked properly"
chunks = task._chunk_text(text)
assert len(chunks) > 1
assert all(len(chunk) <= 10 for chunk in chunks)
assert chunks[1].startswith(chunks[0][-2:])
finally:
Path(temp_path).unlink()
def test_empty_file_handling(self):
"""Test handling of empty files."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("")
temp_path = f.name
try:
task = ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path=temp_path
)
chunks = task._chunk_text("")
assert chunks == []
finally:
Path(temp_path).unlink()
@patch('crewai.task.Task._execute_core')
def test_chunk_processing_execution(self, mock_execute):
"""Test the sequential chunk processing execution."""
test_content = "A" * 100 + "B" * 100 + "C" * 100
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(test_content)
temp_path = f.name
try:
mock_result = TaskOutput(
description="Test",
expected_output="Test output",
raw="Chunk analysis result",
agent="test_agent"
)
mock_execute.return_value = mock_result
task = ChunkBasedTask(
description="Analyze content",
expected_output="Analysis summary",
file_path=temp_path,
chunk_size=80,
chunk_overlap=10
)
mock_agent = Mock()
mock_agent.role = "test_agent"
mock_agent.crew = None
task._execute_core(mock_agent, None, None)
assert len(task.chunk_results) > 1
assert mock_execute.call_count > len(task.chunk_results)
finally:
Path(temp_path).unlink()
def test_memory_integration(self):
"""Test integration with agent memory system."""
test_content = "Test content for memory integration"
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(test_content)
temp_path = f.name
try:
task = ChunkBasedTask(
description="Test memory integration",
expected_output="Memory test output",
file_path=temp_path
)
mock_agent = Mock()
mock_agent.role = "test_agent"
mock_crew = Mock()
mock_memory = Mock()
mock_crew._short_term_memory = mock_memory
mock_agent.crew = mock_crew
with patch.object(task, '_aggregate_results') as mock_aggregate:
mock_aggregate.return_value = TaskOutput(
description="Test",
expected_output="Test output",
raw="Final result",
agent="test_agent"
)
with patch('crewai.task.Task._execute_core') as mock_execute:
mock_execute.return_value = TaskOutput(
description="Test",
expected_output="Test output",
raw="Chunk result",
agent="test_agent"
)
task._execute_core(mock_agent, None, None)
mock_memory.save.assert_called()
finally:
Path(temp_path).unlink()
def test_get_chunk_results(self):
"""Test accessing individual chunk results."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("test content")
temp_path = f.name
try:
task = ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path=temp_path
)
mock_result = TaskOutput(
description="Test",
expected_output="Test output",
raw="Test result",
agent="test_agent"
)
task.chunk_results = [mock_result]
results = task.get_chunk_results()
assert len(results) == 1
assert results[0] == mock_result
finally:
Path(temp_path).unlink()
def test_custom_aggregation_prompt(self):
"""Test using custom aggregation prompt."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write("test content")
temp_path = f.name
try:
custom_prompt = "Custom aggregation instructions"
task = ChunkBasedTask(
description="Test",
expected_output="Test output",
file_path=temp_path,
aggregation_prompt=custom_prompt
)
assert task.aggregation_prompt == custom_prompt
finally:
Path(temp_path).unlink()

View File

@@ -0,0 +1,91 @@
import tempfile
from pathlib import Path
from crewai.tasks.chunk_based_task import ChunkBasedTask
from crewai.agent import Agent
from crewai.crew import Crew
class TestChunkBasedTaskIntegration:
def test_end_to_end_chunk_processing(self):
"""Test complete chunk-based processing workflow."""
test_content = """
This is the first section of a large document that needs to be analyzed.
It contains important information about the topic at hand.
This is the second section that builds upon the first section.
It provides additional context and details that are relevant.
This is the third section that concludes the document.
It summarizes the key points and provides final insights.
"""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(test_content)
temp_path = f.name
try:
agent = Agent(
role="Document Analyzer",
goal="Analyze document content thoroughly",
backstory="Expert at analyzing and summarizing documents"
)
task = ChunkBasedTask(
description="Analyze this document and identify key themes",
expected_output="A comprehensive analysis of the document's main themes",
file_path=temp_path,
chunk_size=200,
chunk_overlap=50,
agent=agent
)
Crew(
agents=[agent],
tasks=[task],
memory=True
)
assert task.file_path == Path(temp_path)
assert task.chunk_size == 200
assert task.chunk_overlap == 50
finally:
Path(temp_path).unlink()
def test_chunk_based_task_with_crew_structure(self):
"""Test that ChunkBasedTask integrates properly with Crew structure."""
test_content = "Sample content for testing crew integration with chunk-based tasks."
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
f.write(test_content)
temp_path = f.name
try:
analyzer_agent = Agent(
role="Content Analyzer",
goal="Analyze content effectively",
backstory="Experienced content analyst"
)
chunk_task = ChunkBasedTask(
description="Analyze the content for key insights",
expected_output="Summary of key insights found",
file_path=temp_path,
chunk_size=50,
chunk_overlap=10,
agent=analyzer_agent
)
crew = Crew(
agents=[analyzer_agent],
tasks=[chunk_task]
)
assert len(crew.tasks) == 1
assert isinstance(crew.tasks[0], ChunkBasedTask)
assert crew.tasks[0].file_path == Path(temp_path)
finally:
Path(temp_path).unlink()