Compare commits

..

2 Commits

Author SHA1 Message Date
github-actions[bot]
287ffe2f6d chore: update tool specifications 2026-03-20 20:17:10 +00:00
Devin AI
9a9cb48d09 fix: prevent SQL injection in SnowflakeSearchTool and NL2SQLTool
- Add identifier validation regex to database and snowflake_schema fields
  in SnowflakeSearchToolInput to reject malicious values at schema level
- Add _validate_identifier() runtime check in SnowflakeSearchTool._run()
  and double-quote identifiers in USE DATABASE/SCHEMA SQL statements
- Add _validate_identifier() to NL2SQLTool to sanitize table_name in
  _fetch_all_available_columns() preventing second-order SQL injection
- Add comprehensive tests for both tools covering injection vectors

Closes #4993

Co-Authored-By: João <joao@crewai.com>
2026-03-20 20:15:14 +00:00
43 changed files with 331 additions and 4723 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -4,38 +4,6 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Mar 23, 2026">
## v1.11.1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.11.1)
## What's Changed
### Features
- Add flow_structure() serializer for Flow class introspection.
### Bug Fixes
- Fix security vulnerabilities by bumping pypdf, tinytag, and langchain-core.
- Preserve full LLM config across HITL resume for non-OpenAI providers.
- Prevent path traversal in FileWriterTool.
- Fix lock_store crash when redis package is not installed.
- Pass cache_function from BaseTool to CrewStructuredTool.
### Documentation
- Add publish custom tools guide with translations.
- Update changelog and version for v1.11.0.
- Add missing event listeners documentation.
### Refactoring
- Replace urllib with requests in pdf loader.
- Replace Any-typed callback and model fields with serializable types.
## Contributors
@alex-clawd, @danielfsbarreto, @dependabot[bot], @greysonlalonde, @lorenzejay, @lucasgomide, @mattatcha, @theCyberTech, @vinibrsl
</Update>
<Update label="Mar 18, 2026">
## v1.11.0

View File

@@ -4,38 +4,6 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 3월 23일">
## v1.11.1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.11.1)
## 변경 사항
### 기능
- Flow 클래스 내성 검사를 위한 flow_structure() 직렬 변환기 추가.
### 버그 수정
- pypdf, tinytag 및 langchain-core의 버전을 업데이트하여 보안 취약점 수정.
- 비-OpenAI 제공자의 HITL 재개 시 전체 LLM 구성 유지.
- FileWriterTool에서 경로 탐색 방지.
- redis 패키지가 설치되지 않았을 때 lock_store 충돌 수정.
- BaseTool에서 CrewStructuredTool로 cache_function 전달.
### 문서화
- 번역이 포함된 사용자 정의 도구 게시 가이드 추가.
- v1.11.0에 대한 변경 로그 및 버전 업데이트.
- 누락된 이벤트 리스너 문서 추가.
### 리팩토링
- pdf 로더에서 urllib를 requests로 교체.
- Any 유형의 콜백 및 모델 필드를 직렬화 가능한 유형으로 교체.
## 기여자
@alex-clawd, @danielfsbarreto, @dependabot[bot], @greysonlalonde, @lorenzejay, @lucasgomide, @mattatcha, @theCyberTech, @vinibrsl
</Update>
<Update label="2026년 3월 18일">
## v1.11.0

View File

@@ -4,38 +4,6 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="23 mar 2026">
## v1.11.1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.11.1)
## O que Mudou
### Funcionalidades
- Adicionar o serializer flow_structure() para introspecção da classe Flow.
### Correções de Bugs
- Corrigir vulnerabilidades de segurança atualizando pypdf, tinytag e langchain-core.
- Preservar a configuração completa do LLM durante a retomada do HITL para provedores que não são da OpenAI.
- Prevenir a travessia de caminho no FileWriterTool.
- Corrigir a falha do lock_store quando o pacote redis não está instalado.
- Passar cache_function de BaseTool para CrewStructuredTool.
### Documentação
- Adicionar guia de publicação de ferramentas personalizadas com traduções.
- Atualizar changelog e versão para v1.11.0.
- Adicionar documentação de ouvintes de eventos ausentes.
### Refatoração
- Substituir urllib por requests no carregador de pdf.
- Substituir campos de callback e modelo do tipo Any por tipos serializáveis.
## Contribuidores
@alex-clawd, @danielfsbarreto, @dependabot[bot], @greysonlalonde, @lorenzejay, @lucasgomide, @mattatcha, @theCyberTech, @vinibrsl
</Update>
<Update label="18 mar 2026">
## v1.11.0

View File

