Prevent credential forwarding across redirects

This commit is contained in:
Rip&Tear
2026-06-26 01:47:27 +08:00
parent cd3fa72ead
commit 7455e51a84
2 changed files with 115 additions and 13 deletions

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from typing import Any
from urllib.parse import urljoin
from urllib.parse import urljoin, urlparse
import requests
@@ -11,6 +11,41 @@ from crewai_tools.security.safe_path import validate_url
_REDIRECT_STATUS_CODES = {301, 302, 303, 307, 308}
_SENSITIVE_HEADER_NAMES = {
"authorization",
"cookie",
"proxy-authorization",
"x-api-key",
}
_SENSITIVE_HEADER_FRAGMENTS = ("api-key", "apikey", "secret", "token")
def _same_origin(previous_url: str, next_url: str) -> bool:
previous = urlparse(previous_url)
next_ = urlparse(next_url)
return (previous.scheme, previous.netloc) == (next_.scheme, next_.netloc)
def _is_sensitive_header(header: str) -> bool:
normalized = header.lower()
return (
normalized in _SENSITIVE_HEADER_NAMES
or normalized.startswith("authorization-")
or any(fragment in normalized for fragment in _SENSITIVE_HEADER_FRAGMENTS)
)
def _strip_cross_origin_credentials(request_kwargs: dict[str, Any]) -> dict[str, Any]:
sanitized = {**request_kwargs}
headers = sanitized.get("headers")
if headers:
sanitized["headers"] = {
key: value
for key, value in headers.items()
if not _is_sensitive_header(str(key))
}
sanitized.pop("cookies", None)
return sanitized
def safe_get(url: str, *, max_redirects: int = 10, **kwargs: Any) -> requests.Response:
@@ -44,6 +79,10 @@ def safe_get(url: str, *, max_redirects: int = 10, **kwargs: Any) -> requests.Re
except ValueError:
response.close()
raise
if not _same_origin(current_url, redirect_url):
request_kwargs = _strip_cross_origin_credentials(request_kwargs)
history.append(response)
current_url = redirect_url
redirects_followed += 1

View File

@@ -50,6 +50,13 @@ def test_safe_get_blocks_direct_internal_url() -> None:
safe_get("http://127.0.0.1/admin", timeout=15)
def _mock_get(monkeypatch: pytest.MonkeyPatch, get_response: Any) -> None:
monkeypatch.setattr(
"crewai_tools.security.safe_requests.requests.get",
get_response,
)
def test_safe_get_blocks_redirect_to_internal_url(
monkeypatch: pytest.MonkeyPatch, public_dns: None
) -> None:
@@ -60,10 +67,7 @@ def test_safe_get_blocks_redirect_to_internal_url(
assert kwargs["allow_redirects"] is False
return _response(url, 302, location="http://127.0.0.1/admin")
monkeypatch.setattr(
"crewai_tools.security.safe_requests.requests.get",
fake_get,
)
_mock_get(monkeypatch, fake_get)
with pytest.raises(ValueError, match="private/reserved IP"):
safe_get("http://public.example/start", timeout=15)
@@ -83,10 +87,7 @@ def test_safe_get_follows_safe_relative_redirect(
return _response(url, 302, location="/final")
return _response(url, 200)
monkeypatch.setattr(
"crewai_tools.security.safe_requests.requests.get",
fake_get,
)
_mock_get(monkeypatch, fake_get)
response = safe_get("http://public.example/start", timeout=15)
@@ -105,10 +106,72 @@ def test_safe_get_fails_closed_after_too_many_redirects(
def fake_get(url: str, **kwargs: Any) -> requests.Response:
return _response(url, 302, location="http://safe.example/again")
monkeypatch.setattr(
"crewai_tools.security.safe_requests.requests.get",
fake_get,
)
_mock_get(monkeypatch, fake_get)
with pytest.raises(ValueError, match="Too many redirects"):
safe_get("http://public.example/start", max_redirects=1, timeout=15)
def test_safe_get_strips_credentials_on_cross_origin_redirect(
monkeypatch: pytest.MonkeyPatch, public_dns: None
) -> None:
requests_made: list[tuple[str, dict[str, Any]]] = []
def fake_get(url: str, **kwargs: Any) -> requests.Response:
requests_made.append((url, kwargs))
if url == "http://public.example/start":
return _response(url, 302, location="http://safe.example/final")
return _response(url, 200)
_mock_get(monkeypatch, fake_get)
response = safe_get(
"http://public.example/start",
timeout=15,
headers={
"Authorization": "Bearer token",
"Authorization-Custom": "secret token",
"Cookie": "session=abc",
"X-API-Key": "api key",
"X-CrewAI-Token": "crewai token",
"User-Agent": "crewai-test",
},
cookies={"session": "abc"},
)
assert response.status_code == 200
assert requests_made[0][1]["headers"] == {
"Authorization": "Bearer token",
"Authorization-Custom": "secret token",
"Cookie": "session=abc",
"X-API-Key": "api key",
"X-CrewAI-Token": "crewai token",
"User-Agent": "crewai-test",
}
assert requests_made[0][1]["cookies"] == {"session": "abc"}
assert requests_made[1][1]["headers"] == {"User-Agent": "crewai-test"}
assert "cookies" not in requests_made[1][1]
def test_safe_get_preserves_credentials_on_same_origin_redirect(
monkeypatch: pytest.MonkeyPatch, public_dns: None
) -> None:
requests_made: list[tuple[str, dict[str, Any]]] = []
def fake_get(url: str, **kwargs: Any) -> requests.Response:
requests_made.append((url, kwargs))
if url == "http://public.example/start":
return _response(url, 302, location="/final")
return _response(url, 200)
_mock_get(monkeypatch, fake_get)
safe_get(
"http://public.example/start",
timeout=15,
headers={"Authorization": "Bearer token"},
cookies={"session": "abc"},
)
assert requests_made[1][1]["headers"] == {"Authorization": "Bearer token"}
assert requests_made[1][1]["cookies"] == {"session": "abc"}