mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-22 19:02:37 +00:00
Compare commits
1 Commits
devin/1772
...
devin/1772
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c2b151bd30 |
@@ -138,12 +138,24 @@ def fetch_agent_card(
|
||||
ttl_hash = int(time.time() // cache_ttl)
|
||||
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
|
||||
|
||||
coro = afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
# Already inside an event loop (e.g. Jupyter notebook) - run the
|
||||
# coroutine in a separate thread with its own event loop.
|
||||
import concurrent.futures
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(asyncio.run, coro)
|
||||
return future.result()
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(
|
||||
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||
)
|
||||
return loop.run_until_complete(coro)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
@@ -158,13 +158,10 @@ def execute_a2a_delegation(
|
||||
) -> TaskStateResult:
|
||||
"""Execute a task delegation to a remote A2A agent synchronously.
|
||||
|
||||
WARNING: This function blocks the entire thread by creating and running a new
|
||||
event loop. Prefer using 'await aexecute_a2a_delegation()' in async contexts
|
||||
for better performance and resource efficiency.
|
||||
|
||||
This is a synchronous wrapper around aexecute_a2a_delegation that creates a
|
||||
new event loop to run the async implementation. It is provided for compatibility
|
||||
with synchronous code paths only.
|
||||
This is a synchronous wrapper around aexecute_a2a_delegation. When no event
|
||||
loop is running it creates a new one; when called from within an existing
|
||||
event loop (e.g. Jupyter notebooks) it transparently runs the coroutine in
|
||||
a separate thread to avoid "cannot run nested event loop" errors.
|
||||
|
||||
Args:
|
||||
endpoint: A2A agent endpoint URL (AgentCard URL).
|
||||
@@ -194,51 +191,51 @@ def execute_a2a_delegation(
|
||||
|
||||
Returns:
|
||||
TaskStateResult with status, result/error, history, and agent_card.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If called from an async context with a running event loop.
|
||||
"""
|
||||
coro = aexecute_a2a_delegation(
|
||||
endpoint=endpoint,
|
||||
auth=auth,
|
||||
timeout=timeout,
|
||||
task_description=task_description,
|
||||
context=context,
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
reference_task_ids=reference_task_ids,
|
||||
metadata=metadata,
|
||||
extensions=extensions,
|
||||
conversation_history=conversation_history,
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
agent_branch=agent_branch,
|
||||
response_model=response_model,
|
||||
turn_number=turn_number,
|
||||
updates=updates,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
skill_id=skill_id,
|
||||
client_extensions=client_extensions,
|
||||
transport=transport,
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=input_files,
|
||||
)
|
||||
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
raise RuntimeError(
|
||||
"execute_a2a_delegation() cannot be called from an async context. "
|
||||
"Use 'await aexecute_a2a_delegation()' instead."
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "no running event loop" not in str(e).lower():
|
||||
raise
|
||||
# Already inside an event loop (e.g. Jupyter notebook) - run the
|
||||
# coroutine in a separate thread with its own event loop so we don't
|
||||
# block or conflict with the running one.
|
||||
import concurrent.futures
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(asyncio.run, coro)
|
||||
return future.result()
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(
|
||||
aexecute_a2a_delegation(
|
||||
endpoint=endpoint,
|
||||
auth=auth,
|
||||
timeout=timeout,
|
||||
task_description=task_description,
|
||||
context=context,
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
reference_task_ids=reference_task_ids,
|
||||
metadata=metadata,
|
||||
extensions=extensions,
|
||||
conversation_history=conversation_history,
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
agent_branch=agent_branch,
|
||||
response_model=response_model,
|
||||
turn_number=turn_number,
|
||||
updates=updates,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
skill_id=skill_id,
|
||||
client_extensions=client_extensions,
|
||||
transport=transport,
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=input_files,
|
||||
)
|
||||
)
|
||||
return loop.run_until_complete(coro)
|
||||
finally:
|
||||
try:
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
|
||||
@@ -214,13 +214,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
Returns:
|
||||
Dictionary with agent output.
|
||||
"""
|
||||
# Reset execution state for a fresh execution.
|
||||
# When the same executor instance is reused across sequential tasks,
|
||||
# stale messages and iteration counts must be cleared to prevent
|
||||
# context pollution and premature max-iteration exits.
|
||||
self.messages = []
|
||||
self.iterations = 0
|
||||
|
||||
self._setup_messages(inputs)
|
||||
|
||||
self._inject_multimodal_files(inputs)
|
||||
@@ -1122,13 +1115,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
Returns:
|
||||
Dictionary with agent output.
|
||||
"""
|
||||
# Reset execution state for a fresh execution.
|
||||
# When the same executor instance is reused across sequential tasks,
|
||||
# stale messages and iteration counts must be cleared to prevent
|
||||
# context pollution and premature max-iteration exits.
|
||||
self.messages = []
|
||||
self.iterations = 0
|
||||
|
||||
self._setup_messages(inputs)
|
||||
|
||||
await self._ainject_multimodal_files(inputs)
|
||||
|
||||
110
lib/crewai/tests/a2a/utils/test_agent_card_event_loop.py
Normal file
110
lib/crewai/tests/a2a/utils/test_agent_card_event_loop.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""Tests for fetch_agent_card sync wrapper when a running event loop exists.
|
||||
|
||||
Covers the same class of issue as #4671 — ``fetch_agent_card()`` must work
|
||||
even when called from within an already-running event loop (e.g. Jupyter).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
from a2a.types import AgentCapabilities, AgentCard
|
||||
|
||||
from crewai.a2a.utils.agent_card import fetch_agent_card
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _mock_agent_card() -> AgentCard:
|
||||
"""Return a minimal ``AgentCard`` for mocking."""
|
||||
return AgentCard(
|
||||
name="Test Agent",
|
||||
description="A test agent",
|
||||
url="http://localhost:9999",
|
||||
version="1.0.0",
|
||||
capabilities=AgentCapabilities(streaming=False),
|
||||
default_input_modes=["text"],
|
||||
default_output_modes=["text"],
|
||||
skills=[],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFetchAgentCardEventLoop:
|
||||
"""Verify fetch_agent_card works with and without a running loop."""
|
||||
|
||||
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
|
||||
def test_works_without_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Normal case: no running event loop, should succeed directly."""
|
||||
expected = _mock_agent_card()
|
||||
mock_async_fn.return_value = expected
|
||||
|
||||
result = fetch_agent_card(
|
||||
endpoint="http://localhost:9999",
|
||||
use_cache=False,
|
||||
)
|
||||
|
||||
assert result.name == "Test Agent"
|
||||
mock_async_fn.assert_called_once()
|
||||
|
||||
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
|
||||
def test_works_inside_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Regression test: must not raise when a loop is already running."""
|
||||
expected = _mock_agent_card()
|
||||
mock_async_fn.return_value = expected
|
||||
|
||||
result_holder: list[Any] = []
|
||||
error_holder: list[Exception] = []
|
||||
|
||||
async def _call_sync_from_async() -> None:
|
||||
try:
|
||||
res = fetch_agent_card(
|
||||
endpoint="http://localhost:9999",
|
||||
use_cache=False,
|
||||
)
|
||||
result_holder.append(res)
|
||||
except Exception as exc:
|
||||
error_holder.append(exc)
|
||||
|
||||
asyncio.run(_call_sync_from_async())
|
||||
|
||||
assert not error_holder, f"Unexpected error: {error_holder[0]}"
|
||||
assert len(result_holder) == 1
|
||||
assert result_holder[0].name == "Test Agent"
|
||||
|
||||
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
|
||||
def test_propagates_errors_inside_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Errors should propagate even when called from a running loop."""
|
||||
mock_async_fn.side_effect = ConnectionError("cannot reach agent")
|
||||
|
||||
error_holder: list[Exception] = []
|
||||
|
||||
async def _call_sync_from_async() -> None:
|
||||
try:
|
||||
fetch_agent_card(
|
||||
endpoint="http://localhost:9999",
|
||||
use_cache=False,
|
||||
)
|
||||
except Exception as exc:
|
||||
error_holder.append(exc)
|
||||
|
||||
asyncio.run(_call_sync_from_async())
|
||||
|
||||
assert len(error_holder) == 1
|
||||
assert isinstance(error_holder[0], ConnectionError)
|
||||
assert "cannot reach agent" in str(error_holder[0])
|
||||
129
lib/crewai/tests/a2a/utils/test_delegation_event_loop.py
Normal file
129
lib/crewai/tests/a2a/utils/test_delegation_event_loop.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""Tests for A2A delegation sync wrappers when a running event loop exists.
|
||||
|
||||
Covers the fix for https://github.com/crewAIInc/crewAI/issues/4671 where
|
||||
``execute_a2a_delegation()`` raised ``RuntimeError`` when called from an
|
||||
environment that already has a running event loop (e.g. Jupyter notebooks).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import threading
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.a2a.task_helpers import TaskStateResult
|
||||
from crewai.a2a.utils.delegation import execute_a2a_delegation
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _minimal_task_state_result(**overrides: Any) -> TaskStateResult:
|
||||
"""Return a minimal ``TaskStateResult`` dict for mocking."""
|
||||
base: TaskStateResult = {
|
||||
"status": "completed",
|
||||
"result": "mocked result",
|
||||
"error": None,
|
||||
"history": [],
|
||||
"agent_card": None,
|
||||
}
|
||||
base.update(overrides) # type: ignore[typeddict-item]
|
||||
return base
|
||||
|
||||
|
||||
_DELEGATION_KWARGS: dict[str, Any] = dict(
|
||||
endpoint="http://localhost:9999/.well-known/agent-card.json",
|
||||
auth=None,
|
||||
timeout=30,
|
||||
task_description="test task",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExecuteA2ADelegationEventLoop:
|
||||
"""Verify execute_a2a_delegation works with and without a running loop."""
|
||||
|
||||
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
|
||||
def test_works_without_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Normal case: no running event loop, should succeed directly."""
|
||||
expected = _minimal_task_state_result()
|
||||
mock_async_fn.return_value = expected
|
||||
|
||||
result = execute_a2a_delegation(**_DELEGATION_KWARGS)
|
||||
|
||||
assert result["status"] == "completed"
|
||||
assert result["result"] == "mocked result"
|
||||
mock_async_fn.assert_called_once()
|
||||
|
||||
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
|
||||
def test_works_inside_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Regression test for #4671: must not raise when a loop is running.
|
||||
|
||||
Simulates the Jupyter notebook environment by calling the sync wrapper
|
||||
from within an already-running event loop.
|
||||
"""
|
||||
expected = _minimal_task_state_result()
|
||||
mock_async_fn.return_value = expected
|
||||
|
||||
result_holder: list[Any] = []
|
||||
error_holder: list[Exception] = []
|
||||
|
||||
async def _call_sync_from_async() -> None:
|
||||
"""Call the sync function from within a running event loop."""
|
||||
try:
|
||||
# This must NOT raise RuntimeError anymore
|
||||
res = execute_a2a_delegation(**_DELEGATION_KWARGS)
|
||||
result_holder.append(res)
|
||||
except Exception as exc:
|
||||
error_holder.append(exc)
|
||||
|
||||
# Run inside an event loop, simulating a Jupyter notebook
|
||||
asyncio.run(_call_sync_from_async())
|
||||
|
||||
assert not error_holder, f"Unexpected error: {error_holder[0]}"
|
||||
assert len(result_holder) == 1
|
||||
assert result_holder[0]["status"] == "completed"
|
||||
assert result_holder[0]["result"] == "mocked result"
|
||||
|
||||
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
|
||||
def test_propagates_errors_from_async_fn(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Errors from the underlying async function should propagate."""
|
||||
mock_async_fn.side_effect = ConnectionError("remote agent down")
|
||||
|
||||
with pytest.raises(ConnectionError, match="remote agent down"):
|
||||
execute_a2a_delegation(**_DELEGATION_KWARGS)
|
||||
|
||||
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
|
||||
def test_propagates_errors_inside_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Errors should propagate even when called from a running loop."""
|
||||
mock_async_fn.side_effect = ConnectionError("remote agent down")
|
||||
|
||||
error_holder: list[Exception] = []
|
||||
|
||||
async def _call_sync_from_async() -> None:
|
||||
try:
|
||||
execute_a2a_delegation(**_DELEGATION_KWARGS)
|
||||
except Exception as exc:
|
||||
error_holder.append(exc)
|
||||
|
||||
asyncio.run(_call_sync_from_async())
|
||||
|
||||
assert len(error_holder) == 1
|
||||
assert isinstance(error_holder[0], ConnectionError)
|
||||
assert "remote agent down" in str(error_holder[0])
|
||||
@@ -1,252 +0,0 @@
|
||||
"""Tests for CrewAgentExecutor state reset between task executions.
|
||||
|
||||
Verifies that messages and iterations are properly cleared when the same
|
||||
executor instance is reused across sequential tasks, preventing context
|
||||
pollution and premature max-iteration exits.
|
||||
|
||||
Related issues: #4319, #4389, #4661
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
from crewai.agents.parser import AgentFinish
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _make_executor() -> callable:
|
||||
"""Factory fixture that builds a CrewAgentExecutor with mocked deps."""
|
||||
|
||||
def _factory(**overrides: Any) -> CrewAgentExecutor:
|
||||
llm = Mock()
|
||||
llm.supports_stop_words.return_value = True
|
||||
llm.stop = []
|
||||
llm.supports_function_calling.return_value = False
|
||||
|
||||
task = Mock()
|
||||
task.id = "task-1"
|
||||
task.description = "Test task"
|
||||
task.human_input = False
|
||||
task.response_model = None
|
||||
task.output_pydantic = None
|
||||
task.output_json = None
|
||||
|
||||
crew = MagicMock()
|
||||
crew.id = "crew-1"
|
||||
crew._memory = None
|
||||
crew._train = False
|
||||
crew.verbose = False
|
||||
|
||||
agent = MagicMock()
|
||||
agent.id = "agent-1"
|
||||
agent.role = "Tester"
|
||||
agent.verbose = False
|
||||
agent.key = "tester-key"
|
||||
agent.security_config = MagicMock()
|
||||
agent.security_config.fingerprint = "fp-123"
|
||||
|
||||
tools_handler = Mock()
|
||||
tools_handler.cache = None
|
||||
|
||||
defaults: dict[str, Any] = {
|
||||
"llm": llm,
|
||||
"task": task,
|
||||
"crew": crew,
|
||||
"agent": agent,
|
||||
"prompt": {"system": "You are a helper.", "user": "Do {input}"},
|
||||
"max_iter": 10,
|
||||
"tools": [],
|
||||
"tools_names": "",
|
||||
"stop_words": ["Observation:"],
|
||||
"tools_description": "",
|
||||
"tools_handler": tools_handler,
|
||||
}
|
||||
defaults.update(overrides)
|
||||
return CrewAgentExecutor(**defaults)
|
||||
|
||||
return _factory
|
||||
|
||||
|
||||
class TestCrewAgentExecutorStateReset:
|
||||
"""Ensure invoke() and ainvoke() reset execution state."""
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Synchronous invoke
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.get_all_files",
|
||||
return_value=None,
|
||||
)
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.get_llm_response",
|
||||
return_value="Final Answer: done",
|
||||
)
|
||||
def test_invoke_resets_messages_between_calls(
|
||||
self,
|
||||
_mock_llm_response: Mock,
|
||||
_mock_files: Mock,
|
||||
_make_executor: callable,
|
||||
) -> None:
|
||||
"""Messages list should be fresh on every invoke() call."""
|
||||
executor = _make_executor()
|
||||
|
||||
inputs: dict[str, str] = {"input": "task-one", "tool_names": "", "tools": ""}
|
||||
|
||||
# First invocation
|
||||
executor.invoke(inputs)
|
||||
|
||||
# Messages should have been populated during the first run
|
||||
msgs_after_first = list(executor.messages)
|
||||
assert len(msgs_after_first) > 0
|
||||
|
||||
# Second invocation (simulating reuse for a different task)
|
||||
executor.invoke(inputs)
|
||||
|
||||
# The messages list should NOT contain leftovers from the first run.
|
||||
# It should start fresh with only messages from the second invocation.
|
||||
# Specifically, there should be exactly one system and one user message
|
||||
# from _setup_messages, plus any assistant messages from the loop.
|
||||
system_msgs = [m for m in executor.messages if m.get("role") == "system"]
|
||||
assert len(system_msgs) == 1, (
|
||||
f"Expected exactly 1 system message after second invoke, "
|
||||
f"got {len(system_msgs)}"
|
||||
)
|
||||
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.get_all_files",
|
||||
return_value=None,
|
||||
)
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.get_llm_response",
|
||||
return_value="Final Answer: done",
|
||||
)
|
||||
def test_invoke_resets_iterations_between_calls(
|
||||
self,
|
||||
_mock_llm_response: Mock,
|
||||
_mock_files: Mock,
|
||||
_make_executor: callable,
|
||||
) -> None:
|
||||
"""Iterations counter should reset to 0 on every invoke() call."""
|
||||
executor = _make_executor()
|
||||
|
||||
inputs: dict[str, str] = {"input": "task-one", "tool_names": "", "tools": ""}
|
||||
|
||||
executor.invoke(inputs)
|
||||
assert executor.iterations > 0, "iterations should have incremented"
|
||||
|
||||
# Second invocation
|
||||
executor.invoke(inputs)
|
||||
|
||||
# iterations should have been reset and only reflect the second run
|
||||
# (for a single-pass answer it should be 1)
|
||||
assert executor.iterations == 1, (
|
||||
f"Expected iterations == 1 after reset, got {executor.iterations}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Asynchronous ainvoke
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.aget_all_files",
|
||||
return_value=None,
|
||||
)
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.aget_llm_response",
|
||||
return_value="Final Answer: done",
|
||||
)
|
||||
async def test_ainvoke_resets_messages_between_calls(
|
||||
self,
|
||||
_mock_llm_response: Mock,
|
||||
_mock_files: Mock,
|
||||
_make_executor: callable,
|
||||
) -> None:
|
||||
"""Messages list should be fresh on every ainvoke() call."""
|
||||
executor = _make_executor()
|
||||
|
||||
inputs: dict[str, str] = {"input": "task-one", "tool_names": "", "tools": ""}
|
||||
|
||||
await executor.ainvoke(inputs)
|
||||
|
||||
msgs_after_first = list(executor.messages)
|
||||
assert len(msgs_after_first) > 0
|
||||
|
||||
await executor.ainvoke(inputs)
|
||||
|
||||
system_msgs = [m for m in executor.messages if m.get("role") == "system"]
|
||||
assert len(system_msgs) == 1, (
|
||||
f"Expected exactly 1 system message after second ainvoke, "
|
||||
f"got {len(system_msgs)}"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.aget_all_files",
|
||||
return_value=None,
|
||||
)
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.aget_llm_response",
|
||||
return_value="Final Answer: done",
|
||||
)
|
||||
async def test_ainvoke_resets_iterations_between_calls(
|
||||
self,
|
||||
_mock_llm_response: Mock,
|
||||
_mock_files: Mock,
|
||||
_make_executor: callable,
|
||||
) -> None:
|
||||
"""Iterations counter should reset to 0 on every ainvoke() call."""
|
||||
executor = _make_executor()
|
||||
|
||||
inputs: dict[str, str] = {"input": "task-one", "tool_names": "", "tools": ""}
|
||||
|
||||
await executor.ainvoke(inputs)
|
||||
assert executor.iterations > 0
|
||||
|
||||
await executor.ainvoke(inputs)
|
||||
|
||||
assert executor.iterations == 1, (
|
||||
f"Expected iterations == 1 after reset, got {executor.iterations}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Regression: multiple sequential tasks via the same executor
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.get_all_files",
|
||||
return_value=None,
|
||||
)
|
||||
@patch(
|
||||
"crewai.agents.crew_agent_executor.get_llm_response",
|
||||
return_value="Final Answer: result",
|
||||
)
|
||||
def test_no_context_leak_across_three_sequential_invokes(
|
||||
self,
|
||||
_mock_llm_response: Mock,
|
||||
_mock_files: Mock,
|
||||
_make_executor: callable,
|
||||
) -> None:
|
||||
"""Simulates an agent reused across 3 sequential tasks.
|
||||
|
||||
After each invoke the message count should be consistent and not
|
||||
grow unboundedly.
|
||||
"""
|
||||
executor = _make_executor()
|
||||
inputs: dict[str, str] = {"input": "task", "tool_names": "", "tools": ""}
|
||||
|
||||
message_counts: list[int] = []
|
||||
for _ in range(3):
|
||||
executor.invoke(inputs)
|
||||
message_counts.append(len(executor.messages))
|
||||
|
||||
# All three runs should produce the same number of messages
|
||||
assert message_counts[0] == message_counts[1] == message_counts[2], (
|
||||
f"Message counts diverged across sequential invokes: {message_counts}"
|
||||
)
|
||||
Reference in New Issue
Block a user