Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
ef64d5d0c8 fix: reset messages and iterations in CrewAgentExecutor between task executions
When the same CrewAgentExecutor instance is reused across multiple
sequential tasks, messages and iterations were accumulating instead of
being reset. This caused duplicate system prompts, context window
explosion, and eventually crashes with empty LLM responses.

Reset self.messages and self.iterations at the start of invoke() and
ainvoke() to ensure each task execution starts with a clean slate.

Fixes #4319, #4389, #4415

Co-Authored-By: João <joao@crewai.com>
2026-02-08 08:56:15 +00:00
25 changed files with 2224 additions and 3170 deletions

View File

@@ -65,8 +65,6 @@ body:
- '3.10'
- '3.11'
- '3.12'
- '3.13'
- '3.14'
validations:
required: true
- type: input

View File

@@ -14,18 +14,13 @@ paths-ignore:
- "lib/crewai/src/crewai/experimental/a2a/**"
paths:
# Include GitHub Actions workflows/composite actions for CodeQL actions analysis
- ".github/workflows/**"
- ".github/actions/**"
# Include all Python source code from workspace packages
- "lib/crewai/src/**"
- "lib/crewai-tools/src/**"
- "lib/crewai-files/src/**"
- "lib/devtools/src/**"
# Include tests (but exclude cassettes via paths-ignore)
- "lib/crewai/tests/**"
- "lib/crewai-tools/tests/**"
- "lib/crewai-files/tests/**"
- "lib/devtools/tests/**"
# Configure specific queries or packs if needed

View File

@@ -19,7 +19,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
python-version: ["3.10", "3.11", "3.12", "3.13"]
steps:
- name: Checkout repository
@@ -32,10 +32,6 @@ jobs:
python-version: ${{ matrix.python-version }}
enable-cache: false
- name: Install system build dependencies
if: matrix.python-version == '3.14'
run: sudo apt-get update && sudo apt-get install -y libxml2-dev libxslt-dev libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libavfilter-dev libswscale-dev libswresample-dev libheif-dev
- name: Install dependencies and populate cache
run: |
echo "Building global UV cache for Python ${{ matrix.python-version }}..."

View File

@@ -69,7 +69,7 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v4
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
build-mode: ${{ matrix.build-mode }}
@@ -98,6 +98,6 @@ jobs:
exit 1
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v4
uses: github/codeql-action/analyze@v3
with:
category: "/language:${{matrix.language}}"

View File

@@ -26,15 +26,15 @@ jobs:
~/.cache/uv
~/.local/share/uv
.venv
key: uv-main-py3.12-${{ hashFiles('uv.lock') }}
key: uv-main-py3.11-${{ hashFiles('uv.lock') }}
restore-keys: |
uv-main-py3.12-
uv-main-py3.11-
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
version: "0.8.4"
python-version: "3.12"
python-version: "3.11"
enable-cache: false
- name: Install dependencies
@@ -66,4 +66,4 @@ jobs:
~/.cache/uv
~/.local/share/uv
.venv
key: uv-main-py3.12-${{ hashFiles('uv.lock') }}
key: uv-main-py3.11-${{ hashFiles('uv.lock') }}

View File

@@ -13,8 +13,8 @@ jobs:
strategy:
fail-fast: true
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13', '3.14']
group: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
python-version: ['3.10', '3.11', '3.12', '3.13']
group: [1, 2, 3, 4, 5, 6, 7, 8]
steps:
- name: Checkout code
uses: actions/checkout@v4
@@ -40,10 +40,6 @@ jobs:
python-version: ${{ matrix.python-version }}
enable-cache: false
- name: Install system build dependencies
if: matrix.python-version == '3.14'
run: sudo apt-get update && sudo apt-get install -y libxml2-dev libxslt-dev libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libavfilter-dev libswscale-dev libswresample-dev libheif-dev
- name: Install the project
run: uv sync --all-groups --all-extras
@@ -53,7 +49,7 @@ jobs:
path: .test_durations_py*
key: test-durations-py${{ matrix.python-version }}
- name: Run tests (group ${{ matrix.group }} of 16)
- name: Run tests (group ${{ matrix.group }} of 8)
run: |
PYTHON_VERSION_SAFE=$(echo "${{ matrix.python-version }}" | tr '.' '_')
DURATION_FILE="../../.test_durations_py${PYTHON_VERSION_SAFE}"
@@ -77,17 +73,17 @@ jobs:
cd lib/crewai && uv run pytest \
-vv \
--splits 16 \
--splits 8 \
--group ${{ matrix.group }} \
$DURATIONS_ARG \
--durations=10 \
--maxfail=3
- name: Run tool tests (group ${{ matrix.group }} of 16)
- name: Run tool tests (group ${{ matrix.group }} of 8)
run: |
cd lib/crewai-tools && uv run pytest \
-vv \
--splits 16 \
--splits 8 \
--group ${{ matrix.group }} \
--durations=10 \
--maxfail=3

