|
|
|
|
@@ -1,167 +1,199 @@
|
|
|
|
|
from unittest.mock import Mock, patch
|
|
|
|
|
import os
|
|
|
|
|
import tempfile
|
|
|
|
|
from unittest.mock import patch
|
|
|
|
|
|
|
|
|
|
from crewai_tools.rag.base_loader import LoaderResult
|
|
|
|
|
from crewai_tools.rag.loaders.webpage_loader import WebPageLoader
|
|
|
|
|
from crewai_tools.rag.loaders.xml_loader import XMLLoader
|
|
|
|
|
from crewai_tools.rag.source_content import SourceContent
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestWebPageLoader:
|
|
|
|
|
def setup_mock_response(self, text, status_code=200, content_type="text/html"):
|
|
|
|
|
response = Mock()
|
|
|
|
|
response.text = text
|
|
|
|
|
response.apparent_encoding = "utf-8"
|
|
|
|
|
response.status_code = status_code
|
|
|
|
|
response.headers = {"content-type": content_type}
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
def setup_mock_soup(self, text, title=None, script_style_elements=None):
|
|
|
|
|
soup = Mock()
|
|
|
|
|
soup.get_text.return_value = text
|
|
|
|
|
soup.title = Mock(string=title) if title is not None else None
|
|
|
|
|
soup.return_value = script_style_elements or []
|
|
|
|
|
return soup
|
|
|
|
|
|
|
|
|
|
@patch("requests.get")
|
|
|
|
|
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
|
|
|
|
|
def test_load_basic_webpage(self, mock_bs, mock_get):
|
|
|
|
|
mock_get.return_value = self.setup_mock_response(
|
|
|
|
|
"<html><head><title>Test Page</title></head><body><p>Test content</p></body></html>"
|
|
|
|
|
)
|
|
|
|
|
mock_bs.return_value = self.setup_mock_soup("Test content", title="Test Page")
|
|
|
|
|
|
|
|
|
|
loader = WebPageLoader()
|
|
|
|
|
result = loader.load(SourceContent("https://example.com"))
|
|
|
|
|
class TestXMLLoader:
|
|
|
|
|
def test_parse_valid_xml_string(self):
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
source = SourceContent("<root><item>Hello</item><item>World</item></root>")
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
assert isinstance(result, LoaderResult)
|
|
|
|
|
assert result.content == "Test content"
|
|
|
|
|
assert result.metadata["title"] == "Test Page"
|
|
|
|
|
assert "Hello" in result.content
|
|
|
|
|
assert "World" in result.content
|
|
|
|
|
assert result.metadata["format"] == "xml"
|
|
|
|
|
assert result.metadata["root_tag"] == "root"
|
|
|
|
|
|
|
|
|
|
@patch("requests.get")
|
|
|
|
|
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
|
|
|
|
|
def test_load_webpage_with_scripts_and_styles(self, mock_bs, mock_get):
|
|
|
|
|
html = """
|
|
|
|
|
<html><head><title>Page with Scripts</title><style>body { color: red; }</style></head>
|
|
|
|
|
<body><script>console.log('test');</script><p>Visible content</p></body></html>
|
|
|
|
|
"""
|
|
|
|
|
mock_get.return_value = self.setup_mock_response(html)
|
|
|
|
|
scripts = [Mock(), Mock()]
|
|
|
|
|
styles = [Mock()]
|
|
|
|
|
for el in scripts + styles:
|
|
|
|
|
el.decompose = Mock()
|
|
|
|
|
mock_bs.return_value = self.setup_mock_soup(
|
|
|
|
|
"Page with Scripts Visible content",
|
|
|
|
|
title="Page with Scripts",
|
|
|
|
|
script_style_elements=scripts + styles,
|
|
|
|
|
def test_parse_xml_from_file(self):
|
|
|
|
|
xml_content = "<root><title>Test Title</title><body>Test Body</body></root>"
|
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
|
|
|
mode="w", suffix=".xml", delete=False, encoding="utf-8"
|
|
|
|
|
) as f:
|
|
|
|
|
f.write(xml_content)
|
|
|
|
|
temp_path = f.name
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
source = SourceContent(temp_path)
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
assert isinstance(result, LoaderResult)
|
|
|
|
|
assert "Test Title" in result.content
|
|
|
|
|
assert "Test Body" in result.content
|
|
|
|
|
assert result.metadata["format"] == "xml"
|
|
|
|
|
assert result.metadata["root_tag"] == "root"
|
|
|
|
|
finally:
|
|
|
|
|
os.unlink(temp_path)
|
|
|
|
|
|
|
|
|
|
def test_parse_invalid_xml_returns_raw_content(self):
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
invalid_xml = "<root><unclosed>"
|
|
|
|
|
source = SourceContent(invalid_xml)
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
assert isinstance(result, LoaderResult)
|
|
|
|
|
assert result.content == invalid_xml
|
|
|
|
|
assert "parse_error" in result.metadata
|
|
|
|
|
assert result.metadata["format"] == "xml"
|
|
|
|
|
|
|
|
|
|
def test_parse_nested_xml(self):
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
xml = (
|
|
|
|
|
"<root>"
|
|
|
|
|
"<parent><child>Nested text</child></parent>"
|
|
|
|
|
"<sibling>Other text</sibling>"
|
|
|
|
|
"</root>"
|
|
|
|
|
)
|
|
|
|
|
source = SourceContent(xml)
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
loader = WebPageLoader()
|
|
|
|
|
result = loader.load(SourceContent("https://example.com/with-scripts"))
|
|
|
|
|
assert "Nested text" in result.content
|
|
|
|
|
assert "Other text" in result.content
|
|
|
|
|
|
|
|
|
|
assert "Visible content" in result.content
|
|
|
|
|
for el in scripts + styles:
|
|
|
|
|
el.decompose.assert_called_once()
|
|
|
|
|
def test_parse_xml_with_attributes(self):
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
xml = '<root><item id="1">First</item><item id="2">Second</item></root>'
|
|
|
|
|
source = SourceContent(xml)
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
@patch("requests.get")
|
|
|
|
|
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
|
|
|
|
|
def test_text_cleaning_and_title_handling(self, mock_bs, mock_get):
|
|
|
|
|
mock_get.return_value = self.setup_mock_response(
|
|
|
|
|
"<html><body><p> Messy text </p></body></html>"
|
|
|
|
|
)
|
|
|
|
|
mock_bs.return_value = self.setup_mock_soup(
|
|
|
|
|
"Text with extra spaces\n\n More\t text \n\n", title=None
|
|
|
|
|
)
|
|
|
|
|
assert "First" in result.content
|
|
|
|
|
assert "Second" in result.content
|
|
|
|
|
assert result.metadata["root_tag"] == "root"
|
|
|
|
|
|
|
|
|
|
loader = WebPageLoader()
|
|
|
|
|
result = loader.load(SourceContent("https://example.com/messy-text"))
|
|
|
|
|
assert result.content is not None
|
|
|
|
|
assert result.metadata["title"] == ""
|
|
|
|
|
def test_parse_empty_xml_elements(self):
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
xml = "<root><empty></empty><filled>Content</filled></root>"
|
|
|
|
|
source = SourceContent(xml)
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
@patch("requests.get")
|
|
|
|
|
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
|
|
|
|
|
def test_empty_or_missing_title(self, mock_bs, mock_get):
|
|
|
|
|
for title in [None, ""]:
|
|
|
|
|
mock_get.return_value = self.setup_mock_response(
|
|
|
|
|
"<html><head><title></title></head><body>Content</body></html>"
|
|
|
|
|
)
|
|
|
|
|
mock_bs.return_value = self.setup_mock_soup("Content", title=title)
|
|
|
|
|
assert "Content" in result.content
|
|
|
|
|
assert result.metadata["format"] == "xml"
|
|
|
|
|
|
|
|
|
|
loader = WebPageLoader()
|
|
|
|
|
result = loader.load(SourceContent("https://example.com"))
|
|
|
|
|
assert result.metadata["title"] == ""
|
|
|
|
|
|
|
|
|
|
@patch("requests.get")
|
|
|
|
|
def test_custom_and_default_headers(self, mock_get):
|
|
|
|
|
mock_get.return_value = self.setup_mock_response(
|
|
|
|
|
"<html><body>Test</body></html>"
|
|
|
|
|
)
|
|
|
|
|
custom_headers = {
|
|
|
|
|
"User-Agent": "Bot",
|
|
|
|
|
"Authorization": "Bearer xyz",
|
|
|
|
|
"Accept": "text/html",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
with patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup") as mock_bs:
|
|
|
|
|
mock_bs.return_value = self.setup_mock_soup("Test")
|
|
|
|
|
WebPageLoader().load(
|
|
|
|
|
SourceContent("https://example.com"), headers=custom_headers
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert mock_get.call_args[1]["headers"] == custom_headers
|
|
|
|
|
|
|
|
|
|
@patch("requests.get")
|
|
|
|
|
def test_error_handling(self, mock_get):
|
|
|
|
|
for error in [Exception("Fail"), ValueError("Bad"), ImportError("Oops")]:
|
|
|
|
|
mock_get.side_effect = error
|
|
|
|
|
with pytest.raises(ValueError, match="Error loading webpage"):
|
|
|
|
|
WebPageLoader().load(SourceContent("https://example.com"))
|
|
|
|
|
|
|
|
|
|
@patch("requests.get")
|
|
|
|
|
def test_timeout_and_http_error(self, mock_get):
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
|
|
mock_get.side_effect = requests.Timeout("Timeout")
|
|
|
|
|
with pytest.raises(ValueError):
|
|
|
|
|
WebPageLoader().load(SourceContent("https://example.com"))
|
|
|
|
|
|
|
|
|
|
mock_response = Mock()
|
|
|
|
|
mock_response.raise_for_status.side_effect = requests.HTTPError("404")
|
|
|
|
|
mock_get.side_effect = None
|
|
|
|
|
mock_get.return_value = mock_response
|
|
|
|
|
with pytest.raises(ValueError):
|
|
|
|
|
WebPageLoader().load(SourceContent("https://example.com/404"))
|
|
|
|
|
|
|
|
|
|
@patch("requests.get")
|
|
|
|
|
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
|
|
|
|
|
def test_doc_id_consistency(self, mock_bs, mock_get):
|
|
|
|
|
mock_get.return_value = self.setup_mock_response(
|
|
|
|
|
"<html><body>Doc</body></html>"
|
|
|
|
|
)
|
|
|
|
|
mock_bs.return_value = self.setup_mock_soup("Doc")
|
|
|
|
|
|
|
|
|
|
loader = WebPageLoader()
|
|
|
|
|
result1 = loader.load(SourceContent("https://example.com"))
|
|
|
|
|
result2 = loader.load(SourceContent("https://example.com"))
|
|
|
|
|
def test_doc_id_consistency(self):
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
xml = "<root><item>Consistent</item></root>"
|
|
|
|
|
source = SourceContent(xml)
|
|
|
|
|
result1 = loader.load(source)
|
|
|
|
|
result2 = loader.load(source)
|
|
|
|
|
|
|
|
|
|
assert result1.doc_id == result2.doc_id
|
|
|
|
|
|
|
|
|
|
@patch("requests.get")
|
|
|
|
|
@patch("crewai_tools.rag.loaders.webpage_loader.BeautifulSoup")
|
|
|
|
|
def test_status_code_and_content_type(self, mock_bs, mock_get):
|
|
|
|
|
for status in [200, 201, 301]:
|
|
|
|
|
mock_get.return_value = self.setup_mock_response(
|
|
|
|
|
f"<html><body>Status {status}</body></html>", status_code=status
|
|
|
|
|
)
|
|
|
|
|
mock_bs.return_value = self.setup_mock_soup(f"Status {status}")
|
|
|
|
|
result = WebPageLoader().load(
|
|
|
|
|
SourceContent(f"https://example.com/{status}")
|
|
|
|
|
)
|
|
|
|
|
assert result.metadata["status_code"] == status
|
|
|
|
|
@patch("crewai_tools.rag.loaders.xml_loader.load_from_url")
|
|
|
|
|
def test_load_from_url(self, mock_load_url):
|
|
|
|
|
mock_load_url.return_value = "<root><data>URL content</data></root>"
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
source = SourceContent("https://example.com/data.xml")
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
for ctype in ["text/html", "text/plain", "application/xhtml+xml"]:
|
|
|
|
|
mock_get.return_value = self.setup_mock_response(
|
|
|
|
|
"<html><body>Content</body></html>", content_type=ctype
|
|
|
|
|
)
|
|
|
|
|
mock_bs.return_value = self.setup_mock_soup("Content")
|
|
|
|
|
result = WebPageLoader().load(SourceContent("https://example.com"))
|
|
|
|
|
assert result.metadata["content_type"] == ctype
|
|
|
|
|
assert isinstance(result, LoaderResult)
|
|
|
|
|
assert "URL content" in result.content
|
|
|
|
|
mock_load_url.assert_called_once()
|
|
|
|
|
|
|
|
|
|
def test_xxe_entity_expansion_blocked(self):
|
|
|
|
|
"""Test that XML External Entity (XXE) attacks are blocked by defusedxml."""
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
xxe_payload = (
|
|
|
|
|
'<?xml version="1.0"?>'
|
|
|
|
|
"<!DOCTYPE foo ["
|
|
|
|
|
' <!ENTITY xxe SYSTEM "file:///etc/passwd">'
|
|
|
|
|
"]>"
|
|
|
|
|
"<root>&xxe;</root>"
|
|
|
|
|
)
|
|
|
|
|
source = SourceContent(xxe_payload)
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
# defusedxml should block the entity expansion and raise EntitiesForbidden,
|
|
|
|
|
# which is a subclass of ParseError, so _parse_xml catches it and returns
|
|
|
|
|
# raw content with a parse_error in metadata.
|
|
|
|
|
assert "parse_error" in result.metadata
|
|
|
|
|
assert result.metadata["format"] == "xml"
|
|
|
|
|
# The raw payload should NOT have resolved the entity
|
|
|
|
|
assert "/etc/passwd" not in result.content or result.content == xxe_payload
|
|
|
|
|
|
|
|
|
|
def test_xxe_billion_laughs_blocked(self):
|
|
|
|
|
"""Test that XML bomb (Billion Laughs) attacks are blocked by defusedxml."""
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
billion_laughs = (
|
|
|
|
|
'<?xml version="1.0"?>'
|
|
|
|
|
"<!DOCTYPE lolz ["
|
|
|
|
|
' <!ENTITY lol "lol">'
|
|
|
|
|
' <!ENTITY lol2 "&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;&lol;">'
|
|
|
|
|
' <!ENTITY lol3 "&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;&lol2;">'
|
|
|
|
|
"]>"
|
|
|
|
|
"<root>&lol3;</root>"
|
|
|
|
|
)
|
|
|
|
|
source = SourceContent(billion_laughs)
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
# defusedxml blocks entity expansion, resulting in a parse error
|
|
|
|
|
assert "parse_error" in result.metadata
|
|
|
|
|
assert result.metadata["format"] == "xml"
|
|
|
|
|
|
|
|
|
|
def test_xxe_parameter_entity_blocked(self):
|
|
|
|
|
"""Test that parameter entity attacks are blocked by defusedxml."""
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
xxe_param = (
|
|
|
|
|
'<?xml version="1.0"?>'
|
|
|
|
|
"<!DOCTYPE foo ["
|
|
|
|
|
' <!ENTITY % xxe SYSTEM "file:///etc/passwd">'
|
|
|
|
|
" %xxe;"
|
|
|
|
|
"]>"
|
|
|
|
|
"<root>test</root>"
|
|
|
|
|
)
|
|
|
|
|
source = SourceContent(xxe_param)
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
assert "parse_error" in result.metadata
|
|
|
|
|
|
|
|
|
|
def test_xxe_file_from_file_blocked(self):
|
|
|
|
|
"""Test that XXE attacks via file loading are also blocked."""
|
|
|
|
|
xxe_content = (
|
|
|
|
|
'<?xml version="1.0"?>'
|
|
|
|
|
"<!DOCTYPE foo ["
|
|
|
|
|
' <!ENTITY xxe SYSTEM "file:///etc/passwd">'
|
|
|
|
|
"]>"
|
|
|
|
|
"<root>&xxe;</root>"
|
|
|
|
|
)
|
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
|
|
|
mode="w", suffix=".xml", delete=False, encoding="utf-8"
|
|
|
|
|
) as f:
|
|
|
|
|
f.write(xxe_content)
|
|
|
|
|
temp_path = f.name
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
loader = XMLLoader()
|
|
|
|
|
source = SourceContent(temp_path)
|
|
|
|
|
result = loader.load(source)
|
|
|
|
|
|
|
|
|
|
assert "parse_error" in result.metadata
|
|
|
|
|
assert result.metadata["format"] == "xml"
|
|
|
|
|
finally:
|
|
|
|
|
os.unlink(temp_path)
|
|
|
|
|
|
|
|
|
|
def test_defusedxml_is_used_not_stdlib(self):
|
|
|
|
|
"""Verify that the XMLLoader imports from defusedxml, not xml.etree.ElementTree."""
|
|
|
|
|
import crewai_tools.rag.loaders.xml_loader as xml_loader_module
|
|
|
|
|
|
|
|
|
|
assert "defusedxml" in xml_loader_module.fromstring.__module__
|
|
|
|
|
|
|
|
|
|
def test_arxiv_tool_uses_defusedxml(self):
|
|
|
|
|
"""Verify that ArxivPaperTool imports from defusedxml, not xml.etree.ElementTree."""
|
|
|
|
|
import crewai_tools.tools.arxiv_paper_tool.arxiv_paper_tool as arxiv_module
|
|
|
|
|
|
|
|
|
|
assert "defusedxml" in arxiv_module.fromstring.__module__
|
|
|
|
|
|