mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-04 13:48:31 +00:00
Compare commits
4 Commits
devin/1765
...
devin/1764
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8437e1cdea | ||
|
|
5110817709 | ||
|
|
cff9cfb665 | ||
|
|
29d40de651 |
@@ -324,14 +324,25 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
||||
Returns:
|
||||
Updated action or final answer.
|
||||
"""
|
||||
# Special case for add_image_tool
|
||||
# Special case for add_image_tool - the result is already a properly
|
||||
# formatted multimodal message dict with role and content
|
||||
add_image_tool = self._i18n.tools("add_image")
|
||||
if (
|
||||
isinstance(add_image_tool, dict)
|
||||
and formatted_answer.tool.casefold().strip()
|
||||
== add_image_tool.get("name", "").casefold().strip()
|
||||
):
|
||||
self.messages.append({"role": "assistant", "content": tool_result.result})
|
||||
# tool_result.result is a dict like {"role": "user", "content": [...]}
|
||||
# Append it directly without wrapping
|
||||
result = tool_result.result
|
||||
if isinstance(result, dict) and "role" in result and "content" in result:
|
||||
self.messages.append(result)
|
||||
elif isinstance(result, list):
|
||||
# If it's just the content list, wrap it with user role
|
||||
self.messages.append({"role": "user", "content": result})
|
||||
else:
|
||||
# Fallback: wrap as user message
|
||||
self.messages.append({"role": "user", "content": str(result)})
|
||||
return formatted_answer
|
||||
|
||||
return handle_agent_action_core(
|
||||
|
||||
@@ -448,6 +448,7 @@ class GeminiCompletion(BaseLLM):
|
||||
- System messages are separate system_instruction
|
||||
- Content is organized as Content objects with Parts
|
||||
- Roles are 'user' and 'model' (not 'assistant')
|
||||
- Multimodal content with image_url must be converted to Gemini's format
|
||||
|
||||
Args:
|
||||
messages: Input messages
|
||||
@@ -465,17 +466,18 @@ class GeminiCompletion(BaseLLM):
|
||||
role = message["role"]
|
||||
content = message["content"]
|
||||
|
||||
# Convert content to string if it's a list
|
||||
if isinstance(content, list):
|
||||
text_content = " ".join(
|
||||
str(item.get("text", "")) if isinstance(item, dict) else str(item)
|
||||
for item in content
|
||||
)
|
||||
else:
|
||||
text_content = str(content) if content else ""
|
||||
|
||||
if role == "system":
|
||||
# Extract system instruction - Gemini handles it separately
|
||||
if isinstance(content, list):
|
||||
text_content = " ".join(
|
||||
str(item.get("text", ""))
|
||||
if isinstance(item, dict)
|
||||
else str(item)
|
||||
for item in content
|
||||
)
|
||||
else:
|
||||
text_content = str(content) if content else ""
|
||||
|
||||
if system_instruction:
|
||||
system_instruction += f"\n\n{text_content}"
|
||||
else:
|
||||
@@ -484,14 +486,190 @@ class GeminiCompletion(BaseLLM):
|
||||
# Convert role for Gemini (assistant -> model)
|
||||
gemini_role = "model" if role == "assistant" else "user"
|
||||
|
||||
# Convert content to Gemini Parts
|
||||
parts = self._convert_content_to_parts(content)
|
||||
|
||||
# Create Content object
|
||||
gemini_content = types.Content(
|
||||
role=gemini_role, parts=[types.Part.from_text(text=text_content)]
|
||||
)
|
||||
gemini_content = types.Content(role=gemini_role, parts=parts)
|
||||
contents.append(gemini_content)
|
||||
|
||||
return contents, system_instruction
|
||||
|
||||
def _convert_content_to_parts(self, content: Any) -> list[types.Part]:
|
||||
"""Convert message content to Gemini Parts.
|
||||
|
||||
Handles both simple text content and multimodal content with image_url.
|
||||
|
||||
Args:
|
||||
content: Message content (string or list of content items)
|
||||
|
||||
Returns:
|
||||
List of Gemini Part objects
|
||||
"""
|
||||
if not isinstance(content, list):
|
||||
# Simple text content
|
||||
text_content = str(content) if content else ""
|
||||
return [types.Part.from_text(text=text_content)]
|
||||
|
||||
parts: list[types.Part] = []
|
||||
for item in content:
|
||||
if isinstance(item, dict):
|
||||
item_type = item.get("type", "")
|
||||
if item_type == "text":
|
||||
text = item.get("text", "")
|
||||
if text:
|
||||
parts.append(types.Part.from_text(text=str(text)))
|
||||
elif item_type == "image_url":
|
||||
# Handle image_url format from OpenAI-compatible APIs
|
||||
image_url_data = item.get("image_url", {})
|
||||
url = image_url_data.get("url", "") if isinstance(
|
||||
image_url_data, dict
|
||||
) else str(image_url_data)
|
||||
if url:
|
||||
image_part = self._create_image_part_from_url(url)
|
||||
if image_part:
|
||||
parts.append(image_part)
|
||||
else:
|
||||
# Unknown type, convert to text
|
||||
parts.append(types.Part.from_text(text=str(item)))
|
||||
else:
|
||||
# Non-dict item, convert to text
|
||||
parts.append(types.Part.from_text(text=str(item)))
|
||||
|
||||
# Ensure at least one part exists
|
||||
if not parts:
|
||||
parts.append(types.Part.from_text(text=""))
|
||||
|
||||
return parts
|
||||
|
||||
def _create_image_part_from_url(self, url: str) -> types.Part | None:
|
||||
"""Create a Gemini Part from an image URL.
|
||||
|
||||
Handles:
|
||||
- HTTP(S) URLs: Fetches the image and converts to inline_data
|
||||
- Data URLs: Parses and extracts base64 data
|
||||
- Local file paths: Reads file and converts to inline_data
|
||||
|
||||
Args:
|
||||
url: Image URL, data URL, or local file path
|
||||
|
||||
Returns:
|
||||
Gemini Part object or None if conversion fails
|
||||
"""
|
||||
|
||||
try:
|
||||
if url.startswith("data:"):
|
||||
# Handle data URL (e.g., data:image/png;base64,...)
|
||||
return self._parse_data_url(url)
|
||||
if url.startswith(("http://", "https://")):
|
||||
# Handle HTTP(S) URL - fetch and convert to inline_data
|
||||
return self._fetch_image_from_url(url)
|
||||
# Handle local file path
|
||||
return self._read_local_image(url)
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to create image part from URL '{url}': {e}")
|
||||
return None
|
||||
|
||||
def _parse_data_url(self, data_url: str) -> types.Part | None:
|
||||
"""Parse a data URL and create a Gemini Part.
|
||||
|
||||
Args:
|
||||
data_url: Data URL (e.g., data:image/png;base64,...)
|
||||
|
||||
Returns:
|
||||
Gemini Part object or None if parsing fails
|
||||
"""
|
||||
import base64
|
||||
|
||||
try:
|
||||
# Parse data URL: data:[<mediatype>][;base64],<data>
|
||||
if not data_url.startswith("data:"):
|
||||
return None
|
||||
|
||||
# Split header and data
|
||||
header_end = data_url.find(",")
|
||||
if header_end == -1:
|
||||
return None
|
||||
|
||||
header = data_url[5:header_end] # Skip "data:"
|
||||
data = data_url[header_end + 1:]
|
||||
|
||||
# Parse mime type and encoding
|
||||
parts = header.split(";")
|
||||
mime_type = parts[0] if parts else "application/octet-stream"
|
||||
is_base64 = "base64" in parts
|
||||
|
||||
if is_base64:
|
||||
image_bytes = base64.b64decode(data)
|
||||
else:
|
||||
image_bytes = data.encode("utf-8")
|
||||
|
||||
return types.Part.from_bytes(data=image_bytes, mime_type=mime_type)
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to parse data URL: {e}")
|
||||
return None
|
||||
|
||||
def _fetch_image_from_url(self, url: str) -> types.Part | None:
|
||||
"""Fetch an image from a URL and create a Gemini Part.
|
||||
|
||||
Args:
|
||||
url: HTTP(S) URL of the image
|
||||
|
||||
Returns:
|
||||
Gemini Part object or None if fetching fails
|
||||
"""
|
||||
import mimetypes
|
||||
import urllib.request
|
||||
|
||||
try:
|
||||
# Fetch the image with a timeout
|
||||
# URL scheme is validated in _create_image_part_from_url before calling this method
|
||||
with urllib.request.urlopen(url, timeout=30) as response: # noqa: S310
|
||||
image_bytes = response.read()
|
||||
content_type = response.headers.get("Content-Type", "")
|
||||
|
||||
# Extract mime type from Content-Type header
|
||||
if content_type:
|
||||
mime_type = content_type.split(";")[0].strip()
|
||||
else:
|
||||
# Guess from URL extension
|
||||
mime_type, _ = mimetypes.guess_type(url)
|
||||
mime_type = mime_type or "image/jpeg"
|
||||
|
||||
return types.Part.from_bytes(data=image_bytes, mime_type=mime_type)
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to fetch image from URL '{url}': {e}")
|
||||
return None
|
||||
|
||||
def _read_local_image(self, file_path: str) -> types.Part | None:
|
||||
"""Read a local image file and create a Gemini Part.
|
||||
|
||||
Args:
|
||||
file_path: Path to the local image file
|
||||
|
||||
Returns:
|
||||
Gemini Part object or None if reading fails
|
||||
"""
|
||||
import mimetypes
|
||||
import os
|
||||
|
||||
try:
|
||||
if not os.path.exists(file_path):
|
||||
logging.warning(f"Image file not found: {file_path}")
|
||||
return None
|
||||
|
||||
# Guess mime type from file extension
|
||||
mime_type, _ = mimetypes.guess_type(file_path)
|
||||
mime_type = mime_type or "image/jpeg"
|
||||
|
||||
with open(file_path, "rb") as f:
|
||||
image_bytes = f.read()
|
||||
|
||||
return types.Part.from_bytes(data=image_bytes, mime_type=mime_type)
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to read local image '{file_path}': {e}")
|
||||
return None
|
||||
|
||||
def _handle_completion(
|
||||
self,
|
||||
contents: list[types.Content],
|
||||
|
||||
@@ -313,7 +313,24 @@ class ToolUsage:
|
||||
tool_name=tool.name,
|
||||
attempts=self._run_attempts,
|
||||
)
|
||||
result = self._format_result(result=result)
|
||||
|
||||
# Always increment tool usage counter (this is normally done in _format_result)
|
||||
if self.task:
|
||||
self.task.used_tools += 1
|
||||
|
||||
# For add_image tool, preserve the raw dict result for multimodal handling
|
||||
# The result is a dict like {"role": "user", "content": [...]} that should
|
||||
# not be stringified. We skip format reminders for add_image to avoid
|
||||
# in-place mutation of cached results.
|
||||
add_image_tool = self._i18n.tools("add_image")
|
||||
is_add_image_tool = (
|
||||
isinstance(add_image_tool, dict)
|
||||
and tool.name.casefold().strip()
|
||||
== add_image_tool.get("name", "").casefold().strip()
|
||||
)
|
||||
if not is_add_image_tool:
|
||||
result = self._format_result(result=result, skip_counter=True)
|
||||
|
||||
data = {
|
||||
"result": result,
|
||||
"tool_name": tool.name,
|
||||
@@ -351,8 +368,8 @@ class ToolUsage:
|
||||
|
||||
return result
|
||||
|
||||
def _format_result(self, result: Any) -> str:
|
||||
if self.task:
|
||||
def _format_result(self, result: Any, skip_counter: bool = False) -> str:
|
||||
if self.task and not skip_counter:
|
||||
self.task.used_tools += 1
|
||||
if self._should_remember_format():
|
||||
result = self._remember_format(result=result)
|
||||
|
||||
488
lib/crewai/tests/test_multimodal_image_handling.py
Normal file
488
lib/crewai/tests/test_multimodal_image_handling.py
Normal file
@@ -0,0 +1,488 @@
|
||||
"""Tests for multimodal image handling fixes (Issue #4016).
|
||||
|
||||
This module tests the fixes for proper multimodal image handling in CrewAI:
|
||||
1. CrewAgentExecutor._handle_agent_action properly appends multimodal messages
|
||||
2. ToolUsage.use preserves raw dict result for add_image tool
|
||||
3. Gemini provider properly converts image_url to Gemini's format
|
||||
"""
|
||||
|
||||
import base64
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestCrewAgentExecutorMultimodal:
|
||||
"""Tests for CrewAgentExecutor._handle_agent_action with multimodal content."""
|
||||
|
||||
def test_handle_agent_action_with_add_image_tool_dict_result(self):
|
||||
"""Test that add_image tool result dict is appended directly without wrapping."""
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
from crewai.agents.parser import AgentAction
|
||||
from crewai.tools.tool_usage import ToolResult
|
||||
|
||||
# Create a mock executor
|
||||
mock_llm = MagicMock()
|
||||
mock_task = MagicMock()
|
||||
mock_crew = MagicMock()
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.i18n = MagicMock()
|
||||
mock_agent.i18n.tools.return_value = {"name": "Add image to content"}
|
||||
|
||||
executor = CrewAgentExecutor(
|
||||
llm=mock_llm,
|
||||
task=mock_task,
|
||||
crew=mock_crew,
|
||||
agent=mock_agent,
|
||||
prompt="test prompt",
|
||||
tools=[],
|
||||
original_tools=[],
|
||||
max_iter=1,
|
||||
)
|
||||
executor.messages = []
|
||||
executor._i18n = mock_agent.i18n
|
||||
|
||||
# Create a mock add_image tool result (dict with role and content)
|
||||
multimodal_result = {
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": "Analyze this image"},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": "https://example.com/image.jpg"},
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
formatted_answer = AgentAction(
|
||||
tool="Add image to content",
|
||||
tool_input='{"image_url": "https://example.com/image.jpg"}',
|
||||
text="Using add image tool",
|
||||
thought="I need to add an image",
|
||||
result="",
|
||||
)
|
||||
|
||||
tool_result = ToolResult(result=multimodal_result)
|
||||
|
||||
# Call _handle_agent_action
|
||||
result = executor._handle_agent_action(formatted_answer, tool_result)
|
||||
|
||||
# Verify the message was appended correctly (not double-wrapped)
|
||||
assert len(executor.messages) == 1
|
||||
appended_message = executor.messages[0]
|
||||
|
||||
# The message should be the dict directly, not wrapped
|
||||
assert appended_message["role"] == "user"
|
||||
assert isinstance(appended_message["content"], list)
|
||||
assert len(appended_message["content"]) == 2
|
||||
assert appended_message["content"][0]["type"] == "text"
|
||||
assert appended_message["content"][1]["type"] == "image_url"
|
||||
|
||||
def test_handle_agent_action_with_add_image_tool_list_result(self):
|
||||
"""Test that add_image tool result list is wrapped with user role."""
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
from crewai.agents.parser import AgentAction
|
||||
from crewai.tools.tool_usage import ToolResult
|
||||
|
||||
# Create a mock executor
|
||||
mock_llm = MagicMock()
|
||||
mock_task = MagicMock()
|
||||
mock_crew = MagicMock()
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.i18n = MagicMock()
|
||||
mock_agent.i18n.tools.return_value = {"name": "Add image to content"}
|
||||
|
||||
executor = CrewAgentExecutor(
|
||||
llm=mock_llm,
|
||||
task=mock_task,
|
||||
crew=mock_crew,
|
||||
agent=mock_agent,
|
||||
prompt="test prompt",
|
||||
tools=[],
|
||||
original_tools=[],
|
||||
max_iter=1,
|
||||
)
|
||||
executor.messages = []
|
||||
executor._i18n = mock_agent.i18n
|
||||
|
||||
# Create a mock add_image tool result (list without role)
|
||||
content_list = [
|
||||
{"type": "text", "text": "Analyze this image"},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": "https://example.com/image.jpg"},
|
||||
},
|
||||
]
|
||||
|
||||
formatted_answer = AgentAction(
|
||||
tool="Add image to content",
|
||||
tool_input='{"image_url": "https://example.com/image.jpg"}',
|
||||
text="Using add image tool",
|
||||
thought="I need to add an image",
|
||||
result="",
|
||||
)
|
||||
|
||||
tool_result = ToolResult(result=content_list)
|
||||
|
||||
# Call _handle_agent_action
|
||||
result = executor._handle_agent_action(formatted_answer, tool_result)
|
||||
|
||||
# Verify the message was wrapped with user role
|
||||
assert len(executor.messages) == 1
|
||||
appended_message = executor.messages[0]
|
||||
|
||||
assert appended_message["role"] == "user"
|
||||
assert appended_message["content"] == content_list
|
||||
|
||||
|
||||
class TestToolUsageMultimodal:
|
||||
"""Tests for ToolUsage.use with add_image tool."""
|
||||
|
||||
def test_tool_usage_preserves_add_image_dict_result(self):
|
||||
"""Test that add_image tool result is not stringified."""
|
||||
from crewai.tools.tool_usage import ToolUsage
|
||||
|
||||
# Create mock components
|
||||
mock_tools_handler = MagicMock()
|
||||
mock_tools_handler.cache = None
|
||||
mock_task = MagicMock()
|
||||
mock_task.used_tools = 0
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.key = "test_key"
|
||||
mock_agent.role = "test_role"
|
||||
mock_agent.verbose = False
|
||||
mock_agent.fingerprint = None
|
||||
mock_agent.tools_results = []
|
||||
mock_action = MagicMock()
|
||||
mock_action.tool = "Add image to content"
|
||||
mock_action.tool_input = '{"image_url": "https://example.com/image.jpg"}'
|
||||
|
||||
# Mock i18n
|
||||
mock_i18n = MagicMock()
|
||||
mock_i18n.tools.return_value = {"name": "Add image to content"}
|
||||
|
||||
# Create a mock add_image tool
|
||||
mock_tool = MagicMock()
|
||||
mock_tool.name = "Add image to content"
|
||||
mock_tool.args_schema = MagicMock()
|
||||
mock_tool.args_schema.model_json_schema.return_value = {
|
||||
"properties": {"image_url": {}, "action": {}}
|
||||
}
|
||||
|
||||
# The tool returns a dict with role and content
|
||||
multimodal_result = {
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": "Analyze this image"},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": "https://example.com/image.jpg"},
|
||||
},
|
||||
],
|
||||
}
|
||||
mock_tool.invoke.return_value = multimodal_result
|
||||
|
||||
tool_usage = ToolUsage(
|
||||
tools_handler=mock_tools_handler,
|
||||
tools=[mock_tool],
|
||||
task=mock_task,
|
||||
function_calling_llm=MagicMock(),
|
||||
agent=mock_agent,
|
||||
action=mock_action,
|
||||
)
|
||||
tool_usage._i18n = mock_i18n
|
||||
tool_usage._telemetry = MagicMock()
|
||||
|
||||
# Create a mock calling object
|
||||
mock_calling = MagicMock()
|
||||
mock_calling.tool_name = "Add image to content"
|
||||
mock_calling.arguments = {"image_url": "https://example.com/image.jpg"}
|
||||
|
||||
# Call _use directly
|
||||
result = tool_usage._use(
|
||||
tool_string="test",
|
||||
tool=mock_tool,
|
||||
calling=mock_calling,
|
||||
)
|
||||
|
||||
# The result should be the dict, not a string
|
||||
assert isinstance(result, dict)
|
||||
assert result["role"] == "user"
|
||||
assert isinstance(result["content"], list)
|
||||
|
||||
|
||||
class TestGeminiMultimodalFormatting:
|
||||
"""Tests for Gemini provider's multimodal content handling."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_gemini_types(self):
|
||||
"""Create mock Gemini types for testing."""
|
||||
mock_types = MagicMock()
|
||||
mock_part = MagicMock()
|
||||
mock_content = MagicMock()
|
||||
|
||||
# Mock Part.from_text
|
||||
mock_types.Part.from_text.return_value = mock_part
|
||||
|
||||
# Mock Part.from_bytes
|
||||
mock_types.Part.from_bytes.return_value = mock_part
|
||||
|
||||
# Mock Content
|
||||
mock_types.Content.return_value = mock_content
|
||||
|
||||
return mock_types
|
||||
|
||||
def test_convert_content_to_parts_simple_text(self):
|
||||
"""Test converting simple text content to Gemini Parts."""
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"google": MagicMock(),
|
||||
"google.genai": MagicMock(),
|
||||
"google.genai.types": MagicMock(),
|
||||
"google.genai.errors": MagicMock(),
|
||||
},
|
||||
):
|
||||
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||
|
||||
# Create a mock instance
|
||||
completion = MagicMock(spec=GeminiCompletion)
|
||||
completion._convert_content_to_parts = (
|
||||
GeminiCompletion._convert_content_to_parts.__get__(
|
||||
completion, GeminiCompletion
|
||||
)
|
||||
)
|
||||
|
||||
# Mock types.Part.from_text
|
||||
mock_part = MagicMock()
|
||||
with patch(
|
||||
"crewai.llms.providers.gemini.completion.types.Part.from_text",
|
||||
return_value=mock_part,
|
||||
):
|
||||
result = completion._convert_content_to_parts("Hello, world!")
|
||||
|
||||
assert len(result) == 1
|
||||
|
||||
def test_convert_content_to_parts_multimodal_with_image_url(self):
|
||||
"""Test converting multimodal content with image_url to Gemini Parts."""
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"google": MagicMock(),
|
||||
"google.genai": MagicMock(),
|
||||
"google.genai.types": MagicMock(),
|
||||
"google.genai.errors": MagicMock(),
|
||||
},
|
||||
):
|
||||
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||
|
||||
# Create a mock instance
|
||||
completion = MagicMock(spec=GeminiCompletion)
|
||||
completion._convert_content_to_parts = (
|
||||
GeminiCompletion._convert_content_to_parts.__get__(
|
||||
completion, GeminiCompletion
|
||||
)
|
||||
)
|
||||
completion._create_image_part_from_url = MagicMock(
|
||||
return_value=MagicMock()
|
||||
)
|
||||
|
||||
# Mock types.Part.from_text
|
||||
mock_text_part = MagicMock()
|
||||
with patch(
|
||||
"crewai.llms.providers.gemini.completion.types.Part.from_text",
|
||||
return_value=mock_text_part,
|
||||
):
|
||||
multimodal_content = [
|
||||
{"type": "text", "text": "Analyze this image"},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": "https://example.com/image.jpg"},
|
||||
},
|
||||
]
|
||||
|
||||
result = completion._convert_content_to_parts(multimodal_content)
|
||||
|
||||
# Should have called _create_image_part_from_url
|
||||
completion._create_image_part_from_url.assert_called_once_with(
|
||||
"https://example.com/image.jpg"
|
||||
)
|
||||
|
||||
def test_parse_data_url(self):
|
||||
"""Test parsing data URLs."""
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"google": MagicMock(),
|
||||
"google.genai": MagicMock(),
|
||||
"google.genai.types": MagicMock(),
|
||||
"google.genai.errors": MagicMock(),
|
||||
},
|
||||
):
|
||||
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||
|
||||
# Create a mock instance
|
||||
completion = MagicMock(spec=GeminiCompletion)
|
||||
completion._parse_data_url = GeminiCompletion._parse_data_url.__get__(
|
||||
completion, GeminiCompletion
|
||||
)
|
||||
|
||||
# Create a test data URL
|
||||
test_data = b"test image data"
|
||||
base64_data = base64.b64encode(test_data).decode("utf-8")
|
||||
data_url = f"data:image/png;base64,{base64_data}"
|
||||
|
||||
# Mock types.Part.from_bytes
|
||||
mock_part = MagicMock()
|
||||
with patch(
|
||||
"crewai.llms.providers.gemini.completion.types.Part.from_bytes",
|
||||
return_value=mock_part,
|
||||
) as mock_from_bytes:
|
||||
result = completion._parse_data_url(data_url)
|
||||
|
||||
# Verify from_bytes was called with correct arguments
|
||||
mock_from_bytes.assert_called_once()
|
||||
call_args = mock_from_bytes.call_args
|
||||
assert call_args.kwargs["mime_type"] == "image/png"
|
||||
assert call_args.kwargs["data"] == test_data
|
||||
|
||||
def test_fetch_image_from_url(self):
|
||||
"""Test fetching images from HTTP URLs."""
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"google": MagicMock(),
|
||||
"google.genai": MagicMock(),
|
||||
"google.genai.types": MagicMock(),
|
||||
"google.genai.errors": MagicMock(),
|
||||
},
|
||||
):
|
||||
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||
|
||||
# Create a mock instance
|
||||
completion = MagicMock(spec=GeminiCompletion)
|
||||
completion._fetch_image_from_url = (
|
||||
GeminiCompletion._fetch_image_from_url.__get__(
|
||||
completion, GeminiCompletion
|
||||
)
|
||||
)
|
||||
|
||||
# Mock urllib.request.urlopen
|
||||
mock_response = MagicMock()
|
||||
mock_response.read.return_value = b"fake image data"
|
||||
mock_response.headers.get.return_value = "image/jpeg"
|
||||
mock_response.__enter__ = MagicMock(return_value=mock_response)
|
||||
mock_response.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
mock_part = MagicMock()
|
||||
with (
|
||||
patch("urllib.request.urlopen", return_value=mock_response),
|
||||
patch(
|
||||
"crewai.llms.providers.gemini.completion.types.Part.from_bytes",
|
||||
return_value=mock_part,
|
||||
) as mock_from_bytes,
|
||||
):
|
||||
result = completion._fetch_image_from_url("https://example.com/image.jpg")
|
||||
|
||||
# Verify from_bytes was called with correct arguments
|
||||
mock_from_bytes.assert_called_once()
|
||||
call_args = mock_from_bytes.call_args
|
||||
assert call_args.kwargs["mime_type"] == "image/jpeg"
|
||||
assert call_args.kwargs["data"] == b"fake image data"
|
||||
|
||||
def test_read_local_image(self):
|
||||
"""Test reading local image files."""
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"google": MagicMock(),
|
||||
"google.genai": MagicMock(),
|
||||
"google.genai.types": MagicMock(),
|
||||
"google.genai.errors": MagicMock(),
|
||||
},
|
||||
):
|
||||
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||
|
||||
# Create a mock instance
|
||||
completion = MagicMock(spec=GeminiCompletion)
|
||||
completion._read_local_image = GeminiCompletion._read_local_image.__get__(
|
||||
completion, GeminiCompletion
|
||||
)
|
||||
|
||||
# Create a temporary test file
|
||||
with tempfile.NamedTemporaryFile(
|
||||
suffix=".png", delete=False
|
||||
) as temp_file:
|
||||
temp_file.write(b"fake png data")
|
||||
temp_path = temp_file.name
|
||||
|
||||
try:
|
||||
mock_part = MagicMock()
|
||||
with patch(
|
||||
"crewai.llms.providers.gemini.completion.types.Part.from_bytes",
|
||||
return_value=mock_part,
|
||||
) as mock_from_bytes:
|
||||
result = completion._read_local_image(temp_path)
|
||||
|
||||
# Verify from_bytes was called with correct arguments
|
||||
mock_from_bytes.assert_called_once()
|
||||
call_args = mock_from_bytes.call_args
|
||||
assert call_args.kwargs["mime_type"] == "image/png"
|
||||
assert call_args.kwargs["data"] == b"fake png data"
|
||||
finally:
|
||||
os.unlink(temp_path)
|
||||
|
||||
def test_read_local_image_file_not_found(self):
|
||||
"""Test reading non-existent local image file."""
|
||||
with patch.dict(
|
||||
"sys.modules",
|
||||
{
|
||||
"google": MagicMock(),
|
||||
"google.genai": MagicMock(),
|
||||
"google.genai.types": MagicMock(),
|
||||
"google.genai.errors": MagicMock(),
|
||||
},
|
||||
):
|
||||
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||
|
||||
# Create a mock instance
|
||||
completion = MagicMock(spec=GeminiCompletion)
|
||||
completion._read_local_image = GeminiCompletion._read_local_image.__get__(
|
||||
completion, GeminiCompletion
|
||||
)
|
||||
|
||||
result = completion._read_local_image("/nonexistent/path/image.png")
|
||||
|
||||
# Should return None for non-existent file
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestAddImageToolOutput:
|
||||
"""Tests for AddImageTool output format."""
|
||||
|
||||
def test_add_image_tool_returns_correct_format(self):
|
||||
"""Test that AddImageTool returns the correct multimodal format."""
|
||||
from crewai.tools.agent_tools.add_image_tool import AddImageTool
|
||||
|
||||
tool = AddImageTool()
|
||||
result = tool._run(
|
||||
image_url="https://example.com/image.jpg",
|
||||
action="Analyze this image",
|
||||
)
|
||||
|
||||
# Verify the result format
|
||||
assert isinstance(result, dict)
|
||||
assert result["role"] == "user"
|
||||
assert isinstance(result["content"], list)
|
||||
assert len(result["content"]) == 2
|
||||
|
||||
# Check text part
|
||||
assert result["content"][0]["type"] == "text"
|
||||
assert result["content"][0]["text"] == "Analyze this image"
|
||||
|
||||
# Check image_url part
|
||||
assert result["content"][1]["type"] == "image_url"
|
||||
assert result["content"][1]["image_url"]["url"] == "https://example.com/image.jpg"
|
||||
Reference in New Issue
Block a user