View File

@@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
python-version: ["3.10", "3.11", "3.12", "3.13"]
steps:
- name: Checkout code
@@ -39,10 +39,6 @@ jobs:
python-version: ${{ matrix.python-version }}
enable-cache: false
- name: Install system build dependencies
if: matrix.python-version == '3.14'
run: sudo apt-get update && sudo apt-get install -y libxml2-dev libxslt-dev libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libavfilter-dev libswscale-dev libswresample-dev libheif-dev
- name: Install dependencies
run: uv sync --all-groups --all-extras

View File

@@ -16,11 +16,11 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13', '3.14']
python-version: ['3.10', '3.11', '3.12', '3.13']
env:
OPENAI_API_KEY: fake-api-key
PYTHONUNBUFFERED: 1
steps:
- name: Checkout repository
uses: actions/checkout@v4
@@ -44,10 +44,6 @@ jobs:
python-version: ${{ matrix.python-version }}
enable-cache: false
- name: Install system build dependencies
if: matrix.python-version == '3.14'
run: sudo apt-get update && sudo apt-get install -y libxml2-dev libxslt-dev libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libavfilter-dev libswscale-dev libswresample-dev libheif-dev
- name: Install the project
run: uv sync --all-groups --all-extras
@@ -72,4 +68,4 @@ jobs:
~/.cache/uv
~/.local/share/uv
.venv
key: uv-main-py${{ matrix.python-version }}-${{ hashFiles('uv.lock') }}
key: uv-main-py${{ matrix.python-version }}-${{ hashFiles('uv.lock') }}

View File

