diff --git a/lib/crewai/src/crewai/utilities/agent_utils.py b/lib/crewai/src/crewai/utilities/agent_utils.py index 48af1cf8b..22b498541 100644 --- a/lib/crewai/src/crewai/utilities/agent_utils.py +++ b/lib/crewai/src/crewai/utilities/agent_utils.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio from collections.abc import Callable, Sequence +import concurrent.futures import json import re from typing import TYPE_CHECKING, Any, Final, Literal, TypedDict @@ -678,17 +679,22 @@ def _format_messages_for_summary(messages: list[LLMMessage]) -> str: tool_names = [] for tc in tool_calls: func = tc.get("function", {}) - name = func.get("name", "unknown") if isinstance(func, dict) else "unknown" + name = ( + func.get("name", "unknown") + if isinstance(func, dict) + else "unknown" + ) tool_names.append(name) content = f"[Called tools: {', '.join(tool_names)}]" else: content = "" elif isinstance(content, list): # Multimodal content blocks — extract text parts - text_parts = [] - for block in content: - if isinstance(block, dict) and block.get("type") == "text": - text_parts.append(block.get("text", "")) + text_parts = [ + block.get("text", "") + for block in content + if isinstance(block, dict) and block.get("type") == "text" + ] content = " ".join(text_parts) if text_parts else "[multimodal content]" if role == "assistant": @@ -771,6 +777,44 @@ def _extract_summary_tags(text: str) -> str: return text.strip() +async def _asummarize_chunks( + chunks: list[list[LLMMessage]], + llm: LLM | BaseLLM, + callbacks: list[TokenCalcHandler], + i18n: I18N, +) -> list[SummaryContent]: + """Summarize multiple message chunks concurrently using asyncio. + + Args: + chunks: List of message chunks to summarize. + llm: LLM instance (must support ``acall``). + callbacks: List of callbacks for the LLM. + i18n: I18N instance for prompt templates. + + Returns: + Ordered list of summary contents, one per chunk. + """ + + async def _summarize_one(chunk: list[LLMMessage]) -> SummaryContent: + conversation_text = _format_messages_for_summary(chunk) + summarization_messages = [ + format_message_for_llm( + i18n.slice("summarizer_system_message"), role="system" + ), + format_message_for_llm( + i18n.slice("summarize_instruction").format( + conversation=conversation_text + ), + ), + ] + summary = await llm.acall(summarization_messages, callbacks=callbacks) + extracted = _extract_summary_tags(str(summary)) + return {"content": extracted} + + results = await asyncio.gather(*[_summarize_one(chunk) for chunk in chunks]) + return list(results) + + def summarize_messages( messages: list[LLMMessage], llm: LLM | BaseLLM, @@ -813,42 +857,52 @@ def summarize_messages( chunks = _split_messages_into_chunks(non_system_messages, max_tokens) # 4. Summarize each chunk with role-labeled formatting - summarized_contents: list[SummaryContent] = [] total_chunks = len(chunks) - for idx, chunk in enumerate(chunks, 1): + if total_chunks <= 1: + # Single chunk — no benefit from async overhead + summarized_contents: list[SummaryContent] = [] + for idx, chunk in enumerate(chunks, 1): + if verbose: + Printer().print( + content=f"Summarizing {idx}/{total_chunks}...", + color="yellow", + ) + conversation_text = _format_messages_for_summary(chunk) + summarization_messages = [ + format_message_for_llm( + i18n.slice("summarizer_system_message"), role="system" + ), + format_message_for_llm( + i18n.slice("summarize_instruction").format( + conversation=conversation_text + ), + ), + ] + summary = llm.call(summarization_messages, callbacks=callbacks) + extracted = _extract_summary_tags(str(summary)) + summarized_contents.append({"content": extracted}) + else: + # Multiple chunks — summarize in parallel via asyncio if verbose: Printer().print( - content=f"Summarizing {idx}/{total_chunks}...", + content=f"Summarizing {total_chunks} chunks in parallel...", color="yellow", ) - - conversation_text = _format_messages_for_summary(chunk) - - summarization_messages = [ - format_message_for_llm( - i18n.slice("summarizer_system_message"), role="system" - ), - format_message_for_llm( - i18n.slice("summarize_instruction").format( - conversation=conversation_text - ), - ), - ] - summary = llm.call( - summarization_messages, - callbacks=callbacks, + coro = _asummarize_chunks( + chunks=chunks, llm=llm, callbacks=callbacks, i18n=i18n ) - # Extract content from tags with graceful fallback - extracted = _extract_summary_tags(str(summary)) - summarized_contents.append({"content": extracted}) + if is_inside_event_loop(): + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + summarized_contents = pool.submit(asyncio.run, coro).result() + else: + summarized_contents = asyncio.run(coro) merged_summary = "\n\n".join(content["content"] for content in summarized_contents) # 6. Reconstruct messages: [system messages...] + [summary user message] messages.clear() - for sys_msg in system_messages: - messages.append(sys_msg) + messages.extend(system_messages) summary_message = format_message_for_llm( i18n.slice("summary").format(merged_summary=merged_summary) diff --git a/lib/crewai/tests/utilities/test_agent_utils.py b/lib/crewai/tests/utilities/test_agent_utils.py index 778c5c21e..31d7b9705 100644 --- a/lib/crewai/tests/utilities/test_agent_utils.py +++ b/lib/crewai/tests/utilities/test_agent_utils.py @@ -2,13 +2,16 @@ from __future__ import annotations +import asyncio from typing import Any -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch +import pytest from pydantic import BaseModel, Field from crewai.tools.base_tool import BaseTool from crewai.utilities.agent_utils import ( + _asummarize_chunks, _estimate_token_count, _extract_summary_tags, _format_messages_for_summary, @@ -621,3 +624,301 @@ class TestEstimateTokenCount: estimated = _estimate_token_count(text) assert estimated > 0 assert estimated == len(text) // 4 + + +class TestParallelSummarization: + """Tests for parallel chunk summarization via asyncio.""" + + def _make_messages_for_n_chunks(self, n: int) -> list[dict[str, Any]]: + """Build a message list that will produce exactly *n* chunks. + + Each message has 400 chars (~100 tokens). With max_tokens=100 returned + by the mock LLM, each message lands in its own chunk. + """ + msgs: list[dict[str, Any]] = [] + for i in range(n): + msgs.append({"role": "user", "content": f"msg-{i} " + "x" * 400}) + return msgs + + def test_multiple_chunks_use_acall(self) -> None: + """When there are multiple chunks, summarize_messages should use + llm.acall (parallel) instead of llm.call (sequential).""" + messages = self._make_messages_for_n_chunks(3) + + mock_llm = MagicMock() + mock_llm.get_context_window_size.return_value = 100 # force multiple chunks + mock_llm.acall = AsyncMock( + side_effect=[ + "Summary chunk 1", + "Summary chunk 2", + "Summary chunk 3", + ] + ) + + summarize_messages( + messages=messages, + llm=mock_llm, + callbacks=[], + i18n=_make_mock_i18n(), + ) + + # acall should have been awaited once per chunk + assert mock_llm.acall.await_count == 3 + # sync call should NOT have been used for chunk summarization + mock_llm.call.assert_not_called() + + def test_single_chunk_uses_sync_call(self) -> None: + """When there is only one chunk, summarize_messages should use + the sync llm.call path (no async overhead).""" + messages: list[dict[str, Any]] = [ + {"role": "user", "content": "Short message"}, + {"role": "assistant", "content": "Short reply"}, + ] + + mock_llm = MagicMock() + mock_llm.get_context_window_size.return_value = 100_000 + mock_llm.call.return_value = "Short summary" + + summarize_messages( + messages=messages, + llm=mock_llm, + callbacks=[], + i18n=_make_mock_i18n(), + ) + + mock_llm.call.assert_called_once() + + def test_parallel_results_preserve_order(self) -> None: + """Summaries must appear in the same order as the original chunks, + regardless of which async call finishes first.""" + messages = self._make_messages_for_n_chunks(3) + + mock_llm = MagicMock() + mock_llm.get_context_window_size.return_value = 100 + + # Simulate varying latencies — chunk 2 finishes before chunk 0 + async def _delayed_acall(msgs: Any, **kwargs: Any) -> str: + user_content = msgs[1]["content"] + if "msg-0" in user_content: + await asyncio.sleep(0.05) + return "Summary-A" + elif "msg-1" in user_content: + return "Summary-B" # fastest + else: + await asyncio.sleep(0.02) + return "Summary-C" + + mock_llm.acall = _delayed_acall + + summarize_messages( + messages=messages, + llm=mock_llm, + callbacks=[], + i18n=_make_mock_i18n(), + ) + + # The final summary message should have A, B, C in order + summary_content = messages[-1]["content"] + pos_a = summary_content.index("Summary-A") + pos_b = summary_content.index("Summary-B") + pos_c = summary_content.index("Summary-C") + assert pos_a < pos_b < pos_c + + def test_asummarize_chunks_returns_ordered_results(self) -> None: + """Direct test of the async helper _asummarize_chunks.""" + chunk_a: list[dict[str, Any]] = [{"role": "user", "content": "Chunk A"}] + chunk_b: list[dict[str, Any]] = [{"role": "user", "content": "Chunk B"}] + + mock_llm = MagicMock() + mock_llm.acall = AsyncMock( + side_effect=[ + "Result A", + "Result B", + ] + ) + + results = asyncio.run( + _asummarize_chunks( + chunks=[chunk_a, chunk_b], + llm=mock_llm, + callbacks=[], + i18n=_make_mock_i18n(), + ) + ) + + assert len(results) == 2 + assert results[0]["content"] == "Result A" + assert results[1]["content"] == "Result B" + + @patch("crewai.utilities.agent_utils.is_inside_event_loop", return_value=True) + def test_works_inside_existing_event_loop(self, _mock_loop: Any) -> None: + """When called from inside a running event loop (e.g. a Flow), + the ThreadPoolExecutor fallback should still work.""" + messages = self._make_messages_for_n_chunks(2) + + mock_llm = MagicMock() + mock_llm.get_context_window_size.return_value = 100 + mock_llm.acall = AsyncMock( + side_effect=[ + "Flow summary 1", + "Flow summary 2", + ] + ) + + summarize_messages( + messages=messages, + llm=mock_llm, + callbacks=[], + i18n=_make_mock_i18n(), + ) + + assert mock_llm.acall.await_count == 2 + # Verify the merged summary made it into messages + assert "Flow summary 1" in messages[-1]["content"] + assert "Flow summary 2" in messages[-1]["content"] + + +def _build_long_conversation() -> list[dict[str, Any]]: + """Build a multi-turn conversation that produces multiple chunks at max_tokens=200. + + Each non-system message is ~100-140 estimated tokens (400-560 chars), + so a max_tokens of 200 yields roughly 3 chunks from 6 messages. + """ + return [ + { + "role": "system", + "content": "You are a helpful research assistant.", + }, + { + "role": "user", + "content": ( + "Tell me about the history of the Python programming language. " + "Who created it, when was it first released, and what were the " + "main design goals? Please provide a detailed overview covering " + "the major milestones from its inception through Python 3." + ), + }, + { + "role": "assistant", + "content": ( + "Python was created by Guido van Rossum and first released in 1991. " + "The main design goals were code readability and simplicity. Key milestones: " + "Python 1.0 (1994) introduced functional programming tools like lambda and map. " + "Python 2.0 (2000) added list comprehensions and garbage collection. " + "Python 3.0 (2008) was a major backward-incompatible release that fixed " + "fundamental design flaws. Python 2 reached end-of-life in January 2020." + ), + }, + { + "role": "user", + "content": ( + "What about the async/await features? When were they introduced " + "and how do they compare to similar features in JavaScript and C#? " + "Also explain the Global Interpreter Lock and its implications." + ), + }, + { + "role": "assistant", + "content": ( + "Async/await was introduced in Python 3.5 (PEP 492, 2015). " + "Unlike JavaScript which is single-threaded by design, Python's asyncio " + "is an opt-in framework. C# introduced async/await in 2012 (C# 5.0) and " + "was a major inspiration for Python's implementation. " + "The GIL (Global Interpreter Lock) is a mutex that protects access to " + "Python objects, preventing multiple threads from executing Python bytecodes " + "simultaneously. This means CPU-bound multithreaded programs don't benefit " + "from multiple cores. PEP 703 proposes making the GIL optional in CPython." + ), + }, + { + "role": "user", + "content": ( + "Explain the Python package ecosystem. How does pip work, what is PyPI, " + "and what are virtual environments? Compare pip with conda and uv." + ), + }, + { + "role": "assistant", + "content": ( + "PyPI (Python Package Index) is the official repository hosting 400k+ packages. " + "pip is the standard package installer that downloads from PyPI. " + "Virtual environments (venv) create isolated Python installations to avoid " + "dependency conflicts between projects. conda is a cross-language package manager " + "popular in data science that can manage non-Python dependencies. " + "uv is a new Rust-based tool that is 10-100x faster than pip and aims to replace " + "pip, pip-tools, and virtualenv with a single unified tool." + ), + }, + ] + + +class TestParallelSummarizationVCR: + """VCR-backed integration tests for parallel summarization. + + These tests use a real LLM but patch get_context_window_size to force + multiple chunks, exercising the asyncio.gather + acall parallel path. + + To record cassettes: + PYTEST_VCR_RECORD_MODE=all uv run pytest lib/crewai/tests/utilities/test_agent_utils.py::TestParallelSummarizationVCR -v + """ + + @pytest.mark.vcr() + def test_parallel_summarize_openai(self) -> None: + """Test that parallel summarization with gpt-4o-mini produces a valid summary.""" + from crewai.llm import LLM + from crewai.utilities.i18n import I18N + + llm = LLM(model="gpt-4o-mini", temperature=0) + i18n = I18N() + messages = _build_long_conversation() + + original_system = messages[0]["content"] + + # Patch get_context_window_size to return 200 — forces multiple chunks + with patch.object(type(llm), "get_context_window_size", return_value=200): + # Verify we actually get multiple chunks with this window size + non_system = [m for m in messages if m.get("role") != "system"] + chunks = _split_messages_into_chunks(non_system, max_tokens=200) + assert len(chunks) > 1, f"Expected multiple chunks, got {len(chunks)}" + + summarize_messages( + messages=messages, + llm=llm, + callbacks=[], + i18n=i18n, + ) + + # System message preserved + assert messages[0]["role"] == "system" + assert messages[0]["content"] == original_system + + # Summary produced as a user message + summary_msg = messages[-1] + assert summary_msg["role"] == "user" + assert len(summary_msg["content"]) > 0 + + @pytest.mark.vcr() + def test_parallel_summarize_preserves_files(self) -> None: + """Test that file references survive parallel summarization.""" + from crewai.llm import LLM + from crewai.utilities.i18n import I18N + + llm = LLM(model="gpt-4o-mini", temperature=0) + i18n = I18N() + messages = _build_long_conversation() + + mock_file = MagicMock() + messages[1]["files"] = {"report.pdf": mock_file} + + with patch.object(type(llm), "get_context_window_size", return_value=200): + summarize_messages( + messages=messages, + llm=llm, + callbacks=[], + i18n=i18n, + ) + + summary_msg = messages[-1] + assert summary_msg["role"] == "user" + assert "files" in summary_msg + assert "report.pdf" in summary_msg["files"]