@@ -9,11 +9,11 @@ authors = [
requires-python = ">=3.10, <3.14"
dependencies = [
"Pillow~=12.1.1",
"pypdf~=6.9.1",
"pypdf~=6.7.5",
"python-magic>=0.4.27",
"aiocache~=0.12.3",
"aiofiles~=24.1.0",
"tinytag~=2.2.1",
"tinytag~=1.10.0",
"av~=13.0.0",
]

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.11.1"
__version__ = "1.11.0"

View File

@@ -11,7 +11,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.11.1",
"crewai==1.11.0",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -309,4 +309,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.11.1"
__version__ = "1.11.0"

View File

@@ -1,12 +1,10 @@
"""PDF loader for extracting text from PDF files."""
import os
import tempfile
from pathlib import Path
from typing import Any
from typing import Any, cast
from urllib.parse import urlparse
import requests
import urllib.request
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
@@ -25,34 +23,22 @@ class PDFLoader(BaseLoader):
return False
@staticmethod
def _download_from_url(url: str, kwargs: dict) -> str:
"""Download PDF from a URL to a temporary file and return its path.
def _download_pdf(url: str) -> bytes:
"""Download PDF content from a URL.
Args:
url: The URL to download from.
kwargs: Optional dict that may contain custom headers.
Returns:
Path to the temporary file containing the PDF.
The PDF content as bytes.
Raises:
ValueError: If the download fails.
"""
headers = kwargs.get(
"headers",
{
"Accept": "application/pdf",
"User-Agent": "Mozilla/5.0 (compatible; crewai-tools PDFLoader)",
},
)
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
temp_file.write(response.content)
return temp_file.name
with urllib.request.urlopen(url, timeout=30) as response: # noqa: S310
return cast(bytes, response.read())
except Exception as e:
raise ValueError(f"Failed to download PDF from {url}: {e!s}") from e
@@ -94,8 +80,8 @@ class PDFLoader(BaseLoader):
try:
if is_url:
local_path = self._download_from_url(file_path, kwargs)
doc = pymupdf.open(local_path)
pdf_bytes = self._download_pdf(file_path)
doc = pymupdf.open(stream=pdf_bytes, filetype="pdf")
else:
if not os.path.isfile(file_path):
raise FileNotFoundError(f"PDF file not found: {file_path}")

View File

@@ -1,3 +1,4 @@
import re
from typing import Any
from crewai.tools import BaseTool
@@ -52,7 +53,18 @@ class NL2SQLTool(BaseTool):
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';"
)
@staticmethod
def _validate_identifier(value: str, name: str) -> str:
"""Validate a SQL identifier to prevent SQL injection."""
if not re.match(r"^[A-Za-z_][A-Za-z0-9_$]*$", value):
raise ValueError(
f"Invalid {name}: {value!r}. "
f"Only alphanumeric characters, underscores, and dollar signs are allowed."
)
return value
def _fetch_all_available_columns(self, table_name: str):
table_name = self._validate_identifier(table_name, "table_name")
return self.execute_sql(
f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table_name}';" # noqa: S608
)

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
import re
import threading
from typing import TYPE_CHECKING, Any
@@ -71,8 +72,16 @@ class SnowflakeSearchToolInput(BaseModel):
model_config = ConfigDict(protected_namespaces=())
query: str = Field(..., description="SQL query or semantic search query to execute")
database: str | None = Field(None, description="Override default database")
snowflake_schema: str | None = Field(None, description="Override default schema")
database: str | None = Field(
None,
description="Override default database",
pattern=r"^[A-Za-z_][A-Za-z0-9_$]*$",
)
snowflake_schema: str | None = Field(
None,
description="Override default schema",
pattern=r"^[A-Za-z_][A-Za-z0-9_$]*$",
)
timeout: int | None = Field(300, description="Query timeout in seconds")
@@ -247,6 +256,16 @@ class SnowflakeSearchTool(BaseTool):
continue
raise RuntimeError("Query failed after all retries")
@staticmethod
def _validate_identifier(value: str, name: str) -> str:
"""Validate and sanitize a Snowflake identifier to prevent SQL injection."""
if not re.match(r"^[A-Za-z_][A-Za-z0-9_$]*$", value):
raise ValueError(
f"Invalid {name}: {value!r}. "
f"Only alphanumeric characters, underscores, and dollar signs are allowed."
)
return value
async def _run(
self,
query: str,
@@ -259,9 +278,11 @@ class SnowflakeSearchTool(BaseTool):
try:
# Override database/schema if provided
if database:
await self._execute_query(f"USE DATABASE {database}")
database = self._validate_identifier(database, "database")
await self._execute_query(f'USE DATABASE "{database}"')
if snowflake_schema:
await self._execute_query(f"USE SCHEMA {snowflake_schema}")
snowflake_schema = self._validate_identifier(snowflake_schema, "schema")
await self._execute_query(f'USE SCHEMA "{snowflake_schema}"')
return await self._execute_query(query, timeout)
except Exception as e:

View File

@@ -0,0 +1,72 @@
from unittest.mock import MagicMock, patch
import pytest
from crewai_tools.tools.nl2sql.nl2sql_tool import NL2SQLTool
class TestNL2SQLToolValidateIdentifier:
"""Tests for SQL injection prevention via identifier validation."""
def test_valid_identifiers(self):
assert NL2SQLTool._validate_identifier("users", "table_name") == "users"
assert NL2SQLTool._validate_identifier("MY_TABLE", "table_name") == "MY_TABLE"
assert NL2SQLTool._validate_identifier("table$1", "table_name") == "table$1"
assert NL2SQLTool._validate_identifier("_private", "table_name") == "_private"
def test_rejects_sql_injection_with_semicolon(self):
with pytest.raises(ValueError, match="Invalid table_name"):
NL2SQLTool._validate_identifier("users; DROP TABLE users;--", "table_name")
def test_rejects_sql_injection_with_quotes(self):
with pytest.raises(ValueError, match="Invalid table_name"):
NL2SQLTool._validate_identifier("users'--", "table_name")
def test_rejects_sql_injection_with_spaces(self):
with pytest.raises(ValueError, match="Invalid table_name"):
NL2SQLTool._validate_identifier("users DROP TABLE", "table_name")
def test_rejects_leading_number(self):
with pytest.raises(ValueError, match="Invalid table_name"):
NL2SQLTool._validate_identifier("1table", "table_name")
def test_rejects_empty_string(self):
with pytest.raises(ValueError, match="Invalid table_name"):
NL2SQLTool._validate_identifier("", "table_name")
def test_rejects_parentheses(self):
with pytest.raises(ValueError, match="Invalid table_name"):
NL2SQLTool._validate_identifier("users()", "table_name")
def test_rejects_dash_comment(self):
with pytest.raises(ValueError, match="Invalid table_name"):
NL2SQLTool._validate_identifier("users--comment", "table_name")
@patch("crewai_tools.tools.nl2sql.nl2sql_tool.SQLALCHEMY_AVAILABLE", True)
class TestNL2SQLToolFetchColumns:
"""Tests that _fetch_all_available_columns validates table names."""
def _make_tool(self):
"""Create an NL2SQLTool instance bypassing model_post_init DB calls."""
with patch.object(NL2SQLTool, "model_post_init"):
tool = NL2SQLTool(
db_uri="sqlite:///:memory:",
name="NL2SQLTool",
description="test",
)
return tool
def test_rejects_malicious_table_name(self):
tool = self._make_tool()
with pytest.raises(ValueError, match="Invalid table_name"):
tool._fetch_all_available_columns("users'; DROP TABLE users;--")
def test_accepts_valid_table_name(self):
tool = self._make_tool()
with patch.object(NL2SQLTool, "execute_sql", return_value=[]) as mock_exec:
result = tool._fetch_all_available_columns("valid_table")
mock_exec.assert_called_once()
call_sql = mock_exec.call_args[0][0]
assert "valid_table" in call_sql
assert result == []

View File

@@ -2,6 +2,9 @@ import asyncio
from unittest.mock import MagicMock, patch
from crewai_tools import SnowflakeConfig, SnowflakeSearchTool
from crewai_tools.tools.snowflake_search_tool.snowflake_search_tool import (
SnowflakeSearchToolInput,
)
import pytest
@@ -100,3 +103,136 @@ def test_config_validation():
# Test missing authentication
with pytest.raises(ValueError):
SnowflakeConfig(account="test_account", user="test_user")
# SQL Injection Prevention Tests
class TestSnowflakeSearchToolInputValidation:
"""Tests for SQL injection prevention via input schema validation."""
def test_valid_database_identifier(self):
inp = SnowflakeSearchToolInput(query="SELECT 1", database="my_database")
assert inp.database == "my_database"
def test_valid_schema_identifier(self):
inp = SnowflakeSearchToolInput(query="SELECT 1", snowflake_schema="public")
assert inp.snowflake_schema == "public"
def test_valid_identifier_with_dollar_sign(self):
inp = SnowflakeSearchToolInput(query="SELECT 1", database="my$db")
assert inp.database == "my$db"
def test_database_with_sql_injection_semicolon(self):
with pytest.raises(ValueError):
SnowflakeSearchToolInput(
query="SELECT 1", database="test_db; DROP TABLE users; --"
)
def test_schema_with_sql_injection_semicolon(self):
with pytest.raises(ValueError):
SnowflakeSearchToolInput(
query="SELECT 1", snowflake_schema="public; DROP TABLE users; --"
)
def test_database_with_sql_injection_spaces(self):
with pytest.raises(ValueError):
SnowflakeSearchToolInput(
query="SELECT 1", database="test_db DROP TABLE"
)
def test_schema_with_sql_injection_quotes(self):
with pytest.raises(ValueError):
SnowflakeSearchToolInput(
query="SELECT 1", snowflake_schema="public\"--"
)
def test_database_with_sql_injection_dash_comment(self):
with pytest.raises(ValueError):
SnowflakeSearchToolInput(
query="SELECT 1", database="test--comment"
)
def test_database_starting_with_number(self):
with pytest.raises(ValueError):
SnowflakeSearchToolInput(query="SELECT 1", database="1invalid")
def test_none_database_is_allowed(self):
inp = SnowflakeSearchToolInput(query="SELECT 1", database=None)
assert inp.database is None
def test_none_schema_is_allowed(self):
inp = SnowflakeSearchToolInput(query="SELECT 1", snowflake_schema=None)
assert inp.snowflake_schema is None
class TestSnowflakeSearchToolValidateIdentifier:
"""Tests for the _validate_identifier runtime check."""
def test_valid_identifiers(self):
assert SnowflakeSearchTool._validate_identifier("my_db", "database") == "my_db"
assert SnowflakeSearchTool._validate_identifier("PROD_DB", "database") == "PROD_DB"
assert SnowflakeSearchTool._validate_identifier("schema$1", "schema") == "schema$1"
assert SnowflakeSearchTool._validate_identifier("_private", "schema") == "_private"
def test_rejects_semicolons(self):
with pytest.raises(ValueError, match="Invalid database"):
SnowflakeSearchTool._validate_identifier("db; DROP TABLE users;--", "database")
def test_rejects_spaces(self):
with pytest.raises(ValueError, match="Invalid schema"):
SnowflakeSearchTool._validate_identifier("public schema", "schema")
def test_rejects_quotes(self):
with pytest.raises(ValueError, match="Invalid database"):
SnowflakeSearchTool._validate_identifier('db"--', "database")
def test_rejects_leading_number(self):
with pytest.raises(ValueError, match="Invalid database"):
SnowflakeSearchTool._validate_identifier("1db", "database")
def test_rejects_empty_string(self):
with pytest.raises(ValueError, match="Invalid database"):
SnowflakeSearchTool._validate_identifier("", "database")
@pytest.mark.asyncio
async def test_run_uses_quoted_identifiers(snowflake_tool, mock_snowflake_connection):
"""Verify that _run wraps database/schema in double quotes in the SQL."""
with patch.object(snowflake_tool, "_create_connection") as mock_create_conn:
mock_create_conn.return_value = mock_snowflake_connection
await snowflake_tool._run(
query="SELECT 1",
database="my_db",
snowflake_schema="my_schema",
)
calls = mock_snowflake_connection.cursor().execute.call_args_list
sql_statements = [call[0][0] for call in calls]
assert 'USE DATABASE "my_db"' in sql_statements
assert 'USE SCHEMA "my_schema"' in sql_statements
@pytest.mark.asyncio
async def test_run_rejects_malicious_database(snowflake_tool, mock_snowflake_connection):
"""Verify that _run raises ValueError for SQL injection attempts in database."""
with patch.object(snowflake_tool, "_create_connection") as mock_create_conn:
mock_create_conn.return_value = mock_snowflake_connection
with pytest.raises(ValueError, match="Invalid database"):
await snowflake_tool._run(
query="SELECT 1",
database="test_db; DROP TABLE users; --",
)
@pytest.mark.asyncio
async def test_run_rejects_malicious_schema(snowflake_tool, mock_snowflake_connection):
"""Verify that _run raises ValueError for SQL injection attempts in schema."""
with patch.object(snowflake_tool, "_create_connection") as mock_create_conn:
mock_create_conn.return_value = mock_snowflake_connection
with pytest.raises(ValueError, match="Invalid schema"):
await snowflake_tool._run(
query="SELECT 1",
snowflake_schema="public; DROP TABLE users; --",
)

View File

@@ -21672,6 +21672,7 @@
"database": {
"anyOf": [
{
"pattern": "^[A-Za-z_][A-Za-z0-9_$]*$",
"type": "string"
},
{
@@ -21690,6 +21691,7 @@
"snowflake_schema": {
"anyOf": [
{
"pattern": "^[A-Za-z_][A-Za-z0-9_$]*$",
"type": "string"
},
{

View File

@@ -53,7 +53,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.11.1",
"crewai-tools==1.11.0",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -42,7 +42,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.11.1"
__version__ = "1.11.0"
_telemetry_submitted = False

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from crewai.agents.parser import AgentFinish
from crewai.memory.utils import sanitize_scope_name
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import sanitize_tool_name
@@ -27,12 +26,7 @@ class CrewAgentExecutorMixin:
_printer: Printer = Printer()
def _save_to_memory(self, output: AgentFinish) -> None:
"""Save task result to unified memory (memory or crew._memory).
Extends the memory's root_scope with agent-specific path segment
(e.g., '/crew/research-crew/agent/researcher') so that agent memories
are scoped hierarchically under their crew.
"""
"""Save task result to unified memory (memory or crew._memory)."""
memory = getattr(self.agent, "memory", None) or (
getattr(self.crew, "_memory", None) if self.crew else None
)
@@ -49,20 +43,6 @@ class CrewAgentExecutorMixin:
)
extracted = memory.extract_memories(raw)
if extracted:
# Build agent-specific root_scope that extends the crew's root
agent_role = self.agent.role or "unknown"
sanitized_role = sanitize_scope_name(agent_role)
# Get the memory's existing root_scope and extend with agent info
base_root = getattr(memory, "root_scope", None) or ""
# Construct agent root: base_root + /agent/<role>
agent_root = f"{base_root.rstrip('/')}/agent/{sanitized_role}"
# Ensure leading slash
if not agent_root.startswith("/"):
agent_root = "/" + agent_root
memory.remember_many(
extracted, agent_role=self.agent.role, root_scope=agent_root
)
memory.remember_many(extracted, agent_role=self.agent.role)
except Exception as e:
self.agent._logger.log("error", f"Failed to save to memory: {e}")

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.11.1"
"crewai[tools]==1.11.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.11.1"
"crewai[tools]==1.11.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.11.1"
"crewai[tools]==1.11.0"
]
[tool.crewai]

View File

@@ -357,18 +357,7 @@ class Crew(FlowTrackable, BaseModel):
@model_validator(mode="after")
def create_crew_memory(self) -> Crew:
"""Initialize unified memory, respecting crew embedder config.
When memory is enabled, sets a hierarchical root_scope based on the
crew name (e.g. '/crew/research-crew') so that all memories saved by
this crew and its agents are organized under a consistent namespace.
"""
from crewai.memory.utils import sanitize_scope_name
# Compute sanitized crew name for root_scope
crew_name = sanitize_scope_name(self.name or "crew")
crew_root_scope = f"/crew/{crew_name}"
"""Initialize unified memory, respecting crew embedder config."""
if self.memory is True:
from crewai.memory.unified_memory import Memory
@@ -377,13 +366,10 @@ class Crew(FlowTrackable, BaseModel):
from crewai.rag.embeddings.factory import build_embedder
embedder = build_embedder(self.embedder) # type: ignore[arg-type]
self._memory = Memory(embedder=embedder, root_scope=crew_root_scope)
self._memory = Memory(embedder=embedder)
elif self.memory:
# User passed a Memory / MemoryScope / MemorySlice instance
self._memory = self.memory
# Set root_scope only if not already set (don't override user config)
if hasattr(self._memory, "root_scope") and self._memory.root_scope is None:
self._memory.root_scope = crew_root_scope
else:
self._memory = None

View File

@@ -6,7 +6,6 @@ from crewai.flow.async_feedback import (
)
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_config import flow_config
from crewai.flow.flow_serializer import flow_structure
from crewai.flow.human_feedback import HumanFeedbackResult, human_feedback
from crewai.flow.input_provider import InputProvider, InputResponse
from crewai.flow.persistence import persist
@@ -30,7 +29,6 @@ __all__ = [
"and_",
"build_flow_structure",
"flow_config",
"flow_structure",
"human_feedback",
"listen",
"or_",

View File

@@ -60,7 +60,7 @@ class PendingFeedbackContext:
emit: list[str] | None = None
default_outcome: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
llm: dict[str, Any] | str | None = None
llm: str | None = None
requested_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict[str, Any]:

View File

@@ -905,10 +905,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Internal flows (RecallFlow, EncodingFlow) set _skip_auto_memory
# to avoid creating a wasteful standalone Memory instance.
if self.memory is None and not getattr(self, "_skip_auto_memory", False):
from crewai.memory.utils import sanitize_scope_name
flow_name = sanitize_scope_name(self.name or self.__class__.__name__)
self.memory = Memory(root_scope=f"/flow/{flow_name}")
self.memory = Memory()
# Register all flow-related methods
for method_name in dir(self):
@@ -1318,25 +1315,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
context = self._pending_feedback_context
emit = context.emit
default_outcome = context.default_outcome
# Try to get the live LLM from the re-imported decorator first.
# This preserves the fully-configured object (credentials, safety_settings, etc.)
# for same-process resume. For cross-process resume, fall back to the
# serialized context.llm which is now a dict with full config (or a legacy string).
from crewai.flow.human_feedback import _deserialize_llm_from_context
llm = None
method = self._methods.get(FlowMethodName(context.method_name))
if method is not None:
live_llm = getattr(method, "_hf_llm", None)
if live_llm is not None:
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
if isinstance(live_llm, BaseLLMClass):
llm = live_llm
if llm is None:
llm = _deserialize_llm_from_context(context.llm)
llm = context.llm
# Determine outcome
collapsed_outcome: str | None = None

View File

@@ -1,619 +0,0 @@
"""Flow structure serializer for introspecting Flow classes.
This module provides the flow_structure() function that analyzes a Flow class
and returns a JSON-serializable dictionary describing its graph structure.
This is used by Studio UI to render a visual flow graph.
Example:
>>> from crewai.flow import Flow, start, listen
>>> from crewai.flow.flow_serializer import flow_structure
>>>
>>> class MyFlow(Flow):
... @start()
... def begin(self):
... return "started"
...
... @listen(begin)
... def process(self):
... return "done"
>>>
>>> structure = flow_structure(MyFlow)
>>> print(structure["name"])
'MyFlow'
"""
from __future__ import annotations
import inspect
import logging
import re
import textwrap
from typing import Any, TypedDict, get_args, get_origin
from pydantic import BaseModel
from pydantic_core import PydanticUndefined
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowMethod,
ListenMethod,
RouterMethod,
StartMethod,
)
logger = logging.getLogger(__name__)
class MethodInfo(TypedDict, total=False):
"""Information about a single flow method.
Attributes:
name: The method name.
type: Method type - start, listen, router, or start_router.
trigger_methods: List of method names that trigger this method.
condition_type: 'AND' or 'OR' for composite conditions, null otherwise.
router_paths: For routers, the possible route names returned.
has_human_feedback: Whether the method has @human_feedback decorator.
has_crew: Whether the method body references a Crew.
"""
name: str
type: str
trigger_methods: list[str]
condition_type: str | None
router_paths: list[str]
has_human_feedback: bool
has_crew: bool
class EdgeInfo(TypedDict, total=False):
"""Information about an edge between flow methods.
Attributes:
from_method: Source method name.
to_method: Target method name.
edge_type: Type of edge - 'listen' or 'route'.
condition: Route name for router edges, null for listen edges.
"""
from_method: str
to_method: str
edge_type: str
condition: str | None
class StateFieldInfo(TypedDict, total=False):
"""Information about a state field.
Attributes:
name: Field name.
type: Field type as string.
default: Default value if any.
"""
name: str
type: str
default: Any
class StateSchemaInfo(TypedDict, total=False):
"""Information about the flow's state schema.
Attributes:
fields: List of field information.
"""
fields: list[StateFieldInfo]
class FlowStructureInfo(TypedDict, total=False):
"""Complete flow structure information.
Attributes:
name: Flow class name.
description: Flow docstring if available.
methods: List of method information.
edges: List of edge information.
state_schema: State schema if typed, null otherwise.
inputs: Detected flow inputs if available.
"""
name: str
description: str | None
methods: list[MethodInfo]
edges: list[EdgeInfo]
state_schema: StateSchemaInfo | None
inputs: list[str]
def _get_method_type(
method_name: str,
method: Any,
start_methods: list[str],
routers: set[str],
) -> str:
"""Determine the type of a flow method.
Args:
method_name: Name of the method.
method: The method object.
start_methods: List of start method names.
routers: Set of router method names.
Returns:
One of: 'start', 'listen', 'router', or 'start_router'.
"""
is_start = method_name in start_methods or getattr(
method, "__is_start_method__", False
)
is_router = method_name in routers or getattr(method, "__is_router__", False)
if is_start and is_router:
return "start_router"
if is_start:
return "start"
if is_router:
return "router"
return "listen"
def _has_human_feedback(method: Any) -> bool:
"""Check if a method has the @human_feedback decorator.
Args:
method: The method object to check.
Returns:
True if the method has __human_feedback_config__ attribute.
"""
return hasattr(method, "__human_feedback_config__")
def _detect_crew_reference(method: Any) -> bool:
"""Detect if a method body references a Crew.
Checks for patterns like:
- .crew() method calls
- Crew( instantiation
- References to Crew class in type hints
Note:
This is a **best-effort heuristic for UI hints**, not a guarantee.
Uses inspect.getsource + regex which can false-positive on comments
or string literals, and may fail on dynamically generated methods
or lambdas. Do not rely on this for correctness-critical logic.
Args:
method: The method object to inspect.
Returns:
True if crew reference detected, False otherwise.
"""
try:
# Get the underlying function from wrapper
func = method
if hasattr(method, "_meth"):
func = method._meth
elif hasattr(method, "__wrapped__"):
func = method.__wrapped__
source = inspect.getsource(func)
source = textwrap.dedent(source)
# Patterns that indicate Crew usage
crew_patterns = [
r"\.crew\(\)", # .crew() method call
r"Crew\s*\(", # Crew( instantiation
r":\s*Crew\b", # Type hint with Crew
r"->.*Crew", # Return type hint with Crew
]
for pattern in crew_patterns:
if re.search(pattern, source):
return True
return False
except (OSError, TypeError):
# Can't get source code - assume no crew reference
return False
def _extract_trigger_methods(method: Any) -> tuple[list[str], str | None]:
"""Extract trigger methods and condition type from a method.
Args:
method: The method object to inspect.
Returns:
Tuple of (trigger_methods list, condition_type or None).
"""
trigger_methods: list[str] = []
condition_type: str | None = None
# First try __trigger_methods__ (populated for simple conditions)
if hasattr(method, "__trigger_methods__") and method.__trigger_methods__:
trigger_methods = [str(m) for m in method.__trigger_methods__]
# For complex conditions (or_/and_ combinators), extract from __trigger_condition__
if (
not trigger_methods
and hasattr(method, "__trigger_condition__")
and method.__trigger_condition__
):
trigger_condition = method.__trigger_condition__
trigger_methods = _extract_all_methods_from_condition(trigger_condition)
if hasattr(method, "__condition_type__") and method.__condition_type__:
condition_type = str(method.__condition_type__)
return trigger_methods, condition_type
def _extract_router_paths(
method: Any, router_paths_registry: dict[str, list[str]]
) -> list[str]:
"""Extract router paths for a router method.
Args:
method: The method object.
router_paths_registry: The class-level _router_paths dict.
Returns:
List of possible route names.
"""
method_name = getattr(method, "__name__", "")
# First check if there are __router_paths__ on the method itself
if hasattr(method, "__router_paths__") and method.__router_paths__:
return [str(p) for p in method.__router_paths__]
# Then check the class-level registry
if method_name in router_paths_registry:
return [str(p) for p in router_paths_registry[method_name]]
return []
def _extract_all_methods_from_condition(
condition: str | FlowCondition | dict[str, Any] | list[Any],
) -> list[str]:
"""Extract all method names from a condition tree recursively.
Args:
condition: Can be a string, FlowCondition tuple, dict, or list.
Returns:
List of all method names found in the condition.
"""
if isinstance(condition, str):
return [condition]
if isinstance(condition, tuple) and len(condition) == 2:
# FlowCondition: (condition_type, methods_list)
_, methods = condition
if isinstance(methods, list):
result: list[str] = []
for m in methods:
result.extend(_extract_all_methods_from_condition(m))
return result
return []
if isinstance(condition, dict):
conditions_list = condition.get("conditions", [])
methods: list[str] = []
for sub_cond in conditions_list:
methods.extend(_extract_all_methods_from_condition(sub_cond))
return methods
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods_from_condition(item))
return methods
return []
def _generate_edges(
listeners: dict[str, tuple[str, list[str]] | FlowCondition],
routers: set[str],
router_paths: dict[str, list[str]],
all_methods: set[str],
) -> list[EdgeInfo]:
"""Generate edges from listeners and routers.
Args:
listeners: Map of listener_name -> (condition_type, trigger_methods) or FlowCondition.
routers: Set of router method names.
router_paths: Map of router_name -> possible return values.
all_methods: Set of all method names in the flow.
Returns:
List of EdgeInfo dictionaries.
"""
edges: list[EdgeInfo] = []
# Generate edges from listeners (listen edges)
for listener_name, condition_data in listeners.items():
trigger_methods: list[str] = []
if isinstance(condition_data, tuple) and len(condition_data) == 2:
_condition_type, methods = condition_data
trigger_methods = [str(m) for m in methods]
elif isinstance(condition_data, dict):
trigger_methods = _extract_all_methods_from_condition(condition_data)
# Create edges from each trigger to the listener
edges.extend(
EdgeInfo(
from_method=trigger,
to_method=listener_name,
edge_type="listen",
condition=None,
)
for trigger in trigger_methods
if trigger in all_methods
)
# Generate edges from routers (route edges)
for router_name, paths in router_paths.items():
for path in paths:
# Find listeners that listen to this path
for listener_name, condition_data in listeners.items():
path_triggers: list[str] = []
if isinstance(condition_data, tuple) and len(condition_data) == 2:
_, methods = condition_data
path_triggers = [str(m) for m in methods]
elif isinstance(condition_data, dict):
path_triggers = _extract_all_methods_from_condition(condition_data)
if str(path) in path_triggers:
edges.append(
EdgeInfo(
from_method=router_name,
to_method=listener_name,
edge_type="route",
condition=str(path),
)
)
return edges
def _extract_state_schema(flow_class: type) -> StateSchemaInfo | None:
"""Extract state schema from a Flow class.
Checks for:
- Generic type parameter (Flow[MyState])
- initial_state class attribute
Args:
flow_class: The Flow class to inspect.
Returns:
StateSchemaInfo if a Pydantic model state is detected, None otherwise.
"""
state_type: type | None = None
# Check for _initial_state_t set by __class_getitem__
if hasattr(flow_class, "_initial_state_t"):
state_type = flow_class._initial_state_t
# Check initial_state class attribute
if state_type is None and hasattr(flow_class, "initial_state"):
initial_state = flow_class.initial_state
if isinstance(initial_state, type) and issubclass(initial_state, BaseModel):
state_type = initial_state
elif isinstance(initial_state, BaseModel):
state_type = type(initial_state)
# Check __orig_bases__ for generic parameters
if state_type is None and hasattr(flow_class, "__orig_bases__"):
for base in flow_class.__orig_bases__:
origin = get_origin(base)
if origin is not None:
args = get_args(base)
if args:
candidate = args[0]
if isinstance(candidate, type) and issubclass(candidate, BaseModel):
state_type = candidate
break
if state_type is None or not issubclass(state_type, BaseModel):
return None
# Extract fields from the Pydantic model
fields: list[StateFieldInfo] = []
try:
model_fields = state_type.model_fields
for field_name, field_info in model_fields.items():
field_type_str = "Any"
if field_info.annotation is not None:
field_type_str = str(field_info.annotation)
# Clean up the type string
field_type_str = field_type_str.replace("typing.", "")
field_type_str = field_type_str.replace("<class '", "").replace(
"'>", ""
)
default_value = None
if (
field_info.default is not PydanticUndefined
and field_info.default is not None
and not callable(field_info.default)
):
try:
# Try to serialize the default value
default_value = field_info.default
except Exception:
default_value = str(field_info.default)
fields.append(
StateFieldInfo(
name=field_name,
type=field_type_str,
default=default_value,
)
)
except Exception:
logger.debug(
"Failed to extract state schema fields for %s", flow_class.__name__
)
return StateSchemaInfo(fields=fields) if fields else None
def _detect_flow_inputs(flow_class: type) -> list[str]:
"""Detect flow input parameters.
Inspects the __init__ signature for custom parameters beyond standard Flow params.
Args:
flow_class: The Flow class to inspect.
Returns:
List of detected input names.
"""
inputs: list[str] = []
# Check for inputs in __init__ signature beyond standard Flow params
try:
init_sig = inspect.signature(flow_class.__init__)
standard_params = {
"self",
"persistence",
"tracing",
"suppress_flow_events",
"max_method_calls",
"kwargs",
}
inputs.extend(
param_name
for param_name in init_sig.parameters
if param_name not in standard_params and not param_name.startswith("_")
)
except Exception:
logger.debug(
"Failed to detect inputs from __init__ for %s", flow_class.__name__
)
return inputs
def flow_structure(flow_class: type) -> FlowStructureInfo:
"""Introspect a Flow class and return its structure as a JSON-serializable dict.
This function analyzes a Flow CLASS (not instance) and returns complete
information about its graph structure including methods, edges, and state.
Args:
flow_class: A Flow class (not an instance) to introspect.
Returns:
FlowStructureInfo dictionary containing:
- name: Flow class name
- description: Docstring if available
- methods: List of method info dicts
- edges: List of edge info dicts
- state_schema: State schema if typed, None otherwise
- inputs: Detected input names
Raises:
TypeError: If flow_class is not a class.
Example:
>>> structure = flow_structure(MyFlow)
>>> print(structure["name"])
'MyFlow'
>>> for method in structure["methods"]:
... print(method["name"], method["type"])
"""
if not isinstance(flow_class, type):
raise TypeError(
f"flow_structure requires a Flow class, not an instance. "
f"Got {type(flow_class).__name__}"
)
# Get class-level metadata set by FlowMeta
start_methods: list[str] = getattr(flow_class, "_start_methods", [])
listeners: dict[str, Any] = getattr(flow_class, "_listeners", {})
routers: set[str] = getattr(flow_class, "_routers", set())
router_paths_registry: dict[str, list[str]] = getattr(
flow_class, "_router_paths", {}
)
# Collect all flow methods
methods: list[MethodInfo] = []
all_method_names: set[str] = set()
for attr_name in dir(flow_class):
if attr_name.startswith("_"):
continue
try:
attr = getattr(flow_class, attr_name)
except AttributeError:
continue
# Check if it's a flow method
is_flow_method = (
isinstance(attr, (FlowMethod, StartMethod, ListenMethod, RouterMethod))
or hasattr(attr, "__is_flow_method__")
or hasattr(attr, "__is_start_method__")
or hasattr(attr, "__trigger_methods__")
or hasattr(attr, "__is_router__")
)
if not is_flow_method:
continue
all_method_names.add(attr_name)
# Get method type
method_type = _get_method_type(attr_name, attr, start_methods, routers)
# Get trigger methods and condition type
trigger_methods, condition_type = _extract_trigger_methods(attr)
# Get router paths if applicable
router_paths_list: list[str] = []
if method_type in ("router", "start_router"):
router_paths_list = _extract_router_paths(attr, router_paths_registry)
# Check for human feedback
has_hf = _has_human_feedback(attr)
# Check for crew reference
has_crew = _detect_crew_reference(attr)
method_info = MethodInfo(
name=attr_name,
type=method_type,
trigger_methods=trigger_methods,
condition_type=condition_type,
router_paths=router_paths_list,
has_human_feedback=has_hf,
has_crew=has_crew,
)
methods.append(method_info)
# Generate edges
edges = _generate_edges(listeners, routers, router_paths_registry, all_method_names)
# Extract state schema
state_schema = _extract_state_schema(flow_class)
# Detect inputs
inputs = _detect_flow_inputs(flow_class)
# Get flow description from docstring
description: str | None = None
if flow_class.__doc__:
description = flow_class.__doc__.strip()
return FlowStructureInfo(
name=flow_class.__name__,
description=description,
methods=methods,
edges=edges,
state_schema=state_schema,
inputs=inputs,
)

View File

@@ -75,7 +75,6 @@ class FlowMethod(Generic[P, R]):
"__is_router__",
"__router_paths__",
"__human_feedback_config__",
"_hf_llm", # Live LLM object for HITL resume
]:
if hasattr(meth, attr):
setattr(self, attr, getattr(meth, attr))

View File

@@ -76,48 +76,22 @@ if TYPE_CHECKING:
F = TypeVar("F", bound=Callable[..., Any])
def _serialize_llm_for_context(llm: Any) -> dict[str, Any] | str | None:
"""Serialize a BaseLLM object to a dict preserving full config.
def _serialize_llm_for_context(llm: Any) -> str | None:
"""Serialize a BaseLLM object to a model string with provider prefix.
Delegates to ``llm.to_config_dict()`` when available (BaseLLM and
subclasses). Falls back to extracting the model string with provider
prefix for unknown LLM types.
When persisting the LLM for HITL resume, we need to store enough info
to reconstruct a working LLM on the resume worker. Just storing the bare
model name (e.g. "gemini-3-flash-preview") causes provider inference to
fail — it defaults to OpenAI. Including the provider prefix (e.g.
"gemini/gemini-3-flash-preview") allows LLM() to correctly route.
"""
if hasattr(llm, "to_config_dict"):
return llm.to_config_dict()
# Fallback for non-BaseLLM objects: just extract model + provider prefix
model = getattr(llm, "model", None)
if not model:
return None
provider = getattr(llm, "provider", None)
return f"{provider}/{model}" if provider and "/" not in model else model
def _deserialize_llm_from_context(
llm_data: dict[str, Any] | str | None,
) -> BaseLLM | None:
"""Reconstruct an LLM instance from serialized context data.
Handles both the new dict format (with full config) and the legacy
string format (model name only) for backward compatibility.
Returns a BaseLLM instance, or None if llm_data is None.
"""
if llm_data is None:
return None
from crewai.llm import LLM
if isinstance(llm_data, str):
return LLM(model=llm_data)
if isinstance(llm_data, dict):
model = llm_data.pop("model", None)
if not model:
return None
return LLM(model=model, **llm_data)
return None
if provider and "/" not in model:
return f"{provider}/{model}"
return model
@dataclass
@@ -598,14 +572,6 @@ def human_feedback(
wrapper.__is_router__ = True
wrapper.__router_paths__ = list(emit)
# Stash the live LLM object for HITL resume to retrieve.
# When a flow pauses for human feedback and later resumes (possibly in a
# different process), the serialized context only contains a model string.
# By storing the original LLM on the wrapper, resume_async can retrieve
# the fully-configured LLM (with credentials, project, safety_settings, etc.)
# instead of creating a bare LLM from just the model string.
wrapper._hf_llm = llm
return wrapper # type: ignore[no-any-return]
return decorator

View File

@@ -152,28 +152,6 @@ class BaseLLM(ABC):
"cached_prompt_tokens": 0,
}
def to_config_dict(self) -> dict[str, Any]:
"""Serialize this LLM to a dict that can reconstruct it via ``LLM(**config)``.
Returns the core fields that BaseLLM owns. Provider subclasses should
override this (calling ``super().to_config_dict()``) to add their own
fields (e.g. ``project``, ``location``, ``safety_settings``).
"""
model = self.model
provider = self.provider
model_str = f"{provider}/{model}" if provider and "/" not in model else model
config: dict[str, Any] = {"model": model_str}
if self.temperature is not None:
config["temperature"] = self.temperature
if self.base_url is not None:
config["base_url"] = self.base_url
if self.stop:
config["stop"] = self.stop
return config
@property
def provider(self) -> str:
"""Get the provider of the LLM."""

View File

@@ -256,19 +256,6 @@ class AnthropicCompletion(BaseLLM):
else:
self.stop_sequences = []
def to_config_dict(self) -> dict[str, Any]:
"""Extend base config with Anthropic-specific fields."""
config = super().to_config_dict()
if self.max_tokens != 4096: # non-default
config["max_tokens"] = self.max_tokens
if self.max_retries != 2: # non-default
config["max_retries"] = self.max_retries
if self.top_p is not None:
config["top_p"] = self.top_p
if self.timeout is not None:
config["timeout"] = self.timeout
return config
def _get_client_params(self) -> dict[str, Any]:
"""Get client parameters."""

View File

@@ -180,27 +180,6 @@ class AzureCompletion(BaseLLM):
and "/openai/deployments/" in self.endpoint
)
def to_config_dict(self) -> dict[str, Any]:
"""Extend base config with Azure-specific fields."""
config = super().to_config_dict()
if self.endpoint:
config["endpoint"] = self.endpoint
if self.api_version and self.api_version != "2024-06-01":
config["api_version"] = self.api_version
if self.timeout is not None:
config["timeout"] = self.timeout
if self.max_retries != 2:
config["max_retries"] = self.max_retries
if self.top_p is not None:
config["top_p"] = self.top_p
if self.frequency_penalty is not None:
config["frequency_penalty"] = self.frequency_penalty
if self.presence_penalty is not None:
config["presence_penalty"] = self.presence_penalty
if self.max_tokens is not None:
config["max_tokens"] = self.max_tokens
return config
@staticmethod
def _validate_and_fix_endpoint(endpoint: str, model: str) -> str:
"""Validate and fix Azure endpoint URL format.

View File

@@ -346,23 +346,6 @@ class BedrockCompletion(BaseLLM):
# Handle inference profiles for newer models
self.model_id = model
def to_config_dict(self) -> dict[str, Any]:
"""Extend base config with Bedrock-specific fields."""
config = super().to_config_dict()
# NOTE: AWS credentials (access_key, secret_key, session_token) are
# intentionally excluded — they must come from env on resume.
if self.region_name and self.region_name != "us-east-1":
config["region_name"] = self.region_name
if self.max_tokens is not None:
config["max_tokens"] = self.max_tokens
if self.top_p is not None:
config["top_p"] = self.top_p
if self.top_k is not None:
config["top_k"] = self.top_k
if self.guardrail_config:
config["guardrail_config"] = self.guardrail_config
return config
@property
def stop(self) -> list[str]:
"""Get stop sequences sent to the API."""
@@ -1897,9 +1880,7 @@ class BedrockCompletion(BaseLLM):
# Anthropic (Claude) models reject assistant-last messages when
# tools are in the request. Append a user message so the
# Converse API accepts the payload.
elif (
"anthropic" in self.model.lower() or "claude" in self.model.lower()
):
elif "anthropic" in self.model.lower() or "claude" in self.model.lower():
converse_messages.append(
{
"role": "user",

View File

@@ -176,28 +176,6 @@ class GeminiCompletion(BaseLLM):
else:
self.stop_sequences = []
def to_config_dict(self) -> dict[str, Any]:
"""Extend base config with Gemini/Vertex-specific fields."""
config = super().to_config_dict()
if self.project:
config["project"] = self.project
if self.location and self.location != "us-central1":
config["location"] = self.location
if self.top_p is not None:
config["top_p"] = self.top_p
if self.top_k is not None:
config["top_k"] = self.top_k
if self.max_output_tokens is not None:
config["max_output_tokens"] = self.max_output_tokens
if self.safety_settings:
config["safety_settings"] = [
{"category": str(s.category), "threshold": str(s.threshold)}
if hasattr(s, "category") and hasattr(s, "threshold")
else s
for s in self.safety_settings
]
return config
def _initialize_client(self, use_vertexai: bool = False) -> genai.Client:
"""Initialize the Google Gen AI client with proper parameter handling.

View File

@@ -329,35 +329,6 @@ class OpenAICompletion(BaseLLM):
"""
self._last_reasoning_items = None
def to_config_dict(self) -> dict[str, Any]:
"""Extend base config with OpenAI-specific fields."""
config = super().to_config_dict()
# Client-level params (from OpenAI SDK)
if self.organization:
config["organization"] = self.organization
if self.project:
config["project"] = self.project
if self.timeout is not None:
config["timeout"] = self.timeout
if self.max_retries != 2:
config["max_retries"] = self.max_retries
# Completion params
if self.top_p is not None:
config["top_p"] = self.top_p
if self.frequency_penalty is not None:
config["frequency_penalty"] = self.frequency_penalty
if self.presence_penalty is not None:
config["presence_penalty"] = self.presence_penalty
if self.max_tokens is not None:
config["max_tokens"] = self.max_tokens
if self.max_completion_tokens is not None:
config["max_completion_tokens"] = self.max_completion_tokens
if self.seed is not None:
config["seed"] = self.seed
if self.reasoning_effort is not None:
config["reasoning_effort"] = self.reasoning_effort
return config
def _get_client_params(self) -> dict[str, Any]:
"""Get OpenAI client parameters."""

View File

@@ -28,7 +28,6 @@ from crewai.memory.analyze import (
analyze_for_save,
)
from crewai.memory.types import MemoryConfig, MemoryRecord, embed_texts
from crewai.memory.utils import join_scope_paths
logger = logging.getLogger(__name__)
@@ -49,8 +48,6 @@ class ItemState(BaseModel):
importance: float | None = None
source: str | None = None
private: bool = False
# Structural root scope prefix for hierarchical scoping
root_scope: str | None = None
# Resolved values
resolved_scope: str = "/"
resolved_categories: list[str] = Field(default_factory=list)
@@ -106,24 +103,12 @@ class EncodingFlow(Flow[EncodingState]):
llm: Any,
embedder: Any,
config: MemoryConfig | None = None,
root_scope: str | None = None,
) -> None:
"""Initialize the encoding flow.
Args:
storage: Storage backend for persisting memories.
llm: LLM instance for analysis.
embedder: Embedder for generating vectors.
config: Optional memory configuration.
root_scope: Structural root scope prefix. LLM-inferred or explicit
scopes are nested under this root.
"""
super().__init__(suppress_flow_events=True)
self._storage = storage
self._llm = llm
self._embedder = embedder
self._config = config or MemoryConfig()
self._root_scope = root_scope
# ------------------------------------------------------------------
# Step 1: Batch embed (ONE embedder call)
@@ -336,13 +321,7 @@ class EncodingFlow(Flow[EncodingState]):
for i, future in save_futures.items():
analysis = future.result()
item = items[i]
# Determine inner scope from explicit scope or LLM-inferred
inner_scope = item.scope or analysis.suggested_scope or "/"
# Join root_scope with inner scope if root_scope is set
if item.root_scope:
item.resolved_scope = join_scope_paths(item.root_scope, inner_scope)
else:
item.resolved_scope = inner_scope
item.resolved_scope = item.scope or analysis.suggested_scope or "/"
item.resolved_categories = (
item.categories
if item.categories is not None
@@ -374,18 +353,8 @@ class EncodingFlow(Flow[EncodingState]):
pool.shutdown(wait=False)
def _apply_defaults(self, item: ItemState) -> None:
"""Apply caller values with config defaults (fast path).
If root_scope is set, prepends it to the inner scope to create the
final resolved_scope.
"""
inner_scope = item.scope or "/"
# Join root_scope with inner scope if root_scope is set
if item.root_scope:
item.resolved_scope = join_scope_paths(item.root_scope, inner_scope)
else:
item.resolved_scope = inner_scope if inner_scope != "/" else "/"
"""Apply caller values with config defaults (fast path)."""
item.resolved_scope = item.scope or "/"
item.resolved_categories = item.categories or []
item.resolved_metadata = item.metadata or {}
item.resolved_importance = (

View File

@@ -126,14 +126,6 @@ class Memory(BaseModel):
default=False,
description="If True, remember() and remember_many() are silent no-ops.",
)
root_scope: str | None = Field(
default=None,
description=(
"Structural root scope prefix. When set, LLM-inferred or explicit scopes "
"are nested under this root. For example, a crew with root_scope='/crew/research' "
"will store memories at '/crew/research/<inferred_scope>'."
),
)
_config: MemoryConfig = PrivateAttr()
_llm_instance: BaseLLM | None = PrivateAttr(default=None)
@@ -305,26 +297,11 @@ class Memory(BaseModel):
importance: float | None = None,
source: str | None = None,
private: bool = False,
root_scope: str | None = None,
) -> list[MemoryRecord]:
"""Run the batch EncodingFlow for one or more items. No event emission.
This is the core encoding logic shared by ``remember()`` and
``remember_many()``. Events are managed by the calling method.
Args:
contents: List of text content to encode and store.
scope: Optional explicit scope (inner scope, nested under root_scope).
categories: Optional categories for all items.
metadata: Optional metadata for all items.
importance: Optional importance score for all items.
source: Optional source identifier for all items.
private: Whether items are private.
root_scope: Structural root scope prefix. LLM-inferred or explicit
scopes are nested under this root.
Returns:
List of created MemoryRecord instances.
"""
from crewai.memory.encoding_flow import EncodingFlow
@@ -333,7 +310,6 @@ class Memory(BaseModel):
llm=self._llm,
embedder=self._embedder,
config=self._config,
root_scope=root_scope,
)
items_input = [
{
@@ -344,7 +320,6 @@ class Memory(BaseModel):
"importance": importance,
"source": source,
"private": private,
"root_scope": root_scope,
}
for c in contents
]
@@ -365,7 +340,6 @@ class Memory(BaseModel):
source: str | None = None,
private: bool = False,
agent_role: str | None = None,
root_scope: str | None = None,
) -> MemoryRecord | None:
"""Store a single item in memory (synchronous).
@@ -375,15 +349,13 @@ class Memory(BaseModel):
Args:
content: Text to remember.
scope: Optional scope path (inner scope); inferred if None.
scope: Optional scope path; inferred if None.
categories: Optional categories; inferred if None.
metadata: Optional metadata; merged with LLM-extracted if inferred.
importance: Optional importance 0-1; inferred if None.
source: Optional provenance identifier (e.g. user ID, session ID).
private: If True, only visible to recall from the same source.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope override. If provided, this overrides
the instance-level root_scope for this call only.
Returns:
The created MemoryRecord, or None if this memory is read-only.
@@ -393,10 +365,6 @@ class Memory(BaseModel):
"""
if self.read_only:
return None
# Determine effective root_scope: per-call override takes precedence
effective_root = root_scope if root_scope is not None else self.root_scope
_source_type = "unified_memory"
try:
crewai_event_bus.emit(
@@ -420,7 +388,6 @@ class Memory(BaseModel):
importance,
source,
private,
effective_root,
)
records = future.result()
record = records[0] if records else None
@@ -459,7 +426,6 @@ class Memory(BaseModel):
source: str | None = None,
private: bool = False,
agent_role: str | None = None,
root_scope: str | None = None,
) -> list[MemoryRecord]:
"""Store multiple items in memory (non-blocking).
@@ -474,15 +440,13 @@ class Memory(BaseModel):
Args:
contents: List of text items to remember.
scope: Optional scope (inner scope) applied to all items.
scope: Optional scope applied to all items.
categories: Optional categories applied to all items.
metadata: Optional metadata applied to all items.
importance: Optional importance applied to all items.
source: Optional provenance identifier applied to all items.
private: Privacy flag applied to all items.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope override. If provided, this overrides
the instance-level root_scope for this call only.
Returns:
Empty list (records are not available until the background save completes).
@@ -490,9 +454,6 @@ class Memory(BaseModel):
if not contents or self.read_only:
return []
# Determine effective root_scope: per-call override takes precedence
effective_root = root_scope if root_scope is not None else self.root_scope
self._submit_save(
self._background_encode_batch,
contents,
@@ -503,7 +464,6 @@ class Memory(BaseModel):
source,
private,
agent_role,
effective_root,
)
return []
@@ -517,7 +477,6 @@ class Memory(BaseModel):
source: str | None,
private: bool,
agent_role: str | None,
root_scope: str | None = None,
) -> list[MemoryRecord]:
"""Run the encoding pipeline in a background thread with event emission.
@@ -527,20 +486,6 @@ class Memory(BaseModel):
All ``emit`` calls are wrapped in try/except to handle the case where
the event bus shuts down before the background save finishes (e.g.
during process exit).
Args:
contents: List of text content to encode.
scope: Optional inner scope for all items.
categories: Optional categories for all items.
metadata: Optional metadata for all items.
importance: Optional importance for all items.
source: Optional source identifier for all items.
private: Whether items are private.
agent_role: Optional agent role for event metadata.
root_scope: Optional root scope prefix for hierarchical scoping.
Returns:
List of created MemoryRecord instances.
"""
try:
crewai_event_bus.emit(
@@ -557,14 +502,7 @@ class Memory(BaseModel):
try:
start = time.perf_counter()
records = self._encode_batch(
contents,
scope,
categories,
metadata,
importance,
source,
private,
root_scope,
contents, scope, categories, metadata, importance, source, private
)
elapsed_ms = (time.perf_counter() - start) * 1000
except RuntimeError:

View File

@@ -1,110 +0,0 @@
"""Utility functions for the unified memory system."""
from __future__ import annotations
import re
def sanitize_scope_name(name: str) -> str:
"""Sanitize a name for use in hierarchical scope paths.
Converts to lowercase, replaces non-alphanumeric chars (except underscore
and hyphen) with hyphens, collapses multiple hyphens, strips leading/trailing
hyphens.
Args:
name: The raw name to sanitize (e.g. crew name, agent role, flow class name).
Returns:
A sanitized string safe for use in scope paths. Returns 'unknown' if the
result would be empty.
Examples:
>>> sanitize_scope_name("Research Crew")
'research-crew'
>>> sanitize_scope_name("Agent #1 (Main)")
'agent-1-main'
>>> sanitize_scope_name("café_worker")
'caf-worker'
"""
if not name:
return "unknown"
name = name.lower().strip()
# Replace any character that's not alphanumeric, underscore, or hyphen with hyphen
name = re.sub(r"[^a-z0-9_-]", "-", name)
# Collapse multiple hyphens into one
name = re.sub(r"-+", "-", name)
# Strip leading/trailing hyphens
name = name.strip("-")
return name or "unknown"
def normalize_scope_path(path: str) -> str:
"""Normalize a scope path by removing double slashes and ensuring proper format.
Args:
path: The raw scope path (e.g. '/crew/MyCrewName//agent//role').
Returns:
A normalized path with leading slash, no trailing slash, no double slashes.
Returns '/' for empty or root-only paths.
Examples:
>>> normalize_scope_path("/crew/test//agent//")
'/crew/test/agent'
>>> normalize_scope_path("")
'/'
>>> normalize_scope_path("crew/test")
'/crew/test'
"""
if not path or path == "/":
return "/"
# Collapse multiple slashes
path = re.sub(r"/+", "/", path)
# Ensure leading slash
if not path.startswith("/"):
path = "/" + path
# Remove trailing slash (unless it's just '/')
if len(path) > 1:
path = path.rstrip("/")
return path
def join_scope_paths(root: str | None, inner: str | None) -> str:
"""Join a root scope with an inner scope, handling edge cases properly.
Args:
root: The root scope prefix (e.g. '/crew/research-crew').
inner: The inner scope (e.g. '/market-trends' or 'market-trends').
Returns:
The combined, normalized scope path.
Examples:
>>> join_scope_paths("/crew/test", "/market-trends")
'/crew/test/market-trends'
>>> join_scope_paths("/crew/test", "market-trends")
'/crew/test/market-trends'
>>> join_scope_paths("/crew/test", "/")
'/crew/test'
>>> join_scope_paths("/crew/test", None)
'/crew/test'
>>> join_scope_paths(None, "/market-trends")
'/market-trends'
>>> join_scope_paths(None, None)
'/'
"""
# Normalize both parts
root = root.rstrip("/") if root else ""
inner = inner.strip("/") if inner else ""
if root and inner:
result = f"{root}/{inner}"
elif root:
result = root
elif inner:
result = f"/{inner}"
else:
result = "/"
return normalize_scope_path(result)

View File

@@ -1,823 +0,0 @@
"""Tests for hierarchical root_scope functionality in unified memory.
Root scope is a structural prefix that is set automatically by crews and flows.
The LLM's encoding flow still infers a semantic inner scope, but the final
resolved scope = root_scope + '/' + llm_inferred_scope.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai.memory.types import MemoryRecord
from crewai.memory.utils import (
join_scope_paths,
normalize_scope_path,
sanitize_scope_name,
)
# --- Utility function tests ---
class TestSanitizeScopeName:
"""Tests for sanitize_scope_name utility."""
def test_simple_name(self) -> None:
assert sanitize_scope_name("research") == "research"
def test_name_with_spaces(self) -> None:
assert sanitize_scope_name("Research Crew") == "research-crew"
def test_name_with_special_chars(self) -> None:
assert sanitize_scope_name("Agent #1 (Main)") == "agent-1-main"
def test_name_with_unicode(self) -> None:
# Unicode characters get replaced with hyphens
result = sanitize_scope_name("café_worker")
# é becomes -, and the underscore is preserved, so café_worker -> caf-_worker
assert result == "caf-_worker"
def test_name_with_underscores(self) -> None:
# Underscores are preserved
assert sanitize_scope_name("test_agent") == "test_agent"
def test_name_with_hyphens(self) -> None:
assert sanitize_scope_name("my-crew") == "my-crew"
def test_multiple_spaces_collapsed(self) -> None:
assert sanitize_scope_name("foo bar") == "foo-bar"
def test_leading_trailing_spaces(self) -> None:
assert sanitize_scope_name(" crew ") == "crew"
def test_empty_string_returns_unknown(self) -> None:
assert sanitize_scope_name("") == "unknown"
def test_only_special_chars_returns_unknown(self) -> None:
assert sanitize_scope_name("@#$%") == "unknown"
def test_none_input_returns_unknown(self) -> None:
assert sanitize_scope_name(None) == "unknown" # type: ignore[arg-type]
class TestNormalizeScopePath:
"""Tests for normalize_scope_path utility."""
def test_simple_path(self) -> None:
assert normalize_scope_path("/crew/test") == "/crew/test"
def test_double_slashes_collapsed(self) -> None:
assert normalize_scope_path("/crew//test//agent") == "/crew/test/agent"
def test_trailing_slash_removed(self) -> None:
assert normalize_scope_path("/crew/test/") == "/crew/test"
def test_missing_leading_slash_added(self) -> None:
assert normalize_scope_path("crew/test") == "/crew/test"
def test_empty_string_returns_root(self) -> None:
assert normalize_scope_path("") == "/"
def test_root_only_returns_root(self) -> None:
assert normalize_scope_path("/") == "/"
def test_multiple_trailing_slashes(self) -> None:
assert normalize_scope_path("/crew///") == "/crew"
class TestJoinScopePaths:
"""Tests for join_scope_paths utility."""
def test_basic_join(self) -> None:
assert join_scope_paths("/crew/test", "/market-trends") == "/crew/test/market-trends"
def test_inner_without_leading_slash(self) -> None:
assert join_scope_paths("/crew/test", "market-trends") == "/crew/test/market-trends"
def test_root_with_trailing_slash(self) -> None:
assert join_scope_paths("/crew/test/", "/inner") == "/crew/test/inner"
def test_root_only_inner_slash(self) -> None:
assert join_scope_paths("/crew/test", "/") == "/crew/test"
def test_root_only_inner_none(self) -> None:
assert join_scope_paths("/crew/test", None) == "/crew/test"
def test_no_root_with_inner(self) -> None:
assert join_scope_paths(None, "/market-trends") == "/market-trends"
def test_both_none(self) -> None:
assert join_scope_paths(None, None) == "/"
def test_empty_strings(self) -> None:
assert join_scope_paths("", "") == "/"
def test_root_empty_inner_value(self) -> None:
assert join_scope_paths("", "inner") == "/inner"
# --- Memory root_scope tests ---
@pytest.fixture
def mock_embedder() -> MagicMock:
"""Embedder mock that returns one embedding per input text (batch-aware)."""
m = MagicMock()
m.side_effect = lambda texts: [[0.1] * 1536 for _ in texts]
return m
class TestMemoryRootScope:
"""Tests for Memory class root_scope field."""
def test_memory_with_root_scope_prepends_to_explicit_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When root_scope is set and explicit scope is provided, they combine."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/research-crew",
)
record = mem.remember(
"Test content",
scope="/market-trends",
categories=["test"],
importance=0.7,
)
assert record is not None
assert record.scope == "/crew/research-crew/market-trends"
def test_memory_without_root_scope_uses_explicit_scope_directly(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When root_scope is None, explicit scope is used as-is (backward compat)."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
record = mem.remember(
"Test content",
scope="/explicit",
categories=["test"],
importance=0.7,
)
assert record is not None
assert record.scope == "/explicit"
def test_memory_root_scope_with_llm_inferred_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When root_scope is set and scope is inferred by LLM, they combine."""
from crewai.memory.analyze import ExtractedMetadata, MemoryAnalysis
from crewai.memory.unified_memory import Memory
llm = MagicMock()
llm.supports_function_calling.return_value = True
llm.call.return_value = MemoryAnalysis(
suggested_scope="/quarterly-results",
categories=["finance"],
importance=0.8,
extracted_metadata=ExtractedMetadata(),
)
mem = Memory(
storage=str(tmp_path / "db"),
llm=llm,
embedder=mock_embedder,
root_scope="/flow/mypipeline",
)
# Don't provide scope - let LLM infer it
record = mem.remember("Q1 revenue was $1M")
assert record is not None
assert record.scope == "/flow/mypipeline/quarterly-results"
def test_memory_root_scope_per_call_override(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Per-call root_scope overrides instance-level root_scope."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/base",
)
record = mem.remember(
"Test content",
scope="/inner",
categories=["test"],
importance=0.7,
root_scope="/override/path", # Override instance-level
)
assert record is not None
assert record.scope == "/override/path/inner"
def test_remember_many_with_root_scope(
self, tmp_path: Path,
) -> None:
"""remember_many respects root_scope for all items."""
from crewai.memory.unified_memory import Memory
# Use distinct embeddings to avoid intra-batch dedup
call_count = 0
def distinct_embedder(texts: list[str]) -> list[list[float]]:
nonlocal call_count
result = []
for i, _ in enumerate(texts):
emb = [0.0] * 1536
emb[(call_count + i) % 1536] = 1.0
result.append(emb)
call_count += len(texts)
return result
mock_embedder = MagicMock(side_effect=distinct_embedder)
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/batch-crew",
)
mem.remember_many(
["Fact A", "Fact B"],
scope="/decisions",
categories=["test"],
importance=0.7,
)
mem.drain_writes()
records = mem.list_records()
assert len(records) == 2
for record in records:
assert record.scope == "/crew/batch-crew/decisions"
def test_remember_many_per_call_root_scope_override(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""remember_many accepts per-call root_scope override."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/default",
)
mem.remember_many(
["Fact A"],
scope="/inner",
categories=["test"],
importance=0.7,
root_scope="/agent/researcher", # Per-call override
)
mem.drain_writes()
records = mem.list_records()
assert len(records) == 1
assert records[0].scope == "/agent/researcher/inner"
class TestRootScopePathNormalization:
"""Tests for proper path normalization with root_scope."""
def test_no_double_slashes_in_result(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Final scope should not have double slashes."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/test/", # Trailing slash
)
record = mem.remember(
"Test",
scope="/inner/", # Both have slashes
categories=["test"],
importance=0.5,
)
assert record is not None
assert "//" not in record.scope
assert record.scope == "/crew/test/inner"
def test_leading_slash_always_present(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Final scope should always have leading slash."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="crew/test", # No leading slash
)
record = mem.remember(
"Test",
scope="inner", # No leading slash
categories=["test"],
importance=0.5,
)
assert record is not None
assert record.scope.startswith("/")
def test_root_scope_with_root_inner_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""When inner scope is '/', result is just the root_scope."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/test",
)
record = mem.remember(
"Test",
scope="/", # Root scope
categories=["test"],
importance=0.5,
)
assert record is not None
assert record.scope == "/crew/test"
class TestCrewAutoScoping:
"""Tests for automatic root_scope assignment in Crew."""
def test_crew_memory_true_sets_root_scope(self) -> None:
"""Creating Crew with memory=True auto-sets root_scope."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
agent = Agent(
role="Researcher",
goal="Research",
backstory="Expert researcher",
llm="gpt-4o-mini",
)
task = Task(
description="Do research",
expected_output="Report",
agent=agent,
)
crew = Crew(
name="Research Crew",
agents=[agent],
tasks=[task],
memory=True,
)
assert crew._memory is not None
assert hasattr(crew._memory, "root_scope")
assert crew._memory.root_scope == "/crew/research-crew"
def test_crew_memory_instance_gets_root_scope_if_not_set(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""User-provided Memory instance gets root_scope if not already set."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.unified_memory import Memory
from crewai.task import Task
# Memory without root_scope
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
assert mem.root_scope is None
agent = Agent(
role="Tester",
goal="Test",
backstory="Tester",
llm="gpt-4o-mini",
)
task = Task(
description="Test",
expected_output="Results",
agent=agent,
)
crew = Crew(
name="Test Crew",
agents=[agent],
tasks=[task],
memory=mem,
)
assert crew._memory is mem
assert crew._memory.root_scope == "/crew/test-crew"
def test_crew_respects_existing_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""User-provided Memory with existing root_scope is not overwritten."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.unified_memory import Memory
from crewai.task import Task
# Memory with explicit root_scope
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/custom/path",
)
agent = Agent(
role="Tester",
goal="Test",
backstory="Tester",
llm="gpt-4o-mini",
)
task = Task(
description="Test",
expected_output="Results",
agent=agent,
)
crew = Crew(
name="Test Crew",
agents=[agent],
tasks=[task],
memory=mem,
)
assert crew._memory.root_scope == "/custom/path" # Not overwritten
def test_crew_sanitizes_name_for_root_scope(self) -> None:
"""Crew name with special chars is sanitized for root_scope."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
agent = Agent(
role="Agent",
goal="Goal",
backstory="Story",
llm="gpt-4o-mini",
)
task = Task(
description="Task",
expected_output="Output",
agent=agent,
)
crew = Crew(
name="My Awesome Crew #1!",
agents=[agent],
tasks=[task],
memory=True,
)
assert crew._memory.root_scope == "/crew/my-awesome-crew-1"
class TestAgentScopeExtension:
"""Tests for agent scope extension in BaseAgentExecutorMixin."""
def test_agent_save_extends_crew_root_scope(self) -> None:
"""Agent._save_to_memory extends crew's root_scope with agent info."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
mock_memory = MagicMock()
mock_memory.read_only = False
mock_memory.root_scope = "/crew/research-crew"
mock_memory.extract_memories.return_value = ["Fact A"]
mock_agent = MagicMock()
mock_agent.memory = mock_memory
mock_agent._logger = MagicMock()
mock_agent.role = "Researcher"
mock_task = MagicMock()
mock_task.description = "Research task"
mock_task.expected_output = "Report"
class MinimalExecutor(CrewAgentExecutorMixin):
crew = None
agent = mock_agent
task = mock_task
iterations = 0
max_iter = 1
messages = []
_i18n = MagicMock()
_printer = Printer()
executor = MinimalExecutor()
executor._save_to_memory(AgentFinish(thought="", output="Result", text="Result"))
mock_memory.remember_many.assert_called_once()
call_kwargs = mock_memory.remember_many.call_args.kwargs
assert call_kwargs["root_scope"] == "/crew/research-crew/agent/researcher"
def test_agent_save_sanitizes_role(self) -> None:
"""Agent role with special chars is sanitized for scope path."""
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
from crewai.agents.parser import AgentFinish
from crewai.utilities.printer import Printer
mock_memory = MagicMock()
mock_memory.read_only = False
mock_memory.root_scope = "/crew/test"
mock_memory.extract_memories.return_value = ["Fact"]
mock_agent = MagicMock()
mock_agent.memory = mock_memory
mock_agent._logger = MagicMock()
mock_agent.role = "Senior Research Analyst #1"
mock_task = MagicMock()
mock_task.description = "Task"
mock_task.expected_output = "Output"
class MinimalExecutor(CrewAgentExecutorMixin):
crew = None
agent = mock_agent
task = mock_task
iterations = 0
max_iter = 1
messages = []
_i18n = MagicMock()
_printer = Printer()
executor = MinimalExecutor()
executor._save_to_memory(AgentFinish(thought="", output="R", text="R"))
call_kwargs = mock_memory.remember_many.call_args.kwargs
assert call_kwargs["root_scope"] == "/crew/test/agent/senior-research-analyst-1"
class TestFlowAutoScoping:
"""Tests for automatic root_scope assignment in Flow."""
def test_flow_auto_memory_sets_root_scope(self) -> None:
"""Flow auto-creates memory with root_scope set to /flow/<class_name>."""
from crewai.flow.flow import Flow
from crewai.memory.unified_memory import Memory
class MyPipelineFlow(Flow):
pass
flow = MyPipelineFlow()
assert flow.memory is not None
assert isinstance(flow.memory, Memory)
assert flow.memory.root_scope == "/flow/mypipelineflow"
def test_flow_with_name_uses_name_for_root_scope(self) -> None:
"""Flow with custom name uses that name for root_scope."""
from crewai.flow.flow import Flow
from crewai.memory.unified_memory import Memory
class MyFlow(Flow):
name = "Custom Pipeline"
flow = MyFlow()
assert flow.memory is not None
assert isinstance(flow.memory, Memory)
assert flow.memory.root_scope == "/flow/custom-pipeline"
def test_flow_user_provided_memory_not_overwritten(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""User-provided memory on Flow is not modified."""
from crewai.flow.flow import Flow
from crewai.memory.unified_memory import Memory
user_memory = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/custom/scope",
)
class MyFlow(Flow):
memory = user_memory
flow = MyFlow()
assert flow.memory is user_memory
assert flow.memory.root_scope == "/custom/scope"
class TestBackwardCompatibility:
"""Tests ensuring backward compatibility with existing behavior."""
def test_memory_without_root_scope_works_normally(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Memory without root_scope behaves exactly as before."""
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
assert mem.root_scope is None
record = mem.remember(
"Test content",
scope="/explicit",
categories=["test"],
importance=0.7,
)
assert record.scope == "/explicit"
def test_crew_without_name_uses_default(self) -> None:
"""Crew without name uses 'crew' as default for root_scope."""
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
agent = Agent(
role="Agent",
goal="Goal",
backstory="Story",
llm="gpt-4o-mini",
)
task = Task(
description="Task",
expected_output="Output",
agent=agent,
)
# No name provided - uses default "crew"
crew = Crew(
agents=[agent],
tasks=[task],
memory=True,
)
assert crew._memory.root_scope == "/crew/crew"
def test_old_memories_at_root_still_accessible(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Old memories stored at '/' are still accessible."""
from crewai.memory.unified_memory import Memory
# Create memory and store at root (old behavior)
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
)
record = mem.remember(
"Old memory at root",
scope="/",
categories=["old"],
importance=0.5,
)
assert record.scope == "/"
# Recall from root should find it
matches = mem.recall("Old memory", scope="/", depth="shallow")
assert len(matches) >= 1
class TestEncodingFlowRootScope:
"""Tests for root_scope handling in EncodingFlow."""
def test_encoding_flow_fast_path_with_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Group A (fast path) items properly prepend root_scope."""
from crewai.memory.encoding_flow import ItemState
# Test _apply_defaults directly on an ItemState without going through Flow
# since Flow.state is a property without a setter
item = ItemState(
content="Test",
scope="/inner", # Explicit
categories=["cat"], # Explicit
importance=0.5, # Explicit
root_scope="/crew/test",
)
# Manually test the join_scope_paths logic that _apply_defaults uses
from crewai.memory.utils import join_scope_paths
inner_scope = item.scope or "/"
if item.root_scope:
resolved = join_scope_paths(item.root_scope, inner_scope)
else:
resolved = inner_scope
assert resolved == "/crew/test/inner"
def test_encoding_flow_llm_path_with_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""Group C (LLM path) items properly prepend root_scope to inferred scope."""
from crewai.memory.analyze import ExtractedMetadata, MemoryAnalysis
from crewai.memory.unified_memory import Memory
llm = MagicMock()
llm.supports_function_calling.return_value = True
llm.call.return_value = MemoryAnalysis(
suggested_scope="/llm-inferred",
categories=["auto"],
importance=0.7,
extracted_metadata=ExtractedMetadata(),
)
mem = Memory(
storage=str(tmp_path / "db"),
llm=llm,
embedder=mock_embedder,
root_scope="/flow/pipeline",
)
# No explicit scope/categories/importance -> goes through LLM
record = mem.remember("Content for LLM analysis")
assert record is not None
assert record.scope == "/flow/pipeline/llm-inferred"
class TestMemoryScopeWithRootScope:
"""Tests for MemoryScope interaction with root_scope."""
def test_memory_scope_remembers_within_root_scope(
self, tmp_path: Path, mock_embedder: MagicMock
) -> None:
"""MemoryScope with underlying Memory that has root_scope works correctly."""
from crewai.memory.memory_scope import MemoryScope
from crewai.memory.unified_memory import Memory
mem = Memory(
storage=str(tmp_path / "db"),
llm=MagicMock(),
embedder=mock_embedder,
root_scope="/crew/test",
)
# Create a MemoryScope
scope = MemoryScope(memory=mem, root_path="/agent/1")
# Remember through the scope
record = scope.remember(
"Scoped content",
scope="/task", # Inner scope within MemoryScope
categories=["test"],
importance=0.5,
)
# The MemoryScope prepends its root_path, then Memory prepends root_scope
# MemoryScope.remember prepends /agent/1 to /task -> /agent/1/task
# Then Memory's root_scope /crew/test gets prepended by encoding flow
# Final: /crew/test/agent/1/task
assert record is not None
# Note: MemoryScope builds the scope before calling memory.remember
# So the scope it passes is /agent/1/task, which then gets root_scope prepended
assert record.scope.startswith("/crew/test/agent/1")

