diff --git a/lib/crewai/src/crewai/new_agent/coworker_tools.py b/lib/crewai/src/crewai/new_agent/coworker_tools.py index 15ada40de..253f61ed3 100644 --- a/lib/crewai/src/crewai/new_agent/coworker_tools.py +++ b/lib/crewai/src/crewai/new_agent/coworker_tools.py @@ -25,7 +25,7 @@ def _emit_delegation_event(event_cls: type, **kwargs: Any) -> None: try: from crewai.events.event_bus import crewai_event_bus - crewai_event_bus.emit(None, event_cls(**kwargs)) + crewai_event_bus.emit(None, event=event_cls(**kwargs)) except Exception: pass diff --git a/lib/crewai/src/crewai/new_agent/dreaming.py b/lib/crewai/src/crewai/new_agent/dreaming.py index ba5d5782e..68eb3cd27 100644 --- a/lib/crewai/src/crewai/new_agent/dreaming.py +++ b/lib/crewai/src/crewai/new_agent/dreaming.py @@ -715,7 +715,7 @@ class DreamingEngine: crewai_event_bus.emit( self.agent, - NewAgentWorkflowProposedEvent( + event=NewAgentWorkflowProposedEvent( new_agent_id=str(self.agent.id), workflow_description=description, ), @@ -746,7 +746,7 @@ class DreamingEngine: crewai_event_bus.emit( self.agent, - NewAgentWorkflowConfirmedEvent(new_agent_id=str(self.agent.id)), + event=NewAgentWorkflowConfirmedEvent(new_agent_id=str(self.agent.id)), ) except Exception: pass @@ -768,7 +768,7 @@ class DreamingEngine: crewai_event_bus.emit( self.agent, - NewAgentDreamingStartedEvent(new_agent_id=str(self.agent.id)), + event=NewAgentDreamingStartedEvent(new_agent_id=str(self.agent.id)), ) except Exception: pass @@ -780,7 +780,7 @@ class DreamingEngine: crewai_event_bus.emit( self.agent, - NewAgentWorkflowDetectedEvent( + event=NewAgentWorkflowDetectedEvent( new_agent_id=str(self.agent.id), tools=workflow.get("tools", []), count=workflow.get("count", 0), @@ -796,7 +796,7 @@ class DreamingEngine: crewai_event_bus.emit( self.agent, - NewAgentDreamingCompletedEvent( + event=NewAgentDreamingCompletedEvent( new_agent_id=str(self.agent.id), memories_processed=result.get("memories_processed", 0), canonical_created=result.get("canonical_created", 0), diff --git a/lib/crewai/src/crewai/new_agent/executor.py b/lib/crewai/src/crewai/new_agent/executor.py index 518676c90..b2a80490f 100644 --- a/lib/crewai/src/crewai/new_agent/executor.py +++ b/lib/crewai/src/crewai/new_agent/executor.py @@ -629,6 +629,15 @@ class ConversationalAgentExecutor(BaseModel): pass return "" + _INTERNAL_TAG_RE = re.compile( + r".*?", re.DOTALL + ) + + def _strip_internal_tags(self, text: str) -> str: + """Strip blocks that leak from the summarization prompt.""" + cleaned = self._INTERNAL_TAG_RE.sub("", text).strip() + return cleaned if cleaned else text + def _detect_artifacts(self, tool_name: str, result_str: str) -> list[Artifact]: """GAP-67: Detect artifacts from tool results. @@ -741,7 +750,7 @@ class ConversationalAgentExecutor(BaseModel): CheckpointEvent, ) - crewai_event_bus.emit(self, CheckpointEvent(data=checkpoint_data)) + crewai_event_bus.emit(self, event=CheckpointEvent(data=checkpoint_data)) except (ImportError, Exception): pass except Exception: @@ -828,7 +837,7 @@ class ConversationalAgentExecutor(BaseModel): try: from crewai.events.event_bus import crewai_event_bus - crewai_event_bus.emit(self, event) + crewai_event_bus.emit(self, event=event) except Exception: pass @@ -1711,6 +1720,8 @@ class ConversationalAgentExecutor(BaseModel): break + response_text = self._strip_internal_tags(response_text) + response_text = await self._run_guardrail(response_text) if self.agent.settings.narration_guard: @@ -2389,13 +2400,47 @@ class ConversationalAgentExecutor(BaseModel): invoke_task = asyncio.create_task(self.ainvoke(user_message)) _streamed_chars = 0 _last_status_time = time.monotonic() + _tag_buf = "" + _suppressing = False + + def _filter_chunk(raw: str) -> str: + """Filter ... blocks from streamed chunks.""" + nonlocal _tag_buf, _suppressing + out = [] + for ch in raw: + if _suppressing: + _tag_buf += ch + if _tag_buf.endswith(""): + _suppressing = False + _tag_buf = "" + continue + if _tag_buf: + _tag_buf += ch + if len(_tag_buf) <= len(""): + if ""[: len(_tag_buf)] == _tag_buf: + if _tag_buf == "": + _suppressing = True + continue + else: + out.append(_tag_buf) + _tag_buf = "" + else: + out.append(_tag_buf) + _tag_buf = "" + elif ch == "<": + _tag_buf = ch + else: + out.append(ch) + return "".join(out) try: while not invoke_task.done(): try: chunk = await asyncio.wait_for(chunk_queue.get(), timeout=0.05) - _streamed_chars += len(chunk) - yield chunk + filtered = _filter_chunk(chunk) + if filtered: + _streamed_chars += len(filtered) + yield filtered now = time.monotonic() if now - _last_status_time >= 0.5: @@ -2406,8 +2451,13 @@ class ConversationalAgentExecutor(BaseModel): while not chunk_queue.empty(): chunk = chunk_queue.get_nowait() - _streamed_chars += len(chunk) - yield chunk + filtered = _filter_chunk(chunk) + if filtered: + _streamed_chars += len(filtered) + yield filtered + + if _tag_buf and not _suppressing: + yield _tag_buf result = invoke_task.result() self._last_stream_result = result diff --git a/lib/crewai/src/crewai/new_agent/knowledge_discovery.py b/lib/crewai/src/crewai/new_agent/knowledge_discovery.py index 934ea9348..7d017110e 100644 --- a/lib/crewai/src/crewai/new_agent/knowledge_discovery.py +++ b/lib/crewai/src/crewai/new_agent/knowledge_discovery.py @@ -177,7 +177,7 @@ class KnowledgeDiscovery: crewai_event_bus.emit( self.agent, - NewAgentKnowledgeSuggestedEvent( + event=NewAgentKnowledgeSuggestedEvent( new_agent_id=str(self.agent.id), source_type=suggestion.get("source_tool", ""), ), @@ -192,7 +192,7 @@ class KnowledgeDiscovery: crewai_event_bus.emit( self.agent, - NewAgentKnowledgeConfirmedEvent(new_agent_id=str(self.agent.id)), + event=NewAgentKnowledgeConfirmedEvent(new_agent_id=str(self.agent.id)), ) except Exception: pass @@ -204,7 +204,7 @@ class KnowledgeDiscovery: crewai_event_bus.emit( self.agent, - NewAgentKnowledgeRejectedEvent(new_agent_id=str(self.agent.id)), + event=NewAgentKnowledgeRejectedEvent(new_agent_id=str(self.agent.id)), ) except Exception: pass diff --git a/lib/crewai/src/crewai/new_agent/new_agent.py b/lib/crewai/src/crewai/new_agent/new_agent.py index 1612c9add..6aa93bc48 100644 --- a/lib/crewai/src/crewai/new_agent/new_agent.py +++ b/lib/crewai/src/crewai/new_agent/new_agent.py @@ -533,7 +533,7 @@ class NewAgent(BaseModel): crewai_event_bus.emit( self, - NewAgentCreatedEvent( + event=NewAgentCreatedEvent( new_agent_id=self.id, new_agent_role=self.role, ), @@ -597,7 +597,7 @@ class NewAgent(BaseModel): crewai_event_bus.emit( self, - NewAgentConversationStartedEvent( + event=NewAgentConversationStartedEvent( conversation_id=conversation_id, new_agent_id=self.id, new_agent_role=self.role, @@ -741,7 +741,7 @@ class NewAgent(BaseModel): crewai_event_bus.emit( self, - NewAgentConversationResetEvent( + event=NewAgentConversationResetEvent( conversation_id=old_conversation_id, new_agent_id=self.id, ), @@ -760,7 +760,7 @@ class NewAgent(BaseModel): crewai_event_bus.emit( self, - NewAgentExplainRequestedEvent(new_agent_id=self.id), + event=NewAgentExplainRequestedEvent(new_agent_id=self.id), ) except Exception: pass diff --git a/lib/crewai/src/crewai/new_agent/planning.py b/lib/crewai/src/crewai/new_agent/planning.py index 02ec5b9ee..356bb0879 100644 --- a/lib/crewai/src/crewai/new_agent/planning.py +++ b/lib/crewai/src/crewai/new_agent/planning.py @@ -239,11 +239,11 @@ class PlanningEngine: crewai_event_bus.emit( self.agent, - NewAgentPlanningStartedEvent(new_agent_id=str(self.agent.id)), + event=NewAgentPlanningStartedEvent(new_agent_id=str(self.agent.id)), ) crewai_event_bus.emit( self.agent, - NewAgentPlanningCompletedEvent( + event=NewAgentPlanningCompletedEvent( new_agent_id=str(self.agent.id), plan_steps_count=len(plan), ), diff --git a/lib/crewai/src/crewai/new_agent/skill_builder.py b/lib/crewai/src/crewai/new_agent/skill_builder.py index e94a08cca..aec20bcab 100644 --- a/lib/crewai/src/crewai/new_agent/skill_builder.py +++ b/lib/crewai/src/crewai/new_agent/skill_builder.py @@ -458,7 +458,7 @@ class SkillBuilder: crewai_event_bus.emit( self.agent, - NewAgentSkillSuggestedEvent( + event=NewAgentSkillSuggestedEvent( new_agent_id=str(self.agent.id), skill_name=suggestion.get("name", ""), source_type=suggestion.get("source", ""), @@ -474,7 +474,7 @@ class SkillBuilder: crewai_event_bus.emit( self.agent, - NewAgentSkillConfirmedEvent( + event=NewAgentSkillConfirmedEvent( new_agent_id=str(self.agent.id), skill_name=skill_name, ), @@ -489,7 +489,7 @@ class SkillBuilder: crewai_event_bus.emit( self.agent, - NewAgentSkillRejectedEvent( + event=NewAgentSkillRejectedEvent( new_agent_id=str(self.agent.id), skill_name=skill_name, ), diff --git a/lib/crewai/src/crewai/new_agent/spawn_tools.py b/lib/crewai/src/crewai/new_agent/spawn_tools.py index 98809ae7d..c7e363f72 100644 --- a/lib/crewai/src/crewai/new_agent/spawn_tools.py +++ b/lib/crewai/src/crewai/new_agent/spawn_tools.py @@ -26,7 +26,7 @@ def _emit_spawn_event(event_cls: type, **kwargs: Any) -> None: try: from crewai.events.event_bus import crewai_event_bus - crewai_event_bus.emit(None, event_cls(**kwargs)) + crewai_event_bus.emit(None, event=event_cls(**kwargs)) except Exception: pass diff --git a/lib/crewai/tests/new_agent/test_gap_implementations.py b/lib/crewai/tests/new_agent/test_gap_implementations.py index 8dd8d637f..b7b4f9a82 100644 --- a/lib/crewai/tests/new_agent/test_gap_implementations.py +++ b/lib/crewai/tests/new_agent/test_gap_implementations.py @@ -276,33 +276,26 @@ class TestSpawnEvents: tool = SpawnSubtaskTool(agent=agent) emitted_events: list[Any] = [] + from crewai.events.event_bus import crewai_event_bus - original_emit = None - try: - from crewai.events.event_bus import crewai_event_bus - original_emit = crewai_event_bus.emit + original_emit = crewai_event_bus.emit - def capture_emit(source: Any, event: Any) -> None: + def capture_emit(source: Any, event: Any = None, **kw: Any) -> Any: + if event is not None: emitted_events.append(event) - if original_emit: - original_emit(source, event) + return original_emit(source, event=event) - crewai_event_bus.emit = capture_emit + with patch.object(crewai_event_bus, "emit", side_effect=capture_emit): result = tool._run(subtasks=["Task A"]) - # Check that spawn events were emitted - from crewai.new_agent.events import ( - NewAgentSpawnStartedEvent, - NewAgentSpawnCompletedEvent, - ) - spawn_started = [e for e in emitted_events if isinstance(e, NewAgentSpawnStartedEvent)] - spawn_completed = [e for e in emitted_events if isinstance(e, NewAgentSpawnCompletedEvent)] + from crewai.new_agent.events import ( + NewAgentSpawnStartedEvent, + NewAgentSpawnCompletedEvent, + ) + spawn_started = [e for e in emitted_events if isinstance(e, NewAgentSpawnStartedEvent)] - assert len(spawn_started) >= 1 - assert spawn_started[0].spawn_depth == 1 - finally: - if original_emit: - crewai_event_bus.emit = original_emit + assert len(spawn_started) >= 1 + assert spawn_started[0].spawn_depth == 1 def test_spawn_provenance_includes_spawn_id(self): """Verify the spawn ID is included in provenance entries."""