@@ -6,7 +6,7 @@ readme = "README.md"
authors = [
{ name = "Greyson LaLonde", email = "greyson@crewai.com" }
]
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10, <3.14"
dependencies = [
"Pillow~=10.4.0",
"pypdf~=4.0.0",

View File

@@ -6,7 +6,7 @@ readme = "README.md"
authors = [
{ name = "João Moura", email = "joaomdmoura@gmail.com" },
]
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10, <3.14"
dependencies = [
"lancedb~=0.5.4",
"pytube~=15.0.0",
@@ -118,7 +118,7 @@ rag = [
"lxml>=5.3.0,<5.4.0", # Pin to avoid etree import issues in 5.4.0
]
xml = [
"unstructured[local-inference, all-docs]>=0.17.2,<0.18.31"
"unstructured[local-inference, all-docs]>=0.17.2"
]
oxylabs = [
"oxylabs==2.0.0"

View File

@@ -33,11 +33,8 @@ def test_brave_tool_search(mock_get, brave_tool):
mock_get.return_value.json.return_value = mock_response
result = brave_tool.run(query="test")
data = json.loads(result)
assert isinstance(data, list)
assert len(data) >= 1
assert data[0]["title"] == "Test Title"
assert data[0]["url"] == "http://test.com"
assert "Test Title" in result
assert "http://test.com" in result
@patch("requests.get")

View File

@@ -6,7 +6,7 @@ readme = "README.md"
authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10, <3.14"
dependencies = [
# Core Dependencies
"pydantic~=2.11.9",
@@ -14,7 +14,7 @@ dependencies = [
"instructor>=1.3.3",
# Text Processing
"pdfplumber~=0.11.4",
"regex~=2026.1.15",
"regex~=2024.9.11",
# Telemetry and Monitoring
"opentelemetry-api~=1.34.0",
"opentelemetry-sdk~=1.34.0",
@@ -36,7 +36,7 @@ dependencies = [
"json5~=0.10.0",
"portalocker~=2.7.0",
"pydantic-settings~=2.10.1",
"mcp~=1.26.0",
"mcp~=1.23.1",
"uv~=0.9.13",
"aiosqlite~=0.21.0",
]

View File

@@ -210,6 +210,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
Dictionary with agent output.
"""
self.messages = []
self.iterations = 0
self._setup_messages(inputs)
self._inject_multimodal_files(inputs)
@@ -987,6 +989,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
Dictionary with agent output.
"""
self.messages = []
self.iterations = 0
self._setup_messages(inputs)
await self._ainject_multimodal_files(inputs)

View File

@@ -3,7 +3,7 @@ name = "{{folder_name}}"
version = "0.1.0"
description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.9.3"
]

View File

@@ -3,7 +3,7 @@ name = "{{folder_name}}"
version = "0.1.0"
description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.9.3"
]

View File

@@ -3,9 +3,9 @@ name = "{{folder_name}}"
version = "0.1.0"
description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.9.3"
"crewai[tools]>=0.203.1"
]
[tool.crewai]

View File

@@ -187,7 +187,6 @@ class Crew(FlowTrackable, BaseModel):
_task_output_handler: TaskOutputStorageHandler = PrivateAttr(
default_factory=TaskOutputStorageHandler
)
_kickoff_event_id: str | None = PrivateAttr(default=None)
name: str | None = Field(default="crew")
cache: bool = Field(default=True)
@@ -760,11 +759,7 @@ class Crew(FlowTrackable, BaseModel):
except Exception as e:
crewai_event_bus.emit(
self,
CrewKickoffFailedEvent(
error=str(e),
crew_name=self.name,
started_event_id=self._kickoff_event_id,
),
CrewKickoffFailedEvent(error=str(e), crew_name=self.name),
)
raise
finally:
@@ -954,11 +949,7 @@ class Crew(FlowTrackable, BaseModel):
except Exception as e:
crewai_event_bus.emit(
self,
CrewKickoffFailedEvent(
error=str(e),
crew_name=self.name,
started_event_id=self._kickoff_event_id,
),
CrewKickoffFailedEvent(error=str(e), crew_name=self.name),
)
raise
finally:
@@ -1533,7 +1524,6 @@ class Crew(FlowTrackable, BaseModel):
crew_name=self.name,
output=final_task_output,
total_tokens=self.token_usage.total_tokens,
started_event_id=self._kickoff_event_id,
),
)

View File