View File

@@ -988,9 +988,11 @@ class TestLLMObjectPreservedInContext:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
# Create a real LLM object (not a string)
from crewai.llm import LLM
mock_llm_obj = LLM(model="gemini-2.0-flash", provider="gemini")
# Create a mock BaseLLM object (not a string)
# Simulates LLM(model="gemini-2.0-flash", provider="gemini")
mock_llm_obj = MagicMock()
mock_llm_obj.model = "gemini-2.0-flash"
mock_llm_obj.provider = "gemini"
class PausingProvider:
def __init__(self, persistence: SQLiteFlowPersistence):
@@ -1039,37 +1041,32 @@ class TestLLMObjectPreservedInContext:
result = flow1.kickoff()
assert isinstance(result, HumanFeedbackPending)
# Verify the context stored the model config dict, not None
# Verify the context stored the model STRING, not None
assert provider.captured_context is not None
assert isinstance(provider.captured_context.llm, dict)
assert provider.captured_context.llm["model"] == "gemini/gemini-2.0-flash"
assert provider.captured_context.llm == "gemini/gemini-2.0-flash"
# Verify it survives persistence roundtrip
flow_id = result.context.flow_id
loaded = persistence.load_pending_feedback(flow_id)
assert loaded is not None
_, loaded_context = loaded
assert isinstance(loaded_context.llm, dict)
assert loaded_context.llm["model"] == "gemini/gemini-2.0-flash"
assert loaded_context.llm == "gemini/gemini-2.0-flash"
# Phase 2: Resume with positive feedback - should use LLM to classify
flow2 = TestFlow.from_pending(flow_id, persistence)
assert flow2._pending_feedback_context is not None
assert isinstance(flow2._pending_feedback_context.llm, dict)
assert flow2._pending_feedback_context.llm["model"] == "gemini/gemini-2.0-flash"
assert flow2._pending_feedback_context.llm == "gemini/gemini-2.0-flash"
# Mock _collapse_to_outcome to verify it gets called (not skipped)
with patch.object(flow2, "_collapse_to_outcome", return_value="approved") as mock_collapse:
flow2.resume("this looks good, proceed!")
# The key assertion: _collapse_to_outcome was called (not skipped due to llm=None)
mock_collapse.assert_called_once()
call_kwargs = mock_collapse.call_args
assert call_kwargs.kwargs["feedback"] == "this looks good, proceed!"
assert call_kwargs.kwargs["outcomes"] == ["needs_changes", "approved"]
# LLM should be a live object (from _hf_llm) or reconstructed, not None
assert call_kwargs.kwargs["llm"] is not None
assert getattr(call_kwargs.kwargs["llm"], "model", None) == "gemini-2.0-flash"
mock_collapse.assert_called_once_with(
feedback="this looks good, proceed!",
outcomes=["needs_changes", "approved"],
llm="gemini/gemini-2.0-flash",
)
assert flow2.last_human_feedback.outcome == "approved"
assert flow2.result_path == "approved"
@@ -1099,25 +1096,23 @@ class TestLLMObjectPreservedInContext:
def test_provider_prefix_added_to_bare_model(self) -> None:
"""Test that provider prefix is added when model has no slash."""
from crewai.flow.human_feedback import _serialize_llm_for_context
from crewai.llm import LLM
llm = LLM(model="gemini-2.0-flash", provider="gemini")
result = _serialize_llm_for_context(llm)
assert isinstance(result, dict)
assert result["model"] == "gemini/gemini-2.0-flash"
mock_obj = MagicMock()
mock_obj.model = "gemini-3-flash-preview"
mock_obj.provider = "gemini"
assert _serialize_llm_for_context(mock_obj) == "gemini/gemini-3-flash-preview"
def test_provider_prefix_not_doubled_when_already_present(self) -> None:
"""Test that provider prefix is not added when model already has a slash."""
from crewai.flow.human_feedback import _serialize_llm_for_context
from crewai.llm import LLM
llm = LLM(model="gemini/gemini-2.0-flash")
result = _serialize_llm_for_context(llm)
assert isinstance(result, dict)
assert result["model"] == "gemini/gemini-2.0-flash"
mock_obj = MagicMock()
mock_obj.model = "gemini/gemini-2.0-flash"
mock_obj.provider = "gemini"
assert _serialize_llm_for_context(mock_obj) == "gemini/gemini-2.0-flash"
def test_no_provider_attr_falls_back_to_bare_model(self) -> None:
"""Test that objects without to_config_dict fall back to model string."""
"""Test that bare model is used when no provider attribute exists."""
from crewai.flow.human_feedback import _serialize_llm_for_context
mock_obj = MagicMock(spec=[])
@@ -1221,279 +1216,3 @@ class TestAsyncHumanFeedbackEdgeCases:
assert flow.last_human_feedback.outcome == "approved"
assert flow.last_human_feedback.feedback == ""
# =============================================================================
# Tests for _hf_llm attribute and live LLM resolution on resume
# =============================================================================
class TestLiveLLMPreservationOnResume:
"""Tests for preserving the full LLM config across HITL resume."""
def test_hf_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
"""Test that _hf_llm is set on the wrapper when llm is a BaseLLM instance."""
from crewai.llms.base_llm import BaseLLM
# Create a mock BaseLLM object
mock_llm = MagicMock(spec=BaseLLM)
mock_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=mock_llm,
)
def review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("review")
assert method is not None
assert hasattr(method, "_hf_llm")
assert method._hf_llm is mock_llm
def test_hf_llm_attribute_set_on_wrapper_with_string(self) -> None:
"""Test that _hf_llm is set on the wrapper even when llm is a string."""
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("review")
assert method is not None
assert hasattr(method, "_hf_llm")
assert method._hf_llm == "gpt-4o-mini"
@patch("crewai.flow.flow.crewai_event_bus.emit")
def test_resume_async_uses_live_basellm_over_serialized_string(
self, mock_emit: MagicMock
) -> None:
"""Test that resume_async uses the live BaseLLM from decorator instead of serialized string.
This is the main bug fix: when a flow resumes, it should use the fully-configured
LLM from the re-imported decorator (with credentials, project, etc.) instead of
creating a new LLM from just the model string.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
from crewai.llms.base_llm import BaseLLM
# Create a mock BaseLLM with full config (simulating Gemini with service account)
live_llm = MagicMock(spec=BaseLLM)
live_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
result_path: str = ""
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm=live_llm, # Full LLM object with credentials
)
def review(self):
return "content"
@listen("approved")
def handle_approved(self):
self.result_path = "approved"
return "Approved!"
# Save pending feedback with just a model STRING (simulating serialization)
context = PendingFeedbackContext(
flow_id="live-llm-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gemini/gemini-3-flash", # Serialized string, NOT the live object
)
persistence.save_pending_feedback(
flow_uuid="live-llm-test",
context=context,
state_data={"id": "live-llm-test"},
)
# Restore flow - this re-imports the class with the live LLM
flow = TestFlow.from_pending("live-llm-test", persistence)
# Mock _collapse_to_outcome to capture what LLM it receives
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# The key assertion: _collapse_to_outcome received the LIVE BaseLLM object,
# NOT the serialized string. The live_llm was captured at class definition
# time and stored on the method wrapper as _hf_llm.
assert len(captured_llm) == 1
# Verify it's the same object that was passed to the decorator
# (which is stored on the method's _hf_llm attribute)
method = flow._methods.get("review")
assert method is not None
assert captured_llm[0] is method._hf_llm
# And verify it's a BaseLLM instance, not a string
assert isinstance(captured_llm[0], BaseLLM)
@patch("crewai.flow.flow.crewai_event_bus.emit")
def test_resume_async_falls_back_to_serialized_string_when_no_hf_llm(
self, mock_emit: MagicMock
) -> None:
"""Test that resume_async falls back to context.llm when _hf_llm is not available.
This ensures backward compatibility with flows that were paused before this fix.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class TestFlow(Flow):
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "content"
# Save pending feedback
context = PendingFeedbackContext(
flow_id="fallback-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
persistence.save_pending_feedback(
flow_uuid="fallback-test",
context=context,
state_data={"id": "fallback-test"},
)
flow = TestFlow.from_pending("fallback-test", persistence)
# Remove _hf_llm to simulate old decorator without this attribute
method = flow._methods.get("review")
if hasattr(method, "_hf_llm"):
delattr(method, "_hf_llm")
# Mock _collapse_to_outcome to capture what LLM it receives
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# Should fall back to deserialized LLM from context string
assert len(captured_llm) == 1
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
assert isinstance(captured_llm[0], BaseLLMClass)
assert captured_llm[0].model == "gpt-4o-mini"
@patch("crewai.flow.flow.crewai_event_bus.emit")
def test_resume_async_uses_string_from_context_when_hf_llm_is_string(
self, mock_emit: MagicMock
) -> None:
"""Test that when _hf_llm is a string (not BaseLLM), we still use context.llm.
String LLM values offer no benefit over the serialized context.llm,
so we don't prefer them.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class TestFlow(Flow):
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini", # String LLM
)
def review(self):
return "content"
# Save pending feedback
context = PendingFeedbackContext(
flow_id="string-llm-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
persistence.save_pending_feedback(
flow_uuid="string-llm-test",
context=context,
state_data={"id": "string-llm-test"},
)
flow = TestFlow.from_pending("string-llm-test", persistence)
# Verify _hf_llm is a string
method = flow._methods.get("review")
assert method._hf_llm == "gpt-4o-mini"
# Mock _collapse_to_outcome to capture what LLM it receives
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# _hf_llm is a string, so resume deserializes context.llm into an LLM instance
assert len(captured_llm) == 1
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
assert isinstance(captured_llm[0], BaseLLMClass)
assert captured_llm[0].model == "gpt-4o-mini"
def test_hf_llm_set_for_async_wrapper(self) -> None:
"""Test that _hf_llm is set on async wrapper functions."""
import asyncio
from crewai.llms.base_llm import BaseLLM
mock_llm = MagicMock(spec=BaseLLM)
mock_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=mock_llm,
)
async def async_review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("async_review")
assert method is not None
assert hasattr(method, "_hf_llm")
assert method._hf_llm is mock_llm

View File

@@ -1,795 +0,0 @@
"""Tests for flow_serializer.py - Flow structure serialization for Studio UI."""
from typing import Literal
import pytest
from pydantic import BaseModel, Field
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_serializer import flow_structure
from crewai.flow.human_feedback import human_feedback
class TestSimpleLinearFlow:
"""Test simple linear flow (start → listen → listen)."""
def test_linear_flow_structure(self):
"""Test a simple sequential flow structure."""
class LinearFlow(Flow):
"""A simple linear flow for testing."""
@start()
def begin(self):
return "started"
@listen(begin)
def process(self):
return "processed"
@listen(process)
def finalize(self):
return "done"
structure = flow_structure(LinearFlow)
assert structure["name"] == "LinearFlow"
assert structure["description"] == "A simple linear flow for testing."
assert len(structure["methods"]) == 3
# Check method types
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["begin"]["type"] == "start"
assert method_map["process"]["type"] == "listen"
assert method_map["finalize"]["type"] == "listen"
# Check edges
assert len(structure["edges"]) == 2
edge_pairs = [(e["from_method"], e["to_method"]) for e in structure["edges"]]
assert ("begin", "process") in edge_pairs
assert ("process", "finalize") in edge_pairs
# All edges should be listen type
for edge in structure["edges"]:
assert edge["edge_type"] == "listen"
assert edge["condition"] is None
class TestRouterFlow:
"""Test flow with router branching."""
def test_router_flow_structure(self):
"""Test a flow with router that branches to different paths."""
class BranchingFlow(Flow):
@start()
def init(self):
return "initialized"
@router(init)
def decide(self) -> Literal["path_a", "path_b"]:
return "path_a"
@listen("path_a")
def handle_a(self):
return "handled_a"
@listen("path_b")
def handle_b(self):
return "handled_b"
structure = flow_structure(BranchingFlow)
assert structure["name"] == "BranchingFlow"
assert len(structure["methods"]) == 4
method_map = {m["name"]: m for m in structure["methods"]}
# Check method types
assert method_map["init"]["type"] == "start"
assert method_map["decide"]["type"] == "router"
assert method_map["handle_a"]["type"] == "listen"
assert method_map["handle_b"]["type"] == "listen"
# Check router paths
assert "path_a" in method_map["decide"]["router_paths"]
assert "path_b" in method_map["decide"]["router_paths"]
# Check edges
# Should have: init -> decide (listen), decide -> handle_a (route), decide -> handle_b (route)
listen_edges = [e for e in structure["edges"] if e["edge_type"] == "listen"]
route_edges = [e for e in structure["edges"] if e["edge_type"] == "route"]
assert len(listen_edges) == 1
assert listen_edges[0]["from_method"] == "init"
assert listen_edges[0]["to_method"] == "decide"
assert len(route_edges) == 2
route_targets = {e["to_method"] for e in route_edges}
assert "handle_a" in route_targets
assert "handle_b" in route_targets
# Check route conditions
route_conditions = {e["to_method"]: e["condition"] for e in route_edges}
assert route_conditions["handle_a"] == "path_a"
assert route_conditions["handle_b"] == "path_b"
class TestAndOrConditions:
"""Test flow with AND/OR conditions."""
def test_and_condition_flow(self):
"""Test a flow where a method waits for multiple methods (AND)."""
class AndConditionFlow(Flow):
@start()
def step_a(self):
return "a"
@start()
def step_b(self):
return "b"
@listen(and_(step_a, step_b))
def converge(self):
return "converged"
structure = flow_structure(AndConditionFlow)
assert len(structure["methods"]) == 3
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["step_a"]["type"] == "start"
assert method_map["step_b"]["type"] == "start"
assert method_map["converge"]["type"] == "listen"
# Check condition type
assert method_map["converge"]["condition_type"] == "AND"
# Check trigger methods
triggers = method_map["converge"]["trigger_methods"]
assert "step_a" in triggers
assert "step_b" in triggers
# Check edges - should have 2 edges to converge
converge_edges = [e for e in structure["edges"] if e["to_method"] == "converge"]
assert len(converge_edges) == 2
def test_or_condition_flow(self):
"""Test a flow where a method is triggered by any of multiple methods (OR)."""
class OrConditionFlow(Flow):
@start()
def path_1(self):
return "1"
@start()
def path_2(self):
return "2"
@listen(or_(path_1, path_2))
def handle_any(self):
return "handled"
structure = flow_structure(OrConditionFlow)
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["handle_any"]["condition_type"] == "OR"
triggers = method_map["handle_any"]["trigger_methods"]
assert "path_1" in triggers
assert "path_2" in triggers
class TestHumanFeedbackMethods:
"""Test flow with @human_feedback decorated methods."""
def test_human_feedback_detection(self):
"""Test that human feedback methods are correctly identified."""
class HumanFeedbackFlow(Flow):
@start()
@human_feedback(
message="Please review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review_step(self):
return "content to review"
@listen("approved")
def handle_approved(self):
return "approved"
@listen("rejected")
def handle_rejected(self):
return "rejected"
structure = flow_structure(HumanFeedbackFlow)
method_map = {m["name"]: m for m in structure["methods"]}
# review_step should have human feedback
assert method_map["review_step"]["has_human_feedback"] is True
# It's a start+router (due to emit)
assert method_map["review_step"]["type"] == "start_router"
assert "approved" in method_map["review_step"]["router_paths"]
assert "rejected" in method_map["review_step"]["router_paths"]
# Other methods should not have human feedback
assert method_map["handle_approved"]["has_human_feedback"] is False
assert method_map["handle_rejected"]["has_human_feedback"] is False
class TestCrewReferences:
"""Test detection of Crew references in method bodies."""
def test_crew_detection_with_crew_call(self):
"""Test that .crew() calls are detected."""
class FlowWithCrew(Flow):
@start()
def run_crew(self):
# Simulating crew usage pattern
# result = MyCrew().crew().kickoff()
return "result"
@listen(run_crew)
def no_crew(self):
return "done"
structure = flow_structure(FlowWithCrew)
method_map = {m["name"]: m for m in structure["methods"]}
# Note: Since the actual .crew() call is in a comment/string,
# the detection might not trigger. In real code it would.
# We're testing the mechanism exists.
assert "has_crew" in method_map["run_crew"]
assert "has_crew" in method_map["no_crew"]
def test_no_crew_when_absent(self):
"""Test that methods without Crew refs return has_crew=False."""
class SimpleNonCrewFlow(Flow):
@start()
def calculate(self):
return 1 + 1
@listen(calculate)
def display(self):
return "result"
structure = flow_structure(SimpleNonCrewFlow)
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["calculate"]["has_crew"] is False
assert method_map["display"]["has_crew"] is False
class TestTypedStateSchema:
"""Test flow with typed Pydantic state."""
def test_pydantic_state_schema_extraction(self):
"""Test extracting state schema from a Flow with Pydantic state."""
class MyState(BaseModel):
counter: int = 0
message: str = ""
items: list[str] = Field(default_factory=list)
class TypedStateFlow(Flow[MyState]):
initial_state = MyState
@start()
def increment(self):
self.state.counter += 1
return self.state.counter
@listen(increment)
def display(self):
return f"Count: {self.state.counter}"
structure = flow_structure(TypedStateFlow)
assert structure["state_schema"] is not None
fields = structure["state_schema"]["fields"]
field_names = {f["name"] for f in fields}
assert "counter" in field_names
assert "message" in field_names
assert "items" in field_names
# Check types
field_map = {f["name"]: f for f in fields}
assert "int" in field_map["counter"]["type"]
assert "str" in field_map["message"]["type"]
# Check defaults
assert field_map["counter"]["default"] == 0
assert field_map["message"]["default"] == ""
def test_dict_state_returns_none(self):
"""Test that flows using dict state return None for state_schema."""
class DictStateFlow(Flow):
@start()
def begin(self):
self.state["count"] = 1
return "started"
structure = flow_structure(DictStateFlow)
assert structure["state_schema"] is None
class TestEdgeCases:
"""Test edge cases and special scenarios."""
def test_start_router_combo(self):
"""Test a method that is both @start and a router (via human_feedback emit)."""
class StartRouterFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["continue", "stop"],
llm="gpt-4o-mini",
)
def entry_point(self):
return "data"
@listen("continue")
def proceed(self):
return "proceeding"
@listen("stop")
def halt(self):
return "halted"
structure = flow_structure(StartRouterFlow)
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["entry_point"]["type"] == "start_router"
assert method_map["entry_point"]["has_human_feedback"] is True
assert "continue" in method_map["entry_point"]["router_paths"]
assert "stop" in method_map["entry_point"]["router_paths"]
def test_multiple_start_methods(self):
"""Test a flow with multiple start methods."""
class MultiStartFlow(Flow):
@start()
def start_a(self):
return "a"
@start()
def start_b(self):
return "b"
@listen(and_(start_a, start_b))
def combine(self):
return "combined"
structure = flow_structure(MultiStartFlow)
start_methods = [m for m in structure["methods"] if m["type"] == "start"]
assert len(start_methods) == 2
start_names = {m["name"] for m in start_methods}
assert "start_a" in start_names
assert "start_b" in start_names
def test_orphan_methods(self):
"""Test that orphan methods (not connected to flow) are still captured."""
class FlowWithOrphan(Flow):
@start()
def begin(self):
return "started"
@listen(begin)
def connected(self):
return "connected"
@listen("never_triggered")
def orphan(self):
return "orphan"
structure = flow_structure(FlowWithOrphan)
method_names = {m["name"] for m in structure["methods"]}
assert "orphan" in method_names
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["orphan"]["trigger_methods"] == ["never_triggered"]
def test_empty_flow(self):
"""Test building structure for a flow with no methods."""
class EmptyFlow(Flow):
pass
structure = flow_structure(EmptyFlow)
assert structure["name"] == "EmptyFlow"
assert structure["methods"] == []
assert structure["edges"] == []
assert structure["state_schema"] is None
def test_flow_with_docstring(self):
"""Test that flow docstring is captured."""
class DocumentedFlow(Flow):
"""This is a well-documented flow.
It has multiple lines of documentation.
"""
@start()
def begin(self):
return "started"
structure = flow_structure(DocumentedFlow)
assert structure["description"] is not None
assert "well-documented flow" in structure["description"]
def test_flow_without_docstring(self):
"""Test that missing docstring returns None."""
class UndocumentedFlow(Flow):
@start()
def begin(self):
return "started"
structure = flow_structure(UndocumentedFlow)
assert structure["description"] is None
def test_nested_conditions(self):
"""Test flow with nested AND/OR conditions."""
class NestedConditionFlow(Flow):
@start()
def a(self):
return "a"
@start()
def b(self):
return "b"
@start()
def c(self):
return "c"
@listen(or_(and_(a, b), c))
def complex_trigger(self):
return "triggered"
structure = flow_structure(NestedConditionFlow)
method_map = {m["name"]: m for m in structure["methods"]}
# Should have triggers for a, b, and c
triggers = method_map["complex_trigger"]["trigger_methods"]
assert len(triggers) == 3
assert "a" in triggers
assert "b" in triggers
assert "c" in triggers
class TestErrorHandling:
"""Test error handling and validation."""
def test_instance_raises_type_error(self):
"""Test that passing an instance raises TypeError."""
class TestFlow(Flow):
@start()
def begin(self):
return "started"
flow_instance = TestFlow()
with pytest.raises(TypeError) as exc_info:
flow_structure(flow_instance)
assert "requires a Flow class, not an instance" in str(exc_info.value)
def test_non_class_raises_type_error(self):
"""Test that passing non-class raises TypeError."""
with pytest.raises(TypeError):
flow_structure("not a class")
with pytest.raises(TypeError):
flow_structure(123)
class TestEdgeGeneration:
"""Test edge generation in various scenarios."""
def test_all_edges_generated_correctly(self):
"""Verify all edges are correctly generated for a complex flow."""
class ComplexFlow(Flow):
@start()
def entry(self):
return "started"
@listen(entry)
def step_1(self):
return "step_1"
@router(step_1)
def branch(self) -> Literal["left", "right"]:
return "left"
@listen("left")
def left_path(self):
return "left_done"
@listen("right")
def right_path(self):
return "right_done"
@listen(or_(left_path, right_path))
def converge(self):
return "done"
structure = flow_structure(ComplexFlow)
# Build edge map for easier checking
edges = structure["edges"]
# Check listen edges
listen_edges = [(e["from_method"], e["to_method"]) for e in edges if e["edge_type"] == "listen"]
assert ("entry", "step_1") in listen_edges
assert ("step_1", "branch") in listen_edges
assert ("left_path", "converge") in listen_edges
assert ("right_path", "converge") in listen_edges
# Check route edges
route_edges = [(e["from_method"], e["to_method"], e["condition"]) for e in edges if e["edge_type"] == "route"]
assert ("branch", "left_path", "left") in route_edges
assert ("branch", "right_path", "right") in route_edges
def test_router_edge_conditions(self):
"""Test that router edge conditions are properly set."""
class RouterConditionFlow(Flow):
@start()
def begin(self):
return "start"
@router(begin)
def route(self) -> Literal["option_1", "option_2", "option_3"]:
return "option_1"
@listen("option_1")
def handle_1(self):
return "1"
@listen("option_2")
def handle_2(self):
return "2"
@listen("option_3")
def handle_3(self):
return "3"
structure = flow_structure(RouterConditionFlow)
route_edges = [e for e in structure["edges"] if e["edge_type"] == "route"]
# Should have 3 route edges
assert len(route_edges) == 3
conditions = {e["to_method"]: e["condition"] for e in route_edges}
assert conditions["handle_1"] == "option_1"
assert conditions["handle_2"] == "option_2"
assert conditions["handle_3"] == "option_3"
class TestMethodTypeClassification:
"""Test method type classification."""
def test_all_method_types(self):
"""Test classification of all method types."""
class AllTypesFlow(Flow):
@start()
def start_only(self):
return "start"
@listen(start_only)
def listen_only(self):
return "listen"
@router(listen_only)
def router_only(self) -> Literal["path"]:
return "path"
@listen("path")
def after_router(self):
return "after"
@start()
@human_feedback(
message="Review",
emit=["yes", "no"],
llm="gpt-4o-mini",
)
def start_and_router(self):
return "data"
structure = flow_structure(AllTypesFlow)
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["start_only"]["type"] == "start"
assert method_map["listen_only"]["type"] == "listen"
assert method_map["router_only"]["type"] == "router"
assert method_map["after_router"]["type"] == "listen"
assert method_map["start_and_router"]["type"] == "start_router"
class TestInputDetection:
"""Test flow input detection."""
def test_inputs_list_exists(self):
"""Test that inputs list is always present."""
class SimpleFlow(Flow):
@start()
def begin(self):
return "started"
structure = flow_structure(SimpleFlow)
assert "inputs" in structure
assert isinstance(structure["inputs"], list)
class TestJsonSerializable:
"""Test that output is JSON serializable."""
def test_structure_is_json_serializable(self):
"""Test that the entire structure can be JSON serialized."""
import json
class MyState(BaseModel):
value: int = 0
class SerializableFlow(Flow[MyState]):
"""Test flow for JSON serialization."""
initial_state = MyState
@start()
@human_feedback(
message="Review",
emit=["ok", "not_ok"],
llm="gpt-4o-mini",
)
def begin(self):
return "data"
@listen("ok")
def proceed(self):
return "done"
structure = flow_structure(SerializableFlow)
# Should not raise
json_str = json.dumps(structure)
assert json_str is not None
# Should round-trip
parsed = json.loads(json_str)
assert parsed["name"] == "SerializableFlow"
assert len(parsed["methods"]) > 0
class TestFlowInheritance:
"""Test flow inheritance scenarios."""
def test_child_flow_inherits_parent_methods(self):
"""Test that FlowB inheriting from FlowA includes methods from both.
Note: FlowMeta propagates methods but does NOT fully propagate the
_listeners registry from parent classes. This means edges defined
in the parent class (e.g., parent_start -> parent_process) may not
appear in the child's structure. This is a known FlowMeta limitation.
"""
class FlowA(Flow):
"""Parent flow with start method."""
@start()
def parent_start(self):
return "parent started"
@listen(parent_start)
def parent_process(self):
return "parent processed"
class FlowB(FlowA):
"""Child flow with additional methods."""
@listen(FlowA.parent_process)
def child_continue(self):
return "child continued"
@listen(child_continue)
def child_finalize(self):
return "child finalized"
structure = flow_structure(FlowB)
assert structure["name"] == "FlowB"
# Check all methods are present (from both parent and child)
method_names = {m["name"] for m in structure["methods"]}
assert "parent_start" in method_names
assert "parent_process" in method_names
assert "child_continue" in method_names
assert "child_finalize" in method_names
# Check method types
method_map = {m["name"]: m for m in structure["methods"]}
assert method_map["parent_start"]["type"] == "start"
assert method_map["parent_process"]["type"] == "listen"
assert method_map["child_continue"]["type"] == "listen"
assert method_map["child_finalize"]["type"] == "listen"
# Check edges defined in child class exist
edge_pairs = [(e["from_method"], e["to_method"]) for e in structure["edges"]]
assert ("parent_process", "child_continue") in edge_pairs
assert ("child_continue", "child_finalize") in edge_pairs
# KNOWN LIMITATION: Edges defined in parent class (parent_start -> parent_process)
# are NOT propagated to child's _listeners registry by FlowMeta.
# The edge (parent_start, parent_process) will NOT be in edge_pairs.
# This is a FlowMeta limitation, not a serializer bug.
def test_child_flow_can_override_parent_method(self):
"""Test that child can override parent methods."""
class BaseFlow(Flow):
@start()
def begin(self):
return "base begin"
@listen(begin)
def process(self):
return "base process"
class ExtendedFlow(BaseFlow):
@listen(BaseFlow.begin)
def process(self):
# Override parent's process method
return "extended process"
@listen(process)
def finalize(self):
return "extended finalize"
structure = flow_structure(ExtendedFlow)
method_names = {m["name"] for m in structure["methods"]}
assert "begin" in method_names
assert "process" in method_names
assert "finalize" in method_names
# Should have 3 methods total (not 4, since process is overridden)
assert len(structure["methods"]) == 3

View File

@@ -772,204 +772,3 @@ class TestEdgeCases:
assert result.output == "content"
assert result.feedback == "feedback"
assert result.outcome is None # No routing, no outcome
class TestLLMConfigPreservation:
"""Tests that LLM config is preserved through @human_feedback serialization.
PR #4970 introduced _hf_llm stashing so the live LLM object survives
decorator wrapping for same-process resume. The serialization path
(_serialize_llm_for_context / _deserialize_llm_from_context) preserves
config for cross-process resume.
"""
def test_hf_llm_stashed_on_wrapper_with_llm_instance(self):
"""Test that passing an LLM instance stashes it on the wrapper as _hf_llm."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
class ConfigFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
method = ConfigFlow.review
assert hasattr(method, "_hf_llm"), "_hf_llm not found on wrapper"
assert method._hf_llm is llm_instance, "_hf_llm is not the same object"
def test_hf_llm_preserved_on_listen_method(self):
"""Test that _hf_llm is preserved when @human_feedback is on a @listen method."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.7)
class ListenConfigFlow(Flow):
@start()
def generate(self):
return "draft"
@listen("generate")
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
method = ListenConfigFlow.review
assert hasattr(method, "_hf_llm")
assert method._hf_llm is llm_instance
def test_hf_llm_accessible_on_instance(self):
"""Test that _hf_llm survives Flow instantiation (bound method access)."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
class InstanceFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
flow = InstanceFlow()
instance_method = flow.review
assert hasattr(instance_method, "_hf_llm")
assert instance_method._hf_llm is llm_instance
def test_serialize_llm_preserves_config_fields(self):
"""Test that _serialize_llm_for_context captures temperature, base_url, etc."""
from crewai.flow.human_feedback import _serialize_llm_for_context
from crewai.llm import LLM
llm = LLM(
model="gpt-4o-mini",
temperature=0.42,
base_url="https://custom.example.com/v1",
)
serialized = _serialize_llm_for_context(llm)
assert isinstance(serialized, dict), f"Expected dict, got {type(serialized)}"
assert serialized["model"] == "openai/gpt-4o-mini"
assert serialized["temperature"] == 0.42
assert serialized["base_url"] == "https://custom.example.com/v1"
def test_serialize_llm_excludes_api_key(self):
"""Test that api_key is NOT included in serialized output (security)."""
from crewai.flow.human_feedback import _serialize_llm_for_context
from crewai.llm import LLM
llm = LLM(model="gpt-4o-mini")
serialized = _serialize_llm_for_context(llm)
assert isinstance(serialized, dict)
assert "api_key" not in serialized
def test_deserialize_round_trip_preserves_config(self):
"""Test that serialize → deserialize round-trip preserves all config."""
from crewai.flow.human_feedback import (
_deserialize_llm_from_context,
_serialize_llm_for_context,
)
from crewai.llm import LLM
original = LLM(
model="gpt-4o-mini",
temperature=0.42,
base_url="https://custom.example.com/v1",
)
serialized = _serialize_llm_for_context(original)
reconstructed = _deserialize_llm_from_context(serialized)
assert reconstructed is not None
assert reconstructed.model == original.model
assert reconstructed.temperature == original.temperature
assert reconstructed.base_url == original.base_url
def test_deserialize_handles_legacy_string_format(self):
"""Test backward compat: plain string still reconstructs an LLM."""
from crewai.flow.human_feedback import _deserialize_llm_from_context
reconstructed = _deserialize_llm_from_context("openai/gpt-4o-mini")
assert reconstructed is not None
assert reconstructed.model == "gpt-4o-mini"
def test_deserialize_returns_none_for_none(self):
"""Test that None input returns None."""
from crewai.flow.human_feedback import _deserialize_llm_from_context
assert _deserialize_llm_from_context(None) is None
def test_serialize_llm_preserves_provider_specific_fields(self):
"""Test that provider-specific fields like project/location are serialized."""
from crewai.flow.human_feedback import _serialize_llm_for_context
from crewai.llm import LLM
# Create a Gemini-style LLM with project and non-default location
llm = LLM(
model="gemini-2.0-flash",
provider="gemini",
project="my-project",
location="europe-west1",
temperature=0.3,
)
serialized = _serialize_llm_for_context(llm)
assert isinstance(serialized, dict)
assert serialized.get("project") == "my-project"
assert serialized.get("location") == "europe-west1"
assert serialized.get("temperature") == 0.3
def test_config_preserved_through_full_flow_execution(self):
"""Test that the LLM with custom config is used during outcome collapsing."""
from crewai.llm import LLM
llm_instance = LLM(model="gpt-4o-mini", temperature=0.42)
collapse_calls = []
class FullFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=llm_instance,
)
def review(self):
return "content"
@listen("approved")
def on_approved(self):
return "done"
flow = FullFlow()
original_collapse = flow._collapse_to_outcome
def spy_collapse(feedback, outcomes, llm):
collapse_calls.append(llm)
return "approved"
with (
patch.object(flow, "_request_human_feedback", return_value="looks good"),
patch.object(flow, "_collapse_to_outcome", side_effect=spy_collapse),
):
flow.kickoff()
assert len(collapse_calls) == 1
# The LLM passed to _collapse_to_outcome should be the original instance
assert collapse_calls[0] is llm_instance

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.11.1"
__version__ = "1.11.0"

