Compare commits

..

3 Commits

Author SHA1 Message Date
Devin AI
4747a8263a fix: streaming handler returns tool_calls when available_functions is None
Mirrors _handle_non_streaming_response's explicit path: when tool_calls
exist but available_functions is None, return the tool_calls to the caller
instead of returning an empty full_response string.

Co-Authored-By: João <joao@crewai.com>
2026-03-04 12:54:33 +00:00
Devin AI
992321f679 fix: also skip response_model passthrough to litellm when tools are present
Addresses review feedback: when tools are present, response_model must not
be passed to litellm.completion/acompletion either, because litellm uses
instructor internally which would override the tools parameter.

Added 2 more tests that explicitly verify response_model is NOT in the
kwargs sent to litellm when tools are present.

Co-Authored-By: João <joao@crewai.com>
2026-03-04 12:48:53 +00:00
Devin AI
10384152d3 fix: prevent tools from being discarded when response_model is set (#4697)
When both native function calling (tools) and output_pydantic (response_model)
are enabled, InternalInstructor was intercepting the LLM call and creating its
own completion without passing tools. This caused agent tools to be silently
discarded.

The fix checks whether tools are present in params before routing to
InternalInstructor. When tools exist, the normal litellm.completion path is
used so the LLM can see and call the agent's tools. InternalInstructor is
still used when no tools are present (backward compatible).

Applied to all three response handlers:
- _handle_non_streaming_response
- _ahandle_non_streaming_response
- _handle_streaming_response

Co-Authored-By: João <joao@crewai.com>
2026-03-04 12:36:59 +00:00
42 changed files with 1773 additions and 3720 deletions

View File

@@ -1,127 +0,0 @@
name: Nightly Canary Release
on:
schedule:
- cron: '0 6 * * *' # daily at 6am UTC
workflow_dispatch:
jobs:
check:
name: Check for new commits
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
has_changes: ${{ steps.check.outputs.has_changes }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Check for commits in last 24h
id: check
run: |
RECENT=$(git log --since="24 hours ago" --oneline | head -1)
if [ -n "$RECENT" ]; then
echo "has_changes=true" >> "$GITHUB_OUTPUT"
else
echo "has_changes=false" >> "$GITHUB_OUTPUT"
fi
build:
name: Build nightly packages
needs: check
if: needs.check.outputs.has_changes == 'true' || github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Stamp nightly versions
run: |
DATE=$(date +%Y%m%d)
for init_file in \
lib/crewai/src/crewai/__init__.py \
lib/crewai-tools/src/crewai_tools/__init__.py \
lib/crewai-files/src/crewai_files/__init__.py; do
CURRENT=$(python -c "
import re
text = open('$init_file').read()
print(re.search(r'__version__\s*=\s*\"(.*?)\"\s*$', text, re.MULTILINE).group(1))
")
NIGHTLY="${CURRENT}.dev${DATE}"
sed -i "s/__version__ = .*/__version__ = \"${NIGHTLY}\"/" "$init_file"
echo "$init_file: $CURRENT -> $NIGHTLY"
done
# Update cross-package dependency pins to nightly versions
sed -i "s/\"crewai-tools==[^\"]*\"/\"crewai-tools==${NIGHTLY}\"/" lib/crewai/pyproject.toml
sed -i "s/\"crewai==[^\"]*\"/\"crewai==${NIGHTLY}\"/" lib/crewai-tools/pyproject.toml
echo "Updated cross-package dependency pins to ${NIGHTLY}"
- name: Build packages
run: |
uv build --all-packages
rm dist/.gitignore
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: dist
path: dist/
publish:
name: Publish nightly to PyPI
needs: build
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/crewai
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
version: "0.8.4"
python-version: "3.12"
enable-cache: false
- name: Download artifacts
uses: actions/download-artifact@v4
with:
name: dist
path: dist
- name: Publish to PyPI
env:
UV_PUBLISH_TOKEN: ${{ secrets.PYPI_API_TOKEN }}
run: |
failed=0
for package in dist/*; do
if [[ "$package" == *"crewai_devtools"* ]]; then
echo "Skipping private package: $package"
continue
fi
echo "Publishing $package"
if ! uv publish "$package"; then
echo "Failed to publish $package"
failed=1
fi
done
if [ $failed -eq 1 ]; then
echo "Some packages failed to publish"
exit 1
fi

File diff suppressed because it is too large Load Diff

View File

@@ -4,38 +4,6 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Mar 04, 2026">
## v1.10.1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.10.1)
## What's Changed
### Features
- Upgrade Gemini GenAI
### Bug Fixes
- Adjust executor listener value to avoid recursion
- Group parallel function response parts in a single Content object in Gemini
- Surface thought output from thinking models in Gemini
- Load MCP and platform tools when agent tools are None
- Support Jupyter environments with running event loops in A2A
- Use anonymous ID for ephemeral traces
- Conditionally pass plus header
- Skip signal handler registration in non-main threads for telemetry
- Inject tool errors as observations and resolve name collisions
- Upgrade pypdf from 4.x to 6.7.4 to resolve Dependabot alerts
- Resolve critical and high Dependabot security alerts
### Documentation
- Sync Composio tool documentation across locales
## Contributors
@giulio-leone, @greysonlalonde, @haxzie, @joaomdmoura, @lorenzejay, @mattatcha, @mplachta, @nicoferdi96
</Update>
<Update label="Feb 27, 2026">
## v1.10.1a1

View File

@@ -4,38 +4,6 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 3월 4일">
## v1.10.1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.10.1)
## 변경 사항
### 기능
- Gemini GenAI 업그레이드
### 버그 수정
- 재귀를 피하기 위해 실행기 리스너 값을 조정
- Gemini에서 병렬 함수 응답 부분을 단일 Content 객체로 그룹화
- Gemini에서 사고 모델의 사고 출력을 표시
- 에이전트 도구가 None일 때 MCP 및 플랫폼 도구 로드
- A2A에서 실행 이벤트 루프가 있는 Jupyter 환경 지원
- 일시적인 추적을 위해 익명 ID 사용
- 조건부로 플러스 헤더 전달
- 원격 측정을 위해 비주 스레드에서 신호 처리기 등록 건너뛰기
- 도구 오류를 관찰로 주입하고 이름 충돌 해결
- Dependabot 경고를 해결하기 위해 pypdf를 4.x에서 6.7.4로 업그레이드
- 심각 및 높은 Dependabot 보안 경고 해결
### 문서
- Composio 도구 문서를 지역별로 동기화
## 기여자
@giulio-leone, @greysonlalonde, @haxzie, @joaomdmoura, @lorenzejay, @mattatcha, @mplachta, @nicoferdi96
</Update>
<Update label="2026년 2월 27일">
## v1.10.1a1

View File

@@ -4,38 +4,6 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="04 mar 2026">
## v1.10.1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.10.1)
## O que mudou
### Recursos
- Atualizar Gemini GenAI
### Correções de Bugs
- Ajustar o valor do listener do executor para evitar recursão
- Agrupar partes da resposta da função paralela em um único objeto Content no Gemini
- Exibir a saída de pensamento dos modelos de pensamento no Gemini
- Carregar ferramentas MCP e da plataforma quando as ferramentas do agente forem None
- Suportar ambientes Jupyter com loops de eventos em A2A
- Usar ID anônimo para rastreamentos efêmeros
- Passar condicionalmente o cabeçalho plus
- Ignorar o registro do manipulador de sinal em threads não principais para telemetria
- Injetar erros de ferramentas como observações e resolver colisões de nomes
- Atualizar pypdf de 4.x para 6.7.4 para resolver alertas do Dependabot
- Resolver alertas de segurança críticos e altos do Dependabot
### Documentação
- Sincronizar a documentação da ferramenta Composio entre locais
## Contribuidores
@giulio-leone, @greysonlalonde, @haxzie, @joaomdmoura, @lorenzejay, @mattatcha, @mplachta, @nicoferdi96
</Update>
<Update label="27 fev 2026">
## v1.10.1a1

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.10.1"
__version__ = "1.10.1a1"

View File

@@ -11,7 +11,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.10.1",
"crewai==1.10.1a1",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",
@@ -108,7 +108,7 @@ stagehand = [
"stagehand>=0.4.1",
]
github = [
"gitpython>=3.1.41,<4",
"gitpython==3.1.38",
"PyGithub==1.59.1",
]
rag = [

View File

@@ -291,4 +291,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.10.1"
__version__ = "1.10.1a1"

View File

@@ -53,7 +53,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.10.1",
"crewai-tools==1.10.1a1",
]
embeddings = [
"tiktoken~=0.8.0"
@@ -105,9 +105,6 @@ a2a = [
file-processing = [
"crewai-files",
]
pickling = [
'cloudpickle~=3.1.2'
]
[project.scripts]

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.10.1"
__version__ = "1.10.1a1"
_telemetry_submitted = False

View File

@@ -1268,7 +1268,7 @@ class Agent(BaseAgent):
),
)
start_time = time.time()
matches = agent_memory.recall(formatted_messages, limit=20)
matches = agent_memory.recall(formatted_messages, limit=5)
memory_block = ""
if matches:
memory_block = "Relevant memories:\n" + "\n".join(

View File

@@ -30,9 +30,12 @@ class CrewAgentExecutorMixin:
memory = getattr(self.agent, "memory", None) or (
getattr(self.crew, "_memory", None) if self.crew else None
)
if memory is None or not self.task or memory.read_only:
if memory is None or not self.task or getattr(memory, "_read_only", False):
return
if f"Action: {sanitize_tool_name('Delegate work to coworker')}" in output.text:
if (
f"Action: {sanitize_tool_name('Delegate work to coworker')}"
in output.text
):
return
try:
raw = (
@@ -45,4 +48,6 @@ class CrewAgentExecutorMixin:
if extracted:
memory.remember_many(extracted, agent_role=self.agent.role)
except Exception as e:
self.agent._logger.log("error", f"Failed to save to memory: {e}")
self.agent._logger.log(
"error", f"Failed to save to memory: {e}"
)

View File

@@ -8,7 +8,6 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable
import contextvars
from concurrent.futures import ThreadPoolExecutor, as_completed
import inspect
import logging
@@ -756,7 +755,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(
contextvars.copy_context().run,
self._execute_single_native_tool_call,
call_id=call_id,
func_name=func_name,

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.1"
"crewai[tools]==1.10.1a1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.1"
"crewai[tools]==1.10.1a1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.10.1"
"crewai[tools]==1.10.1a1"
]
[tool.crewai]

View File

@@ -35,7 +35,6 @@ from typing_extensions import Self
if TYPE_CHECKING:
from crewai_files import FileInput
from opentelemetry.trace import Span
try:
from crewai_files import get_supported_content_types
@@ -66,10 +65,8 @@ from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.events.listeners.tracing.utils import (
has_user_declined_tracing,
set_tracing_enabled,
should_enable_tracing,
should_suppress_tracing_messages,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
@@ -86,10 +83,7 @@ from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.memory.memory_scope import MemoryScope, MemorySlice
from crewai.memory.unified_memory import Memory
from crewai.process import Process
from crewai.rag.embeddings.factory import build_embedder
from crewai.rag.embeddings.types import EmbedderConfig
from crewai.rag.types import SearchResult
from crewai.security.fingerprint import Fingerprint
@@ -100,8 +94,6 @@ from crewai.tasks.task_output import TaskOutput
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.agent_tools.read_file_tool import ReadFileTool
from crewai.tools.base_tool import BaseTool
from crewai.tools.memory_tools import create_memory_tools
from crewai.types.callable import SerializableCallable
from crewai.types.streaming import CrewStreamingOutput
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
@@ -173,12 +165,12 @@ class Crew(FlowTrackable, BaseModel):
"""
__hash__ = object.__hash__
_execution_span: Span | None = PrivateAttr(default=None)
_execution_span: Any = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default_factory=CacheHandler)
_memory: Memory | MemoryScope | MemorySlice | None = PrivateAttr(default=None)
_memory: Any = PrivateAttr(default=None) # Unified Memory | MemoryScope
_train: bool | None = PrivateAttr(default=False)
_train_iteration: int | None = PrivateAttr()
_inputs: dict[str, Any] | None = PrivateAttr(default=None)
@@ -196,7 +188,7 @@ class Crew(FlowTrackable, BaseModel):
agents: list[BaseAgent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential)
verbose: bool = Field(default=False)
memory: bool | Memory | MemoryScope | MemorySlice = Field(
memory: bool | Any = Field(
default=False,
description=(
"Enable crew memory. Pass True for default Memory(), "
@@ -211,23 +203,23 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
manager_llm: str | InstanceOf[BaseLLM] | None = Field(
manager_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
description="Language model that will run the agent.", default=None
)
manager_agent: BaseAgent | None = Field(
description="Custom agent that will be used as manager.", default=None
)
function_calling_llm: str | InstanceOf[BaseLLM] | None = Field(
function_calling_llm: str | InstanceOf[LLM] | Any | None = Field(
description="Language model that will run the agent.", default=None
)
config: Json[dict[str, Any]] | dict[str, Any] | None = Field(default=None)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
share_crew: bool | None = Field(default=False)
step_callback: SerializableCallable | None = Field(
step_callback: Any | None = Field(
default=None,
description="Callback to be executed after each step for all agents execution.",
)
task_callback: SerializableCallable | None = Field(
task_callback: Any | None = Field(
default=None,
description="Callback to be executed after each task for all agents execution.",
)
@@ -270,7 +262,7 @@ class Crew(FlowTrackable, BaseModel):
default=False,
description="Plan the crew execution and add the plan to the crew.",
)
planning_llm: str | InstanceOf[BaseLLM] | None = Field(
planning_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
default=None,
description=(
"Language model that will run the AgentPlanner if planning is True."
@@ -291,7 +283,7 @@ class Crew(FlowTrackable, BaseModel):
"knowledge object."
),
)
chat_llm: str | InstanceOf[BaseLLM] | None = Field(
chat_llm: str | InstanceOf[BaseLLM] | Any | None = Field(
default=None,
description="LLM used to handle chatting with the crew.",
)
@@ -364,8 +356,12 @@ class Crew(FlowTrackable, BaseModel):
def create_crew_memory(self) -> Crew:
"""Initialize unified memory, respecting crew embedder config."""
if self.memory is True:
from crewai.memory.unified_memory import Memory
embedder = None
if self.embedder is not None:
from crewai.rag.embeddings.factory import build_embedder
embedder = build_embedder(self.embedder)
self._memory = Memory(embedder=embedder)
elif self.memory:
@@ -1415,7 +1411,7 @@ class Crew(FlowTrackable, BaseModel):
return tools
def _add_memory_tools(
self, tools: list[BaseTool], memory: Memory | MemoryScope | MemorySlice
self, tools: list[BaseTool], memory: Any
) -> list[BaseTool]:
"""Add recall and remember tools when memory is available.
@@ -1426,6 +1422,8 @@ class Crew(FlowTrackable, BaseModel):
Returns:
Updated list with memory tools added.
"""
from crewai.tools.memory_tools import create_memory_tools
return self._merge_tools(tools, create_memory_tools(memory))
def _add_file_tools(
@@ -2008,6 +2006,11 @@ class Crew(FlowTrackable, BaseModel):
@staticmethod
def _show_tracing_disabled_message() -> None:
"""Show a message when tracing is disabled."""
from crewai.events.listeners.tracing.utils import (
has_user_declined_tracing,
should_suppress_tracing_messages,
)
if should_suppress_tracing_messages():
return

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import asyncio
import contextvars
from collections.abc import Callable, Coroutine
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
@@ -303,7 +302,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
super().__init__(
suppress_flow_events=True,
tracing=current_tracing if current_tracing else None,
max_method_calls=self.max_iter * 10,
)
self._flow_initialized = True
@@ -405,7 +403,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._setup_native_tools()
return "initialized"
@listen("max_iterations_exceeded")
@listen("force_final_answer")
def force_final_answer(self) -> Literal["agent_finished"]:
"""Force agent to provide final answer when max iterations exceeded."""
formatted_answer = handle_max_iterations_exceeded(
@@ -657,11 +655,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "tool_result_is_final"
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message_post: LLMMessage = {
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message_post)
self.state.messages.append(reasoning_message)
return "tool_completed"
@@ -729,7 +727,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
max_workers = min(8, len(runnable_tool_calls))
with ThreadPoolExecutor(max_workers=max_workers) as pool:
future_to_idx = {
pool.submit(contextvars.copy_context().run, self._execute_single_native_tool_call, tool_call): idx
pool.submit(self._execute_single_native_tool_call, tool_call): idx
for idx, tool_call in enumerate(runnable_tool_calls)
}
ordered_results: list[dict[str, Any] | None] = [None] * len(
@@ -888,10 +886,9 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
call_id, func_name, func_args = info
# Parse arguments
parsed_args, parse_error = parse_tool_call_args(func_args, func_name, call_id)
args_dict, parse_error = parse_tool_call_args(func_args, func_name, call_id)
if parse_error is not None:
return parse_error
args_dict: dict[str, Any] = parsed_args or {}
# Get agent_key for event tracking
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
@@ -1110,11 +1107,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
def check_max_iterations(
self,
) -> Literal[
"max_iterations_exceeded", "continue_reasoning", "continue_reasoning_native"
"force_final_answer", "continue_reasoning", "continue_reasoning_native"
]:
"""Check if max iterations reached before proceeding with reasoning."""
if has_reached_max_iterations(self.state.iterations, self.max_iter):
return "max_iterations_exceeded"
return "force_final_answer"
if self.state.use_native_tools:
return "continue_reasoning_native"
return "continue_reasoning"

View File

@@ -17,12 +17,9 @@ from collections.abc import (
ValuesView,
)
from concurrent.futures import Future, ThreadPoolExecutor
import contextvars
import copy
from datetime import datetime
import enum
import inspect
import json
import logging
import threading
from typing import (
@@ -52,7 +49,6 @@ from crewai.events.event_context import (
reset_last_event_id,
triggered_by_scope,
)
from crewai.events.event_listener import event_listener
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
@@ -65,27 +61,16 @@ from crewai.events.listeners.tracing.utils import (
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowInputReceivedEvent,
FlowInputRequestedEvent,
FlowPausedEvent,
FlowPlotEvent,
FlowStartedEvent,
HumanFeedbackReceivedEvent,
HumanFeedbackRequestedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionPausedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow.async_feedback.providers import ConsoleProvider
from crewai.flow.async_feedback.types import HumanFeedbackPending
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_config import flow_config
from crewai.flow.flow_context import (
current_flow_id,
current_flow_method_name,
current_flow_request_id,
)
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
@@ -95,9 +80,6 @@ from crewai.flow.flow_wrappers import (
SimpleFlowCondition,
StartMethod,
)
from crewai.flow.human_feedback import HumanFeedbackResult
from crewai.flow.input_provider import InputResponse
from crewai.flow.persistence import SQLiteFlowPersistence
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import (
FlowExecutionData,
@@ -116,18 +98,14 @@ from crewai.flow.utils import (
is_flow_method_name,
is_simple_flow_condition,
)
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.i18n import get_i18n
if TYPE_CHECKING:
from crewai_files import FileInput
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.input_provider import InputProvider
from crewai.memory.memory_scope import MemoryScope, MemorySlice
from crewai.memory.unified_memory import Memory
from crewai.flow.human_feedback import HumanFeedbackResult
from crewai.llms.base_llm import BaseLLM
from crewai.flow.visualization import build_flow_structure, render_interactive
from crewai.types.streaming import CrewStreamingOutput, FlowStreamingOutput
@@ -714,7 +692,6 @@ class FlowMeta(type):
condition_type = getattr(
attr_value, "__condition_type__", OR_CONDITION
)
if (
hasattr(attr_value, "__trigger_condition__")
and attr_value.__trigger_condition__ is not None
@@ -775,8 +752,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
name: str | None = None
tracing: bool | None = None
stream: bool = False
memory: Memory | MemoryScope | MemorySlice | None = None
input_provider: InputProvider | None = None
memory: Any = (
None # Memory | MemoryScope | MemorySlice | None; auto-created if not set
)
input_provider: Any = None # InputProvider | None; per-flow override for self.ask()
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]:
class _FlowGeneric(cls): # type: ignore
@@ -790,7 +769,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
persistence: FlowPersistence | None = None,
tracing: bool | None = None,
suppress_flow_events: bool = False,
max_method_calls: int = 100,
**kwargs: Any,
) -> None:
"""Initialize a new Flow instance.
@@ -799,7 +777,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
persistence: Optional persistence backend for storing flow states
tracing: Whether to enable tracing. True=always enable, False=always disable, None=check environment/user settings
suppress_flow_events: Whether to suppress flow event emissions (internal use)
max_method_calls: Maximum times a single method can be called per execution before raising RecursionError
**kwargs: Additional state values to initialize or override
"""
# Initialize basic instance attributes
@@ -815,8 +792,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._completed_methods: set[FlowMethodName] = (
set()
) # Track completed methods for reload
self._method_call_counts: dict[FlowMethodName, int] = {}
self._max_method_calls = max_method_calls
self._persistence: FlowPersistence | None = persistence
self._is_execution_resuming: bool = False
self._event_futures: list[Future[None]] = []
@@ -905,13 +880,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""
if self.memory is None:
raise ValueError("No memory configured for this flow")
from crewai.memory.unified_memory import Memory
if isinstance(content, list) and isinstance(self.memory, Memory):
return self.memory.remember_many(content, **kwargs)
if isinstance(content, list):
return [self.memory.remember(c, **kwargs) for c in content]
return self.memory.remember_many(content, **kwargs)
return self.memory.remember(content, **kwargs)
def extract_memories(self, content: str) -> list[str]:
@@ -1140,6 +1110,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
```
"""
if persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
persistence = SQLiteFlowPersistence()
# Load pending feedback context and state
@@ -1252,6 +1224,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
Raises:
ValueError: If no pending feedback context exists
"""
from datetime import datetime
from crewai.flow.human_feedback import HumanFeedbackResult
if self._pending_feedback_context is None:
raise ValueError(
"No pending feedback context. Use from_pending() to restore a paused flow."
@@ -1334,9 +1310,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
except Exception as e:
# Check if flow was paused again for human feedback (loop case)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
self._persistence = SQLiteFlowPersistence()
state_data = (
@@ -1739,6 +1719,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
@@ -1807,6 +1789,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
result_holder.append(e)
else:
@@ -1844,7 +1828,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._clear_or_listeners()
self._method_call_counts.clear()
else:
# Only enter resumption mode if there are completed methods to
# replay. When _completed_methods is empty (e.g. a pure
@@ -1931,9 +1914,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
await asyncio.gather(*tasks)
except Exception as e:
# Check if flow was paused for human feedback
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
self._persistence = SQLiteFlowPersistence()
state_data = (
@@ -2169,6 +2156,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Set method name in context so ask() can read it without
# stack inspection. Must happen before copy_context() so the
# value propagates into the thread pool for sync methods.
from crewai.flow.flow_context import current_flow_method_name
method_name_token = current_flow_method_name.set(method_name)
try:
if asyncio.iscoroutinefunction(method):
@@ -2176,6 +2165,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
else:
# Run sync methods in thread pool for isolation
# This allows Agent.kickoff() to work synchronously inside Flow methods
import contextvars
ctx = contextvars.copy_context()
result = await asyncio.to_thread(ctx.run, method, *args, **kwargs)
finally:
@@ -2209,11 +2200,15 @@ class Flow(Generic[T], metaclass=FlowMeta):
return result, finished_event_id
except Exception as e:
# Check if this is a HumanFeedbackPending exception (paused, not failed)
from crewai.flow.async_feedback.types import HumanFeedbackPending
if isinstance(e, HumanFeedbackPending):
e.context.method_name = method_name
# Auto-save pending feedback (create default persistence if needed)
if self._persistence is None:
from crewai.flow.persistence import SQLiteFlowPersistence
self._persistence = SQLiteFlowPersistence()
# Emit paused event (not failed)
@@ -2574,16 +2569,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
- Skips execution if method was already completed (e.g., after reload)
- Catches and logs any exceptions during execution, preventing individual listener failures from breaking the entire flow
"""
count = self._method_call_counts.get(listener_name, 0) + 1
if count > self._max_method_calls:
raise RecursionError(
f"Method '{listener_name}' has been called {self._max_method_calls} times in "
f"this flow execution, which indicates an infinite loop. "
f"This commonly happens when a @listen label matches the "
f"method's own name."
)
self._method_call_counts[listener_name] = count
if listener_name in self._completed_methods:
if self._is_execution_resuming:
# During resumption, skip execution but continue listeners
@@ -2645,6 +2630,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
except Exception as e:
# Don't log HumanFeedbackPending as an error - it's expected control flow
from crewai.flow.async_feedback.types import HumanFeedbackPending
if not isinstance(e, HumanFeedbackPending):
logger.error(f"Error executing listener {listener_name}: {e}")
raise
@@ -2662,6 +2649,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
An object implementing the ``InputProvider`` protocol.
"""
from crewai.flow.async_feedback.providers import ConsoleProvider
from crewai.flow.flow_config import flow_config
if self.input_provider is not None:
return self.input_provider
if flow_config.input_provider is not None:
@@ -2747,6 +2737,19 @@ class Flow(Generic[T], metaclass=FlowMeta):
return topic
```
"""
from concurrent.futures import (
ThreadPoolExecutor,
TimeoutError as FuturesTimeoutError,
)
from datetime import datetime
from crewai.events.types.flow_events import (
FlowInputReceivedEvent,
FlowInputRequestedEvent,
)
from crewai.flow.flow_context import current_flow_method_name
from crewai.flow.input_provider import InputResponse
method_name = current_flow_method_name.get("unknown")
# Emit input requested event
@@ -2777,7 +2780,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
try:
raw = future.result(timeout=timeout)
except TimeoutError:
except FuturesTimeoutError:
future.cancel()
raw = None
finally:
@@ -2850,6 +2853,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
The human's feedback as a string. Empty string if no feedback provided.
"""
from crewai.events.event_listener import event_listener
from crewai.events.types.flow_events import (
HumanFeedbackReceivedEvent,
HumanFeedbackRequestedEvent,
)
# Emit feedback requested event
crewai_event_bus.emit(
self,
@@ -2923,10 +2932,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
One of the outcome strings that best matches the feedback intent.
"""
llm_instance: BaseLLM
from typing import Literal
from pydantic import BaseModel, Field
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
from crewai.utilities.i18n import get_i18n
llm_instance: BaseLLMClass
if isinstance(llm, str):
llm_instance = LLM(model=llm)
elif isinstance(llm, BaseLLM):
elif isinstance(llm, BaseLLMClass):
llm_instance = llm
else:
raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.")
@@ -2959,6 +2976,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
if isinstance(response, str):
import json
try:
parsed = json.loads(response)
return str(parsed.get("outcome", outcomes[0]))
@@ -3023,6 +3042,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
This method uses the centralized Rich console formatter for output
and the standard logging module for log level support.
"""
from crewai.events.event_listener import event_listener
event_listener.formatter.console.print(message, style=color)
if level == "info":
logger.info(message)

View File

@@ -600,7 +600,7 @@ class LiteAgent(FlowTrackable, BaseModel):
def _save_to_memory(self, output_text: str) -> None:
"""Extract discrete memories from the run and remember each. No-op if _memory is None or read-only."""
if self._memory is None or self._memory.read_only:
if self._memory is None or getattr(self._memory, "_read_only", False):
return
input_str = self._get_last_user_content() or "User request"
try:

View File

@@ -967,8 +967,17 @@ class LLM(BaseLLM):
self._track_token_usage_internal(usage_info)
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# If there are tool calls but no available functions, return them
# so the caller (e.g., executor) can handle tool execution.
# This mirrors _handle_non_streaming_response's explicit path.
if tool_calls and not available_functions:
return tool_calls
if not tool_calls or not available_functions:
if response_model and self.is_litellm:
# Only use InternalInstructor for structured output when there
# are no tools — otherwise tools would be silently discarded.
has_tools = bool(params.get("tools"))
if response_model and self.is_litellm and not has_tools:
instructor_instance = InternalInstructor(
content=full_response,
model=response_model,
@@ -1150,7 +1159,10 @@ class LLM(BaseLLM):
str: The response text
"""
# --- 1) Handle response_model with InternalInstructor for LiteLLM
if response_model and self.is_litellm:
# Skip InternalInstructor when tools are present so the LLM can
# see and call the agent's tools before returning structured output.
has_tools = bool(params.get("tools"))
if response_model and self.is_litellm and not has_tools:
from crewai.utilities.internal_instructor import InternalInstructor
messages = params.get("messages", [])
@@ -1183,7 +1195,11 @@ class LLM(BaseLLM):
# and convert them to our own exception type for consistent handling
# across the codebase. This allows CrewAgentExecutor to handle context
# length issues appropriately.
if response_model:
# Only pass response_model to litellm when there are no tools.
# When tools are present, litellm's internal instructor would override
# the tools parameter, so we let the normal completion flow handle it
# and defer structured output conversion to the executor/converter.
if response_model and not has_tools:
params["response_model"] = response_model
response = litellm.completion(**params)
@@ -1290,7 +1306,10 @@ class LLM(BaseLLM):
Returns:
str: The response text
"""
if response_model and self.is_litellm:
# Skip InternalInstructor when tools are present so the LLM can
# see and call the agent's tools before returning structured output.
has_tools = bool(params.get("tools"))
if response_model and self.is_litellm and not has_tools:
from crewai.utilities.internal_instructor import InternalInstructor
messages = params.get("messages", [])
@@ -1318,7 +1337,11 @@ class LLM(BaseLLM):
return structured_response
try:
if response_model:
# Only pass response_model to litellm when there are no tools.
# When tools are present, litellm's internal instructor would override
# the tools parameter, so we let the normal completion flow handle it
# and defer structured output conversion to the executor/converter.
if response_model and not has_tools:
params["response_model"] = response_model
response = await litellm.acompletion(**params)

View File

@@ -3,9 +3,11 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Literal
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator
if TYPE_CHECKING:
from crewai.memory.unified_memory import Memory
from crewai.memory.types import (
_RECALL_OVERSAMPLE_FACTOR,
@@ -13,38 +15,22 @@ from crewai.memory.types import (
MemoryRecord,
ScopeInfo,
)
from crewai.memory.unified_memory import Memory
class MemoryScope(BaseModel):
class MemoryScope:
"""View of Memory restricted to a root path. All operations are scoped under that path."""
model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(self, memory: Memory, root_path: str) -> None:
"""Initialize scope.
root_path: str = Field(default="/")
_memory: Memory = PrivateAttr()
_root: str = PrivateAttr()
@model_validator(mode="wrap")
@classmethod
def _accept_memory(cls, data: Any, handler: Any) -> MemoryScope:
"""Extract memory dependency and normalize root path before validation."""
if isinstance(data, MemoryScope):
return data
memory = data.pop("memory")
instance: MemoryScope = handler(data)
instance._memory = memory
root = instance.root_path.rstrip("/") or ""
if root and not root.startswith("/"):
root = "/" + root
instance._root = root
return instance
@property
def read_only(self) -> bool:
"""Whether the underlying memory is read-only."""
return self._memory.read_only
Args:
memory: The underlying Memory instance.
root_path: Root path for this scope (e.g. /agent/1).
"""
self._memory = memory
self._root = root_path.rstrip("/") or ""
if self._root and not self._root.startswith("/"):
self._root = "/" + self._root
def _scope_path(self, scope: str | None) -> str:
if not scope or scope == "/":
@@ -66,7 +52,7 @@ class MemoryScope(BaseModel):
importance: float | None = None,
source: str | None = None,
private: bool = False,
) -> MemoryRecord | None:
) -> MemoryRecord:
"""Remember content; scope is relative to this scope's root."""
path = self._scope_path(scope)
return self._memory.remember(
@@ -85,7 +71,7 @@ class MemoryScope(BaseModel):
scope: str | None = None,
categories: list[str] | None = None,
limit: int = 10,
depth: Literal["shallow", "deep"] = "deep",
depth: str = "deep",
source: str | None = None,
include_private: bool = False,
) -> list[MemoryMatch]:
@@ -152,34 +138,34 @@ class MemoryScope(BaseModel):
"""Return a narrower scope under this scope."""
child = path.strip("/")
if not child:
return MemoryScope(memory=self._memory, root_path=self._root or "/")
return MemoryScope(self._memory, self._root or "/")
base = self._root.rstrip("/") or ""
new_root = f"{base}/{child}" if base else f"/{child}"
return MemoryScope(memory=self._memory, root_path=new_root)
return MemoryScope(self._memory, new_root)
class MemorySlice(BaseModel):
class MemorySlice:
"""View over multiple scopes: recall searches all, remember is a no-op when read_only."""
model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(
self,
memory: Memory,
scopes: list[str],
categories: list[str] | None = None,
read_only: bool = True,
) -> None:
"""Initialize slice.
scopes: list[str] = Field(default_factory=list)
categories: list[str] | None = Field(default=None)
read_only: bool = Field(default=True)
_memory: Memory = PrivateAttr()
@model_validator(mode="wrap")
@classmethod
def _accept_memory(cls, data: Any, handler: Any) -> MemorySlice:
"""Extract memory dependency and normalize scopes before validation."""
if isinstance(data, MemorySlice):
return data
memory = data.pop("memory")
data["scopes"] = [s.rstrip("/") or "/" for s in data.get("scopes", [])]
instance: MemorySlice = handler(data)
instance._memory = memory
return instance
Args:
memory: The underlying Memory instance.
scopes: List of scope paths to include.
categories: Optional category filter for recall.
read_only: If True, remember() is a silent no-op.
"""
self._memory = memory
self._scopes = [s.rstrip("/") or "/" for s in scopes]
self._categories = categories
self._read_only = read_only
def remember(
self,
@@ -192,7 +178,7 @@ class MemorySlice(BaseModel):
private: bool = False,
) -> MemoryRecord | None:
"""Remember into an explicit scope. No-op when read_only=True."""
if self.read_only:
if self._read_only:
return None
return self._memory.remember(
content,
@@ -210,14 +196,14 @@ class MemorySlice(BaseModel):
scope: str | None = None,
categories: list[str] | None = None,
limit: int = 10,
depth: Literal["shallow", "deep"] = "deep",
depth: str = "deep",
source: str | None = None,
include_private: bool = False,
) -> list[MemoryMatch]:
"""Recall across all slice scopes; results merged and re-ranked."""
cats = categories or self.categories
cats = categories or self._categories
all_matches: list[MemoryMatch] = []
for sc in self.scopes:
for sc in self._scopes:
matches = self._memory.recall(
query,
scope=sc,
@@ -245,7 +231,7 @@ class MemorySlice(BaseModel):
def list_scopes(self, path: str = "/") -> list[str]:
"""List scopes across all slice roots."""
out: list[str] = []
for sc in self.scopes:
for sc in self._scopes:
full = f"{sc.rstrip('/')}{path}" if sc != "/" else path
out.extend(self._memory.list_scopes(full))
return sorted(set(out))
@@ -257,23 +243,15 @@ class MemorySlice(BaseModel):
oldest: datetime | None = None
newest: datetime | None = None
children: list[str] = []
for sc in self.scopes:
for sc in self._scopes:
full = f"{sc.rstrip('/')}{path}" if sc != "/" else path
inf = self._memory.info(full)
total_records += inf.record_count
all_categories.update(inf.categories)
if inf.oldest_record:
oldest = (
inf.oldest_record
if oldest is None
else min(oldest, inf.oldest_record)
)
oldest = inf.oldest_record if oldest is None else min(oldest, inf.oldest_record)
if inf.newest_record:
newest = (
inf.newest_record
if newest is None
else max(newest, inf.newest_record)
)
newest = inf.newest_record if newest is None else max(newest, inf.newest_record)
children.extend(inf.child_scopes)
return ScopeInfo(
path=path,
@@ -287,7 +265,7 @@ class MemorySlice(BaseModel):
def list_categories(self, path: str | None = None) -> dict[str, int]:
"""Categories and counts across slice scopes."""
counts: dict[str, int] = {}
for sc in self.scopes:
for sc in self._scopes:
full = (f"{sc.rstrip('/')}{path}" if sc != "/" else path) if path else sc
for k, v in self._memory.list_categories(full).items():
counts[k] = counts.get(k, 0) + v

View File

@@ -2,6 +2,7 @@
Implements adaptive-depth retrieval with:
- LLM query distillation into targeted sub-queries
- Keyword-driven category filtering
- Time-based filtering from temporal hints
- Parallel multi-query, multi-scope search
- Confidence-based routing with iterative deepening (budget loop)
@@ -36,6 +37,7 @@ class RecallState(BaseModel):
query: str = ""
scope: str | None = None
categories: list[str] | None = None
inferred_categories: list[str] = Field(default_factory=list)
time_cutoff: datetime | None = None
source: str | None = None
include_private: bool = False
@@ -80,8 +82,11 @@ class RecallFlow(Flow[RecallState]):
# ------------------------------------------------------------------
def _merged_categories(self) -> list[str] | None:
"""Return caller-supplied categories, or None if empty."""
return self.state.categories or None
"""Merge caller-supplied and LLM-inferred categories."""
merged = list(
set((self.state.categories or []) + self.state.inferred_categories)
)
return merged or None
def _do_search(self) -> list[dict[str, Any]]:
"""Run parallel search across (embeddings x scopes) with filters.
@@ -207,6 +212,10 @@ class RecallFlow(Flow[RecallState]):
)
self.state.query_analysis = analysis
# Wire keywords -> category filter
if analysis.keywords:
self.state.inferred_categories = analysis.keywords
# Parse time_filter into a datetime cutoff
if analysis.time_filter:
try:

View File

@@ -6,9 +6,7 @@ from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
import threading
import time
from typing import TYPE_CHECKING, Annotated, Any, Literal
from pydantic import BaseModel, ConfigDict, Field, PlainValidator, PrivateAttr
from typing import TYPE_CHECKING, Any, Literal
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
@@ -41,18 +39,13 @@ if TYPE_CHECKING:
)
def _passthrough(v: Any) -> Any:
"""PlainValidator that accepts any value, bypassing strict union discrimination."""
return v
def _default_embedder() -> OpenAIEmbeddingFunction:
"""Build default OpenAI embedder for memory."""
spec: OpenAIProviderSpec = {"provider": "openai", "config": {}}
return build_embedder(spec)
class Memory(BaseModel):
class Memory:
"""Unified memory: standalone, LLM-analyzed, with intelligent recall flow.
Works without agent/crew. Uses LLM to infer scope, categories, importance on save.
@@ -60,119 +53,116 @@ class Memory(BaseModel):
pluggable storage (LanceDB default).
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(
self,
llm: BaseLLM | str = "gpt-4o-mini",
storage: StorageBackend | str = "lancedb",
embedder: Any = None,
# -- Scoring weights --
# These three weights control how recall results are ranked.
# The composite score is: semantic_weight * similarity + recency_weight * decay + importance_weight * importance.
# They should sum to ~1.0 for intuitive scoring.
recency_weight: float = 0.3,
semantic_weight: float = 0.5,
importance_weight: float = 0.2,
# How quickly old memories lose relevance. The recency score halves every
# N days (exponential decay). Lower = faster forgetting; higher = longer relevance.
recency_half_life_days: int = 30,
# -- Consolidation --
# When remembering new content, if an existing record has similarity >= this
# threshold, the LLM is asked to merge/update/delete. Set to 1.0 to disable.
consolidation_threshold: float = 0.85,
# Max existing records to compare against when checking for consolidation.
consolidation_limit: int = 5,
# -- Save defaults --
# Importance assigned to new memories when no explicit value is given and
# the LLM analysis path is skipped (all fields provided by the caller).
default_importance: float = 0.5,
# -- Recall depth control --
# These thresholds govern the RecallFlow router that decides between
# returning results immediately ("synthesize") vs. doing an extra
# LLM-driven exploration round ("explore_deeper").
# confidence >= confidence_threshold_high => always synthesize
# confidence < confidence_threshold_low => explore deeper (if budget > 0)
# complex query + confidence < complex_query_threshold => explore deeper
confidence_threshold_high: float = 0.8,
confidence_threshold_low: float = 0.5,
complex_query_threshold: float = 0.7,
# How many LLM-driven exploration rounds the RecallFlow is allowed to run.
# 0 = always shallow (vector search only); higher = more thorough but slower.
exploration_budget: int = 1,
# Queries shorter than this skip LLM analysis (saving ~1-3s).
# Longer queries (full task descriptions) benefit from LLM distillation.
query_analysis_threshold: int = 200,
# When True, all write operations (remember, remember_many) are silently
# skipped. Useful for sharing a read-only view of memory across agents
# without any of them persisting new memories.
read_only: bool = False,
) -> None:
"""Initialize Memory.
llm: Annotated[BaseLLM | str, PlainValidator(_passthrough)] = Field(
default="gpt-4o-mini",
description="LLM for analysis (model name or BaseLLM instance).",
)
storage: Annotated[StorageBackend | str, PlainValidator(_passthrough)] = Field(
default="lancedb",
description="Storage backend instance or path string.",
)
embedder: Any = Field(
default=None,
description="Embedding callable, provider config dict, or None for default OpenAI.",
)
recency_weight: float = Field(
default=0.3,
description="Weight for recency in the composite relevance score.",
)
semantic_weight: float = Field(
default=0.5,
description="Weight for semantic similarity in the composite relevance score.",
)
importance_weight: float = Field(
default=0.2,
description="Weight for importance in the composite relevance score.",
)
recency_half_life_days: int = Field(
default=30,
description="Recency score halves every N days (exponential decay).",
)
consolidation_threshold: float = Field(
default=0.85,
description="Similarity above which consolidation is triggered on save.",
)
consolidation_limit: int = Field(
default=5,
description="Max existing records to compare during consolidation.",
)
default_importance: float = Field(
default=0.5,
description="Default importance when not provided or inferred.",
)
confidence_threshold_high: float = Field(
default=0.8,
description="Recall confidence above which results are returned directly.",
)
confidence_threshold_low: float = Field(
default=0.5,
description="Recall confidence below which deeper exploration is triggered.",
)
complex_query_threshold: float = Field(
default=0.7,
description="For complex queries, explore deeper below this confidence.",
)
exploration_budget: int = Field(
default=1,
description="Number of LLM-driven exploration rounds during deep recall.",
)
query_analysis_threshold: int = Field(
default=200,
description="Queries shorter than this skip LLM analysis during deep recall.",
)
read_only: bool = Field(
default=False,
description="If True, remember() and remember_many() are silent no-ops.",
)
_config: MemoryConfig = PrivateAttr()
_llm_instance: BaseLLM | None = PrivateAttr(default=None)
_embedder_instance: Any = PrivateAttr(default=None)
_storage: StorageBackend = PrivateAttr()
_save_pool: ThreadPoolExecutor = PrivateAttr(
default_factory=lambda: ThreadPoolExecutor(
max_workers=1, thread_name_prefix="memory-save"
)
)
_pending_saves: list[Future[Any]] = PrivateAttr(default_factory=list)
_pending_lock: threading.Lock = PrivateAttr(default_factory=threading.Lock)
def model_post_init(self, __context: Any) -> None:
"""Initialize runtime state from field values."""
Args:
llm: LLM for analysis (model name or BaseLLM instance).
storage: Backend: "lancedb" or a StorageBackend instance.
embedder: Embedding callable, provider config dict, or None (default OpenAI).
recency_weight: Weight for recency in the composite relevance score.
semantic_weight: Weight for semantic similarity in the composite relevance score.
importance_weight: Weight for importance in the composite relevance score.
recency_half_life_days: Recency score halves every N days (exponential decay).
consolidation_threshold: Similarity above which consolidation is triggered on save.
consolidation_limit: Max existing records to compare during consolidation.
default_importance: Default importance when not provided or inferred.
confidence_threshold_high: Recall confidence above which results are returned directly.
confidence_threshold_low: Recall confidence below which deeper exploration is triggered.
complex_query_threshold: For complex queries, explore deeper below this confidence.
exploration_budget: Number of LLM-driven exploration rounds during deep recall.
query_analysis_threshold: Queries shorter than this skip LLM analysis during deep recall.
read_only: If True, remember() and remember_many() are silent no-ops.
"""
self._read_only = read_only
self._config = MemoryConfig(
recency_weight=self.recency_weight,
semantic_weight=self.semantic_weight,
importance_weight=self.importance_weight,
recency_half_life_days=self.recency_half_life_days,
consolidation_threshold=self.consolidation_threshold,
consolidation_limit=self.consolidation_limit,
default_importance=self.default_importance,
confidence_threshold_high=self.confidence_threshold_high,
confidence_threshold_low=self.confidence_threshold_low,
complex_query_threshold=self.complex_query_threshold,
exploration_budget=self.exploration_budget,
query_analysis_threshold=self.query_analysis_threshold,
recency_weight=recency_weight,
semantic_weight=semantic_weight,
importance_weight=importance_weight,
recency_half_life_days=recency_half_life_days,
consolidation_threshold=consolidation_threshold,
consolidation_limit=consolidation_limit,
default_importance=default_importance,
confidence_threshold_high=confidence_threshold_high,
confidence_threshold_low=confidence_threshold_low,
complex_query_threshold=complex_query_threshold,
exploration_budget=exploration_budget,
query_analysis_threshold=query_analysis_threshold,
)
self._llm_instance = None if isinstance(self.llm, str) else self.llm
self._embedder_instance = (
self.embedder
if (self.embedder is not None and not isinstance(self.embedder, dict))
# Store raw config for lazy initialization. LLM and embedder are only
# built on first access so that Memory() never fails at construction
# time (e.g. when auto-created by Flow without an API key set).
self._llm_config: BaseLLM | str = llm
self._llm_instance: BaseLLM | None = None if isinstance(llm, str) else llm
self._embedder_config: Any = embedder
self._embedder_instance: Any = (
embedder
if (embedder is not None and not isinstance(embedder, dict))
else None
)
if isinstance(self.storage, str):
if isinstance(storage, str):
from crewai.memory.storage.lancedb_storage import LanceDBStorage
self._storage = (
LanceDBStorage()
if self.storage == "lancedb"
else LanceDBStorage(path=self.storage)
)
self._storage = LanceDBStorage() if storage == "lancedb" else LanceDBStorage(path=storage)
else:
self._storage = self.storage
self._storage = storage
# Background save queue. max_workers=1 serializes saves to avoid
# concurrent storage mutations (two saves finding the same similar
# record and both trying to update/delete it). Within each save,
# the parallel LLM calls still run on their own thread pool.
self._save_pool = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="memory-save"
)
self._pending_saves: list[Future[Any]] = []
self._pending_lock = threading.Lock()
_MEMORY_DOCS_URL = "https://docs.crewai.com/concepts/memory"
@@ -183,7 +173,11 @@ class Memory(BaseModel):
from crewai.llm import LLM
try:
model_name = self.llm if isinstance(self.llm, str) else str(self.llm)
model_name = (
self._llm_config
if isinstance(self._llm_config, str)
else str(self._llm_config)
)
self._llm_instance = LLM(model=model_name)
except Exception as e:
raise RuntimeError(
@@ -203,8 +197,8 @@ class Memory(BaseModel):
"""Lazy embedder initialization -- only created when first needed."""
if self._embedder_instance is None:
try:
if isinstance(self.embedder, dict):
self._embedder_instance = build_embedder(self.embedder)
if isinstance(self._embedder_config, dict):
self._embedder_instance = build_embedder(self._embedder_config)
else:
self._embedder_instance = _default_embedder()
except Exception as e:
@@ -362,7 +356,7 @@ class Memory(BaseModel):
Raises:
Exception: On save failure (events emitted).
"""
if self.read_only:
if self._read_only:
return None
_source_type = "unified_memory"
try:
@@ -450,7 +444,7 @@ class Memory(BaseModel):
Returns:
Empty list (records are not available until the background save completes).
"""
if not contents or self.read_only:
if not contents or self._read_only:
return []
self._submit_save(

View File

@@ -83,7 +83,6 @@ if TYPE_CHECKING:
VoyageAIEmbeddingFunction,
)
from crewai.rag.embeddings.providers.voyageai.types import VoyageAIProviderSpec
from crewai.rag.embeddings.types import EmbedderConfig
T = TypeVar("T", bound=EmbeddingFunction[Any])
@@ -350,10 +349,6 @@ def build_embedder(spec: ONNXProviderSpec) -> ONNXMiniLM_L6_V2: ...
def build_embedder(spec: dict[str, Any]) -> EmbeddingFunction[Any]: ...
@overload
def build_embedder(spec: EmbedderConfig) -> EmbeddingFunction[Any]: ...
def build_embedder(spec): # type: ignore[no-untyped-def]
"""Build an embedding function from either a provider spec or a provider instance.

View File

@@ -44,7 +44,6 @@ from crewai.security import Fingerprint, SecurityConfig
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.types.callable import SerializableCallable
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.converter import Converter, convert_to_model
@@ -124,9 +123,8 @@ class Task(BaseModel):
description="Configuration for the agent",
default=None,
)
callback: SerializableCallable | None = Field(
default=None,
description="Callback to be executed after the task is completed.",
callback: Any | None = Field(
description="Callback to be executed after the task is completed.", default=None
)
agent: BaseAgent | None = Field(
description="Agent responsible for execution the task.", default=None

View File

@@ -49,7 +49,7 @@ class RecallMemoryTool(BaseTool):
all_lines: list[str] = []
seen_ids: set[str] = set()
for query in queries:
matches = self.memory.recall(query, limit=20)
matches = self.memory.recall(query)
for m in matches:
if m.record.id not in seen_ids:
seen_ids.add(m.record.id)
@@ -121,7 +121,7 @@ def create_memory_tools(memory: Any) -> list[BaseTool]:
description=i18n.tools("recall_memory"),
),
]
if not memory.read_only:
if not getattr(memory, "_read_only", False):
tools.append(
RememberTool(
memory=memory,

View File

@@ -7,7 +7,7 @@
"slices": {
"observation": "\nObservation:",
"task": "\nCurrent Task: {input}\n\nBegin! This is VERY important to you, use the tools available and give your best Final Answer, your job depends on it!\n\nThought:",
"memory": "\n\n# Memories from past conversations:\n{memory}\n\nIMPORTANT: The memories above are an automatic selection and may be INCOMPLETE. If the task involves counting, listing, or summing items (e.g. 'how many', 'total', 'list all'), you MUST use the Search memory tool with several different queries before answering — do NOT rely solely on the memories shown above. Enumerate each distinct item you find before giving a final count.",
"memory": "\n\n# Useful context: \n{memory}",
"role_playing": "You are {role}. {backstory}\nYour personal goal is: {goal}",
"tools": "\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n{tools}\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [{tool_names}], just the name, exactly as it's written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n```",
"no_tools": "",
@@ -60,12 +60,12 @@
"description": "See image to understand its content, you can optionally ask a question about the image",
"default_action": "Please provide a detailed description of this image, including all visual elements, context, and any notable details you can observe."
},
"recall_memory": "Search through the team's shared memory for relevant information. Pass one or more queries to search for multiple things at once. Use this when you need to find facts, decisions, preferences, or past results that may have been stored previously. IMPORTANT: For questions that require counting, summing, or listing items across multiple conversations (e.g. 'how many X', 'total Y', 'list all Z'), you MUST search multiple times with different phrasings to ensure you find ALL relevant items before giving a final count or total. Do not rely on a single search — items may be described differently across conversations.",
"recall_memory": "Search through the team's shared memory for relevant information. Pass one or more queries to search for multiple things at once. Use this when you need to find facts, decisions, preferences, or past results that may have been stored previously.",
"save_to_memory": "Store one or more important facts, decisions, observations, or lessons in memory so they can be recalled later by you or other agents. Pass multiple items at once when you have several things worth remembering."
},
"memory": {
"query_system": "You analyze a query for searching memory.\nGiven the query and available scopes, output:\n1. keywords: Key entities or keywords that can be used to filter by category.\n2. suggested_scopes: Which available scopes are most relevant (empty for all).\n3. complexity: 'simple' or 'complex'.\n4. recall_queries: 1-3 short, targeted search phrases distilled from the query. Each should be a concise phrase optimized for semantic vector search. If the query is already short and focused, return it as-is in a single-item list. For long task descriptions, extract the distinct things worth searching for.\n5. time_filter: If the query references a time period (like 'last week', 'yesterday', 'in January'), return an ISO 8601 date string for the earliest relevant date (e.g. '2026-02-01'). Return null if no time constraint is implied.",
"extract_memories_system": "You extract discrete, reusable memory statements from raw content (e.g. a task description and its result, or a conversation between a user and an assistant).\n\nFor the given content, output a list of memory statements. Each memory must:\n- Be one clear sentence or short statement\n- Be understandable without the original context\n- Capture a decision, fact, outcome, preference, lesson, or observation worth remembering\n- NOT be a vague summary or a restatement of the task description\n- NOT duplicate the same idea in different words\n\nWhen the content is a conversation, pay special attention to facts stated by the user (first-person statements). These personal facts are HIGH PRIORITY and must always be extracted:\n- What the user did, bought, made, visited, attended, or completed\n- Names of people, pets, places, brands, and specific items the user mentions\n- Quantities, durations, dates, and measurements the user states\n- Subordinate clauses and casual asides often contain important personal details (e.g. \"by the way, it took me 4 hours\" or \"my Golden Retriever Max\")\n\nPreserve exact names and numbers — never generalize (e.g. keep \"lavender gin fizz\" not just \"cocktail\", keep \"12 largemouth bass\" not just \"fish caught\", keep \"Golden Retriever\" not just \"dog\").\n\nAdditional extraction rules:\n- Presupposed facts: When the user reveals a fact indirectly in a question (e.g. \"What collar suits a Golden Retriever like Max?\" presupposes Max is a Golden Retriever), extract that fact as a separate memory.\n- Date precision: Always preserve the full date including day-of-month when stated (e.g. \"February 14th\" not just \"February\", \"March 5\" not just \"March\").\n- Life events in passing: When the user mentions a life event (birth, wedding, graduation, move, adoption) while discussing something else, extract the life event as its own memory (e.g. \"my friend David had a baby boy named Jasper\" is a birth fact, even if mentioned while planning to send congratulations).\n\nIf there is nothing worth remembering (e.g. empty result, no decisions or facts), return an empty list.\nOutput a JSON object with a single key \"memories\" whose value is a list of strings.",
"extract_memories_system": "You extract discrete, reusable memory statements from raw content (e.g. a task description and its result).\n\nFor the given content, output a list of memory statements. Each memory must:\n- Be one clear sentence or short statement\n- Be understandable without the original context\n- Capture a decision, fact, outcome, preference, lesson, or observation worth remembering\n- NOT be a vague summary or a restatement of the task description\n- NOT duplicate the same idea in different words\n\nIf there is nothing worth remembering (e.g. empty result, no decisions or facts), return an empty list.\nOutput a JSON object with a single key \"memories\" whose value is a list of strings.",
"extract_memories_user": "Content:\n{content}\n\nExtract memory statements as described. Return structured output.",
"query_user": "Query: {query}\n\nAvailable scopes: {available_scopes}\n{scope_desc}\n\nReturn the analysis as structured output.",
"save_system": "You analyze content to be stored in a hierarchical memory system.\nGiven the content and the existing scopes and categories, output:\n1. suggested_scope: The best matching existing scope path, or a new path if none fit (use / for root).\n2. categories: A list of categories (reuse existing when relevant, add new ones if needed).\n3. importance: A number from 0.0 to 1.0 indicating how significant this memory is.\n4. extracted_metadata: A JSON object with any entities, dates, or topics you can extract.",

View File

@@ -1,96 +0,0 @@
"""Serializable callable type for Pydantic models.
All callables (ex., named functions, lambdas, closures, methods) are serialized
via ``cloudpickle`` + base64. On deserialization the base64 payload is
decoded and unpickled back into a live callable.
Deserialization is **opt-in** to prevent arbitrary code execution from
untrusted payloads. Callers must use :data:`allow_pickle_deserialization` to enable it::
with allow_pickle_deserialization:
task = Task.model_validate_json(untrusted_json)
``cloudpickle`` is an optional dependency. Serialization and deserialization
will raise ``RuntimeError`` if it is not installed.
"""
from __future__ import annotations
import base64
from collections.abc import Callable
from contextvars import ContextVar, Token
from typing import Annotated, Any
from pydantic import BeforeValidator, PlainSerializer, WithJsonSchema
_ALLOW_PICKLE: ContextVar[bool] = ContextVar("_ALLOW_PICKLE", default=False)
_ALLOW_PICKLE_TOKEN: ContextVar[Token[bool] | None] = ContextVar(
"_ALLOW_PICKLE_TOKEN", default=None
)
def _import_cloudpickle() -> Any:
try:
import cloudpickle # type: ignore[import-untyped]
except ModuleNotFoundError:
raise RuntimeError(
"cloudpickle is required for callable serialization. "
"Install it with: uv add 'crewai[pickling]'"
) from None
return cloudpickle
class _AllowPickleDeserialization:
"""Reentrant context manager that opts in to cloudpickle deserialization.
Usage::
with allow_pickle_deserialization:
task = Task.model_validate_json(payload)
"""
def __enter__(self) -> None:
_ALLOW_PICKLE_TOKEN.set(_ALLOW_PICKLE.set(True))
def __exit__(self, *_: object) -> None:
token = _ALLOW_PICKLE_TOKEN.get()
if token is not None:
_ALLOW_PICKLE.reset(token)
allow_pickle_deserialization = _AllowPickleDeserialization()
def _deserialize_callable(v: str | Callable[..., Any]) -> Callable[..., Any]:
"""Deserialize a base64-encoded cloudpickle payload, or pass through if already callable."""
if isinstance(v, str):
if not _ALLOW_PICKLE.get():
raise RuntimeError(
"Refusing to unpickle a callable from untrusted data. "
"Wrap the deserialization call with "
"`with allow_pickle_deserialization: ...` "
"if you trust the source."
)
cloudpickle = _import_cloudpickle()
obj = cloudpickle.loads(base64.b85decode(v))
if not callable(obj):
raise ValueError(
f"Deserialized object is {type(obj).__name__}, not a callable"
)
return obj # type: ignore[no-any-return]
return v
def _serialize_callable(v: Callable[..., Any]) -> str:
"""Serialize any callable to a base64-encoded cloudpickle payload."""
cloudpickle = _import_cloudpickle()
return base64.b85encode(cloudpickle.dumps(v)).decode("ascii")
SerializableCallable = Annotated[
Callable[..., Any],
BeforeValidator(_deserialize_callable),
PlainSerializer(_serialize_callable, return_type=str, when_used="json"),
WithJsonSchema({"type": "string"}),
]

View File

@@ -123,7 +123,7 @@ class TestAgentExecutor:
executor.state.iterations = 10
result = executor.check_max_iterations()
assert result == "max_iterations_exceeded"
assert result == "force_final_answer"
def test_route_by_answer_type_action(self, mock_dependencies):
"""Test routing for AgentAction."""

View File

@@ -1136,7 +1136,7 @@ def test_lite_agent_memory_instance_recall_and_save_called():
successful_requests=1,
)
mock_memory = Mock()
mock_memory.read_only = False
mock_memory._read_only = False
mock_memory.recall.return_value = []
mock_memory.extract_memories.return_value = ["Fact one.", "Fact two."]

View File

@@ -1,113 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Based on the following human feedback,
determine which outcome best matches their intent.\n\nFeedback: I approve this\n\nPossible
outcomes: approved, rejected\n\nRespond with ONLY one of the exact outcome values
listed above, nothing else."}],"model":"gpt-4o-mini","response_format":{"type":"json_schema","json_schema":{"schema":{"description":"The
outcome that best matches the human''s feedback intent.","properties":{"outcome":{"description":"The
outcome that best matches the feedback. Must be one of: approved, rejected","enum":["approved","rejected"],"title":"Outcome","type":"string"}},"required":["outcome"],"title":"FeedbackOutcome","type":"object","additionalProperties":false},"name":"FeedbackOutcome","strict":true}},"stream":false}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '782'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-helper-method:
- beta.chat.completions.parse
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-DHDHCheu5DvlB6xjTrEDq0nfzLlrf\",\n \"object\":
\"chat.completion\",\n \"created\": 1772994982,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"{\\\"outcome\\\":\\\"approved\\\"}\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
130,\n \"completion_tokens\": 6,\n \"total_tokens\": 136,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_cf6f0a1ff1\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Sun, 08 Mar 2026 18:36:22 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '361'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
set-cookie:
- SET-COOKIE-XXX
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,113 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Based on the following human feedback,
determine which outcome best matches their intent.\n\nFeedback: Unclear feedback\n\nPossible
outcomes: approved, rejected\n\nRespond with ONLY one of the exact outcome values
listed above, nothing else."}],"model":"gpt-4o-mini","response_format":{"type":"json_schema","json_schema":{"schema":{"description":"The
outcome that best matches the human''s feedback intent.","properties":{"outcome":{"description":"The
outcome that best matches the feedback. Must be one of: approved, rejected","enum":["approved","rejected"],"title":"Outcome","type":"string"}},"required":["outcome"],"title":"FeedbackOutcome","type":"object","additionalProperties":false},"name":"FeedbackOutcome","strict":true}},"stream":false}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '784'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-helper-method:
- beta.chat.completions.parse
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-DHDHDlji53YRtj69Ulq5E9SjBqccI\",\n \"object\":
\"chat.completion\",\n \"created\": 1772994983,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"{\\\"outcome\\\":\\\"rejected\\\"}\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
130,\n \"completion_tokens\": 7,\n \"total_tokens\": 137,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_a1ddba3226\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Sun, 08 Mar 2026 18:36:24 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '317'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
set-cookie:
- SET-COOKIE-XXX
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,113 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"user","content":"Based on the following human feedback,
determine which outcome best matches their intent.\n\nFeedback: Looks good\n\nPossible
outcomes: approved, rejected\n\nRespond with ONLY one of the exact outcome values
listed above, nothing else."}],"model":"gpt-4o-mini","response_format":{"type":"json_schema","json_schema":{"schema":{"description":"The
outcome that best matches the human''s feedback intent.","properties":{"outcome":{"description":"The
outcome that best matches the feedback. Must be one of: approved, rejected","enum":["approved","rejected"],"title":"Outcome","type":"string"}},"required":["outcome"],"title":"FeedbackOutcome","type":"object","additionalProperties":false},"name":"FeedbackOutcome","strict":true}},"stream":false}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '778'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-helper-method:
- beta.chat.completions.parse
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.12
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-DHDHEVhZlU19TjrqDy0sKeWkKRINn\",\n \"object\":
\"chat.completion\",\n \"created\": 1772994984,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"{\\\"outcome\\\":\\\"approved\\\"}\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
129,\n \"completion_tokens\": 6,\n \"total_tokens\": 135,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_a1ddba3226\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Sun, 08 Mar 2026 18:36:24 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '253'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
set-cookie:
- SET-COOKIE-XXX
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -172,8 +172,8 @@ def test_memory_scope_slice(tmp_path: Path, mock_embedder: MagicMock) -> None:
sc = mem.scope("/agent/1")
assert sc._root in ("/agent/1", "/agent/1/")
sl = mem.slice(["/a", "/b"], read_only=True)
assert sl.read_only is True
assert "/a" in sl.scopes and "/b" in sl.scopes
assert sl._read_only is True
assert "/a" in sl._scopes and "/b" in sl._scopes
def test_memory_list_scopes_info_tree(tmp_path: Path, mock_embedder: MagicMock) -> None:
@@ -198,7 +198,7 @@ def test_memory_scope_remember_recall(tmp_path: Path, mock_embedder: MagicMock)
from crewai.memory.memory_scope import MemoryScope
mem = Memory(storage=str(tmp_path / "db5"), llm=MagicMock(), embedder=mock_embedder)
scope = MemoryScope(memory=mem, root_path="/crew/1")
scope = MemoryScope(mem, "/crew/1")
scope.remember("Scoped note", scope="/", categories=[], importance=0.5, metadata={})
results = scope.recall("note", limit=5, depth="shallow")
assert len(results) >= 1
@@ -213,7 +213,7 @@ def test_memory_slice_recall(tmp_path: Path, mock_embedder: MagicMock) -> None:
mem = Memory(storage=str(tmp_path / "db6"), llm=MagicMock(), embedder=mock_embedder)
mem.remember("In scope A", scope="/a", categories=[], importance=0.5, metadata={})
sl = MemorySlice(memory=mem, scopes=["/a"], read_only=True)
sl = MemorySlice(mem, ["/a"], read_only=True)
matches = sl.recall("scope", limit=5, depth="shallow")
assert isinstance(matches, list)
@@ -223,7 +223,7 @@ def test_memory_slice_remember_is_noop_when_read_only(tmp_path: Path, mock_embed
from crewai.memory.memory_scope import MemorySlice
mem = Memory(storage=str(tmp_path / "db7"), llm=MagicMock(), embedder=mock_embedder)
sl = MemorySlice(memory=mem, scopes=["/a"], read_only=True)
sl = MemorySlice(mem, ["/a"], read_only=True)
result = sl.remember("x", scope="/a")
assert result is None
assert mem.list_records() == []
@@ -319,7 +319,7 @@ def test_executor_save_to_memory_calls_extract_then_remember_per_item() -> None:
from crewai.agents.parser import AgentFinish
mock_memory = MagicMock()
mock_memory.read_only = False
mock_memory._read_only = False
mock_memory.extract_memories.return_value = ["Fact A.", "Fact B."]
mock_agent = MagicMock()
@@ -360,7 +360,7 @@ def test_executor_save_to_memory_skips_delegation_output() -> None:
from crewai.utilities.string_utils import sanitize_tool_name
mock_memory = MagicMock()
mock_memory.read_only = False
mock_memory._read_only = False
mock_agent = MagicMock()
mock_agent.memory = mock_memory
mock_agent._logger = MagicMock()
@@ -393,7 +393,7 @@ def test_memory_scope_extract_memories_delegates() -> None:
mock_memory = MagicMock()
mock_memory.extract_memories.return_value = ["Scoped fact."]
scope = MemoryScope(memory=mock_memory, root_path="/agent/1")
scope = MemoryScope(mock_memory, "/agent/1")
result = scope.extract_memories("Some content")
mock_memory.extract_memories.assert_called_once_with("Some content")
assert result == ["Scoped fact."]
@@ -405,7 +405,7 @@ def test_memory_slice_extract_memories_delegates() -> None:
mock_memory = MagicMock()
mock_memory.extract_memories.return_value = ["Sliced fact."]
sl = MemorySlice(memory=mock_memory, scopes=["/a", "/b"], read_only=True)
sl = MemorySlice(mock_memory, ["/a", "/b"], read_only=True)
result = sl.extract_memories("Some content")
mock_memory.extract_memories.assert_called_once_with("Some content")
assert result == ["Sliced fact."]
@@ -670,10 +670,10 @@ def test_agent_kickoff_memory_recall_and_save(tmp_path: Path, mock_embedder: Mag
verbose=False,
)
# Patch on the class to avoid Pydantic BaseModel __delattr__ restriction
with patch.object(Memory, "recall", wraps=mem.recall) as recall_mock, \
patch.object(Memory, "extract_memories", return_value=["PostgreSQL is used."]) as extract_mock, \
patch.object(Memory, "remember_many", wraps=mem.remember_many) as remember_many_mock:
# Mock recall to verify it's called, but return real results
with patch.object(mem, "recall", wraps=mem.recall) as recall_mock, \
patch.object(mem, "extract_memories", return_value=["PostgreSQL is used."]) as extract_mock, \
patch.object(mem, "remember_many", wraps=mem.remember_many) as remember_many_mock:
result = agent.kickoff("What database do we use?")
assert result is not None

View File

@@ -897,7 +897,7 @@ class TestCollapseToOutcomeJsonParsing:
"""Test that JSON string response from LLM is correctly parsed."""
flow = Flow()
with patch("crewai.flow.flow.LLM") as MockLLM:
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
# Simulate LLM returning JSON string (the bug we fixed)
mock_llm.call.return_value = '{"outcome": "approved"}'
@@ -915,7 +915,7 @@ class TestCollapseToOutcomeJsonParsing:
"""Test that plain string response is correctly matched."""
flow = Flow()
with patch("crewai.flow.flow.LLM") as MockLLM:
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
# Simulate LLM returning plain outcome string
mock_llm.call.return_value = "rejected"
@@ -933,7 +933,7 @@ class TestCollapseToOutcomeJsonParsing:
"""Test that invalid JSON falls back to string matching."""
flow = Flow()
with patch("crewai.flow.flow.LLM") as MockLLM:
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
# Invalid JSON that contains "approved"
mock_llm.call.return_value = "{invalid json but says approved"
@@ -951,7 +951,7 @@ class TestCollapseToOutcomeJsonParsing:
"""Test that LLM exception triggers fallback to simple prompting."""
flow = Flow()
with patch("crewai.flow.flow.LLM") as MockLLM:
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
# First call raises, second call succeeds (fallback)
mock_llm.call.side_effect = [

View File

@@ -36,7 +36,7 @@ from crewai.flow import Flow, start
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
from crewai.memory.unified_memory import Memory
from crewai.process import Process
from crewai.project import CrewBase, agent, before_kickoff, crew, task
from crewai.task import Task
@@ -2618,9 +2618,9 @@ def test_memory_remember_called_after_task():
)
with patch.object(
Memory, "extract_memories", wraps=crew._memory.extract_memories
crew._memory, "extract_memories", wraps=crew._memory.extract_memories
) as extract_mock, patch.object(
Memory, "remember", wraps=crew._memory.remember
crew._memory, "remember", wraps=crew._memory.remember
) as remember_mock:
crew.kickoff()
@@ -4773,13 +4773,13 @@ def test_memory_remember_receives_task_content():
# Mock extract_memories to return fake memories and capture the raw input.
# No wraps= needed -- the test only checks what args it receives, not the output.
patch.object(
Memory, "extract_memories", return_value=["Fake memory."]
crew._memory, "extract_memories", return_value=["Fake memory."]
) as extract_mock,
# Mock recall to avoid LLM calls for query analysis (not in cassette).
patch.object(Memory, "recall", return_value=[]),
patch.object(crew._memory, "recall", return_value=[]),
# Mock remember_many to prevent the background save from triggering
# LLM calls (field resolution) that aren't in the cassette.
patch.object(Memory, "remember_many", return_value=[]),
patch.object(crew._memory, "remember_many", return_value=[]),
):
crew.kickoff()

View File

@@ -1843,53 +1843,3 @@ def test_cyclic_flow_works_with_persist_and_id_input():
f"'{method}' should fire 3 times, "
f"got {len(events)}: {execution_order}"
)
@pytest.mark.timeout(5)
def test_self_listening_method_does_not_loop():
"""A method whose @listen label matches its own name must not loop forever.
Without the guard, 'process' re-triggers itself on every completion,
running indefinitely (timeout → FAIL). The fix caps method calls
and raises RecursionError (PASS).
"""
class SelfListenFlow(Flow):
@start()
def begin(self):
return "process"
@router(begin)
def route(self):
return "process"
@listen("process")
def process(self):
pass
flow = SelfListenFlow()
with pytest.raises(RecursionError, match="infinite loop"):
flow.kickoff()
def test_or_condition_self_listen_fires_once():
"""or_() with a self-referencing label only fires once due to or_() guard."""
call_count = 0
class OrSelfListenFlow(Flow):
@start()
def begin(self):
return "process"
@router(begin)
def route(self):
return "process"
@listen(or_("other_trigger", "process"))
def process(self):
nonlocal call_count
call_count += 1
flow = OrSelfListenFlow()
flow.kickoff()
assert call_count == 1

View File

@@ -349,38 +349,56 @@ class TestHumanFeedbackHistory:
class TestCollapseToOutcome:
"""Tests for the _collapse_to_outcome method."""
@pytest.mark.vcr()
def test_exact_match(self):
"""Test exact match returns the correct outcome."""
flow = Flow()
result = flow._collapse_to_outcome(
feedback="I approve this",
outcomes=["approved", "rejected"],
llm="gpt-4o-mini",
)
assert result in ("approved", "rejected")
@pytest.mark.vcr()
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
mock_llm.call.return_value = "approved"
MockLLM.return_value = mock_llm
result = flow._collapse_to_outcome(
feedback="I approve this",
outcomes=["approved", "rejected"],
llm="gpt-4o-mini",
)
assert result == "approved"
def test_partial_match(self):
"""Test partial match finds the outcome in the response."""
flow = Flow()
result = flow._collapse_to_outcome(
feedback="Looks good",
outcomes=["approved", "rejected"],
llm="gpt-4o-mini",
)
assert result in ("approved", "rejected")
@pytest.mark.vcr()
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
mock_llm.call.return_value = "The outcome is approved based on the feedback"
MockLLM.return_value = mock_llm
result = flow._collapse_to_outcome(
feedback="Looks good",
outcomes=["approved", "rejected"],
llm="gpt-4o-mini",
)
assert result == "approved"
def test_fallback_to_first(self):
"""Test that unmatched response falls back to first outcome."""
flow = Flow()
result = flow._collapse_to_outcome(
feedback="Unclear feedback",
outcomes=["approved", "rejected"],
llm="gpt-4o-mini",
)
assert result in ("approved", "rejected")
with patch("crewai.llm.LLM") as MockLLM:
mock_llm = MagicMock()
mock_llm.call.return_value = "something completely different"
MockLLM.return_value = mock_llm
result = flow._collapse_to_outcome(
feedback="Unclear feedback",
outcomes=["approved", "rejected"],
llm="gpt-4o-mini",
)
assert result == "approved" # First in list
# -- HITL Learning tests --

View File

@@ -1022,3 +1022,256 @@ async def test_usage_info_streaming_with_acall():
assert llm._token_usage["total_tokens"] > 0
assert len(result) > 0
# --- Tests for issue #4697: tools discarded when response_model is set ---
class MyOutput(BaseModel):
name: str
value: str
def _make_tool_call_response():
"""Create a mock litellm response that contains a tool call."""
mock_message = MagicMock()
mock_message.content = ""
mock_tool_call = MagicMock()
mock_tool_call.function.name = "my_search_tool"
mock_tool_call.function.arguments = '{"query": "test"}'
mock_tool_call.id = "call_123"
mock_message.tool_calls = [mock_tool_call]
mock_choice = MagicMock()
mock_choice.message = mock_message
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.usage = MagicMock()
mock_response.usage.prompt_tokens = 10
mock_response.usage.completion_tokens = 5
mock_response.usage.total_tokens = 15
return mock_response
def _make_text_response(text: str = '{"name": "Alice", "value": "42"}'):
"""Create a mock litellm response that contains only text content."""
mock_message = MagicMock()
mock_message.content = text
mock_message.tool_calls = []
mock_choice = MagicMock()
mock_choice.message = mock_message
mock_response = MagicMock()
mock_response.choices = [mock_choice]
mock_response.usage = MagicMock()
mock_response.usage.prompt_tokens = 10
mock_response.usage.completion_tokens = 5
mock_response.usage.total_tokens = 15
return mock_response
def _get_tool_schema():
return {
"type": "function",
"function": {
"name": "my_search_tool",
"description": "Search for data",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query"}
},
"required": ["query"],
},
},
}
def test_non_streaming_response_model_with_tools_skips_instructor():
"""When response_model AND tools are both present, InternalInstructor must NOT
be used so that LiteLLM sees the tools and can return tool_calls."""
llm = LLM(model="gpt-4o-mini", is_litellm=True)
tool_schema = _get_tool_schema()
tool_call_resp = _make_tool_call_response()
with patch("litellm.completion", return_value=tool_call_resp) as mock_completion:
result = llm.call(
messages=[{"role": "user", "content": "Find the name and value"}],
tools=[tool_schema],
available_functions=None, # executor handles tool execution
response_model=MyOutput,
)
# litellm.completion must have been called (not InternalInstructor)
mock_completion.assert_called_once()
# The result should be tool_calls (a list), not a structured JSON string
assert isinstance(result, list), (
"Expected tool_calls list but got a string/model — "
"InternalInstructor likely intercepted the call"
)
def test_non_streaming_response_model_without_tools_uses_instructor():
"""When response_model is set but NO tools, InternalInstructor should still
be used for structured output conversion."""
llm = LLM(model="gpt-4o-mini", is_litellm=True)
with patch(
"crewai.utilities.internal_instructor.InternalInstructor"
) as MockInstructor:
mock_instance = MagicMock()
mock_pydantic = MyOutput(name="Alice", value="42")
mock_instance.to_pydantic.return_value = mock_pydantic
MockInstructor.return_value = mock_instance
result = llm.call(
messages=[{"role": "user", "content": "Find the name and value"}],
tools=None,
response_model=MyOutput,
)
# InternalInstructor should have been used
MockInstructor.assert_called_once()
assert '"Alice"' in result
def test_non_streaming_response_model_with_tools_returns_tool_calls():
"""Verify that when LLM returns tool_calls with response_model + tools,
the tool_calls are returned to the caller (not swallowed)."""
llm = LLM(model="gpt-4o-mini", is_litellm=True)
tool_schema = _get_tool_schema()
tool_call_resp = _make_tool_call_response()
with patch("litellm.completion", return_value=tool_call_resp):
result = llm.call(
messages=[{"role": "user", "content": "Find the name and value"}],
tools=[tool_schema],
available_functions=None,
response_model=MyOutput,
)
# The result should be the raw tool_calls list
assert isinstance(result, list)
assert len(result) == 1
assert result[0].function.name == "my_search_tool"
def test_non_streaming_response_model_with_tools_text_response():
"""When LLM returns text (not tool_calls) with both response_model and tools,
the text response should be returned normally."""
llm = LLM(model="gpt-4o-mini", is_litellm=True)
tool_schema = _get_tool_schema()
text_resp = _make_text_response('{"name": "Alice", "value": "42"}')
with patch("litellm.completion", return_value=text_resp):
result = llm.call(
messages=[{"role": "user", "content": "Find the name and value"}],
tools=[tool_schema],
available_functions=None,
response_model=MyOutput,
)
# Text response should be returned as-is
assert isinstance(result, str)
assert "Alice" in result
@pytest.mark.asyncio
async def test_async_non_streaming_response_model_with_tools_skips_instructor():
"""Async variant: response_model + tools should skip InternalInstructor."""
llm = LLM(model="gpt-4o-mini", is_litellm=True)
tool_schema = _get_tool_schema()
tool_call_resp = _make_tool_call_response()
with patch("litellm.acompletion", return_value=tool_call_resp) as mock_acompletion:
result = await llm.acall(
messages=[{"role": "user", "content": "Find the name and value"}],
tools=[tool_schema],
available_functions=None,
response_model=MyOutput,
)
mock_acompletion.assert_called_once()
assert isinstance(result, list), (
"Expected tool_calls list but got a string/model — "
"InternalInstructor likely intercepted the call"
)
@pytest.mark.asyncio
async def test_async_non_streaming_response_model_without_tools_uses_instructor():
"""Async variant: response_model without tools should use InternalInstructor."""
llm = LLM(model="gpt-4o-mini", is_litellm=True)
with patch(
"crewai.utilities.internal_instructor.InternalInstructor"
) as MockInstructor:
mock_instance = MagicMock()
mock_pydantic = MyOutput(name="Alice", value="42")
mock_instance.to_pydantic.return_value = mock_pydantic
MockInstructor.return_value = mock_instance
result = await llm.acall(
messages=[{"role": "user", "content": "Find the name and value"}],
tools=None,
response_model=MyOutput,
)
MockInstructor.assert_called_once()
assert '"Alice"' in result
def test_non_streaming_response_model_not_passed_to_litellm_when_tools_present():
"""Verify that response_model is NOT forwarded to litellm.completion when
tools are present, because litellm's internal instructor would override
the tools parameter."""
llm = LLM(model="gpt-4o-mini", is_litellm=True)
tool_schema = _get_tool_schema()
tool_call_resp = _make_tool_call_response()
with patch("litellm.completion", return_value=tool_call_resp) as mock_completion:
llm.call(
messages=[{"role": "user", "content": "Find the name and value"}],
tools=[tool_schema],
available_functions=None,
response_model=MyOutput,
)
mock_completion.assert_called_once()
call_kwargs = mock_completion.call_args[1]
# response_model must NOT be in the kwargs sent to litellm.completion
assert "response_model" not in call_kwargs, (
"response_model was passed to litellm.completion even though tools are present; "
"litellm's internal instructor would override the tools"
)
# tools MUST be present
assert "tools" in call_kwargs
@pytest.mark.asyncio
async def test_async_response_model_not_passed_to_litellm_when_tools_present():
"""Async variant: response_model must not be forwarded to litellm.acompletion
when tools are present."""
llm = LLM(model="gpt-4o-mini", is_litellm=True)
tool_schema = _get_tool_schema()
tool_call_resp = _make_tool_call_response()
with patch("litellm.acompletion", return_value=tool_call_resp) as mock_acompletion:
await llm.acall(
messages=[{"role": "user", "content": "Find the name and value"}],
tools=[tool_schema],
available_functions=None,
response_model=MyOutput,
)
mock_acompletion.assert_called_once()
call_kwargs = mock_acompletion.call_args[1]
assert "response_model" not in call_kwargs, (
"response_model was passed to litellm.acompletion even though tools are present; "
"litellm's internal instructor would override the tools"
)
assert "tools" in call_kwargs

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.10.1"
__version__ = "1.10.1a1"

2333
uv.lock generated

File diff suppressed because it is too large Load Diff