@@ -265,9 +265,10 @@ def prepare_kickoff(
normalized = {}
normalized = before_callback(normalized)
started_event = CrewKickoffStartedEvent(crew_name=crew.name, inputs=normalized)
crew._kickoff_event_id = started_event.event_id
future = crewai_event_bus.emit(crew, started_event)
future = crewai_event_bus.emit(
crew,
CrewKickoffStartedEvent(crew_name=crew.name, inputs=normalized),
)
if future is not None:
try:
future.result()

View File

@@ -1696,99 +1696,6 @@ class OpenAICompletion(BaseLLM):
return content
def _finalize_streaming_response(
self,
full_response: str,
tool_calls: dict[int, dict[str, Any]],
usage_data: dict[str, int],
params: dict[str, Any],
available_functions: dict[str, Any] | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
) -> str | list[dict[str, Any]]:
"""Finalize a streaming response with usage tracking, tool call handling, and events.
Args:
full_response: The accumulated text response from the stream.
tool_calls: Accumulated tool calls from the stream, keyed by index.
usage_data: Token usage data from the stream.
params: The completion parameters containing messages.
available_functions: Available functions for tool calling.
from_task: Task that initiated the call.
from_agent: Agent that initiated the call.
Returns:
Tool calls list when tools were invoked without available_functions,
tool execution result when available_functions is provided,
or the text response string.
"""
self._track_token_usage_internal(usage_data)
if tool_calls and not available_functions:
tool_calls_list = [
{
"id": call_data["id"],
"type": "function",
"function": {
"name": call_data["name"],
"arguments": call_data["arguments"],
},
"index": call_data["index"],
}
for call_data in tool_calls.values()
]
self._emit_call_completed_event(
response=tool_calls_list,
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return tool_calls_list
if tool_calls and available_functions:
for call_data in tool_calls.values():
function_name = call_data["name"]
arguments = call_data["arguments"]
if not function_name or not arguments:
continue
if function_name not in available_functions:
logging.warning(
f"Function '{function_name}' not found in available functions"
)
continue
try:
function_args = json.loads(arguments)
except json.JSONDecodeError as e:
logging.error(f"Failed to parse streamed tool arguments: {e}")
continue
result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
if result is not None:
return result
full_response = self._apply_stop_words(full_response)
self._emit_call_completed_event(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return full_response
def _handle_streaming_completion(
self,
params: dict[str, Any],
@@ -1796,7 +1703,7 @@ class OpenAICompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str | list[dict[str, Any]] | BaseModel:
) -> str | BaseModel:
"""Handle streaming chat completion."""
full_response = ""
tool_calls: dict[int, dict[str, Any]] = {}
@@ -1913,20 +1820,54 @@ class OpenAICompletion(BaseLLM):
response_id=response_id_stream,
)
result = self._finalize_streaming_response(
full_response=full_response,
tool_calls=tool_calls,
usage_data=usage_data,
params=params,
available_functions=available_functions,
self._track_token_usage_internal(usage_data)
if tool_calls and available_functions:
for call_data in tool_calls.values():
function_name = call_data["name"]
arguments = call_data["arguments"]
# Skip if function name is empty or arguments are empty
if not function_name or not arguments:
continue
# Check if function exists in available functions
if function_name not in available_functions:
logging.warning(
f"Function '{function_name}' not found in available functions"
)
continue
try:
function_args = json.loads(arguments)
except json.JSONDecodeError as e:
logging.error(f"Failed to parse streamed tool arguments: {e}")
continue
result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
if result is not None:
return result
full_response = self._apply_stop_words(full_response)
self._emit_call_completed_event(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return self._invoke_after_llm_call_hooks(
params["messages"], full_response, from_agent
)
if isinstance(result, str):
return self._invoke_after_llm_call_hooks(
params["messages"], result, from_agent
)
return result
async def _ahandle_completion(
self,
@@ -2075,7 +2016,7 @@ class OpenAICompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str | list[dict[str, Any]] | BaseModel:
) -> str | BaseModel:
"""Handle async streaming chat completion."""
full_response = ""
tool_calls: dict[int, dict[str, Any]] = {}
@@ -2201,16 +2142,51 @@ class OpenAICompletion(BaseLLM):
response_id=response_id_stream,
)
return self._finalize_streaming_response(
full_response=full_response,
tool_calls=tool_calls,
usage_data=usage_data,
params=params,
available_functions=available_functions,
self._track_token_usage_internal(usage_data)
if tool_calls and available_functions:
for call_data in tool_calls.values():
function_name = call_data["name"]
arguments = call_data["arguments"]
if not function_name or not arguments:
continue
if function_name not in available_functions:
logging.warning(
f"Function '{function_name}' not found in available functions"
)
continue
try:
function_args = json.loads(arguments)
except json.JSONDecodeError as e:
logging.error(f"Failed to parse streamed tool arguments: {e}")
continue
result = self._handle_tool_execution(
function_name=function_name,
function_args=function_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
if result is not None:
return result
full_response = self._apply_stop_words(full_response)
self._emit_call_completed_event(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return full_response
def supports_function_calling(self) -> bool:
"""Check if the model supports function calling."""
return not self.is_o1_model

View File

@@ -0,0 +1,270 @@
"""Tests for CrewAgentExecutor state reset between task executions.
Verifies that messages and iterations are properly reset when invoke() or
ainvoke() is called multiple times on the same executor instance, preventing
context pollution across sequential tasks (issues #4319, #4389, #4415).
"""
import asyncio
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.agents.parser import AgentFinish
@pytest.fixture
def mock_llm() -> MagicMock:
llm = MagicMock()
llm.supports_stop_words.return_value = True
llm.stop = []
return llm
@pytest.fixture
def mock_agent() -> MagicMock:
agent = MagicMock()
agent.role = "Test Agent"
agent.key = "test_agent_key"
agent.verbose = False
agent.id = "test_agent_id"
return agent
@pytest.fixture
def mock_task() -> MagicMock:
task = MagicMock()
task.description = "Test task"
task.human_input = False
task.response_model = None
task.id = "test_task_id"
return task
@pytest.fixture
def mock_crew() -> MagicMock:
crew = MagicMock()
crew.verbose = False
crew._train = False
crew.id = "test_crew_id"
return crew
@pytest.fixture
def mock_tools_handler() -> MagicMock:
return MagicMock()
@pytest.fixture
def executor(
mock_llm: MagicMock,
mock_agent: MagicMock,
mock_task: MagicMock,
mock_crew: MagicMock,
mock_tools_handler: MagicMock,
) -> CrewAgentExecutor:
return CrewAgentExecutor(
llm=mock_llm,
task=mock_task,
crew=mock_crew,
agent=mock_agent,
prompt={"system": "You are a helpful agent.", "user": "Task: {input} Tools: {tool_names} {tools}"},
max_iter=5,
tools=[],
tools_names="",
stop_words=["Observation:"],
tools_description="",
tools_handler=mock_tools_handler,
)
class TestCrewAgentExecutorStateReset:
"""Tests that CrewAgentExecutor resets messages and iterations on each invoke."""
def test_invoke_resets_messages(self, executor: CrewAgentExecutor) -> None:
"""Messages from a previous invoke must not leak into the next one."""
executor.messages = [
{"role": "system", "content": "old system prompt"},
{"role": "user", "content": "old task prompt"},
{"role": "assistant", "content": "old response"},
]
executor.iterations = 7
with patch.object(
executor,
"_invoke_loop",
return_value=AgentFinish(thought="done", output="result", text="Final Answer: result"),
):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
executor.invoke({"input": "new task", "tool_names": "", "tools": ""})
system_msgs = [m for m in executor.messages if m.get("role") == "system"]
assert len(system_msgs) == 1, (
f"Expected exactly 1 system message after invoke, got {len(system_msgs)}"
)
def test_invoke_resets_iterations(self, executor: CrewAgentExecutor) -> None:
"""Iterations must be reset to 0 at the start of each invoke."""
executor.iterations = 42
with patch.object(
executor,
"_invoke_loop",
return_value=AgentFinish(thought="done", output="result", text="done"),
):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
executor.invoke({"input": "task", "tool_names": "", "tools": ""})
def test_sequential_invokes_no_message_accumulation(
self, executor: CrewAgentExecutor
) -> None:
"""Calling invoke multiple times must not accumulate messages."""
message_counts: list[int] = []
original_invoke_loop = executor._invoke_loop
def capture_messages_then_finish() -> AgentFinish:
message_counts.append(len(executor.messages))
return AgentFinish(thought="done", output="result", text="done")
with patch.object(executor, "_invoke_loop", side_effect=capture_messages_then_finish):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
for i in range(3):
executor.invoke(
{"input": f"task {i}", "tool_names": "", "tools": ""}
)
assert len(message_counts) == 3
assert message_counts[0] == message_counts[1] == message_counts[2], (
f"Message counts should be equal across invocations, got {message_counts}"
)
def test_sequential_invokes_no_duplicate_system_messages(
self, executor: CrewAgentExecutor
) -> None:
"""Each invoke must contain exactly one system message, not N."""
system_msg_counts: list[int] = []
def capture_and_finish() -> AgentFinish:
count = sum(1 for m in executor.messages if m.get("role") == "system")
system_msg_counts.append(count)
return AgentFinish(thought="done", output="ok", text="ok")
with patch.object(executor, "_invoke_loop", side_effect=capture_and_finish):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
for _ in range(5):
executor.invoke({"input": "task", "tool_names": "", "tools": ""})
assert all(c == 1 for c in system_msg_counts), (
f"Expected 1 system message per invoke, got {system_msg_counts}"
)
class TestCrewAgentExecutorAsyncStateReset:
"""Tests that ainvoke also resets messages and iterations."""
@pytest.mark.asyncio
async def test_ainvoke_resets_messages(self, executor: CrewAgentExecutor) -> None:
"""Messages from a previous ainvoke must not leak into the next one."""
executor.messages = [
{"role": "system", "content": "old system prompt"},
{"role": "user", "content": "old task prompt"},
{"role": "assistant", "content": "old response"},
]
executor.iterations = 7
with patch.object(
executor,
"_ainvoke_loop",
new_callable=AsyncMock,
return_value=AgentFinish(thought="done", output="result", text="done"),
):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
await executor.ainvoke(
{"input": "new task", "tool_names": "", "tools": ""}
)
system_msgs = [m for m in executor.messages if m.get("role") == "system"]
assert len(system_msgs) == 1, (
f"Expected exactly 1 system message after ainvoke, got {len(system_msgs)}"
)
@pytest.mark.asyncio
async def test_ainvoke_resets_iterations(self, executor: CrewAgentExecutor) -> None:
"""Iterations must be reset to 0 at the start of each ainvoke."""
executor.iterations = 42
with patch.object(
executor,
"_ainvoke_loop",
new_callable=AsyncMock,
return_value=AgentFinish(thought="done", output="result", text="done"),
):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
await executor.ainvoke(
{"input": "task", "tool_names": "", "tools": ""}
)
@pytest.mark.asyncio
async def test_sequential_ainvokes_no_message_accumulation(
self, executor: CrewAgentExecutor
) -> None:
"""Calling ainvoke multiple times must not accumulate messages."""
message_counts: list[int] = []
async def capture_and_finish() -> AgentFinish:
message_counts.append(len(executor.messages))
return AgentFinish(thought="done", output="result", text="done")
with patch.object(executor, "_ainvoke_loop", side_effect=capture_and_finish):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
for i in range(3):
await executor.ainvoke(
{"input": f"task {i}", "tool_names": "", "tools": ""}
)
assert len(message_counts) == 3
assert message_counts[0] == message_counts[1] == message_counts[2], (
f"Message counts should be equal across invocations, got {message_counts}"
)
@pytest.mark.asyncio
async def test_sequential_ainvokes_no_duplicate_system_messages(
self, executor: CrewAgentExecutor
) -> None:
"""Each ainvoke must contain exactly one system message, not N."""
system_msg_counts: list[int] = []
async def capture_and_finish() -> AgentFinish:
count = sum(1 for m in executor.messages if m.get("role") == "system")
system_msg_counts.append(count)
return AgentFinish(thought="done", output="ok", text="ok")
with patch.object(executor, "_ainvoke_loop", side_effect=capture_and_finish):
with patch.object(executor, "_create_short_term_memory"):
with patch.object(executor, "_create_long_term_memory"):
with patch.object(executor, "_create_external_memory"):
for _ in range(5):
await executor.ainvoke(
{"input": "task", "tool_names": "", "tools": ""}
)
assert all(c == 1 for c in system_msg_counts), (
f"Expected 1 system message per ainvoke, got {system_msg_counts}"
)

View File

@@ -230,7 +230,7 @@ class TestDeployCommand(unittest.TestCase):
[project]
name = "test_project"
version = "0.1.0"
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10,<3.14"
dependencies = ["crewai"]
""",
)
@@ -249,7 +249,7 @@ class TestDeployCommand(unittest.TestCase):
[project]
name = "test_project"
version = "0.1.0"
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10,<3.14"
dependencies = ["crewai"]
""",
)

View File

@@ -1,7 +1,6 @@
import os
import sys
import types
from typing import Any
from unittest.mock import patch, MagicMock
import openai
import pytest
@@ -1579,167 +1578,3 @@ def test_openai_structured_output_preserves_json_with_stop_word_patterns():
assert "Action:" in result.action_taken
assert "Observation:" in result.observation_result
assert "Final Answer:" in result.final_answer
def test_openai_streaming_returns_tool_calls_without_available_functions():
"""Test that streaming returns tool calls list when available_functions is None.
This mirrors the non-streaming path where tool_calls are returned for
the executor to handle. Reproduces the bug where streaming with tool
calls would return empty text instead of tool_calls when
available_functions was not provided (as the crew executor does).
"""
llm = LLM(model="openai/gpt-4o-mini", stream=True)
mock_chunk_1 = MagicMock()
mock_chunk_1.choices = [MagicMock()]
mock_chunk_1.choices[0].delta = MagicMock()
mock_chunk_1.choices[0].delta.content = None
mock_chunk_1.choices[0].delta.tool_calls = [MagicMock()]
mock_chunk_1.choices[0].delta.tool_calls[0].index = 0
mock_chunk_1.choices[0].delta.tool_calls[0].id = "call_abc123"
mock_chunk_1.choices[0].delta.tool_calls[0].function = MagicMock()
mock_chunk_1.choices[0].delta.tool_calls[0].function.name = "calculator"
mock_chunk_1.choices[0].delta.tool_calls[0].function.arguments = '{"expr'
mock_chunk_1.choices[0].finish_reason = None
mock_chunk_1.usage = None
mock_chunk_1.id = "chatcmpl-1"
mock_chunk_2 = MagicMock()
mock_chunk_2.choices = [MagicMock()]
mock_chunk_2.choices[0].delta = MagicMock()
mock_chunk_2.choices[0].delta.content = None
mock_chunk_2.choices[0].delta.tool_calls = [MagicMock()]
mock_chunk_2.choices[0].delta.tool_calls[0].index = 0
mock_chunk_2.choices[0].delta.tool_calls[0].id = None
mock_chunk_2.choices[0].delta.tool_calls[0].function = MagicMock()
mock_chunk_2.choices[0].delta.tool_calls[0].function.name = None
mock_chunk_2.choices[0].delta.tool_calls[0].function.arguments = 'ession": "1+1"}'
mock_chunk_2.choices[0].finish_reason = None
mock_chunk_2.usage = None
mock_chunk_2.id = "chatcmpl-1"
mock_chunk_3 = MagicMock()
mock_chunk_3.choices = [MagicMock()]
mock_chunk_3.choices[0].delta = MagicMock()
mock_chunk_3.choices[0].delta.content = None
mock_chunk_3.choices[0].delta.tool_calls = None
mock_chunk_3.choices[0].finish_reason = "tool_calls"
mock_chunk_3.usage = MagicMock()
mock_chunk_3.usage.prompt_tokens = 10
mock_chunk_3.usage.completion_tokens = 5
mock_chunk_3.id = "chatcmpl-1"
with patch.object(
llm.client.chat.completions, "create", return_value=iter([mock_chunk_1, mock_chunk_2, mock_chunk_3])
):
result = llm.call(
messages=[{"role": "user", "content": "Calculate 1+1"}],
tools=[{
"type": "function",
"function": {
"name": "calculator",
"description": "Calculate expression",
"parameters": {"type": "object", "properties": {"expression": {"type": "string"}}},
},
}],
available_functions=None,
)
assert isinstance(result, list), f"Expected list of tool calls, got {type(result)}: {result}"
assert len(result) == 1
assert result[0]["function"]["name"] == "calculator"
assert result[0]["function"]["arguments"] == '{"expression": "1+1"}'
assert result[0]["id"] == "call_abc123"
assert result[0]["type"] == "function"
@pytest.mark.asyncio
async def test_openai_async_streaming_returns_tool_calls_without_available_functions():
"""Test that async streaming returns tool calls list when available_functions is None.
Same as the sync test but for the async path (_ahandle_streaming_completion).
"""
llm = LLM(model="openai/gpt-4o-mini", stream=True)
mock_chunk_1 = MagicMock()
mock_chunk_1.choices = [MagicMock()]
mock_chunk_1.choices[0].delta = MagicMock()
mock_chunk_1.choices[0].delta.content = None
mock_chunk_1.choices[0].delta.tool_calls = [MagicMock()]
mock_chunk_1.choices[0].delta.tool_calls[0].index = 0
mock_chunk_1.choices[0].delta.tool_calls[0].id = "call_abc123"
mock_chunk_1.choices[0].delta.tool_calls[0].function = MagicMock()
mock_chunk_1.choices[0].delta.tool_calls[0].function.name = "calculator"
mock_chunk_1.choices[0].delta.tool_calls[0].function.arguments = '{"expr'
mock_chunk_1.choices[0].finish_reason = None
mock_chunk_1.usage = None
mock_chunk_1.id = "chatcmpl-1"
mock_chunk_2 = MagicMock()
mock_chunk_2.choices = [MagicMock()]
mock_chunk_2.choices[0].delta = MagicMock()
mock_chunk_2.choices[0].delta.content = None
mock_chunk_2.choices[0].delta.tool_calls = [MagicMock()]
mock_chunk_2.choices[0].delta.tool_calls[0].index = 0
mock_chunk_2.choices[0].delta.tool_calls[0].id = None
mock_chunk_2.choices[0].delta.tool_calls[0].function = MagicMock()
mock_chunk_2.choices[0].delta.tool_calls[0].function.name = None
mock_chunk_2.choices[0].delta.tool_calls[0].function.arguments = 'ession": "1+1"}'
mock_chunk_2.choices[0].finish_reason = None
mock_chunk_2.usage = None
mock_chunk_2.id = "chatcmpl-1"
mock_chunk_3 = MagicMock()
mock_chunk_3.choices = [MagicMock()]
mock_chunk_3.choices[0].delta = MagicMock()
mock_chunk_3.choices[0].delta.content = None
mock_chunk_3.choices[0].delta.tool_calls = None
mock_chunk_3.choices[0].finish_reason = "tool_calls"
mock_chunk_3.usage = MagicMock()
mock_chunk_3.usage.prompt_tokens = 10
mock_chunk_3.usage.completion_tokens = 5
mock_chunk_3.id = "chatcmpl-1"
class MockAsyncStream:
"""Async iterator that mimics OpenAI's async streaming response."""
def __init__(self, chunks: list[Any]) -> None:
self._chunks = chunks
self._index = 0
def __aiter__(self) -> "MockAsyncStream":
return self
async def __anext__(self) -> Any:
if self._index >= len(self._chunks):
raise StopAsyncIteration
chunk = self._chunks[self._index]
self._index += 1
return chunk
async def mock_create(**kwargs: Any) -> MockAsyncStream:
return MockAsyncStream([mock_chunk_1, mock_chunk_2, mock_chunk_3])
with patch.object(
llm.async_client.chat.completions, "create", side_effect=mock_create
):
result = await llm.acall(
messages=[{"role": "user", "content": "Calculate 1+1"}],
tools=[{
"type": "function",
"function": {
"name": "calculator",
"description": "Calculate expression",
"parameters": {"type": "object", "properties": {"expression": {"type": "string"}}},
},
}],
available_functions=None,
)
assert isinstance(result, list), f"Expected list of tool calls, got {type(result)}: {result}"
assert len(result) == 1
assert result[0]["function"]["name"] == "calculator"
assert result[0]["function"]["arguments"] == '{"expression": "1+1"}'
assert result[0]["id"] == "call_abc123"
assert result[0]["type"] == "function"

View File

@@ -6,7 +6,7 @@ readme = "README.md"
authors = [
{ name = "Greyson R. LaLonde", email = "greyson@crewai.com" },
]
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10, <3.14"
classifiers = ["Private :: Do Not Upload"]
private = true
dependencies = [

View File

@@ -1,7 +1,7 @@
name = "crewai-workspace"
description = "Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks."
readme = "README.md"
requires-python = ">=3.10,<3.15"
requires-python = ">=3.10,<3.14"
authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
@@ -143,11 +143,6 @@ python_classes = "Test*"
python_functions = "test_*"
[tool.uv]
constraint-dependencies = [
"onnxruntime<1.24; python_version < '3.11'",
]
[tool.uv.workspace]
members = [
"lib/crewai",

4637
uv.lock generated

File diff suppressed because it is too large Load Diff