diff --git a/lib/crewai-tools/pyproject.toml b/lib/crewai-tools/pyproject.toml index 7cc34a122..6329c472b 100644 --- a/lib/crewai-tools/pyproject.toml +++ b/lib/crewai-tools/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "python-docx~=1.2.0", "youtube-transcript-api~=1.2.2", "pymupdf~=1.26.6", + "defusedxml~=0.7.1", ] diff --git a/lib/crewai-tools/src/crewai_tools/rag/loaders/xml_loader.py b/lib/crewai-tools/src/crewai_tools/rag/loaders/xml_loader.py index f7f9a6d00..baf8c5141 100644 --- a/lib/crewai-tools/src/crewai_tools/rag/loaders/xml_loader.py +++ b/lib/crewai-tools/src/crewai_tools/rag/loaders/xml_loader.py @@ -1,5 +1,6 @@ from typing import Any -from xml.etree.ElementTree import ParseError, fromstring, parse + +from defusedxml.ElementTree import ParseError, fromstring, parse from crewai_tools.rag.base_loader import BaseLoader, LoaderResult from crewai_tools.rag.loaders.utils import load_from_url @@ -40,9 +41,9 @@ class XMLLoader(BaseLoader): def _parse_xml(self, content: str, source_ref: str) -> LoaderResult: try: if content.strip().startswith("<"): - root = fromstring(content) # noqa: S314 + root = fromstring(content) else: - root = parse(source_ref).getroot() # noqa: S314 + root = parse(source_ref).getroot() text_parts = [] for text_content in root.itertext(): diff --git a/lib/crewai-tools/src/crewai_tools/tools/arxiv_paper_tool/arxiv_paper_tool.py b/lib/crewai-tools/src/crewai_tools/tools/arxiv_paper_tool/arxiv_paper_tool.py index 3776f56d6..56f0e4828 100644 --- a/lib/crewai-tools/src/crewai_tools/tools/arxiv_paper_tool/arxiv_paper_tool.py +++ b/lib/crewai-tools/src/crewai_tools/tools/arxiv_paper_tool/arxiv_paper_tool.py @@ -6,9 +6,9 @@ from typing import ClassVar import urllib.error import urllib.parse import urllib.request -import xml.etree.ElementTree as ET from crewai.tools import BaseTool, EnvVar +import defusedxml.ElementTree as ET from pydantic import BaseModel, ConfigDict, Field @@ -90,7 +90,7 @@ class ArxivPaperTool(BaseTool): logger.error(f"Error fetching data from Arxiv: {e}") raise - root = ET.fromstring(data) # noqa: S314 + root = ET.fromstring(data) papers = [] for entry in root.findall(self.ATOM_NAMESPACE + "entry"): diff --git a/lib/crewai-tools/tests/rag/test_xml_loader.py b/lib/crewai-tools/tests/rag/test_xml_loader.py index c9debe6a1..1576e84b4 100644 --- a/lib/crewai-tools/tests/rag/test_xml_loader.py +++ b/lib/crewai-tools/tests/rag/test_xml_loader.py @@ -1,167 +1,200 @@ -from unittest.mock import Mock, patch +import os +import tempfile +from unittest.mock import patch from crewai_tools.rag.base_loader import LoaderResult -from crewai_tools.rag.loaders.webpage_loader import WebPageLoader +from crewai_tools.rag.loaders.xml_loader import XMLLoader from crewai_tools.rag.source_content import SourceContent import pytest -class TestWebPageLoader: - def setup_mock_response(self, text, status_code=200, content_type="text/html"): - response = Mock() - response.text = text - response.apparent_encoding = "utf-8" - response.status_code = status_code - response.headers = {"content-type": content_type} - return response - - def setup_mock_soup(self, text, title=None, script_style_elements=None): - soup = Mock() - soup.get_text.return_value = text - soup.title = Mock(string=title) if title is not None else None - soup.return_value = script_style_elements or [] - return soup - - @patch("requests.get") - @patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup") - def test_load_basic_webpage(self, mock_bs, mock_get): - mock_get.return_value = self.setup_mock_response( - "Test Page

Test content

" - ) - mock_bs.return_value = self.setup_mock_soup("Test content", title="Test Page") - - loader = WebPageLoader() - result = loader.load(SourceContent("https://example.com")) +class TestXMLLoader: + def test_parse_valid_xml_string(self): + loader = XMLLoader() + source = SourceContent("HelloWorld") + result = loader.load(source) assert isinstance(result, LoaderResult) - assert result.content == "Test content" - assert result.metadata["title"] == "Test Page" + assert "Hello" in result.content + assert "World" in result.content + assert result.metadata["format"] == "xml" + assert result.metadata["root_tag"] == "root" - @patch("requests.get") - @patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup") - def test_load_webpage_with_scripts_and_styles(self, mock_bs, mock_get): - html = """ - Page with Scripts -

