mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-28 17:48:13 +00:00
Compare commits
4 Commits
devin/1768
...
devin/1764
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8437e1cdea | ||
|
|
5110817709 | ||
|
|
cff9cfb665 | ||
|
|
29d40de651 |
@@ -324,14 +324,25 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
|
|||||||
Returns:
|
Returns:
|
||||||
Updated action or final answer.
|
Updated action or final answer.
|
||||||
"""
|
"""
|
||||||
# Special case for add_image_tool
|
# Special case for add_image_tool - the result is already a properly
|
||||||
|
# formatted multimodal message dict with role and content
|
||||||
add_image_tool = self._i18n.tools("add_image")
|
add_image_tool = self._i18n.tools("add_image")
|
||||||
if (
|
if (
|
||||||
isinstance(add_image_tool, dict)
|
isinstance(add_image_tool, dict)
|
||||||
and formatted_answer.tool.casefold().strip()
|
and formatted_answer.tool.casefold().strip()
|
||||||
== add_image_tool.get("name", "").casefold().strip()
|
== add_image_tool.get("name", "").casefold().strip()
|
||||||
):
|
):
|
||||||
self.messages.append({"role": "assistant", "content": tool_result.result})
|
# tool_result.result is a dict like {"role": "user", "content": [...]}
|
||||||
|
# Append it directly without wrapping
|
||||||
|
result = tool_result.result
|
||||||
|
if isinstance(result, dict) and "role" in result and "content" in result:
|
||||||
|
self.messages.append(result)
|
||||||
|
elif isinstance(result, list):
|
||||||
|
# If it's just the content list, wrap it with user role
|
||||||
|
self.messages.append({"role": "user", "content": result})
|
||||||
|
else:
|
||||||
|
# Fallback: wrap as user message
|
||||||
|
self.messages.append({"role": "user", "content": str(result)})
|
||||||
return formatted_answer
|
return formatted_answer
|
||||||
|
|
||||||
return handle_agent_action_core(
|
return handle_agent_action_core(
|
||||||
|
|||||||
@@ -448,6 +448,7 @@ class GeminiCompletion(BaseLLM):
|
|||||||
- System messages are separate system_instruction
|
- System messages are separate system_instruction
|
||||||
- Content is organized as Content objects with Parts
|
- Content is organized as Content objects with Parts
|
||||||
- Roles are 'user' and 'model' (not 'assistant')
|
- Roles are 'user' and 'model' (not 'assistant')
|
||||||
|
- Multimodal content with image_url must be converted to Gemini's format
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
messages: Input messages
|
messages: Input messages
|
||||||
@@ -465,17 +466,18 @@ class GeminiCompletion(BaseLLM):
|
|||||||
role = message["role"]
|
role = message["role"]
|
||||||
content = message["content"]
|
content = message["content"]
|
||||||
|
|
||||||
# Convert content to string if it's a list
|
|
||||||
if isinstance(content, list):
|
|
||||||
text_content = " ".join(
|
|
||||||
str(item.get("text", "")) if isinstance(item, dict) else str(item)
|
|
||||||
for item in content
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
text_content = str(content) if content else ""
|
|
||||||
|
|
||||||
if role == "system":
|
if role == "system":
|
||||||
# Extract system instruction - Gemini handles it separately
|
# Extract system instruction - Gemini handles it separately
|
||||||
|
if isinstance(content, list):
|
||||||
|
text_content = " ".join(
|
||||||
|
str(item.get("text", ""))
|
||||||
|
if isinstance(item, dict)
|
||||||
|
else str(item)
|
||||||
|
for item in content
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
text_content = str(content) if content else ""
|
||||||
|
|
||||||
if system_instruction:
|
if system_instruction:
|
||||||
system_instruction += f"\n\n{text_content}"
|
system_instruction += f"\n\n{text_content}"
|
||||||
else:
|
else:
|
||||||
@@ -484,14 +486,190 @@ class GeminiCompletion(BaseLLM):
|
|||||||
# Convert role for Gemini (assistant -> model)
|
# Convert role for Gemini (assistant -> model)
|
||||||
gemini_role = "model" if role == "assistant" else "user"
|
gemini_role = "model" if role == "assistant" else "user"
|
||||||
|
|
||||||
|
# Convert content to Gemini Parts
|
||||||
|
parts = self._convert_content_to_parts(content)
|
||||||
|
|
||||||
# Create Content object
|
# Create Content object
|
||||||
gemini_content = types.Content(
|
gemini_content = types.Content(role=gemini_role, parts=parts)
|
||||||
role=gemini_role, parts=[types.Part.from_text(text=text_content)]
|
|
||||||
)
|
|
||||||
contents.append(gemini_content)
|
contents.append(gemini_content)
|
||||||
|
|
||||||
return contents, system_instruction
|
return contents, system_instruction
|
||||||
|
|
||||||
|
def _convert_content_to_parts(self, content: Any) -> list[types.Part]:
|
||||||
|
"""Convert message content to Gemini Parts.
|
||||||
|
|
||||||
|
Handles both simple text content and multimodal content with image_url.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content: Message content (string or list of content items)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of Gemini Part objects
|
||||||
|
"""
|
||||||
|
if not isinstance(content, list):
|
||||||
|
# Simple text content
|
||||||
|
text_content = str(content) if content else ""
|
||||||
|
return [types.Part.from_text(text=text_content)]
|
||||||
|
|
||||||
|
parts: list[types.Part] = []
|
||||||
|
for item in content:
|
||||||
|
if isinstance(item, dict):
|
||||||
|
item_type = item.get("type", "")
|
||||||
|
if item_type == "text":
|
||||||
|
text = item.get("text", "")
|
||||||
|
if text:
|
||||||
|
parts.append(types.Part.from_text(text=str(text)))
|
||||||
|
elif item_type == "image_url":
|
||||||
|
# Handle image_url format from OpenAI-compatible APIs
|
||||||
|
image_url_data = item.get("image_url", {})
|
||||||
|
url = image_url_data.get("url", "") if isinstance(
|
||||||
|
image_url_data, dict
|
||||||
|
) else str(image_url_data)
|
||||||
|
if url:
|
||||||
|
image_part = self._create_image_part_from_url(url)
|
||||||
|
if image_part:
|
||||||
|
parts.append(image_part)
|
||||||
|
else:
|
||||||
|
# Unknown type, convert to text
|
||||||
|
parts.append(types.Part.from_text(text=str(item)))
|
||||||
|
else:
|
||||||
|
# Non-dict item, convert to text
|
||||||
|
parts.append(types.Part.from_text(text=str(item)))
|
||||||
|
|
||||||
|
# Ensure at least one part exists
|
||||||
|
if not parts:
|
||||||
|
parts.append(types.Part.from_text(text=""))
|
||||||
|
|
||||||
|
return parts
|
||||||
|
|
||||||
|
def _create_image_part_from_url(self, url: str) -> types.Part | None:
|
||||||
|
"""Create a Gemini Part from an image URL.
|
||||||
|
|
||||||
|
Handles:
|
||||||
|
- HTTP(S) URLs: Fetches the image and converts to inline_data
|
||||||
|
- Data URLs: Parses and extracts base64 data
|
||||||
|
- Local file paths: Reads file and converts to inline_data
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: Image URL, data URL, or local file path
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Gemini Part object or None if conversion fails
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
if url.startswith("data:"):
|
||||||
|
# Handle data URL (e.g., data:image/png;base64,...)
|
||||||
|
return self._parse_data_url(url)
|
||||||
|
if url.startswith(("http://", "https://")):
|
||||||
|
# Handle HTTP(S) URL - fetch and convert to inline_data
|
||||||
|
return self._fetch_image_from_url(url)
|
||||||
|
# Handle local file path
|
||||||
|
return self._read_local_image(url)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(f"Failed to create image part from URL '{url}': {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _parse_data_url(self, data_url: str) -> types.Part | None:
|
||||||
|
"""Parse a data URL and create a Gemini Part.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data_url: Data URL (e.g., data:image/png;base64,...)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Gemini Part object or None if parsing fails
|
||||||
|
"""
|
||||||
|
import base64
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Parse data URL: data:[<mediatype>][;base64],<data>
|
||||||
|
if not data_url.startswith("data:"):
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Split header and data
|
||||||
|
header_end = data_url.find(",")
|
||||||
|
if header_end == -1:
|
||||||
|
return None
|
||||||
|
|
||||||
|
header = data_url[5:header_end] # Skip "data:"
|
||||||
|
data = data_url[header_end + 1:]
|
||||||
|
|
||||||
|
# Parse mime type and encoding
|
||||||
|
parts = header.split(";")
|
||||||
|
mime_type = parts[0] if parts else "application/octet-stream"
|
||||||
|
is_base64 = "base64" in parts
|
||||||
|
|
||||||
|
if is_base64:
|
||||||
|
image_bytes = base64.b64decode(data)
|
||||||
|
else:
|
||||||
|
image_bytes = data.encode("utf-8")
|
||||||
|
|
||||||
|
return types.Part.from_bytes(data=image_bytes, mime_type=mime_type)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(f"Failed to parse data URL: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _fetch_image_from_url(self, url: str) -> types.Part | None:
|
||||||
|
"""Fetch an image from a URL and create a Gemini Part.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: HTTP(S) URL of the image
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Gemini Part object or None if fetching fails
|
||||||
|
"""
|
||||||
|
import mimetypes
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Fetch the image with a timeout
|
||||||
|
# URL scheme is validated in _create_image_part_from_url before calling this method
|
||||||
|
with urllib.request.urlopen(url, timeout=30) as response: # noqa: S310
|
||||||
|
image_bytes = response.read()
|
||||||
|
content_type = response.headers.get("Content-Type", "")
|
||||||
|
|
||||||
|
# Extract mime type from Content-Type header
|
||||||
|
if content_type:
|
||||||
|
mime_type = content_type.split(";")[0].strip()
|
||||||
|
else:
|
||||||
|
# Guess from URL extension
|
||||||
|
mime_type, _ = mimetypes.guess_type(url)
|
||||||
|
mime_type = mime_type or "image/jpeg"
|
||||||
|
|
||||||
|
return types.Part.from_bytes(data=image_bytes, mime_type=mime_type)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(f"Failed to fetch image from URL '{url}': {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _read_local_image(self, file_path: str) -> types.Part | None:
|
||||||
|
"""Read a local image file and create a Gemini Part.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path: Path to the local image file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Gemini Part object or None if reading fails
|
||||||
|
"""
|
||||||
|
import mimetypes
|
||||||
|
import os
|
||||||
|
|
||||||
|
try:
|
||||||
|
if not os.path.exists(file_path):
|
||||||
|
logging.warning(f"Image file not found: {file_path}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Guess mime type from file extension
|
||||||
|
mime_type, _ = mimetypes.guess_type(file_path)
|
||||||
|
mime_type = mime_type or "image/jpeg"
|
||||||
|
|
||||||
|
with open(file_path, "rb") as f:
|
||||||
|
image_bytes = f.read()
|
||||||
|
|
||||||
|
return types.Part.from_bytes(data=image_bytes, mime_type=mime_type)
|
||||||
|
except Exception as e:
|
||||||
|
logging.warning(f"Failed to read local image '{file_path}': {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
def _handle_completion(
|
def _handle_completion(
|
||||||
self,
|
self,
|
||||||
contents: list[types.Content],
|
contents: list[types.Content],
|
||||||
|
|||||||
@@ -313,7 +313,24 @@ class ToolUsage:
|
|||||||
tool_name=tool.name,
|
tool_name=tool.name,
|
||||||
attempts=self._run_attempts,
|
attempts=self._run_attempts,
|
||||||
)
|
)
|
||||||
result = self._format_result(result=result)
|
|
||||||
|
# Always increment tool usage counter (this is normally done in _format_result)
|
||||||
|
if self.task:
|
||||||
|
self.task.used_tools += 1
|
||||||
|
|
||||||
|
# For add_image tool, preserve the raw dict result for multimodal handling
|
||||||
|
# The result is a dict like {"role": "user", "content": [...]} that should
|
||||||
|
# not be stringified. We skip format reminders for add_image to avoid
|
||||||
|
# in-place mutation of cached results.
|
||||||
|
add_image_tool = self._i18n.tools("add_image")
|
||||||
|
is_add_image_tool = (
|
||||||
|
isinstance(add_image_tool, dict)
|
||||||
|
and tool.name.casefold().strip()
|
||||||
|
== add_image_tool.get("name", "").casefold().strip()
|
||||||
|
)
|
||||||
|
if not is_add_image_tool:
|
||||||
|
result = self._format_result(result=result, skip_counter=True)
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
"result": result,
|
"result": result,
|
||||||
"tool_name": tool.name,
|
"tool_name": tool.name,
|
||||||
@@ -351,8 +368,8 @@ class ToolUsage:
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def _format_result(self, result: Any) -> str:
|
def _format_result(self, result: Any, skip_counter: bool = False) -> str:
|
||||||
if self.task:
|
if self.task and not skip_counter:
|
||||||
self.task.used_tools += 1
|
self.task.used_tools += 1
|
||||||
if self._should_remember_format():
|
if self._should_remember_format():
|
||||||
result = self._remember_format(result=result)
|
result = self._remember_format(result=result)
|
||||||
|
|||||||
488
lib/crewai/tests/test_multimodal_image_handling.py
Normal file
488
lib/crewai/tests/test_multimodal_image_handling.py
Normal file
@@ -0,0 +1,488 @@
|
|||||||
|
"""Tests for multimodal image handling fixes (Issue #4016).
|
||||||
|
|
||||||
|
This module tests the fixes for proper multimodal image handling in CrewAI:
|
||||||
|
1. CrewAgentExecutor._handle_agent_action properly appends multimodal messages
|
||||||
|
2. ToolUsage.use preserves raw dict result for add_image tool
|
||||||
|
3. Gemini provider properly converts image_url to Gemini's format
|
||||||
|
"""
|
||||||
|
|
||||||
|
import base64
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
class TestCrewAgentExecutorMultimodal:
|
||||||
|
"""Tests for CrewAgentExecutor._handle_agent_action with multimodal content."""
|
||||||
|
|
||||||
|
def test_handle_agent_action_with_add_image_tool_dict_result(self):
|
||||||
|
"""Test that add_image tool result dict is appended directly without wrapping."""
|
||||||
|
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||||
|
from crewai.agents.parser import AgentAction
|
||||||
|
from crewai.tools.tool_usage import ToolResult
|
||||||
|
|
||||||
|
# Create a mock executor
|
||||||
|
mock_llm = MagicMock()
|
||||||
|
mock_task = MagicMock()
|
||||||
|
mock_crew = MagicMock()
|
||||||
|
mock_agent = MagicMock()
|
||||||
|
mock_agent.i18n = MagicMock()
|
||||||
|
mock_agent.i18n.tools.return_value = {"name": "Add image to content"}
|
||||||
|
|
||||||
|
executor = CrewAgentExecutor(
|
||||||
|
llm=mock_llm,
|
||||||
|
task=mock_task,
|
||||||
|
crew=mock_crew,
|
||||||
|
agent=mock_agent,
|
||||||
|
prompt="test prompt",
|
||||||
|
tools=[],
|
||||||
|
original_tools=[],
|
||||||
|
max_iter=1,
|
||||||
|
)
|
||||||
|
executor.messages = []
|
||||||
|
executor._i18n = mock_agent.i18n
|
||||||
|
|
||||||
|
# Create a mock add_image tool result (dict with role and content)
|
||||||
|
multimodal_result = {
|
||||||
|
"role": "user",
|
||||||
|
"content": [
|
||||||
|
{"type": "text", "text": "Analyze this image"},
|
||||||
|
{
|
||||||
|
"type": "image_url",
|
||||||
|
"image_url": {"url": "https://example.com/image.jpg"},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
formatted_answer = AgentAction(
|
||||||
|
tool="Add image to content",
|
||||||
|
tool_input='{"image_url": "https://example.com/image.jpg"}',
|
||||||
|
text="Using add image tool",
|
||||||
|
thought="I need to add an image",
|
||||||
|
result="",
|
||||||
|
)
|
||||||
|
|
||||||
|
tool_result = ToolResult(result=multimodal_result)
|
||||||
|
|
||||||
|
# Call _handle_agent_action
|
||||||
|
result = executor._handle_agent_action(formatted_answer, tool_result)
|
||||||
|
|
||||||
|
# Verify the message was appended correctly (not double-wrapped)
|
||||||
|
assert len(executor.messages) == 1
|
||||||
|
appended_message = executor.messages[0]
|
||||||
|
|
||||||
|
# The message should be the dict directly, not wrapped
|
||||||
|
assert appended_message["role"] == "user"
|
||||||
|
assert isinstance(appended_message["content"], list)
|
||||||
|
assert len(appended_message["content"]) == 2
|
||||||
|
assert appended_message["content"][0]["type"] == "text"
|
||||||
|
assert appended_message["content"][1]["type"] == "image_url"
|
||||||
|
|
||||||
|
def test_handle_agent_action_with_add_image_tool_list_result(self):
|
||||||
|
"""Test that add_image tool result list is wrapped with user role."""
|
||||||
|
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||||
|
from crewai.agents.parser import AgentAction
|
||||||
|
from crewai.tools.tool_usage import ToolResult
|
||||||
|
|
||||||
|
# Create a mock executor
|
||||||
|
mock_llm = MagicMock()
|
||||||
|
mock_task = MagicMock()
|
||||||
|
mock_crew = MagicMock()
|
||||||
|
mock_agent = MagicMock()
|
||||||
|
mock_agent.i18n = MagicMock()
|
||||||
|
mock_agent.i18n.tools.return_value = {"name": "Add image to content"}
|
||||||
|
|
||||||
|
executor = CrewAgentExecutor(
|
||||||
|
llm=mock_llm,
|
||||||
|
task=mock_task,
|
||||||
|
crew=mock_crew,
|
||||||
|
agent=mock_agent,
|
||||||
|
prompt="test prompt",
|
||||||
|
tools=[],
|
||||||
|
original_tools=[],
|
||||||
|
max_iter=1,
|
||||||
|
)
|
||||||
|
executor.messages = []
|
||||||
|
executor._i18n = mock_agent.i18n
|
||||||
|
|
||||||
|
# Create a mock add_image tool result (list without role)
|
||||||
|
content_list = [
|
||||||
|
{"type": "text", "text": "Analyze this image"},
|
||||||
|
{
|
||||||
|
"type": "image_url",
|
||||||
|
"image_url": {"url": "https://example.com/image.jpg"},
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
formatted_answer = AgentAction(
|
||||||
|
tool="Add image to content",
|
||||||
|
tool_input='{"image_url": "https://example.com/image.jpg"}',
|
||||||
|
text="Using add image tool",
|
||||||
|
thought="I need to add an image",
|
||||||
|
result="",
|
||||||
|
)
|
||||||
|
|
||||||
|
tool_result = ToolResult(result=content_list)
|
||||||
|
|
||||||
|
# Call _handle_agent_action
|
||||||
|
result = executor._handle_agent_action(formatted_answer, tool_result)
|
||||||
|
|
||||||
|
# Verify the message was wrapped with user role
|
||||||
|
assert len(executor.messages) == 1
|
||||||
|
appended_message = executor.messages[0]
|
||||||
|
|
||||||
|
assert appended_message["role"] == "user"
|
||||||
|
assert appended_message["content"] == content_list
|
||||||
|
|
||||||
|
|
||||||
|
class TestToolUsageMultimodal:
|
||||||
|
"""Tests for ToolUsage.use with add_image tool."""
|
||||||
|
|
||||||
|
def test_tool_usage_preserves_add_image_dict_result(self):
|
||||||
|
"""Test that add_image tool result is not stringified."""
|
||||||
|
from crewai.tools.tool_usage import ToolUsage
|
||||||
|
|
||||||
|
# Create mock components
|
||||||
|
mock_tools_handler = MagicMock()
|
||||||
|
mock_tools_handler.cache = None
|
||||||
|
mock_task = MagicMock()
|
||||||
|
mock_task.used_tools = 0
|
||||||
|
mock_agent = MagicMock()
|
||||||
|
mock_agent.key = "test_key"
|
||||||
|
mock_agent.role = "test_role"
|
||||||
|
mock_agent.verbose = False
|
||||||
|
mock_agent.fingerprint = None
|
||||||
|
mock_agent.tools_results = []
|
||||||
|
mock_action = MagicMock()
|
||||||
|
mock_action.tool = "Add image to content"
|
||||||
|
mock_action.tool_input = '{"image_url": "https://example.com/image.jpg"}'
|
||||||
|
|
||||||
|
# Mock i18n
|
||||||
|
mock_i18n = MagicMock()
|
||||||
|
mock_i18n.tools.return_value = {"name": "Add image to content"}
|
||||||
|
|
||||||
|
# Create a mock add_image tool
|
||||||
|
mock_tool = MagicMock()
|
||||||
|
mock_tool.name = "Add image to content"
|
||||||
|
mock_tool.args_schema = MagicMock()
|
||||||
|
mock_tool.args_schema.model_json_schema.return_value = {
|
||||||
|
"properties": {"image_url": {}, "action": {}}
|
||||||
|
}
|
||||||
|
|
||||||
|
# The tool returns a dict with role and content
|
||||||
|
multimodal_result = {
|
||||||
|
"role": "user",
|
||||||
|
"content": [
|
||||||
|
{"type": "text", "text": "Analyze this image"},
|
||||||
|
{
|
||||||
|
"type": "image_url",
|
||||||
|
"image_url": {"url": "https://example.com/image.jpg"},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
mock_tool.invoke.return_value = multimodal_result
|
||||||
|
|
||||||
|
tool_usage = ToolUsage(
|
||||||
|
tools_handler=mock_tools_handler,
|
||||||
|
tools=[mock_tool],
|
||||||
|
task=mock_task,
|
||||||
|
function_calling_llm=MagicMock(),
|
||||||
|
agent=mock_agent,
|
||||||
|
action=mock_action,
|
||||||
|
)
|
||||||
|
tool_usage._i18n = mock_i18n
|
||||||
|
tool_usage._telemetry = MagicMock()
|
||||||
|
|
||||||
|
# Create a mock calling object
|
||||||
|
mock_calling = MagicMock()
|
||||||
|
mock_calling.tool_name = "Add image to content"
|
||||||
|
mock_calling.arguments = {"image_url": "https://example.com/image.jpg"}
|
||||||
|
|
||||||
|
# Call _use directly
|
||||||
|
result = tool_usage._use(
|
||||||
|
tool_string="test",
|
||||||
|
tool=mock_tool,
|
||||||
|
calling=mock_calling,
|
||||||
|
)
|
||||||
|
|
||||||
|
# The result should be the dict, not a string
|
||||||
|
assert isinstance(result, dict)
|
||||||
|
assert result["role"] == "user"
|
||||||
|
assert isinstance(result["content"], list)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGeminiMultimodalFormatting:
|
||||||
|
"""Tests for Gemini provider's multimodal content handling."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_gemini_types(self):
|
||||||
|
"""Create mock Gemini types for testing."""
|
||||||
|
mock_types = MagicMock()
|
||||||
|
mock_part = MagicMock()
|
||||||
|
mock_content = MagicMock()
|
||||||
|
|
||||||
|
# Mock Part.from_text
|
||||||
|
mock_types.Part.from_text.return_value = mock_part
|
||||||
|
|
||||||
|
# Mock Part.from_bytes
|
||||||
|
mock_types.Part.from_bytes.return_value = mock_part
|
||||||
|
|
||||||
|
# Mock Content
|
||||||
|
mock_types.Content.return_value = mock_content
|
||||||
|
|
||||||
|
return mock_types
|
||||||
|
|
||||||
|
def test_convert_content_to_parts_simple_text(self):
|
||||||
|
"""Test converting simple text content to Gemini Parts."""
|
||||||
|
with patch.dict(
|
||||||
|
"sys.modules",
|
||||||
|
{
|
||||||
|
"google": MagicMock(),
|
||||||
|
"google.genai": MagicMock(),
|
||||||
|
"google.genai.types": MagicMock(),
|
||||||
|
"google.genai.errors": MagicMock(),
|
||||||
|
},
|
||||||
|
):
|
||||||
|
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||||
|
|
||||||
|
# Create a mock instance
|
||||||
|
completion = MagicMock(spec=GeminiCompletion)
|
||||||
|
completion._convert_content_to_parts = (
|
||||||
|
GeminiCompletion._convert_content_to_parts.__get__(
|
||||||
|
completion, GeminiCompletion
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mock types.Part.from_text
|
||||||
|
mock_part = MagicMock()
|
||||||
|
with patch(
|
||||||
|
"crewai.llms.providers.gemini.completion.types.Part.from_text",
|
||||||
|
return_value=mock_part,
|
||||||
|
):
|
||||||
|
result = completion._convert_content_to_parts("Hello, world!")
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
|
||||||
|
def test_convert_content_to_parts_multimodal_with_image_url(self):
|
||||||
|
"""Test converting multimodal content with image_url to Gemini Parts."""
|
||||||
|
with patch.dict(
|
||||||
|
"sys.modules",
|
||||||
|
{
|
||||||
|
"google": MagicMock(),
|
||||||
|
"google.genai": MagicMock(),
|
||||||
|
"google.genai.types": MagicMock(),
|
||||||
|
"google.genai.errors": MagicMock(),
|
||||||
|
},
|
||||||
|
):
|
||||||
|
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||||
|
|
||||||
|
# Create a mock instance
|
||||||
|
completion = MagicMock(spec=GeminiCompletion)
|
||||||
|
completion._convert_content_to_parts = (
|
||||||
|
GeminiCompletion._convert_content_to_parts.__get__(
|
||||||
|
completion, GeminiCompletion
|
||||||
|
)
|
||||||
|
)
|
||||||
|
completion._create_image_part_from_url = MagicMock(
|
||||||
|
return_value=MagicMock()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mock types.Part.from_text
|
||||||
|
mock_text_part = MagicMock()
|
||||||
|
with patch(
|
||||||
|
"crewai.llms.providers.gemini.completion.types.Part.from_text",
|
||||||
|
return_value=mock_text_part,
|
||||||
|
):
|
||||||
|
multimodal_content = [
|
||||||
|
{"type": "text", "text": "Analyze this image"},
|
||||||
|
{
|
||||||
|
"type": "image_url",
|
||||||
|
"image_url": {"url": "https://example.com/image.jpg"},
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
result = completion._convert_content_to_parts(multimodal_content)
|
||||||
|
|
||||||
|
# Should have called _create_image_part_from_url
|
||||||
|
completion._create_image_part_from_url.assert_called_once_with(
|
||||||
|
"https://example.com/image.jpg"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_parse_data_url(self):
|
||||||
|
"""Test parsing data URLs."""
|
||||||
|
with patch.dict(
|
||||||
|
"sys.modules",
|
||||||
|
{
|
||||||
|
"google": MagicMock(),
|
||||||
|
"google.genai": MagicMock(),
|
||||||
|
"google.genai.types": MagicMock(),
|
||||||
|
"google.genai.errors": MagicMock(),
|
||||||
|
},
|
||||||
|
):
|
||||||
|
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||||
|
|
||||||
|
# Create a mock instance
|
||||||
|
completion = MagicMock(spec=GeminiCompletion)
|
||||||
|
completion._parse_data_url = GeminiCompletion._parse_data_url.__get__(
|
||||||
|
completion, GeminiCompletion
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a test data URL
|
||||||
|
test_data = b"test image data"
|
||||||
|
base64_data = base64.b64encode(test_data).decode("utf-8")
|
||||||
|
data_url = f"data:image/png;base64,{base64_data}"
|
||||||
|
|
||||||
|
# Mock types.Part.from_bytes
|
||||||
|
mock_part = MagicMock()
|
||||||
|
with patch(
|
||||||
|
"crewai.llms.providers.gemini.completion.types.Part.from_bytes",
|
||||||
|
return_value=mock_part,
|
||||||
|
) as mock_from_bytes:
|
||||||
|
result = completion._parse_data_url(data_url)
|
||||||
|
|
||||||
|
# Verify from_bytes was called with correct arguments
|
||||||
|
mock_from_bytes.assert_called_once()
|
||||||
|
call_args = mock_from_bytes.call_args
|
||||||
|
assert call_args.kwargs["mime_type"] == "image/png"
|
||||||
|
assert call_args.kwargs["data"] == test_data
|
||||||
|
|
||||||
|
def test_fetch_image_from_url(self):
|
||||||
|
"""Test fetching images from HTTP URLs."""
|
||||||
|
with patch.dict(
|
||||||
|
"sys.modules",
|
||||||
|
{
|
||||||
|
"google": MagicMock(),
|
||||||
|
"google.genai": MagicMock(),
|
||||||
|
"google.genai.types": MagicMock(),
|
||||||
|
"google.genai.errors": MagicMock(),
|
||||||
|
},
|
||||||
|
):
|
||||||
|
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||||
|
|
||||||
|
# Create a mock instance
|
||||||
|
completion = MagicMock(spec=GeminiCompletion)
|
||||||
|
completion._fetch_image_from_url = (
|
||||||
|
GeminiCompletion._fetch_image_from_url.__get__(
|
||||||
|
completion, GeminiCompletion
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Mock urllib.request.urlopen
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.read.return_value = b"fake image data"
|
||||||
|
mock_response.headers.get.return_value = "image/jpeg"
|
||||||
|
mock_response.__enter__ = MagicMock(return_value=mock_response)
|
||||||
|
mock_response.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
mock_part = MagicMock()
|
||||||
|
with (
|
||||||
|
patch("urllib.request.urlopen", return_value=mock_response),
|
||||||
|
patch(
|
||||||
|
"crewai.llms.providers.gemini.completion.types.Part.from_bytes",
|
||||||
|
return_value=mock_part,
|
||||||
|
) as mock_from_bytes,
|
||||||
|
):
|
||||||
|
result = completion._fetch_image_from_url("https://example.com/image.jpg")
|
||||||
|
|
||||||
|
# Verify from_bytes was called with correct arguments
|
||||||
|
mock_from_bytes.assert_called_once()
|
||||||
|
call_args = mock_from_bytes.call_args
|
||||||
|
assert call_args.kwargs["mime_type"] == "image/jpeg"
|
||||||
|
assert call_args.kwargs["data"] == b"fake image data"
|
||||||
|
|
||||||
|
def test_read_local_image(self):
|
||||||
|
"""Test reading local image files."""
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
|
||||||
|
with patch.dict(
|
||||||
|
"sys.modules",
|
||||||
|
{
|
||||||
|
"google": MagicMock(),
|
||||||
|
"google.genai": MagicMock(),
|
||||||
|
"google.genai.types": MagicMock(),
|
||||||
|
"google.genai.errors": MagicMock(),
|
||||||
|
},
|
||||||
|
):
|
||||||
|
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||||
|
|
||||||
|
# Create a mock instance
|
||||||
|
completion = MagicMock(spec=GeminiCompletion)
|
||||||
|
completion._read_local_image = GeminiCompletion._read_local_image.__get__(
|
||||||
|
completion, GeminiCompletion
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a temporary test file
|
||||||
|
with tempfile.NamedTemporaryFile(
|
||||||
|
suffix=".png", delete=False
|
||||||
|
) as temp_file:
|
||||||
|
temp_file.write(b"fake png data")
|
||||||
|
temp_path = temp_file.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
mock_part = MagicMock()
|
||||||
|
with patch(
|
||||||
|
"crewai.llms.providers.gemini.completion.types.Part.from_bytes",
|
||||||
|
return_value=mock_part,
|
||||||
|
) as mock_from_bytes:
|
||||||
|
result = completion._read_local_image(temp_path)
|
||||||
|
|
||||||
|
# Verify from_bytes was called with correct arguments
|
||||||
|
mock_from_bytes.assert_called_once()
|
||||||
|
call_args = mock_from_bytes.call_args
|
||||||
|
assert call_args.kwargs["mime_type"] == "image/png"
|
||||||
|
assert call_args.kwargs["data"] == b"fake png data"
|
||||||
|
finally:
|
||||||
|
os.unlink(temp_path)
|
||||||
|
|
||||||
|
def test_read_local_image_file_not_found(self):
|
||||||
|
"""Test reading non-existent local image file."""
|
||||||
|
with patch.dict(
|
||||||
|
"sys.modules",
|
||||||
|
{
|
||||||
|
"google": MagicMock(),
|
||||||
|
"google.genai": MagicMock(),
|
||||||
|
"google.genai.types": MagicMock(),
|
||||||
|
"google.genai.errors": MagicMock(),
|
||||||
|
},
|
||||||
|
):
|
||||||
|
from crewai.llms.providers.gemini.completion import GeminiCompletion
|
||||||
|
|
||||||
|
# Create a mock instance
|
||||||
|
completion = MagicMock(spec=GeminiCompletion)
|
||||||
|
completion._read_local_image = GeminiCompletion._read_local_image.__get__(
|
||||||
|
completion, GeminiCompletion
|
||||||
|
)
|
||||||
|
|
||||||
|
result = completion._read_local_image("/nonexistent/path/image.png")
|
||||||
|
|
||||||
|
# Should return None for non-existent file
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestAddImageToolOutput:
|
||||||
|
"""Tests for AddImageTool output format."""
|
||||||
|
|
||||||
|
def test_add_image_tool_returns_correct_format(self):
|
||||||
|
"""Test that AddImageTool returns the correct multimodal format."""
|
||||||
|
from crewai.tools.agent_tools.add_image_tool import AddImageTool
|
||||||
|
|
||||||
|
tool = AddImageTool()
|
||||||
|
result = tool._run(
|
||||||
|
image_url="https://example.com/image.jpg",
|
||||||
|
action="Analyze this image",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify the result format
|
||||||
|
assert isinstance(result, dict)
|
||||||
|
assert result["role"] == "user"
|
||||||
|
assert isinstance(result["content"], list)
|
||||||
|
assert len(result["content"]) == 2
|
||||||
|
|
||||||
|
# Check text part
|
||||||
|
assert result["content"][0]["type"] == "text"
|
||||||
|
assert result["content"][0]["text"] == "Analyze this image"
|
||||||
|
|
||||||
|
# Check image_url part
|
||||||
|
assert result["content"][1]["type"] == "image_url"
|
||||||
|
assert result["content"][1]["image_url"]["url"] == "https://example.com/image.jpg"
|
||||||
Reference in New Issue
Block a user