This commit is contained in:
lorenzejay
2026-02-09 12:06:49 -08:00
parent 2e9da6ca95
commit 0524d0454d
2 changed files with 385 additions and 30 deletions

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
import concurrent.futures
import json
import re
from typing import TYPE_CHECKING, Any, Final, Literal, TypedDict
@@ -678,17 +679,22 @@ def _format_messages_for_summary(messages: list[LLMMessage]) -> str:
tool_names = []
for tc in tool_calls:
func = tc.get("function", {})
name = func.get("name", "unknown") if isinstance(func, dict) else "unknown"
name = (
func.get("name", "unknown")
if isinstance(func, dict)
else "unknown"
)
tool_names.append(name)
content = f"[Called tools: {', '.join(tool_names)}]"
else:
content = ""
elif isinstance(content, list):
# Multimodal content blocks — extract text parts
text_parts = []
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text_parts.append(block.get("text", ""))
text_parts = [
block.get("text", "")
for block in content
if isinstance(block, dict) and block.get("type") == "text"
]
content = " ".join(text_parts) if text_parts else "[multimodal content]"
if role == "assistant":
@@ -771,6 +777,44 @@ def _extract_summary_tags(text: str) -> str:
return text.strip()
async def _asummarize_chunks(
chunks: list[list[LLMMessage]],
llm: LLM | BaseLLM,
callbacks: list[TokenCalcHandler],
i18n: I18N,
) -> list[SummaryContent]:
"""Summarize multiple message chunks concurrently using asyncio.
Args:
chunks: List of message chunks to summarize.
llm: LLM instance (must support ``acall``).
callbacks: List of callbacks for the LLM.
i18n: I18N instance for prompt templates.
Returns:
Ordered list of summary contents, one per chunk.
"""
async def _summarize_one(chunk: list[LLMMessage]) -> SummaryContent:
conversation_text = _format_messages_for_summary(chunk)
summarization_messages = [
format_message_for_llm(
i18n.slice("summarizer_system_message"), role="system"
),
format_message_for_llm(
i18n.slice("summarize_instruction").format(
conversation=conversation_text
),
),
]
summary = await llm.acall(summarization_messages, callbacks=callbacks)
extracted = _extract_summary_tags(str(summary))
return {"content": extracted}
results = await asyncio.gather(*[_summarize_one(chunk) for chunk in chunks])
return list(results)
def summarize_messages(
messages: list[LLMMessage],
llm: LLM | BaseLLM,
@@ -813,42 +857,52 @@ def summarize_messages(
chunks = _split_messages_into_chunks(non_system_messages, max_tokens)
# 4. Summarize each chunk with role-labeled formatting
summarized_contents: list[SummaryContent] = []
total_chunks = len(chunks)
for idx, chunk in enumerate(chunks, 1):
if total_chunks <= 1:
# Single chunk — no benefit from async overhead
summarized_contents: list[SummaryContent] = []
for idx, chunk in enumerate(chunks, 1):
if verbose:
Printer().print(
content=f"Summarizing {idx}/{total_chunks}...",
color="yellow",
)
conversation_text = _format_messages_for_summary(chunk)
summarization_messages = [
format_message_for_llm(
i18n.slice("summarizer_system_message"), role="system"
),
format_message_for_llm(
i18n.slice("summarize_instruction").format(
conversation=conversation_text
),
),
]
summary = llm.call(summarization_messages, callbacks=callbacks)
extracted = _extract_summary_tags(str(summary))
summarized_contents.append({"content": extracted})
else:
# Multiple chunks — summarize in parallel via asyncio
if verbose:
Printer().print(
content=f"Summarizing {idx}/{total_chunks}...",
content=f"Summarizing {total_chunks} chunks in parallel...",
color="yellow",
)
conversation_text = _format_messages_for_summary(chunk)
summarization_messages = [
format_message_for_llm(
i18n.slice("summarizer_system_message"), role="system"
),
format_message_for_llm(
i18n.slice("summarize_instruction").format(
conversation=conversation_text
),
),
]
summary = llm.call(
summarization_messages,
callbacks=callbacks,
coro = _asummarize_chunks(
chunks=chunks, llm=llm, callbacks=callbacks, i18n=i18n
)
# Extract content from <summary> tags with graceful fallback
extracted = _extract_summary_tags(str(summary))
summarized_contents.append({"content": extracted})
if is_inside_event_loop():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
summarized_contents = pool.submit(asyncio.run, coro).result()
else:
summarized_contents = asyncio.run(coro)
merged_summary = "\n\n".join(content["content"] for content in summarized_contents)
# 6. Reconstruct messages: [system messages...] + [summary user message]
messages.clear()
for sys_msg in system_messages:
messages.append(sys_msg)
messages.extend(system_messages)
summary_message = format_message_for_llm(
i18n.slice("summary").format(merged_summary=merged_summary)

View File

@@ -2,13 +2,16 @@
from __future__ import annotations
import asyncio
from typing import Any
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pydantic import BaseModel, Field
from crewai.tools.base_tool import BaseTool
from crewai.utilities.agent_utils import (
_asummarize_chunks,
_estimate_token_count,
_extract_summary_tags,
_format_messages_for_summary,
@@ -621,3 +624,301 @@ class TestEstimateTokenCount:
estimated = _estimate_token_count(text)
assert estimated > 0
assert estimated == len(text) // 4
class TestParallelSummarization:
"""Tests for parallel chunk summarization via asyncio."""
def _make_messages_for_n_chunks(self, n: int) -> list[dict[str, Any]]:
"""Build a message list that will produce exactly *n* chunks.
Each message has 400 chars (~100 tokens). With max_tokens=100 returned
by the mock LLM, each message lands in its own chunk.
"""
msgs: list[dict[str, Any]] = []
for i in range(n):
msgs.append({"role": "user", "content": f"msg-{i} " + "x" * 400})
return msgs
def test_multiple_chunks_use_acall(self) -> None:
"""When there are multiple chunks, summarize_messages should use
llm.acall (parallel) instead of llm.call (sequential)."""
messages = self._make_messages_for_n_chunks(3)
mock_llm = MagicMock()
mock_llm.get_context_window_size.return_value = 100 # force multiple chunks
mock_llm.acall = AsyncMock(
side_effect=[
"<summary>Summary chunk 1</summary>",
"<summary>Summary chunk 2</summary>",
"<summary>Summary chunk 3</summary>",
]
)
summarize_messages(
messages=messages,
llm=mock_llm,
callbacks=[],
i18n=_make_mock_i18n(),
)
# acall should have been awaited once per chunk
assert mock_llm.acall.await_count == 3
# sync call should NOT have been used for chunk summarization
mock_llm.call.assert_not_called()
def test_single_chunk_uses_sync_call(self) -> None:
"""When there is only one chunk, summarize_messages should use
the sync llm.call path (no async overhead)."""
messages: list[dict[str, Any]] = [
{"role": "user", "content": "Short message"},
{"role": "assistant", "content": "Short reply"},
]
mock_llm = MagicMock()
mock_llm.get_context_window_size.return_value = 100_000
mock_llm.call.return_value = "<summary>Short summary</summary>"
summarize_messages(
messages=messages,
llm=mock_llm,
callbacks=[],
i18n=_make_mock_i18n(),
)
mock_llm.call.assert_called_once()
def test_parallel_results_preserve_order(self) -> None:
"""Summaries must appear in the same order as the original chunks,
regardless of which async call finishes first."""
messages = self._make_messages_for_n_chunks(3)
mock_llm = MagicMock()
mock_llm.get_context_window_size.return_value = 100
# Simulate varying latencies — chunk 2 finishes before chunk 0
async def _delayed_acall(msgs: Any, **kwargs: Any) -> str:
user_content = msgs[1]["content"]
if "msg-0" in user_content:
await asyncio.sleep(0.05)
return "<summary>Summary-A</summary>"
elif "msg-1" in user_content:
return "<summary>Summary-B</summary>" # fastest
else:
await asyncio.sleep(0.02)
return "<summary>Summary-C</summary>"
mock_llm.acall = _delayed_acall
summarize_messages(
messages=messages,
llm=mock_llm,
callbacks=[],
i18n=_make_mock_i18n(),
)
# The final summary message should have A, B, C in order
summary_content = messages[-1]["content"]
pos_a = summary_content.index("Summary-A")
pos_b = summary_content.index("Summary-B")
pos_c = summary_content.index("Summary-C")
assert pos_a < pos_b < pos_c
def test_asummarize_chunks_returns_ordered_results(self) -> None:
"""Direct test of the async helper _asummarize_chunks."""
chunk_a: list[dict[str, Any]] = [{"role": "user", "content": "Chunk A"}]
chunk_b: list[dict[str, Any]] = [{"role": "user", "content": "Chunk B"}]
mock_llm = MagicMock()
mock_llm.acall = AsyncMock(
side_effect=[
"<summary>Result A</summary>",
"<summary>Result B</summary>",
]
)
results = asyncio.run(
_asummarize_chunks(
chunks=[chunk_a, chunk_b],
llm=mock_llm,
callbacks=[],
i18n=_make_mock_i18n(),
)
)
assert len(results) == 2
assert results[0]["content"] == "Result A"
assert results[1]["content"] == "Result B"
@patch("crewai.utilities.agent_utils.is_inside_event_loop", return_value=True)
def test_works_inside_existing_event_loop(self, _mock_loop: Any) -> None:
"""When called from inside a running event loop (e.g. a Flow),
the ThreadPoolExecutor fallback should still work."""
messages = self._make_messages_for_n_chunks(2)
mock_llm = MagicMock()
mock_llm.get_context_window_size.return_value = 100
mock_llm.acall = AsyncMock(
side_effect=[
"<summary>Flow summary 1</summary>",
"<summary>Flow summary 2</summary>",
]
)
summarize_messages(
messages=messages,
llm=mock_llm,
callbacks=[],
i18n=_make_mock_i18n(),
)
assert mock_llm.acall.await_count == 2
# Verify the merged summary made it into messages
assert "Flow summary 1" in messages[-1]["content"]
assert "Flow summary 2" in messages[-1]["content"]
def _build_long_conversation() -> list[dict[str, Any]]:
"""Build a multi-turn conversation that produces multiple chunks at max_tokens=200.
Each non-system message is ~100-140 estimated tokens (400-560 chars),
so a max_tokens of 200 yields roughly 3 chunks from 6 messages.
"""
return [
{
"role": "system",
"content": "You are a helpful research assistant.",
},
{
"role": "user",
"content": (
"Tell me about the history of the Python programming language. "
"Who created it, when was it first released, and what were the "
"main design goals? Please provide a detailed overview covering "
"the major milestones from its inception through Python 3."
),
},
{
"role": "assistant",
"content": (
"Python was created by Guido van Rossum and first released in 1991. "
"The main design goals were code readability and simplicity. Key milestones: "
"Python 1.0 (1994) introduced functional programming tools like lambda and map. "
"Python 2.0 (2000) added list comprehensions and garbage collection. "
"Python 3.0 (2008) was a major backward-incompatible release that fixed "
"fundamental design flaws. Python 2 reached end-of-life in January 2020."
),
},
{
"role": "user",
"content": (
"What about the async/await features? When were they introduced "
"and how do they compare to similar features in JavaScript and C#? "
"Also explain the Global Interpreter Lock and its implications."
),
},
{
"role": "assistant",
"content": (
"Async/await was introduced in Python 3.5 (PEP 492, 2015). "
"Unlike JavaScript which is single-threaded by design, Python's asyncio "
"is an opt-in framework. C# introduced async/await in 2012 (C# 5.0) and "
"was a major inspiration for Python's implementation. "
"The GIL (Global Interpreter Lock) is a mutex that protects access to "
"Python objects, preventing multiple threads from executing Python bytecodes "
"simultaneously. This means CPU-bound multithreaded programs don't benefit "
"from multiple cores. PEP 703 proposes making the GIL optional in CPython."
),
},
{
"role": "user",
"content": (
"Explain the Python package ecosystem. How does pip work, what is PyPI, "
"and what are virtual environments? Compare pip with conda and uv."
),
},
{
"role": "assistant",
"content": (
"PyPI (Python Package Index) is the official repository hosting 400k+ packages. "
"pip is the standard package installer that downloads from PyPI. "
"Virtual environments (venv) create isolated Python installations to avoid "
"dependency conflicts between projects. conda is a cross-language package manager "
"popular in data science that can manage non-Python dependencies. "
"uv is a new Rust-based tool that is 10-100x faster than pip and aims to replace "
"pip, pip-tools, and virtualenv with a single unified tool."
),
},
]
class TestParallelSummarizationVCR:
"""VCR-backed integration tests for parallel summarization.
These tests use a real LLM but patch get_context_window_size to force
multiple chunks, exercising the asyncio.gather + acall parallel path.
To record cassettes:
PYTEST_VCR_RECORD_MODE=all uv run pytest lib/crewai/tests/utilities/test_agent_utils.py::TestParallelSummarizationVCR -v
"""
@pytest.mark.vcr()
def test_parallel_summarize_openai(self) -> None:
"""Test that parallel summarization with gpt-4o-mini produces a valid summary."""
from crewai.llm import LLM
from crewai.utilities.i18n import I18N
llm = LLM(model="gpt-4o-mini", temperature=0)
i18n = I18N()
messages = _build_long_conversation()
original_system = messages[0]["content"]
# Patch get_context_window_size to return 200 — forces multiple chunks
with patch.object(type(llm), "get_context_window_size", return_value=200):
# Verify we actually get multiple chunks with this window size
non_system = [m for m in messages if m.get("role") != "system"]
chunks = _split_messages_into_chunks(non_system, max_tokens=200)
assert len(chunks) > 1, f"Expected multiple chunks, got {len(chunks)}"
summarize_messages(
messages=messages,
llm=llm,
callbacks=[],
i18n=i18n,
)
# System message preserved
assert messages[0]["role"] == "system"
assert messages[0]["content"] == original_system
# Summary produced as a user message
summary_msg = messages[-1]
assert summary_msg["role"] == "user"
assert len(summary_msg["content"]) > 0
@pytest.mark.vcr()
def test_parallel_summarize_preserves_files(self) -> None:
"""Test that file references survive parallel summarization."""
from crewai.llm import LLM
from crewai.utilities.i18n import I18N
llm = LLM(model="gpt-4o-mini", temperature=0)
i18n = I18N()
messages = _build_long_conversation()
mock_file = MagicMock()
messages[1]["files"] = {"report.pdf": mock_file}
with patch.object(type(llm), "get_context_window_size", return_value=200):
summarize_messages(
messages=messages,
llm=llm,
callbacks=[],
i18n=i18n,
)
summary_msg = messages[-1]
assert summary_msg["role"] == "user"
assert "files" in summary_msg
assert "report.pdf" in summary_msg["files"]