feat: enhance agent summarization and streaming behavior

- Introduced auto-scroll functionality in the AgentTUI to improve user experience during streaming.
- Updated the proactive summarization logic in ConversationalAgentExecutor to ensure the current user message is preserved, preventing loss of context during summarization.
- Added handling for tool usage events to reset streaming state appropriately.
- Refined prompt instructions in translations to clarify agent behavior and expectations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Joao Moura
2026-05-15 00:13:10 -04:00
parent db604b6f32
commit d563a8c8e2
4 changed files with 178 additions and 10 deletions

View File

@@ -607,6 +607,7 @@ class AgentTUI(App[None]):
self._last_active_agent: str | None = None
self._last_agent_error: str = ""
self._scheduler: Any = None
self._stream_auto_scroll: bool = True
def compose(self) -> ComposeResult:
yield Header()
@@ -1490,6 +1491,7 @@ class AgentTUI(App[None]):
accumulated = ""
stream_start = time.monotonic()
stream_chars = 0
self._stream_auto_scroll = True
def _stream_markup(
text: str, final: bool = False, metadata: str = ""
@@ -1508,6 +1510,8 @@ class AgentTUI(App[None]):
return mk
# Timeout-protected streaming: prevents UI freeze if LLM stalls
_tool_reset = "\x00TOOL_RESET\x00"
_tools_used: list[str] = []
stream = agent.stream(message_text)
first_chunk = True
while True:
@@ -1523,6 +1527,11 @@ class AgentTUI(App[None]):
except Exception:
break
if chunk == _tool_reset:
accumulated = ""
stream_chars = 0
continue
accumulated += chunk
stream_chars += len(chunk)
@@ -1537,14 +1546,14 @@ class AgentTUI(App[None]):
_safe_render(accumulated), classes="agent-bubble"
)
scroll.mount(bubble, before=thinking)
if self._is_near_bottom(scroll):
if self._stream_auto_scroll:
scroll.scroll_end(animate=False)
elif bubble is not None:
try:
bubble.update(_stream_markup(accumulated))
except Exception:
bubble.update(_safe_render(accumulated))
if self._is_near_bottom(scroll):
if self._stream_auto_scroll:
scroll.scroll_end(animate=False)
# Remove cursor, add final metadata
@@ -1559,17 +1568,29 @@ class AgentTUI(App[None]):
meta_parts.append(f"~{response.output_tokens or 0:,} tokens")
if getattr(response, "response_time_ms", 0):
meta_parts.append(f"{response.response_time_ms / 1000:.1f}s")
tools = getattr(response, "tools_used", None)
if tools:
_tools_used = tools
tool_summary = ", ".join(tools)
meta_parts.append(f"tools: {tool_summary}")
metadata = " · ".join(meta_parts)
# Use the clean final response from ainvoke when available —
# accumulated may contain intermediate plan/thinking text
# from LLM iterations before tool calls.
final_content = accumulated
if response and response.content:
final_content = response.content
if bubble is not None:
try:
bubble.update(
_stream_markup(accumulated, final=True, metadata=metadata)
_stream_markup(final_content, final=True, metadata=metadata)
)
except Exception:
bubble.update(_safe_render(accumulated))
bubble.update(_safe_render(final_content))
content = accumulated or (response.content if response else "")
content = final_content
self._append_msg(room, target, content, metadata)
except Exception as e:
@@ -1621,6 +1642,21 @@ class AgentTUI(App[None]):
return True
return scroll.scroll_y >= scroll.max_scroll_y - 80
def on_mouse_scroll_up(self, event: events.MouseScrollUp) -> None:
"""User scrolled up — pause auto-scroll while streaming."""
if self._processing:
self._stream_auto_scroll = False
def on_mouse_scroll_down(self, event: events.MouseScrollDown) -> None:
"""User scrolled down — resume auto-scroll if back near bottom."""
if self._processing and not self._stream_auto_scroll:
try:
scroll = self.query_one("#chat-scroll", VerticalScroll)
if self._is_near_bottom(scroll):
self._stream_auto_scroll = True
except Exception:
pass
def _mount_bubble(self, sender: str, content: str, metadata: str = "") -> None:
scroll = self.query_one("#chat-scroll", VerticalScroll)
near_bottom = self._is_near_bottom(scroll)

