Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
a74df9425a ci: trigger test re-run
Co-Authored-By: João <joao@crewai.com>
2026-03-14 05:32:52 +00:00
Devin AI
ee55628db8 fix: resolve N817 lint error by using explicit imports from defusedxml
Replace 'import defusedxml.ElementTree as ET' with explicit imports
(fromstring, ParseError, Element) to satisfy ruff N817 rule that flags
CamelCase imported as acronym.

Co-Authored-By: João <joao@crewai.com>
2026-03-14 05:28:07 +00:00
Devin AI
506155b4f4 fix: replace xml.etree.ElementTree with defusedxml to prevent XXE attacks
Addresses #4865 - The native Python xml library is vulnerable to XML
External Entity (XXE) attacks that can leak confidential data and XML
bombs that can cause denial of service.

Changes:
- Replace xml.etree.ElementTree with defusedxml.ElementTree in xml_loader.py
- Replace xml.etree.ElementTree with defusedxml.ElementTree in arxiv_paper_tool.py
- Add defusedxml~=0.7.1 as a dependency in crewai-tools pyproject.toml
- Update arxiv_paper_tool_test.py to use defusedxml
- Replace WebPageLoader tests in test_xml_loader.py with proper XMLLoader tests
- Add XXE attack tests (entity expansion, billion laughs, parameter entities)
- Remove noqa: S314 comments since defusedxml is safe

Co-Authored-By: João <joao@crewai.com>
2026-03-14 05:24:39 +00:00
5 changed files with 191 additions and 156 deletions

View File

@@ -17,6 +17,7 @@ dependencies = [
"python-docx~=1.2.0",
"youtube-transcript-api~=1.2.2",
"pymupdf~=1.26.6",
"defusedxml~=0.7.1",
]

View File

@@ -1,5 +1,6 @@
from typing import Any
from xml.etree.ElementTree import ParseError, fromstring, parse
from defusedxml.ElementTree import ParseError, fromstring, parse
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.loaders.utils import load_from_url
@@ -40,9 +41,9 @@ class XMLLoader(BaseLoader):
def _parse_xml(self, content: str, source_ref: str) -> LoaderResult:
try:
if content.strip().startswith("<"):
root = fromstring(content) # noqa: S314
root = fromstring(content)
else:
root = parse(source_ref).getroot() # noqa: S314
root = parse(source_ref).getroot()
text_parts = []
for text_content in root.itertext():

View File

@@ -6,9 +6,10 @@ from typing import ClassVar
import urllib.error
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import Element
from crewai.tools import BaseTool, EnvVar
from defusedxml.ElementTree import fromstring
from pydantic import BaseModel, ConfigDict, Field
@@ -90,7 +91,7 @@ class ArxivPaperTool(BaseTool):
logger.error(f"Error fetching data from Arxiv: {e}")
raise
root = ET.fromstring(data) # noqa: S314
root = fromstring(data)
papers = []
for entry in root.findall(self.ATOM_NAMESPACE + "entry"):
@@ -121,11 +122,11 @@ class ArxivPaperTool(BaseTool):
return papers
@staticmethod
def _get_element_text(entry: ET.Element, element_name: str) -> str | None:
def _get_element_text(entry: Element, element_name: str) -> str | None:
elem = entry.find(f"{ArxivPaperTool.ATOM_NAMESPACE}{element_name}")
return elem.text.strip() if elem is not None and elem.text else None
def _extract_pdf_url(self, entry: ET.Element) -> str | None:
def _extract_pdf_url(self, entry: Element) -> str | None:
for link in entry.findall(self.ATOM_NAMESPACE + "link"):
if link.attrib.get("title", "").lower() == "pdf":
return link.attrib.get("href")

View File