View File

@@ -147,12 +147,12 @@ python_functions = "test_*"
# composio-core pins rich<14 but textual requires rich>=14.
# onnxruntime 1.24+ dropped Python 3.10 wheels; cap it so qdrant[fastembed] resolves on 3.10.
# fastembed 0.7.x and docling 2.63 cap pillow<12; the removed APIs don't affect them.
# langchain-core <1.2.11 has SSRF via image_url token counting (CVE-2026-26013).
# langchain-core 0.3.76 has a template-injection vuln (GHSA); force >=0.3.80.
override-dependencies = [
"rich>=13.7.1",
"onnxruntime<1.24; python_version < '3.11'",
"pillow>=12.1.1",
"langchain-core>=1.2.11,<2",
"langchain-core>=0.3.80,<1",
"urllib3>=2.6.3",
]

24
uv.lock generated
View File

@@ -20,7 +20,7 @@ members = [
"crewai-tools",
]
overrides = [
{ name = "langchain-core", specifier = ">=1.2.11,<2" },
{ name = "langchain-core", specifier = ">=0.3.80,<1" },
{ name = "onnxruntime", marker = "python_full_version < '3.11'", specifier = "<1.24" },
{ name = "pillow", specifier = ">=12.1.1" },
{ name = "rich", specifier = ">=13.7.1" },
@@ -1275,9 +1275,9 @@ requires-dist = [
{ name = "aiofiles", specifier = "~=24.1.0" },
{ name = "av", specifier = "~=13.0.0" },
{ name = "pillow", specifier = "~=12.1.1" },
{ name = "pypdf", specifier = "~=6.9.1" },
{ name = "pypdf", specifier = "~=6.7.5" },
{ name = "python-magic", specifier = ">=0.4.27" },
{ name = "tinytag", specifier = "~=2.2.1" },
{ name = "tinytag", specifier = "~=1.10.0" },
]
[[package]]
@@ -3295,7 +3295,7 @@ wheels = [
[[package]]
name = "langchain-core"
version = "1.2.20"
version = "0.3.83"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jsonpatch" },
@@ -3307,9 +3307,9 @@ dependencies = [
{ name = "typing-extensions" },
{ name = "uuid-utils" },
]
sdist = { url = "https://files.pythonhosted.org/packages/db/41/6552a419fe549a79601e5a698d1d5ee2ca7fe93bb87fd624a16a8c1bdee3/langchain_core-1.2.20.tar.gz", hash = "sha256:c7ac8b976039b5832abb989fef058b88c270594ba331efc79e835df046e7dc44", size = 838330, upload-time = "2026-03-18T17:34:45.522Z" }
sdist = { url = "https://files.pythonhosted.org/packages/21/a4/24f2d787bfcf56e5990924cacefe6f6e7971a3629f97c8162fc7a2a3d851/langchain_core-0.3.83.tar.gz", hash = "sha256:a0a4c7b6ea1c446d3b432116f405dc2afa1fe7891c44140d3d5acca221909415", size = 597965, upload-time = "2026-01-13T01:19:23.854Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d9/06/08c88ddd4d6766de4e6c43111ae8f3025df383d2a4379cb938fc571b49d4/langchain_core-1.2.20-py3-none-any.whl", hash = "sha256:b65ff678f3c3dc1f1b4d03a3af5ee3b8d51f9be5181d74eb53c6c11cd9dd5e68", size = 504215, upload-time = "2026-03-18T17:34:44.087Z" },
{ url = "https://files.pythonhosted.org/packages/5a/db/d71b80d3bd6193812485acea4001cdf86cf95a44bbf942f7a240120ff762/langchain_core-0.3.83-py3-none-any.whl", hash = "sha256:8c92506f8b53fc1958b1c07447f58c5783eb8833dd3cb6dc75607c80891ab1ae", size = 458890, upload-time = "2026-01-13T01:19:21.748Z" },
]
[[package]]
@@ -6174,14 +6174,14 @@ wheels = [
[[package]]
name = "pypdf"
version = "6.9.1"
version = "6.7.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "typing-extensions", marker = "python_full_version < '3.11'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f9/fb/dc2e8cb006e80b0020ed20d8649106fe4274e82d8e756ad3e24ade19c0df/pypdf-6.9.1.tar.gz", hash = "sha256:ae052407d33d34de0c86c5c729be6d51010bf36e03035a8f23ab449bca52377d", size = 5311551, upload-time = "2026-03-17T10:46:07.876Z" }
sdist = { url = "https://files.pythonhosted.org/packages/f6/52/37cc0aa9e9d1bf7729a737a0d83f8b3f851c8eb137373d9f71eafb0a3405/pypdf-6.7.5.tar.gz", hash = "sha256:40bb2e2e872078655f12b9b89e2f900888bb505e88a82150b64f9f34fa25651d", size = 5304278, upload-time = "2026-03-02T09:05:21.464Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f9/f4/75543fa802b86e72f87e9395440fe1a89a6d149887e3e55745715c3352ac/pypdf-6.9.1-py3-none-any.whl", hash = "sha256:f35a6a022348fae47e092a908339a8f3dc993510c026bb39a96718fc7185e89f", size = 333661, upload-time = "2026-03-17T10:46:06.286Z" },
{ url = "https://files.pythonhosted.org/packages/05/89/336673efd0a88956562658aba4f0bbef7cb92a6fbcbcaf94926dbc82b408/pypdf-6.7.5-py3-none-any.whl", hash = "sha256:07ba7f1d6e6d9aa2a17f5452e320a84718d4ce863367f7ede2fd72280349ab13", size = 331421, upload-time = "2026-03-02T09:05:19.722Z" },
]
[[package]]
@@ -7626,11 +7626,11 @@ wheels = [
[[package]]
name = "tinytag"
version = "2.2.1"
version = "1.10.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/96/59/8a8cb2331e2602b53e4dc06960f57d1387a2b18e7efd24e5f9cb60ea4925/tinytag-2.2.1.tar.gz", hash = "sha256:e6d06610ebe7cd66fd07be2d3b9495914ab32654a5e47657bb8cd44c2484523c", size = 38214, upload-time = "2026-03-15T18:48:01.11Z" }
sdist = { url = "https://files.pythonhosted.org/packages/59/b5/ff5e5f9ca9677be7272260f67c87f7e8e885babc7ce94604e837dcfd8d76/tinytag-1.10.1.tar.gz", hash = "sha256:122a63b836f85094aacca43fc807aaee3290be3de17d134f5f4a08b509ae268f", size = 40906, upload-time = "2023-10-26T19:30:38.791Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ce/34/d50e338631baaf65ec5396e70085e5de0b52b24b28db1ffbc1c6e82190dc/tinytag-2.2.1-py3-none-any.whl", hash = "sha256:ed8b1e6d25367937e3321e054f4974f9abfde1a3e0a538824c87da377130c2b6", size = 32927, upload-time = "2026-03-15T18:47:59.613Z" },
{ url = "https://files.pythonhosted.org/packages/2f/04/ef783cbc4aa3a5ed75969e300b3e3929daf3d1b52fe80e950c63e0d66d95/tinytag-1.10.1-py3-none-any.whl", hash = "sha256:e437654d04c966fbbbdbf807af61eb9759f1d80e4173a7d26202506b37cfdaf0", size = 37900, upload-time = "2023-10-26T19:30:36.724Z" },
]
[[package]]