View File

@@ -788,6 +788,10 @@ class ConversationalAgentExecutor(BaseModel):
Reuses the existing summarize_messages() from agent_utils which handles
chunking, parallel summarization, and file attachment preservation.
Protects the current turn (from the last user message onward) so
that summarize_messages — which replaces all non-system messages —
never eats the user's actual request or in-progress tool calls.
"""
if not self.agent.settings.respect_context_window:
return
@@ -803,6 +807,28 @@ class ConversationalAgentExecutor(BaseModel):
if est_tokens < int(ctx_size * 0.60):
return
# Find the last user message — everything from there onward is the
# current turn and must survive summarization.
last_user_idx = -1
for i in range(len(llm_messages) - 1, -1, -1):
if llm_messages[i].get("role") == "user":
last_user_idx = i
break
if last_user_idx < 0:
return
# Need history messages before the current user message to summarize
history_non_system = [
m for m in llm_messages[:last_user_idx] if m.get("role") != "system"
]
if len(history_non_system) < 2:
return
# Detach the current turn (user msg + any tool call/result messages)
current_turn = llm_messages[last_user_idx:]
del llm_messages[last_user_idx:]
try:
summarize_messages(
messages=llm_messages,
@@ -814,6 +840,9 @@ class ConversationalAgentExecutor(BaseModel):
except Exception as e:
logger.debug(f"Proactive summarization failed: {e}")
# Re-append the protected current turn
llm_messages.extend(current_turn)
def _emit_event_context_summarized(self) -> None:
try:
from crewai.new_agent.events import NewAgentContextSummarizedEvent
@@ -1489,8 +1518,12 @@ class ConversationalAgentExecutor(BaseModel):
_plan_tokens_before_out = self._turn_output_tokens
plan = await planning.maybe_plan(user_message.content)
if plan:
plan_text = "Follow this execution plan:\n" + "\n".join(
f"{i + 1}. {step}" for i, step in enumerate(plan)
steps = "\n".join(f"{i + 1}. {step}" for i, step in enumerate(plan))
plan_text = (
f"Internal execution plan (do NOT include in your response):\n"
f"{steps}\n\n"
f"Execute these steps using your tools. "
f"Report results, not the plan itself."
)
self.prompt_stack.add("plan", plan_text, source="planning_engine")
# GAP-49: Record sub-action tokens for planning
@@ -1653,6 +1686,17 @@ class ConversationalAgentExecutor(BaseModel):
pass
if is_context_length_exceeded(e):
# Protect the current turn before summarization
_ctx_last_user = -1
for _ci in range(len(llm_messages) - 1, -1, -1):
if llm_messages[_ci].get("role") == "user":
_ctx_last_user = _ci
break
_ctx_tail: list[LLMMessage] = []
if _ctx_last_user >= 0:
_ctx_tail = llm_messages[_ctx_last_user:]
del llm_messages[_ctx_last_user:]
handle_context_length(
respect_context_window=self.agent.settings.respect_context_window,
printer=_NullPrinter(),
@@ -1661,6 +1705,8 @@ class ConversationalAgentExecutor(BaseModel):
callbacks=callbacks,
verbose=self.verbose,
)
if _ctx_tail:
llm_messages.extend(_ctx_tail)
try:
from crewai.new_agent.events import (
NewAgentContextSummarizedEvent,
@@ -2387,6 +2433,9 @@ class ConversationalAgentExecutor(BaseModel):
"""
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_events import LLMStreamChunkEvent
from crewai.new_agent.events import NewAgentToolUsageStartedEvent
_TOOL_RESET = "\x00TOOL_RESET\x00"
chunk_queue: asyncio.Queue[str] = asyncio.Queue()
@@ -2394,7 +2443,11 @@ class ConversationalAgentExecutor(BaseModel):
if event.chunk and not event.tool_call:
chunk_queue.put_nowait(event.chunk)
def _on_tool_started(source: Any, event: NewAgentToolUsageStartedEvent) -> None:
chunk_queue.put_nowait(_TOOL_RESET)
crewai_event_bus.on(LLMStreamChunkEvent)(_on_stream_chunk)
crewai_event_bus.on(NewAgentToolUsageStartedEvent)(_on_tool_started)
llm = self.agent._llm_instance
_prev_stream = getattr(llm, "stream", False) if llm else False
@@ -2440,6 +2493,12 @@ class ConversationalAgentExecutor(BaseModel):
while not invoke_task.done():
try:
chunk = await asyncio.wait_for(chunk_queue.get(), timeout=0.05)
if chunk == _TOOL_RESET:
yield _TOOL_RESET
_streamed_chars = 0
_tag_buf = ""
_suppressing = False
continue
filtered = _filter_chunk(chunk)
if filtered:
_streamed_chars += len(filtered)
@@ -2454,6 +2513,12 @@ class ConversationalAgentExecutor(BaseModel):
while not chunk_queue.empty():
chunk = chunk_queue.get_nowait()
if chunk == _TOOL_RESET:
yield _TOOL_RESET
_streamed_chars = 0
_tag_buf = ""
_suppressing = False
continue
filtered = _filter_chunk(chunk)
if filtered:
_streamed_chars += len(filtered)
@@ -2469,6 +2534,7 @@ class ConversationalAgentExecutor(BaseModel):
finally:
crewai_event_bus.off(LLMStreamChunkEvent, _on_stream_chunk)
crewai_event_bus.off(NewAgentToolUsageStartedEvent, _on_tool_started)
if llm:
llm.stream = _prev_stream
if not invoke_task.done():

