Merge branch 'main' into codex/nl2sql-security-docs

This commit is contained in:
Rip&Tear
2026-02-12 13:44:54 +08:00
committed by GitHub
3 changed files with 435 additions and 16 deletions

View File

@@ -6,12 +6,12 @@ from functools import lru_cache
import importlib.metadata
import json
from pathlib import Path
from typing import Any, cast
from typing import Any
from urllib import request
from urllib.error import URLError
import appdirs
from packaging.version import InvalidVersion, parse
from packaging.version import InvalidVersion, Version, parse
@lru_cache(maxsize=1)
@@ -42,21 +42,88 @@ def _is_cache_valid(cache_data: Mapping[str, Any]) -> bool:
return False
def _find_latest_non_yanked_version(
releases: Mapping[str, list[dict[str, Any]]],
) -> str | None:
"""Find the latest non-yanked version from PyPI releases data.
Args:
releases: PyPI releases dict mapping version strings to file info lists.
Returns:
The latest non-yanked version string, or None if all versions are yanked.
"""
best_version: Version | None = None
best_version_str: str | None = None
for version_str, files in releases.items():
try:
v = parse(version_str)
except InvalidVersion:
continue
if v.is_prerelease or v.is_devrelease:
continue
if not files:
continue
all_yanked = all(f.get("yanked", False) for f in files)
if all_yanked:
continue
if best_version is None or v > best_version:
best_version = v
best_version_str = version_str
return best_version_str
def _is_version_yanked(
version_str: str,
releases: Mapping[str, list[dict[str, Any]]],
) -> tuple[bool, str]:
"""Check if a specific version is yanked.
Args:
version_str: The version string to check.
releases: PyPI releases dict mapping version strings to file info lists.
Returns:
Tuple of (is_yanked, yanked_reason).
"""
files = releases.get(version_str, [])
if not files:
return False, ""
all_yanked = all(f.get("yanked", False) for f in files)
if not all_yanked:
return False, ""
for f in files:
reason = f.get("yanked_reason", "")
if reason:
return True, str(reason)
return True, ""
def get_latest_version_from_pypi(timeout: int = 2) -> str | None:
"""Get the latest version of CrewAI from PyPI.
"""Get the latest non-yanked version of CrewAI from PyPI.
Args:
timeout: Request timeout in seconds.
Returns:
Latest version string or None if unable to fetch.
Latest non-yanked version string or None if unable to fetch.
"""
cache_file = _get_cache_file()
if cache_file.exists():
try:
cache_data = json.loads(cache_file.read_text())
if _is_cache_valid(cache_data):
return cast(str | None, cache_data.get("version"))
if _is_cache_valid(cache_data) and "current_version" in cache_data:
version: str | None = cache_data.get("version")
return version
except (json.JSONDecodeError, OSError):
pass
@@ -65,11 +132,18 @@ def get_latest_version_from_pypi(timeout: int = 2) -> str | None:
"https://pypi.org/pypi/crewai/json", timeout=timeout
) as response:
data = json.loads(response.read())
latest_version = cast(str, data["info"]["version"])
releases: dict[str, list[dict[str, Any]]] = data["releases"]
latest_version = _find_latest_non_yanked_version(releases)
current_version = get_crewai_version()
is_yanked, yanked_reason = _is_version_yanked(current_version, releases)
cache_data = {
"version": latest_version,
"timestamp": datetime.now().isoformat(),
"current_version": current_version,
"current_version_yanked": is_yanked,
"current_version_yanked_reason": yanked_reason,
}
cache_file.write_text(json.dumps(cache_data))
@@ -78,6 +152,40 @@ def get_latest_version_from_pypi(timeout: int = 2) -> str | None:
return None
def is_current_version_yanked() -> tuple[bool, str]:
"""Check if the currently installed version has been yanked on PyPI.
Reads from cache if available, otherwise triggers a fetch.
Returns:
Tuple of (is_yanked, yanked_reason).
"""
cache_file = _get_cache_file()
if cache_file.exists():
try:
cache_data = json.loads(cache_file.read_text())
if _is_cache_valid(cache_data) and "current_version" in cache_data:
current = get_crewai_version()
if cache_data.get("current_version") == current:
return (
bool(cache_data.get("current_version_yanked", False)),
str(cache_data.get("current_version_yanked_reason", "")),
)
except (json.JSONDecodeError, OSError):
pass
get_latest_version_from_pypi()
try:
cache_data = json.loads(cache_file.read_text())
return (
bool(cache_data.get("current_version_yanked", False)),
str(cache_data.get("current_version_yanked_reason", "")),
)
except (json.JSONDecodeError, OSError):
return False, ""
def check_version() -> tuple[str, str | None]:
"""Check current and latest versions.

View File

