fix: resolve CI test flakiness from event bus state pollution

Root cause: test_gap_implementations.py assigned directly to
crewai_event_bus.emit (instance attribute), which shadowed the class
method even after restoration. Later tests using patch.object on the
class couldn't intercept calls.

Also converts all 19 positional crewai_event_bus.emit() calls across 8
new_agent files to use the event= keyword argument, matching the
pattern in llm.py. Adds <summary> tag stripping for both ainvoke() and
astream() to prevent summarization prompt leakage in agent responses.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Joao Moura
2026-05-14 16:10:00 -04:00
parent b956986bf0
commit 3eb1da2d9c
9 changed files with 88 additions and 45 deletions

View File

@@ -25,7 +25,7 @@ def _emit_delegation_event(event_cls: type, **kwargs: Any) -> None:
try:
from crewai.events.event_bus import crewai_event_bus
crewai_event_bus.emit(None, event_cls(**kwargs))
crewai_event_bus.emit(None, event=event_cls(**kwargs))
except Exception:
pass

View File

@@ -715,7 +715,7 @@ class DreamingEngine:
crewai_event_bus.emit(
self.agent,
NewAgentWorkflowProposedEvent(
event=NewAgentWorkflowProposedEvent(
new_agent_id=str(self.agent.id),
workflow_description=description,
),
@@ -746,7 +746,7 @@ class DreamingEngine:
crewai_event_bus.emit(
self.agent,
NewAgentWorkflowConfirmedEvent(new_agent_id=str(self.agent.id)),
event=NewAgentWorkflowConfirmedEvent(new_agent_id=str(self.agent.id)),
)
except Exception:
pass
@@ -768,7 +768,7 @@ class DreamingEngine:
crewai_event_bus.emit(
self.agent,
NewAgentDreamingStartedEvent(new_agent_id=str(self.agent.id)),
event=NewAgentDreamingStartedEvent(new_agent_id=str(self.agent.id)),
)
except Exception:
pass
@@ -780,7 +780,7 @@ class DreamingEngine:
crewai_event_bus.emit(
self.agent,
NewAgentWorkflowDetectedEvent(
event=NewAgentWorkflowDetectedEvent(
new_agent_id=str(self.agent.id),
tools=workflow.get("tools", []),
count=workflow.get("count", 0),
@@ -796,7 +796,7 @@ class DreamingEngine:
crewai_event_bus.emit(
self.agent,
NewAgentDreamingCompletedEvent(
event=NewAgentDreamingCompletedEvent(
new_agent_id=str(self.agent.id),
memories_processed=result.get("memories_processed", 0),
canonical_created=result.get("canonical_created", 0),

View File

@@ -629,6 +629,15 @@ class ConversationalAgentExecutor(BaseModel):
pass
return ""
_INTERNAL_TAG_RE = re.compile(
r"<summary>.*?</summary>", re.DOTALL
)
def _strip_internal_tags(self, text: str) -> str:
"""Strip <summary> blocks that leak from the summarization prompt."""
cleaned = self._INTERNAL_TAG_RE.sub("", text).strip()
return cleaned if cleaned else text
def _detect_artifacts(self, tool_name: str, result_str: str) -> list[Artifact]:
"""GAP-67: Detect artifacts from tool results.
@@ -741,7 +750,7 @@ class ConversationalAgentExecutor(BaseModel):
CheckpointEvent,
)
crewai_event_bus.emit(self, CheckpointEvent(data=checkpoint_data))
crewai_event_bus.emit(self, event=CheckpointEvent(data=checkpoint_data))
except (ImportError, Exception):
pass
except Exception:
@@ -828,7 +837,7 @@ class ConversationalAgentExecutor(BaseModel):
try:
from crewai.events.event_bus import crewai_event_bus
crewai_event_bus.emit(self, event)
crewai_event_bus.emit(self, event=event)
except Exception:
pass
@@ -1711,6 +1720,8 @@ class ConversationalAgentExecutor(BaseModel):
break
response_text = self._strip_internal_tags(response_text)
response_text = await self._run_guardrail(response_text)
if self.agent.settings.narration_guard:
@@ -2389,13 +2400,47 @@ class ConversationalAgentExecutor(BaseModel):
invoke_task = asyncio.create_task(self.ainvoke(user_message))
_streamed_chars = 0
_last_status_time = time.monotonic()
_tag_buf = ""
_suppressing = False
def _filter_chunk(raw: str) -> str:
"""Filter <summary>...</summary> blocks from streamed chunks."""
nonlocal _tag_buf, _suppressing
out = []
for ch in raw:
if _suppressing:
_tag_buf += ch
if _tag_buf.endswith("</summary>"):
_suppressing = False
_tag_buf = ""
continue
if _tag_buf:
_tag_buf += ch
if len(_tag_buf) <= len("<summary>"):
if "<summary>"[: len(_tag_buf)] == _tag_buf:
if _tag_buf == "<summary>":
_suppressing = True
continue
else:
out.append(_tag_buf)
_tag_buf = ""
else:
out.append(_tag_buf)
_tag_buf = ""
elif ch == "<":
_tag_buf = ch
else:
out.append(ch)
return "".join(out)
try:
while not invoke_task.done():
try:
chunk = await asyncio.wait_for(chunk_queue.get(), timeout=0.05)
_streamed_chars += len(chunk)
yield chunk
filtered = _filter_chunk(chunk)
if filtered:
_streamed_chars += len(filtered)
yield filtered
now = time.monotonic()
if now - _last_status_time >= 0.5:
@@ -2406,8 +2451,13 @@ class ConversationalAgentExecutor(BaseModel):
while not chunk_queue.empty():
chunk = chunk_queue.get_nowait()
_streamed_chars += len(chunk)
yield chunk
filtered = _filter_chunk(chunk)
if filtered:
_streamed_chars += len(filtered)
yield filtered
if _tag_buf and not _suppressing:
yield _tag_buf
result = invoke_task.result()
self._last_stream_result = result

View File

@@ -177,7 +177,7 @@ class KnowledgeDiscovery:
crewai_event_bus.emit(
self.agent,
NewAgentKnowledgeSuggestedEvent(
event=NewAgentKnowledgeSuggestedEvent(
new_agent_id=str(self.agent.id),
source_type=suggestion.get("source_tool", ""),
),
@@ -192,7 +192,7 @@ class KnowledgeDiscovery:
crewai_event_bus.emit(
self.agent,
NewAgentKnowledgeConfirmedEvent(new_agent_id=str(self.agent.id)),
event=NewAgentKnowledgeConfirmedEvent(new_agent_id=str(self.agent.id)),
)
except Exception:
pass
@@ -204,7 +204,7 @@ class KnowledgeDiscovery:
crewai_event_bus.emit(
self.agent,
NewAgentKnowledgeRejectedEvent(new_agent_id=str(self.agent.id)),
event=NewAgentKnowledgeRejectedEvent(new_agent_id=str(self.agent.id)),
)
except Exception:
pass

View File

@@ -533,7 +533,7 @@ class NewAgent(BaseModel):
crewai_event_bus.emit(
self,
NewAgentCreatedEvent(
event=NewAgentCreatedEvent(
new_agent_id=self.id,
new_agent_role=self.role,
),
@@ -597,7 +597,7 @@ class NewAgent(BaseModel):
crewai_event_bus.emit(
self,
NewAgentConversationStartedEvent(
event=NewAgentConversationStartedEvent(
conversation_id=conversation_id,
new_agent_id=self.id,
new_agent_role=self.role,
@@ -741,7 +741,7 @@ class NewAgent(BaseModel):
crewai_event_bus.emit(
self,
NewAgentConversationResetEvent(
event=NewAgentConversationResetEvent(
conversation_id=old_conversation_id,
new_agent_id=self.id,
),
@@ -760,7 +760,7 @@ class NewAgent(BaseModel):
crewai_event_bus.emit(
self,
NewAgentExplainRequestedEvent(new_agent_id=self.id),
event=NewAgentExplainRequestedEvent(new_agent_id=self.id),
)
except Exception:
pass

View File

@@ -239,11 +239,11 @@ class PlanningEngine:
crewai_event_bus.emit(
self.agent,
NewAgentPlanningStartedEvent(new_agent_id=str(self.agent.id)),
event=NewAgentPlanningStartedEvent(new_agent_id=str(self.agent.id)),
)
crewai_event_bus.emit(
self.agent,
NewAgentPlanningCompletedEvent(
event=NewAgentPlanningCompletedEvent(
new_agent_id=str(self.agent.id),
plan_steps_count=len(plan),
),

View File

@@ -458,7 +458,7 @@ class SkillBuilder:
crewai_event_bus.emit(
self.agent,
NewAgentSkillSuggestedEvent(
event=NewAgentSkillSuggestedEvent(
new_agent_id=str(self.agent.id),
skill_name=suggestion.get("name", ""),
source_type=suggestion.get("source", ""),
@@ -474,7 +474,7 @@ class SkillBuilder:
crewai_event_bus.emit(
self.agent,
NewAgentSkillConfirmedEvent(
event=NewAgentSkillConfirmedEvent(
new_agent_id=str(self.agent.id),
skill_name=skill_name,
),
@@ -489,7 +489,7 @@ class SkillBuilder:
crewai_event_bus.emit(
self.agent,
NewAgentSkillRejectedEvent(
event=NewAgentSkillRejectedEvent(
new_agent_id=str(self.agent.id),
skill_name=skill_name,
),

View File

@@ -26,7 +26,7 @@ def _emit_spawn_event(event_cls: type, **kwargs: Any) -> None:
try:
from crewai.events.event_bus import crewai_event_bus
crewai_event_bus.emit(None, event_cls(**kwargs))
crewai_event_bus.emit(None, event=event_cls(**kwargs))
except Exception:
pass

View File

@@ -276,33 +276,26 @@ class TestSpawnEvents:
tool = SpawnSubtaskTool(agent=agent)
emitted_events: list[Any] = []
from crewai.events.event_bus import crewai_event_bus
original_emit = None
try:
from crewai.events.event_bus import crewai_event_bus
original_emit = crewai_event_bus.emit
original_emit = crewai_event_bus.emit
def capture_emit(source: Any, event: Any) -> None:
def capture_emit(source: Any, event: Any = None, **kw: Any) -> Any:
if event is not None:
emitted_events.append(event)
if original_emit:
original_emit(source, event)
return original_emit(source, event=event)
crewai_event_bus.emit = capture_emit
with patch.object(crewai_event_bus, "emit", side_effect=capture_emit):
result = tool._run(subtasks=["Task A"])
# Check that spawn events were emitted
from crewai.new_agent.events import (
NewAgentSpawnStartedEvent,
NewAgentSpawnCompletedEvent,
)
spawn_started = [e for e in emitted_events if isinstance(e, NewAgentSpawnStartedEvent)]
spawn_completed = [e for e in emitted_events if isinstance(e, NewAgentSpawnCompletedEvent)]
from crewai.new_agent.events import (
NewAgentSpawnStartedEvent,
NewAgentSpawnCompletedEvent,
)
spawn_started = [e for e in emitted_events if isinstance(e, NewAgentSpawnStartedEvent)]
assert len(spawn_started) >= 1
assert spawn_started[0].spawn_depth == 1
finally:
if original_emit:
crewai_event_bus.emit = original_emit
assert len(spawn_started) >= 1
assert spawn_started[0].spawn_depth == 1
def test_spawn_provenance_includes_spawn_id(self):
"""Verify the spawn ID is included in provenance entries."""