View File

@@ -23,8 +23,8 @@
"human_feedback": "You got human feedback on your work, re-evaluate it and give a new Final Answer when ready.\n {human_feedback}",
"getting_input": "This is the agent's final answer: {final_answer}\n\n",
"summarizer_system_message": "You are a precise assistant that creates structured summaries of agent conversations. You preserve critical context needed for seamless task continuation.",
"summarize_instruction": "Analyze the following conversation and create a structured summary that preserves all information needed to continue the task seamlessly.\n\n<conversation>\n{conversation}\n</conversation>\n\nCreate a summary with these sections:\n1. **Task Overview**: What is the agent trying to accomplish?\n2. **Current State**: What has been completed so far? What step is the agent on?\n3. **Important Discoveries**: Key facts, data, tool results, or findings that must not be lost.\n4. **Next Steps**: What should the agent do next based on the conversation?\n5. **Context to Preserve**: Any specific values, names, URLs, code snippets, or details referenced in the conversation.\n\nWrap your entire summary in <summary> tags.\n\n<summary>\n[Your structured summary here]\n</summary>",
"summary": "<summary>\n{merged_summary}\n</summary>\n\nContinue the task from where the conversation left off. The above is a structured summary of prior context.",
"summarize_instruction": "Analyze the following conversation and create a concise summary that preserves critical context.\n\n<conversation>\n{conversation}\n</conversation>\n\nCreate a summary with these sections:\n1. **Task**: What is being accomplished?\n2. **Completed**: What has been done so far?\n3. **Key Facts**: Important data, results, values, names, URLs, or code snippets that must not be lost.\n\nWrap your entire summary in <summary> tags.\n\n<summary>\n[Your summary here]\n</summary>",
"summary": "Prior conversation context:\n{merged_summary}\n\nThe above is background context from earlier in this conversation. Do not repeat or reference it directly — use it to inform your response to the user's message below.",
"manager_request": "Your best answer to your coworker asking you this, accounting for the context shared.",
"formatted_task_instructions": "Format your final answer according to the following OpenAPI schema: {output_format}\n\nIMPORTANT: Preserve the original content exactly as-is. Do NOT rewrite, paraphrase, or modify the meaning of the content. Only structure it to match the schema format.\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python.",
"conversation_history_instruction": "You are a member of a crew collaborating to achieve a common goal. Your task is a specific action that contributes to this larger objective. For additional context, please review the conversation history between you and the user that led to the initiation of this crew. Use any relevant information or feedback from the conversation to inform your task execution and ensure your response aligns with both the immediate task and the crew's overall goals.",
@@ -99,7 +99,7 @@
"step_executor_task_context": "## Task Context\nThe following is the full task you are helping complete. Keep this in mind — especially any required output files, exact filenames, and expected formats.\n\n{task_context}\n\n---\n"
},
"new_agent": {
"soul": "You are {role}.\nYour goal: {goal}\nBackground: {backstory}\n\nWhen you use tools, act on the results directly. Never repeat raw tool output (file contents, command output, etc.) in your response summarize findings or state what you did instead.",
"soul": "You are {role}.\nYour goal: {goal}\nBackground: {backstory}\n\nCRITICAL INSTRUCTIONS:\n- Use your tools to accomplish tasks. Do NOT describe what you would do — actually do it by calling tools.\n- Never output a numbered plan, step list, or task breakdown as your response. If you have a plan, execute it silently using tools.\n- Never repeat or echo raw tool output, file contents, conversation history, or system context in your response.\n- When summarized context appears in the conversation, use it as background — do not echo or reference it.\n- Respond with the results of your work, not a description of what you intend to do.",
"tools_header": "You have access to the following tools:\n{tool_descriptions}",
"coworkers_header": "You have coworkers with specialized expertise. When a request involves work outside your core specialty, delegate to the appropriate coworker using their tool. Delegation is preferred over attempting work you're not specialized in.\n\nAvailable coworkers:\n{coworker_descriptions}",
"skills_self_build": "You can learn new skills from instructions the user gives you. When the user asks you to remember a process, encode a workflow, or create a skill, a skill suggestion will be generated automatically — do NOT use file-writing tools to create skill files yourself.",

View File

@@ -370,6 +370,72 @@ class TestNewAgentMessage:
assert "Analyst" in assembled
assert "Analyze data" in assembled
@patch("crewai.new_agent.executor.aget_llm_response")
@pytest.mark.asyncio
async def test_summarization_preserves_current_user_message(self, mock_llm_response):
"""When proactive summarization fires, the current user message must
survive — otherwise the agent never sees what the user actually asked."""
mock_llm_response.return_value = "Done."
agent = NewAgent(
role="R",
goal="g",
settings=AgentSettings(respect_context_window=True),
)
executor = agent._executor
# Build messages that simulate: system + history + current user
from crewai.utilities.agent_utils import format_message_for_llm
llm_messages = [
format_message_for_llm("System prompt", role="system"),
format_message_for_llm("Old user msg", role="user"),
format_message_for_llm("Old assistant msg", role="assistant"),
format_message_for_llm("Another old msg", role="user"),
format_message_for_llm("Another reply", role="assistant"),
format_message_for_llm("What is the weather?", role="user"), # current
]
# Mock the LLM to have a tiny context window so summarization triggers
mock_llm = MagicMock()
mock_llm.get_context_window_size.return_value = 100
mock_llm.call.return_value = "<summary>Prior context summary</summary>"
agent._llm_instance = mock_llm
executor._proactive_summarize_messages(llm_messages, [])
# The last user message must still be present
user_messages = [m for m in llm_messages if m.get("role") == "user"]
last_user = user_messages[-1] if user_messages else None
assert last_user is not None
assert "What is the weather?" in str(last_user.get("content", ""))
@patch("crewai.new_agent.executor.aget_llm_response")
@pytest.mark.asyncio
async def test_plan_injection_has_anti_echo(self, mock_llm_response):
"""Plan text added to prompt stack should instruct the LLM not to echo it."""
mock_llm_response.return_value = "Result of the work."
agent = NewAgent(role="Coder", goal="Write code", backstory="Expert.")
# Simulate what happens when planning returns steps
executor = agent._executor
executor.prompt_stack = executor._build_prompt_stack("Write a function")
# Manually add a plan layer like the planning engine would
steps = "\n".join(f"{i + 1}. Step {i + 1}" for i in range(3))
plan_text = (
f"Internal execution plan (do NOT include in your response):\n"
f"{steps}\n\n"
f"Execute these steps using your tools. "
f"Report results, not the plan itself."
)
executor.prompt_stack.add("plan", plan_text, source="planning_engine")
assembled = executor.prompt_stack.assemble()
assert "do NOT include in your response" in assembled
assert "Report results, not the plan itself" in assembled
# ── Delegation tests ─────────────────────────────────────────