@@ -1,167 +1,199 @@
from unittest.mock import Mock, patch
import os
import tempfile
from unittest.mock import patch
from crewai_tools.rag.base_loader import LoaderResult
from crewai_tools.rag.loaders.webpage_loader import WebPageLoader
from crewai_tools.rag.loaders.xml_loader import XMLLoader
from crewai_tools.rag.source_content import SourceContent
import pytest
class TestWebPageLoader:
def setup_mock_response(self, text, status_code=200, content_type="text/html"):
response = Mock()
response.text = text
response.apparent_encoding = "utf-8"
response.status_code = status_code
response.headers = {"content-type": content_type}
return response
def setup_mock_soup(self, text, title=None, script_style_elements=None):
soup = Mock()
soup.get_text.return_value = text
soup.title = Mock(string=title) if title is not None else None
soup.return_value = script_style_elements or []
return soup
@patch("requests.get")
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
def test_load_basic_webpage(self, mock_bs, mock_get):
mock_get.return_value = self.setup_mock_response(
"<html><head><title>Test Page</title></head><body><p>Test content</p></body></html>"
)
mock_bs.return_value = self.setup_mock_soup("Test content", title="Test Page")
loader = WebPageLoader()
result = loader.load(SourceContent("https://example.com"))
class TestXMLLoader:
def test_parse_valid_xml_string(self):
loader = XMLLoader()
source = SourceContent("<root><item>Hello</item><item>World</item></root>")
result = loader.load(source)
assert isinstance(result, LoaderResult)
assert result.content == "Test content"
assert result.metadata["title"] == "Test Page"
assert "Hello" in result.content
assert "World" in result.content
assert result.metadata["format"] == "xml"
assert result.metadata["root_tag"] == "root"
@patch("requests.get")
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
def test_load_webpage_with_scripts_and_styles(self, mock_bs, mock_get):
html = """
<html><head><title>Page with Scripts</title><style>body { color: red; }</style></head>
<body><script>console.log('test');</script><p>Visible content</p></body></html>
"""
mock_get.return_value = self.setup_mock_response(html)
scripts = [Mock(), Mock()]
styles = [Mock()]
for el in scripts + styles:
el.decompose = Mock()
mock_bs.return_value = self.setup_mock_soup(
"Page with Scripts Visible content",
title="Page with Scripts",
script_style_elements=scripts + styles,
def test_parse_xml_from_file(self):
xml_content = "<root><title>Test Title</title><body>Test Body</body></root>"
with tempfile.NamedTemporaryFile(
mode="w", suffix=".xml", delete=False, encoding="utf-8"
) as f:
f.write(xml_content)
temp_path = f.name
try:
loader = XMLLoader()
source = SourceContent(temp_path)
result = loader.load(source)
assert isinstance(result, LoaderResult)
assert "Test Title" in result.content
assert "Test Body" in result.content
assert result.metadata["format"] == "xml"
assert result.metadata["root_tag"] == "root"
finally:
os.unlink(temp_path)
def test_parse_invalid_xml_returns_raw_content(self):
loader = XMLLoader()
invalid_xml = "<root><unclosed>"
source = SourceContent(invalid_xml)
result = loader.load(source)
assert isinstance(result, LoaderResult)
assert result.content == invalid_xml
assert "parse_error" in result.metadata
assert result.metadata["format"] == "xml"
def test_parse_nested_xml(self):
loader = XMLLoader()
xml = (
"<root>"
"<parent><child>Nested text</child></parent>"
"<sibling>Other text</sibling>"
"</root>"
)
source = SourceContent(xml)
result = loader.load(source)
loader = WebPageLoader()
result = loader.load(SourceContent("https://example.com/with-scripts"))
assert "Nested text" in result.content
assert "Other text" in result.content
assert "Visible content" in result.content
for el in scripts + styles:
el.decompose.assert_called_once()
def test_parse_xml_with_attributes(self):
loader = XMLLoader()
xml = '<root><item id="1">First</item><item id="2">Second</item></root>'
source = SourceContent(xml)
result = loader.load(source)
@patch("requests.get")
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
def test_text_cleaning_and_title_handling(self, mock_bs, mock_get):
mock_get.return_value = self.setup_mock_response(
"<html><body><p> Messy text </p></body></html>"
)
mock_bs.return_value = self.setup_mock_soup(
"Text with extra spaces\n\n More\t text \n\n", title=None
)
assert "First" in result.content
assert "Second" in result.content
assert result.metadata["root_tag"] == "root"
loader = WebPageLoader()
result = loader.load(SourceContent("https://example.com/messy-text"))
assert result.content is not None
assert result.metadata["title"] == ""
def test_parse_empty_xml_elements(self):
loader = XMLLoader()
xml = "<root><empty></empty><filled>Content</filled></root>"
source = SourceContent(xml)
result = loader.load(source)
@patch("requests.get")
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
def test_empty_or_missing_title(self, mock_bs, mock_get):
for title in [None, ""]:
mock_get.return_value = self.setup_mock_response(
"<html><head><title></title></head><body>Content</body></html>"
)
mock_bs.return_value = self.setup_mock_soup("Content", title=title)
assert "Content" in result.content
assert result.metadata["format"] == "xml"
loader = WebPageLoader()
result = loader.load(SourceContent("https://example.com"))
assert result.metadata["title"] == ""
@patch("requests.get")
def test_custom_and_default_headers(self, mock_get):
mock_get.return_value = self.setup_mock_response(
"<html><body>Test</body></html>"
)
custom_headers = {
"User-Agent": "Bot",
"Authorization": "Bearer xyz",
"Accept": "text/html",
}
with patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup") as mock_bs:
mock_bs.return_value = self.setup_mock_soup("Test")
WebPageLoader().load(
SourceContent("https://example.com"), headers=custom_headers
)
assert mock_get.call_args[1]["headers"] == custom_headers
@patch("requests.get")
def test_error_handling(self, mock_get):
for error in [Exception("Fail"), ValueError("Bad"), ImportError("Oops")]:
mock_get.side_effect = error
with pytest.raises(ValueError, match="Error loading webpage"):
WebPageLoader().load(SourceContent("https://example.com"))
@patch("requests.get")
def test_timeout_and_http_error(self, mock_get):
import requests
mock_get.side_effect = requests.Timeout("Timeout")
with pytest.raises(ValueError):
WebPageLoader().load(SourceContent("https://example.com"))
mock_response = Mock()
mock_response.raise_for_status.side_effect = requests.HTTPError("404")
mock_get.side_effect = None
mock_get.return_value = mock_response
with pytest.raises(ValueError):
WebPageLoader().load(SourceContent("https://example.com/404"))
@patch("requests.get")
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
def test_doc_id_consistency(self, mock_bs, mock_get):
mock_get.return_value = self.setup_mock_response(
"<html><body>Doc</body></html>"
)
mock_bs.return_value = self.setup_mock_soup("Doc")
loader = WebPageLoader()
result1 = loader.load(SourceContent("https://example.com"))
result2 = loader.load(SourceContent("https://example.com"))
def test_doc_id_consistency(self):
loader = XMLLoader()
xml = "<root><item>Consistent</item></root>"
source = SourceContent(xml)
result1 = loader.load(source)
result2 = loader.load(source)
assert result1.doc_id == result2.doc_id
@patch("requests.get")
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
def test_status_code_and_content_type(self, mock_bs, mock_get):
for status in [200, 201, 301]:
mock_get.return_value = self.setup_mock_response(
f"<html><body>Status {status}</body></html>", status_code=status
)
mock_bs.return_value = self.setup_mock_soup(f"Status {status}")
result = WebPageLoader().load(
SourceContent(f"https://example.com/{status}")
)
assert result.metadata["status_code"] == status
@patch("crewai_tools.rag.loaders.xml_loader.load_from_url")
def test_load_from_url(self, mock_load_url):
mock_load_url.return_value = "<root><data>URL content</data></root>"
loader = XMLLoader()
source = SourceContent("https://example.com/data.xml")
result = loader.load(source)
for ctype in ["text/html", "text/plain", "application/xhtml+xml"]:
mock_get.return_value = self.setup_mock_response(
"<html><body>Content</body></html>", content_type=ctype
)
mock_bs.return_value = self.setup_mock_soup("Content")
result = WebPageLoader().load(SourceContent("https://example.com"))
assert result.metadata["content_type"] == ctype
assert isinstance(result, LoaderResult)
assert "URL content" in result.content
mock_load_url.assert_called_once()
def test_xxe_entity_expansion_blocked(self):
"""Test that XML External Entity (XXE) attacks are blocked by defusedxml."""
loader = XMLLoader()
xxe_payload = (
'<?xml version="1.0"?>'
"<!DOCTYPE foo ["
' <!ENTITY xxe SYSTEM "file:///etc/passwd">'
"]>"
"<root>&xxe;</root>"
)
source = SourceContent(xxe_payload)
result = loader.load(source)
# defusedxml should block the entity expansion and raise EntitiesForbidden,
# which is a subclass of ParseError, so _parse_xml catches it and returns
# raw content with a parse_error in metadata.
assert "parse_error" in result.metadata
assert result.metadata["format"] == "xml"
# The raw payload should NOT have resolved the entity
assert "/etc/passwd" not in result.content or result.content == xxe_payload
def test_xxe_billion_laughs_blocked(self):
"""Test that XML bomb (Billion Laughs) attacks are blocked by defusedxml."""
loader = XMLLoader()
billion_laughs = (
'<?xml version="1.0"?>'
"<!DOCTYPE lolz ["
' <!ENTITY lol "lol">'
' <!ENTITY lol2 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
"]>"
"<root>&lol3;</root>"
)
source = SourceContent(billion_laughs)
result = loader.load(source)
# defusedxml blocks entity expansion, resulting in a parse error
assert "parse_error" in result.metadata
assert result.metadata["format"] == "xml"
def test_xxe_parameter_entity_blocked(self):
"""Test that parameter entity attacks are blocked by defusedxml."""
loader = XMLLoader()
xxe_param = (
'<?xml version="1.0"?>'
"<!DOCTYPE foo ["
' <!ENTITY % xxe SYSTEM "file:///etc/passwd">'
" %xxe;"
"]>"
"<root>test</root>"
)
source = SourceContent(xxe_param)
result = loader.load(source)
assert "parse_error" in result.metadata
def test_xxe_file_from_file_blocked(self):
"""Test that XXE attacks via file loading are also blocked."""
xxe_content = (
'<?xml version="1.0"?>'
"<!DOCTYPE foo ["
' <!ENTITY xxe SYSTEM "file:///etc/passwd">'
"]>"
"<root>&xxe;</root>"
)
with tempfile.NamedTemporaryFile(
mode="w", suffix=".xml", delete=False, encoding="utf-8"
) as f:
f.write(xxe_content)
temp_path = f.name
try:
loader = XMLLoader()
source = SourceContent(temp_path)
result = loader.load(source)
assert "parse_error" in result.metadata
assert result.metadata["format"] == "xml"
finally:
os.unlink(temp_path)
def test_defusedxml_is_used_not_stdlib(self):
"""Verify that the XMLLoader imports from defusedxml, not xml.etree.ElementTree."""
import crewai_tools.rag.loaders.xml_loader as xml_loader_module
assert "defusedxml" in xml_loader_module.fromstring.__module__
def test_arxiv_tool_uses_defusedxml(self):
"""Verify that ArxivPaperTool imports from defusedxml, not xml.etree.ElementTree."""
import crewai_tools.tools.arxiv_paper_tool.arxiv_paper_tool as arxiv_module
assert "defusedxml" in arxiv_module.fromstring.__module__

View File

@@ -1,7 +1,7 @@
from pathlib import Path
from unittest.mock import MagicMock, patch
import urllib.error
import xml.etree.ElementTree as ET
from defusedxml.ElementTree import ParseError
from crewai_tools import ArxivPaperTool
import pytest
@@ -108,7 +108,7 @@ def test_invalid_xml_response(mock_urlopen, tool):
mock_response.status = 200
mock_urlopen.return_value.__enter__.return_value = mock_response
with pytest.raises(ET.ParseError):
with pytest.raises(ParseError):
tool.fetch_arxiv_data("quantum", 1)