Visible content

- """ - mock_get.return_value = self.setup_mock_response(html) - scripts = [Mock(), Mock()] - styles = [Mock()] - for el in scripts + styles: - el.decompose = Mock() - mock_bs.return_value = self.setup_mock_soup( - "Page with Scripts Visible content", - title="Page with Scripts", - script_style_elements=scripts + styles, + def test_parse_xml_from_file(self): + xml_content = "Test TitleTest Body" + with tempfile.NamedTemporaryFile( + mode="w", suffix=".xml", delete=False, encoding="utf-8" + ) as f: + f.write(xml_content) + temp_path = f.name + + try: + loader = XMLLoader() + source = SourceContent(temp_path) + result = loader.load(source) + + assert isinstance(result, LoaderResult) + assert "Test Title" in result.content + assert "Test Body" in result.content + assert result.metadata["format"] == "xml" + assert result.metadata["root_tag"] == "root" + finally: + os.unlink(temp_path) + + def test_parse_invalid_xml_returns_raw_content(self): + loader = XMLLoader() + invalid_xml = "" + source = SourceContent(invalid_xml) + result = loader.load(source) + + assert isinstance(result, LoaderResult) + assert result.content == invalid_xml + assert "parse_error" in result.metadata + assert result.metadata["format"] == "xml" + + def test_parse_nested_xml(self): + loader = XMLLoader() + xml = ( + "" + "Nested text" + "Other text" + "" ) + source = SourceContent(xml) + result = loader.load(source) - loader = WebPageLoader() - result = loader.load(SourceContent("https://example.com/with-scripts")) + assert "Nested text" in result.content + assert "Other text" in result.content - assert "Visible content" in result.content - for el in scripts + styles: - el.decompose.assert_called_once() + def test_parse_xml_with_attributes(self): + loader = XMLLoader() + xml = 'FirstSecond' + source = SourceContent(xml) + result = loader.load(source) - @patch("requests.get") - @patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup") - def test_text_cleaning_and_title_handling(self, mock_bs, mock_get): - mock_get.return_value = self.setup_mock_response( - "

Messy text

" - ) - mock_bs.return_value = self.setup_mock_soup( - "Text with extra spaces\n\n More\t text \n\n", title=None - ) + assert "First" in result.content + assert "Second" in result.content + assert result.metadata["root_tag"] == "root" - loader = WebPageLoader() - result = loader.load(SourceContent("https://example.com/messy-text")) - assert result.content is not None - assert result.metadata["title"] == "" + def test_parse_empty_xml_elements(self): + loader = XMLLoader() + xml = "Content" + source = SourceContent(xml) + result = loader.load(source) - @patch("requests.get") - @patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup") - def test_empty_or_missing_title(self, mock_bs, mock_get): - for title in [None, ""]: - mock_get.return_value = self.setup_mock_response( - "Content" - ) - mock_bs.return_value = self.setup_mock_soup("Content", title=title) + assert "Content" in result.content + assert result.metadata["format"] == "xml" - loader = WebPageLoader() - result = loader.load(SourceContent("https://example.com")) - assert result.metadata["title"] == "" - - @patch("requests.get") - def test_custom_and_default_headers(self, mock_get): - mock_get.return_value = self.setup_mock_response( - "Test" - ) - custom_headers = { - "User-Agent": "Bot", - "Authorization": "Bearer xyz", - "Accept": "text/html", - } - - with patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup") as mock_bs: - mock_bs.return_value = self.setup_mock_soup("Test") - WebPageLoader().load( - SourceContent("https://example.com"), headers=custom_headers - ) - - assert mock_get.call_args[1]["headers"] == custom_headers - - @patch("requests.get") - def test_error_handling(self, mock_get): - for error in [Exception("Fail"), ValueError("Bad"), ImportError("Oops")]: - mock_get.side_effect = error - with pytest.raises(ValueError, match="Error loading webpage"): - WebPageLoader().load(SourceContent("https://example.com")) - - @patch("requests.get") - def test_timeout_and_http_error(self, mock_get): - import requests - - mock_get.side_effect = requests.Timeout("Timeout") - with pytest.raises(ValueError): - WebPageLoader().load(SourceContent("https://example.com")) - - mock_response = Mock() - mock_response.raise_for_status.side_effect = requests.HTTPError("404") - mock_get.side_effect = None - mock_get.return_value = mock_response - with pytest.raises(ValueError): - WebPageLoader().load(SourceContent("https://example.com/404")) - - @patch("requests.get") - @patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup") - def test_doc_id_consistency(self, mock_bs, mock_get): - mock_get.return_value = self.setup_mock_response( - "Doc" - ) - mock_bs.return_value = self.setup_mock_soup("Doc") - - loader = WebPageLoader() - result1 = loader.load(SourceContent("https://example.com")) - result2 = loader.load(SourceContent("https://example.com")) + def test_doc_id_consistency(self): + loader = XMLLoader() + xml = "Consistent" + source = SourceContent(xml) + result1 = loader.load(source) + result2 = loader.load(source) assert result1.doc_id == result2.doc_id - @patch("requests.get") - @patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup") - def test_status_code_and_content_type(self, mock_bs, mock_get): - for status in [200, 201, 301]: - mock_get.return_value = self.setup_mock_response( - f"Status {status}", status_code=status - ) - mock_bs.return_value = self.setup_mock_soup(f"Status {status}") - result = WebPageLoader().load( - SourceContent(f"https://example.com/{status}") - ) - assert result.metadata["status_code"] == status + @patch("crewai_tools.rag.loaders.xml_loader.load_from_url") + def test_load_from_url(self, mock_load_url): + mock_load_url.return_value = "URL content" + loader = XMLLoader() + source = SourceContent("https://example.com/data.xml") + result = loader.load(source) - for ctype in ["text/html", "text/plain", "application/xhtml+xml"]: - mock_get.return_value = self.setup_mock_response( - "Content", content_type=ctype - ) - mock_bs.return_value = self.setup_mock_soup("Content") - result = WebPageLoader().load(SourceContent("https://example.com")) - assert result.metadata["content_type"] == ctype + assert isinstance(result, LoaderResult) + assert "URL content" in result.content + mock_load_url.assert_called_once() + + def test_xxe_entity_expansion_blocked(self): + """Test that XML External Entity (XXE) attacks are blocked by defusedxml.""" + loader = XMLLoader() + xxe_payload = ( + '' + "' + "]>" + "&xxe;" + ) + source = SourceContent(xxe_payload) + result = loader.load(source) + + # defusedxml should block the entity expansion and raise EntitiesForbidden, + # which is a subclass of ParseError, so _parse_xml catches it and returns + # raw content with a parse_error in metadata. + assert "parse_error" in result.metadata + assert result.metadata["format"] == "xml" + # The raw payload should NOT have resolved the entity + assert "/etc/passwd" not in result.content or result.content == xxe_payload + + def test_xxe_billion_laughs_blocked(self): + """Test that XML bomb (Billion Laughs) attacks are blocked by defusedxml.""" + loader = XMLLoader() + billion_laughs = ( + '' + "' + ' ' + ' ' + "]>" + "&lol3;" + ) + source = SourceContent(billion_laughs) + result = loader.load(source) + + # defusedxml blocks entity expansion, resulting in a parse error + assert "parse_error" in result.metadata + assert result.metadata["format"] == "xml" + + def test_xxe_parameter_entity_blocked(self): + """Test that parameter entity attacks are blocked by defusedxml.""" + loader = XMLLoader() + xxe_param = ( + '' + "' + " %xxe;" + "]>" + "test" + ) + source = SourceContent(xxe_param) + result = loader.load(source) + + assert "parse_error" in result.metadata + + def test_xxe_file_from_file_blocked(self): + """Test that XXE attacks via file loading are also blocked.""" + xxe_content = ( + '' + "' + "]>" + "&xxe;" + ) + with tempfile.NamedTemporaryFile( + mode="w", suffix=".xml", delete=False, encoding="utf-8" + ) as f: + f.write(xxe_content) + temp_path = f.name + + try: + loader = XMLLoader() + source = SourceContent(temp_path) + result = loader.load(source) + + assert "parse_error" in result.metadata + assert result.metadata["format"] == "xml" + finally: + os.unlink(temp_path) + + def test_defusedxml_is_used_not_stdlib(self): + """Verify that the XMLLoader imports from defusedxml, not xml.etree.ElementTree.""" + import crewai_tools.rag.loaders.xml_loader as xml_loader_module + + assert "defusedxml" in xml_loader_module.fromstring.__module__ + + def test_arxiv_tool_uses_defusedxml(self): + """Verify that ArxivPaperTool imports from defusedxml, not xml.etree.ElementTree.""" + import crewai_tools.tools.arxiv_paper_tool.arxiv_paper_tool as arxiv_module + + ET = arxiv_module.ET + assert "defusedxml" in ET.__name__ diff --git a/lib/crewai-tools/tests/tools/arxiv_paper_tool_test.py b/lib/crewai-tools/tests/tools/arxiv_paper_tool_test.py index 9a2f0a36d..f9134b421 100644 --- a/lib/crewai-tools/tests/tools/arxiv_paper_tool_test.py +++ b/lib/crewai-tools/tests/tools/arxiv_paper_tool_test.py @@ -1,7 +1,7 @@ from pathlib import Path from unittest.mock import MagicMock, patch import urllib.error -import xml.etree.ElementTree as ET +import defusedxml.ElementTree as ET from crewai_tools import ArxivPaperTool import pytest