@@ -8,7 +8,7 @@ from rich.live import Live
from rich.panel import Panel
from rich.text import Text
from crewai.cli.version import is_newer_version_available
from crewai.cli.version import is_current_version_yanked, is_newer_version_available
_disable_version_check: ContextVar[bool] = ContextVar(
@@ -104,6 +104,22 @@ To update, run: uv sync --upgrade-package crewai"""
)
self.console.print(panel)
self.console.print()
is_yanked, yanked_reason = is_current_version_yanked()
if is_yanked:
yanked_message = f"Version {current} has been yanked from PyPI."
if yanked_reason:
yanked_message += f"\nReason: {yanked_reason}"
yanked_message += "\n\nTo update, run: uv sync --upgrade-package crewai"
yanked_panel = Panel(
yanked_message,
title="Yanked Version",
border_style="red",
padding=(1, 2),
)
self.console.print(yanked_panel)
self.console.print()
except Exception: # noqa: S110
# Silently ignore errors in version check - it's non-critical
pass

View File

@@ -1,15 +1,19 @@
"""Test for version management."""
import json
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai import __version__
from crewai.cli.version import (
_find_latest_non_yanked_version,
_get_cache_file,
_is_cache_valid,
_is_version_yanked,
get_crewai_version,
get_latest_version_from_pypi,
is_current_version_yanked,
is_newer_version_available,
)
@@ -19,10 +23,8 @@ def test_dynamic_versioning_consistency() -> None:
cli_version = get_crewai_version()
package_version = __version__
# Both should return the same version string
assert cli_version == package_version
# Version should not be empty
assert package_version is not None
assert len(package_version.strip()) > 0
@@ -63,12 +65,18 @@ class TestVersionChecking:
def test_get_latest_version_from_pypi_success(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test successful PyPI version fetch."""
# Mock cache not existing to force fetch from PyPI
"""Test successful PyPI version fetch uses releases data."""
mock_exists.return_value = False
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": False}],
"2.1.0": [{"yanked": True, "yanked_reason": "bad release"}],
}
mock_response = MagicMock()
mock_response.read.return_value = b'{"info": {"version": "2.0.0"}}'
mock_response.read.return_value = json.dumps(
{"info": {"version": "2.1.0"}, "releases": releases}
).encode()
mock_urlopen.return_value.__enter__.return_value = mock_response
version = get_latest_version_from_pypi()
@@ -82,7 +90,6 @@ class TestVersionChecking:
"""Test PyPI version fetch failure."""
from urllib.error import URLError
# Mock cache not existing to force fetch from PyPI
mock_exists.return_value = False
mock_urlopen.side_effect = URLError("Network error")
@@ -133,18 +140,247 @@ class TestVersionChecking:
assert latest is None
class TestFindLatestNonYankedVersion:
"""Test _find_latest_non_yanked_version helper."""
def test_skips_yanked_versions(self) -> None:
"""Test that yanked versions are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_returns_highest_non_yanked(self) -> None:
"""Test that the highest non-yanked version is returned."""
releases = {
"1.0.0": [{"yanked": False}],
"1.5.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) == "1.5.0"
def test_returns_none_when_all_yanked(self) -> None:
"""Test that None is returned when all versions are yanked."""
releases = {
"1.0.0": [{"yanked": True}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) is None
def test_skips_prerelease_versions(self) -> None:
"""Test that pre-release versions are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0a1": [{"yanked": False}],
"2.0.0rc1": [{"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_skips_versions_with_empty_files(self) -> None:
"""Test that versions with no files are skipped."""
releases: dict[str, list[dict[str, bool]]] = {
"1.0.0": [{"yanked": False}],
"2.0.0": [],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_handles_invalid_version_strings(self) -> None:
"""Test that invalid version strings are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"not-a-version": [{"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_partially_yanked_files_not_considered_yanked(self) -> None:
"""Test that a version with some non-yanked files is not yanked."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}, {"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "2.0.0"
class TestIsVersionYanked:
"""Test _is_version_yanked helper."""
def test_non_yanked_version(self) -> None:
"""Test a non-yanked version returns False."""
releases = {"1.0.0": [{"yanked": False}]}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is False
assert reason == ""
def test_yanked_version_with_reason(self) -> None:
"""Test a yanked version returns True with reason."""
releases = {
"1.0.0": [{"yanked": True, "yanked_reason": "critical bug"}],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == "critical bug"
def test_yanked_version_without_reason(self) -> None:
"""Test a yanked version returns True with empty reason."""
releases = {"1.0.0": [{"yanked": True}]}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == ""
def test_unknown_version(self) -> None:
"""Test an unknown version returns False."""
releases = {"1.0.0": [{"yanked": False}]}
is_yanked, reason = _is_version_yanked("9.9.9", releases)
assert is_yanked is False
assert reason == ""
def test_partially_yanked_files(self) -> None:
"""Test a version with mixed yanked/non-yanked files is not yanked."""
releases = {
"1.0.0": [{"yanked": True}, {"yanked": False}],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is False
assert reason == ""
def test_multiple_yanked_files_picks_first_reason(self) -> None:
"""Test that the first available reason is returned."""
releases = {
"1.0.0": [
{"yanked": True, "yanked_reason": ""},
{"yanked": True, "yanked_reason": "second reason"},
],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == "second reason"
class TestIsCurrentVersionYanked:
"""Test is_current_version_yanked public function."""
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version._get_cache_file")
def test_reads_from_valid_cache(
self, mock_cache_file: MagicMock, mock_version: MagicMock, tmp_path: Path
) -> None:
"""Test reading yanked status from a valid cache."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
cache_data = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "1.0.0",
"current_version_yanked": True,
"current_version_yanked_reason": "bad release",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
is_yanked, reason = is_current_version_yanked()
assert is_yanked is True
assert reason == "bad release"
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version._get_cache_file")
def test_not_yanked_from_cache(
self, mock_cache_file: MagicMock, mock_version: MagicMock, tmp_path: Path
) -> None:
"""Test non-yanked status from a valid cache."""
mock_version.return_value = "2.0.0"
cache_file = tmp_path / "version_cache.json"
cache_data = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "2.0.0",
"current_version_yanked": False,
"current_version_yanked_reason": "",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
assert reason == ""
@patch("crewai.cli.version.get_latest_version_from_pypi")
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version._get_cache_file")
def test_triggers_fetch_on_stale_cache(
self,
mock_cache_file: MagicMock,
mock_version: MagicMock,
mock_fetch: MagicMock,
tmp_path: Path,
) -> None:
"""Test that a stale cache triggers a re-fetch."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
old_time = datetime.now() - timedelta(hours=25)
cache_data = {
"version": "2.0.0",
"timestamp": old_time.isoformat(),
"current_version": "1.0.0",
"current_version_yanked": True,
"current_version_yanked_reason": "old reason",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
fresh_cache = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "1.0.0",
"current_version_yanked": False,
"current_version_yanked_reason": "",
}
def write_fresh_cache() -> str:
cache_file.write_text(json.dumps(fresh_cache))
return "2.0.0"
mock_fetch.side_effect = lambda: write_fresh_cache()
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
mock_fetch.assert_called_once()
@patch("crewai.cli.version.get_latest_version_from_pypi")
@patch("crewai.cli.version.get_crewai_version")
@patch("crewai.cli.version._get_cache_file")
def test_returns_false_on_fetch_failure(
self,
mock_cache_file: MagicMock,
mock_version: MagicMock,
mock_fetch: MagicMock,
tmp_path: Path,
) -> None:
"""Test that fetch failure returns not yanked."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
mock_cache_file.return_value = cache_file
mock_fetch.return_value = None
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
assert reason == ""
class TestConsoleFormatterVersionCheck:
"""Test version check display in ConsoleFormatter."""
@patch("crewai.events.utils.console_formatter.is_current_version_yanked")
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": ""})
def test_version_message_shows_when_update_available_and_verbose(
self, mock_check: MagicMock
self, mock_check: MagicMock, mock_yanked: MagicMock
) -> None:
"""Test version message shows when update available and verbose enabled."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
mock_yanked.return_value = (False, "")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
@@ -165,14 +401,16 @@ class TestConsoleFormatterVersionCheck:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_current_version_yanked")
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
def test_version_message_hides_when_no_update_available(
self, mock_check: MagicMock
self, mock_check: MagicMock, mock_yanked: MagicMock
) -> None:
"""Test version message hidden when no update available."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (False, "2.0.0", "2.0.0")
mock_yanked.return_value = (False, "")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
@@ -208,3 +446,60 @@ class TestConsoleFormatterVersionCheck:
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
mock_print.assert_not_called()
@patch("crewai.events.utils.console_formatter.is_current_version_yanked")
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": ""})
def test_yanked_warning_shows_when_version_is_yanked(
self, mock_check: MagicMock, mock_yanked: MagicMock
) -> None:
"""Test yanked warning panel shows when current version is yanked."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (False, "1.0.0", "1.0.0")
mock_yanked.return_value = (True, "critical bug")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
assert mock_print.call_count == 2
panel = mock_print.call_args_list[0][0][0]
assert "Yanked Version" in panel.title
assert "critical bug" in str(panel.renderable)
@patch("crewai.events.utils.console_formatter.is_current_version_yanked")
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": ""})
def test_yanked_warning_shows_without_reason(
self, mock_check: MagicMock, mock_yanked: MagicMock
) -> None:
"""Test yanked warning panel shows even without a reason."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (False, "1.0.0", "1.0.0")
mock_yanked.return_value = (True, "")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
assert mock_print.call_count == 2
panel = mock_print.call_args_list[0][0][0]
assert "Yanked Version" in panel.title
assert "Reason:" not in str(panel.renderable)
@patch("crewai.events.utils.console_formatter.is_current_version_yanked")
@patch("crewai.events.utils.console_formatter.is_newer_version_available")
@patch.dict("os.environ", {"CI": ""})
def test_both_update_and_yanked_warning_show(
self, mock_check: MagicMock, mock_yanked: MagicMock
) -> None:
"""Test both update and yanked panels show when applicable."""
from crewai.events.utils.console_formatter import ConsoleFormatter
mock_check.return_value = (True, "1.0.0", "2.0.0")
mock_yanked.return_value = (True, "security issue")
formatter = ConsoleFormatter(verbose=True)
with patch.object(formatter.console, "print") as mock_print:
formatter._show_version_update_message_if_needed()
assert mock_print.call_count == 4