Files
crewAI/lib/crewai/tests/llms/google/test_google.py
Lorenze Jay 32d7b4a8d4 Lorenze/feat/plan execute pattern (#4817)
* feat: introduce PlanningConfig for enhanced agent planning capabilities (#4344)

* feat: introduce PlanningConfig for enhanced agent planning capabilities

This update adds a new PlanningConfig class to manage agent planning configurations, allowing for customizable planning behavior before task execution. The existing reasoning parameter is deprecated in favor of this new configuration, ensuring backward compatibility while enhancing the planning process. Additionally, the Agent class has been updated to utilize this new configuration, and relevant utility functions have been adjusted accordingly. Tests have been added to validate the new planning functionality and ensure proper integration with existing agent workflows.

* dropping redundancy

* fix test

* revert handle_reasoning here

* refactor: update reasoning handling in Agent class

This commit modifies the Agent class to conditionally call the handle_reasoning function based on the executor class being used. The legacy CrewAgentExecutor will continue to utilize handle_reasoning, while the new AgentExecutor will manage planning internally. Additionally, the PlanningConfig class has been referenced in the documentation to clarify its role in enabling or disabling planning. Tests have been updated to reflect these changes and ensure proper functionality.

* improve planning prompts

* matching

* refactor: remove default enabled flag from PlanningConfig in Agent class

* more cassettes

* fix test

* refactor: update planning prompt and remove deprecated methods in reasoning handler

* improve planning prompt

* Lorenze/feat planning pt 2 todo list gen (#4449)

* feat: introduce PlanningConfig for enhanced agent planning capabilities

This update adds a new PlanningConfig class to manage agent planning configurations, allowing for customizable planning behavior before task execution. The existing reasoning parameter is deprecated in favor of this new configuration, ensuring backward compatibility while enhancing the planning process. Additionally, the Agent class has been updated to utilize this new configuration, and relevant utility functions have been adjusted accordingly. Tests have been added to validate the new planning functionality and ensure proper integration with existing agent workflows.

* dropping redundancy

* fix test

* revert handle_reasoning here

* refactor: update reasoning handling in Agent class

This commit modifies the Agent class to conditionally call the handle_reasoning function based on the executor class being used. The legacy CrewAgentExecutor will continue to utilize handle_reasoning, while the new AgentExecutor will manage planning internally. Additionally, the PlanningConfig class has been referenced in the documentation to clarify its role in enabling or disabling planning. Tests have been updated to reflect these changes and ensure proper functionality.

* improve planning prompts

* matching

* refactor: remove default enabled flag from PlanningConfig in Agent class

* more cassettes

* fix test

* feat: enhance agent planning with structured todo management

This commit introduces a new planning system within the AgentExecutor class, allowing for the creation of structured todo items from planning steps. The TodoList and TodoItem models have been added to facilitate tracking of plan execution. The reasoning plan now includes a list of steps, improving the clarity and organization of agent tasks. Additionally, tests have been added to validate the new planning functionality and ensure proper integration with existing workflows.

* refactor: update planning prompt and remove deprecated methods in reasoning handler

* improve planning prompt

* improve handler

* linted

* linted

* Lorenze/feat/planning pt 3 todo list execution (#4450)

* feat: introduce PlanningConfig for enhanced agent planning capabilities

This update adds a new PlanningConfig class to manage agent planning configurations, allowing for customizable planning behavior before task execution. The existing reasoning parameter is deprecated in favor of this new configuration, ensuring backward compatibility while enhancing the planning process. Additionally, the Agent class has been updated to utilize this new configuration, and relevant utility functions have been adjusted accordingly. Tests have been added to validate the new planning functionality and ensure proper integration with existing agent workflows.

* dropping redundancy

* fix test

* revert handle_reasoning here

* refactor: update reasoning handling in Agent class

This commit modifies the Agent class to conditionally call the handle_reasoning function based on the executor class being used. The legacy CrewAgentExecutor will continue to utilize handle_reasoning, while the new AgentExecutor will manage planning internally. Additionally, the PlanningConfig class has been referenced in the documentation to clarify its role in enabling or disabling planning. Tests have been updated to reflect these changes and ensure proper functionality.

* improve planning prompts

* matching

* refactor: remove default enabled flag from PlanningConfig in Agent class

* more cassettes

* fix test

* feat: enhance agent planning with structured todo management

This commit introduces a new planning system within the AgentExecutor class, allowing for the creation of structured todo items from planning steps. The TodoList and TodoItem models have been added to facilitate tracking of plan execution. The reasoning plan now includes a list of steps, improving the clarity and organization of agent tasks. Additionally, tests have been added to validate the new planning functionality and ensure proper integration with existing workflows.

* refactor: update planning prompt and remove deprecated methods in reasoning handler

* improve planning prompt

* improve handler

* execute todos and be able to track them

* feat: introduce PlannerObserver and StepExecutor for enhanced plan execution

This commit adds the PlannerObserver and StepExecutor classes to the CrewAI framework, implementing the observation phase of the Plan-and-Execute architecture. The PlannerObserver analyzes step execution results, determines plan validity, and suggests refinements, while the StepExecutor executes individual todo items in isolation. These additions improve the overall planning and execution process, allowing for more dynamic and responsive agent behavior. Additionally, new observation events have been defined to facilitate monitoring and logging of the planning process.

* refactor: enhance final answer synthesis in AgentExecutor

This commit improves the synthesis of final answers in the AgentExecutor class by implementing a more coherent approach to combining results from multiple todo items. The method now utilizes a single LLM call to generate a polished response, falling back to concatenation if the synthesis fails. Additionally, the test cases have been updated to reflect the changes in planning and execution, ensuring that the results are properly validated and that the plan-and-execute architecture is functioning as intended.

* refactor: enhance final answer synthesis in AgentExecutor

This commit improves the synthesis of final answers in the AgentExecutor class by implementing a more coherent approach to combining results from multiple todo items. The method now utilizes a single LLM call to generate a polished response, falling back to concatenation if the synthesis fails. Additionally, the test cases have been updated to reflect the changes in planning and execution, ensuring that the results are properly validated and that the plan-and-execute architecture is functioning as intended.

* refactor: implement structured output handling in final answer synthesis

This commit enhances the final answer synthesis process in the AgentExecutor class by introducing support for structured outputs when a response model is specified. The synthesis method now utilizes the response model to produce outputs that conform to the expected schema, while still falling back to concatenation in case of synthesis failures. This change ensures that intermediate steps yield free-text results, but the final output can be structured, improving the overall coherence and usability of the synthesized answers.

* regen tests

* linted

* fix

* Enhance PlanningConfig and AgentExecutor with Reasoning Effort Levels

This update introduces a new  attribute in the  class, allowing users to customize the observation and replanning behavior during task execution. The  class has been modified to utilize this new attribute, routing step observations based on the specified reasoning effort level: low, medium, or high.

Additionally, tests have been added to validate the functionality of the reasoning effort levels, ensuring that the agent behaves as expected under different configurations. This enhancement improves the adaptability and efficiency of the planning process in agent execution.

* regen cassettes for test and fix test

* cassette regen

* fixing tests

* dry

* Refactor PlannerObserver and StepExecutor to Utilize I18N for Prompts

This update enhances the PlannerObserver and StepExecutor classes by integrating the I18N utility for managing prompts and messages. The system and user prompts are now retrieved from the I18N module, allowing for better localization and maintainability. Additionally, the code has been cleaned up to remove hardcoded strings, improving readability and consistency across the planning and execution processes.

* Refactor PlannerObserver and StepExecutor to Utilize I18N for Prompts

This update enhances the PlannerObserver and StepExecutor classes by integrating the I18N utility for managing prompts and messages. The system and user prompts are now retrieved from the I18N module, allowing for better localization and maintainability. Additionally, the code has been cleaned up to remove hardcoded strings, improving readability and consistency across the planning and execution processes.

* consolidate agent logic

* fix datetime

* improving step executor

* refactor: streamline observation and refinement process in PlannerObserver

- Updated the PlannerObserver to apply structured refinements directly from observations without requiring a second LLM call.
- Renamed  method to  for clarity.
- Enhanced documentation to reflect changes in how refinements are handled.
- Removed unnecessary LLM message building and parsing logic, simplifying the refinement process.
- Updated event emissions to include summaries of refinements instead of raw data.

* enhance step executor with tool usage events and validation

- Added event emissions for tool usage, including started and finished events, to track tool execution.
- Implemented validation to ensure expected tools are called during step execution, raising errors when not.
- Refactored the  method to handle tool execution with event logging.
- Introduced a new method  for parsing tool input into a structured format.
- Updated tests to cover new functionality and ensure correct behavior of tool usage events.

* refactor: enhance final answer synthesis logic in AgentExecutor

- Updated the finalization process to conditionally skip synthesis when the last todo result is sufficient as a complete answer.
- Introduced a new method to determine if the last todo result can be used directly, improving efficiency.
- Added tests to verify the new behavior, ensuring synthesis is skipped when appropriate and maintained when a response model is set.

* fix: update observation handling in PlannerObserver for LLM errors

- Modified the error handling in the PlannerObserver to default to a conservative replan when an LLM call fails.
- Updated the return values to indicate that the step was not completed successfully and that a full replan is needed.
- Added a new test to verify the behavior of the observer when an LLM error occurs, ensuring the correct replan logic is triggered.

* refactor: enhance planning and execution flow in agents

- Updated the PlannerObserver to accept a kickoff input for standalone task execution, improving flexibility in task handling.
- Refined the step execution process in StepExecutor to support multi-turn action loops, allowing for iterative tool execution and observation.
- Introduced a method to extract relevant task sections from descriptions, ensuring clarity in task requirements.
- Enhanced the AgentExecutor to manage step failures more effectively, triggering replans only when necessary and preserving completed task history.
- Updated translations to reflect changes in planning principles and execution prompts, emphasizing concrete and executable steps.

* refactor: update setup_native_tools to include tool_name_mapping

- Modified the setup_native_tools function to return an additional mapping of tool names.
- Updated StepExecutor and AgentExecutor classes to accommodate the new return value from setup_native_tools.

* fix tests

* linted

* linted

* feat: enhance image block handling in Anthropic provider and update AgentExecutor logic

- Added a method to convert OpenAI-style image_url blocks to Anthropic's required format.
- Updated AgentExecutor to handle cases where no todos are ready, introducing a needs_replan return state.
- Improved fallback answer generation in AgentExecutor to prevent RuntimeErrors when no final output is produced.

* lint

* lint

* 1. Added failed to TodoStatus (planning_types.py)

  - TodoStatus now includes failed as a valid state: Literal[pending, running, completed, failed]
  - Added mark_failed(step_number, result) method to TodoList
  - Added get_failed_todos() method to TodoList
  - Updated is_complete to treat both completed and failed as terminal states
  - Updated replace_pending_todos docstring to mention failed items are preserved

  2. Mark running todos as failed before replan (agent_executor.py)

  All three effort-level handlers now call mark_failed() on the current todo before routing to replan_now:

  - Low effort (handle_step_observed_low): hard-failure branch
  - Medium effort (handle_step_observed_medium): needs_full_replan branch
  - High effort (decide_next_action): both needs_full_replan and step_completed_successfully=False branches

  3. Updated _should_replan to use get_failed_todos()

  Previously filtered on todo.status == failed which was dead code. Now uses the proper accessor method that will actually find failed items.

  What this fixes: Before these changes, a step that triggered a replan would stay in running status permanently, causing is_complete to never
  return True and next_pending to skip it — leading to stuck execution states. Now failed steps are properly tracked, replanning context correctly
  reports them, and LiteAgentOutput.failed_todos will actually return results.

* fix test

* imp on failed states

* adjusted the var name from AgentReActState to AgentExecutorState

* addressed p0 bugs

* more improvements

* linted

* regen cassette

* addressing crictical comments

* ensure configurable timeouts, max_replans and max step iterations

* adjusted tools

* dropping debug statements

* addressed comment

* fix  linter

* lints and test fixes

* fix: default observation parse fallback to failure and clean up plan-execute types

When _parse_observation_response fails all parse attempts, default to
step_completed_successfully=False instead of True to avoid silently
masking failures. Extract duplicate _extract_task_section into a shared
utility in agent_utils. Type PlanningConfig.llm as str | BaseLLM | None
instead of str | Any | None. Make StepResult a frozen dataclass for
immutability consistency with StepExecutionContext.

* fix: remove Any from function_calling_llm union type in step_executor

* fix: make BaseTool usage count thread-safe for parallel step execution

Add _usage_lock and _claim_usage() to BaseTool for atomic
check-and-increment of current_usage_count. This prevents race
conditions when parallel plan steps invoke the same tool concurrently
via execute_todos_parallel. Remove the racy pre-check from
execute_single_native_tool_call since the limit is now enforced
atomically inside tool.run().

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
Co-authored-by: Greyson LaLonde <greyson@crewai.com>
2026-03-15 18:33:17 -07:00

1193 lines
40 KiB
Python

import os
import sys
import types
from unittest.mock import patch, MagicMock
import pytest
from crewai.llm import LLM
from crewai.crew import Crew
from crewai.agent import Agent
from crewai.task import Task
@pytest.fixture(autouse=True)
def mock_google_api_key():
"""Mock GOOGLE_API_KEY for tests only if real keys are not set."""
if "GOOGLE_API_KEY" not in os.environ and "GEMINI_API_KEY" not in os.environ:
with patch.dict(os.environ, {"GOOGLE_API_KEY": "test-key"}):
yield
else:
yield
def test_gemini_completion_is_used_when_google_provider():
"""
Test that GeminiCompletion from completion.py is used when LLM uses provider 'google'
"""
llm = LLM(model="google/gemini-2.0-flash-001")
assert llm.__class__.__name__ == "GeminiCompletion"
assert llm.provider == "gemini"
assert llm.model == "gemini-2.0-flash-001"
def test_gemini_completion_is_used_when_gemini_provider():
"""
Test that GeminiCompletion is used when provider is 'gemini'
"""
llm = LLM(model="gemini/gemini-2.0-flash-001")
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm, GeminiCompletion)
assert llm.provider == "gemini"
assert llm.model == "gemini-2.0-flash-001"
def test_gemini_completion_module_is_imported():
"""
Test that the completion module is properly imported when using Google provider
"""
module_name = "crewai.llms.providers.gemini.completion"
# Remove module from cache if it exists
if module_name in sys.modules:
del sys.modules[module_name]
# Create LLM instance - this should trigger the import
LLM(model="google/gemini-2.0-flash-001")
# Verify the module was imported
assert module_name in sys.modules
completion_mod = sys.modules[module_name]
assert isinstance(completion_mod, types.ModuleType)
# Verify the class exists in the module
assert hasattr(completion_mod, 'GeminiCompletion')
def test_native_gemini_raises_error_when_initialization_fails():
"""
Test that LLM raises ImportError when native Gemini completion fails.
With the new behavior, when a native provider is in SUPPORTED_NATIVE_PROVIDERS
but fails to instantiate, we raise an ImportError instead of silently falling back.
This provides clearer error messages to users about missing dependencies.
"""
# Mock the _get_native_provider to return a failing class
with patch('crewai.llm.LLM._get_native_provider') as mock_get_provider:
class FailingCompletion:
def __init__(self, *args, **kwargs):
raise Exception("Native Google Gen AI SDK failed")
mock_get_provider.return_value = FailingCompletion
# This should raise ImportError with clear message
with pytest.raises(ImportError) as excinfo:
LLM(model="google/gemini-2.0-flash-001")
# Verify the error message is helpful
assert "Error importing native provider" in str(excinfo.value)
assert "Native Google Gen AI SDK failed" in str(excinfo.value)
def test_gemini_completion_initialization_parameters():
"""
Test that GeminiCompletion is initialized with correct parameters
"""
llm = LLM(
model="google/gemini-2.0-flash-001",
temperature=0.7,
max_output_tokens=2000,
top_p=0.9,
top_k=40,
api_key="test-key"
)
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm, GeminiCompletion)
assert llm.model == "gemini-2.0-flash-001"
assert llm.temperature == 0.7
assert llm.max_output_tokens == 2000
assert llm.top_p == 0.9
assert llm.top_k == 40
def test_gemini_specific_parameters():
"""
Test Gemini-specific parameters like stop_sequences, streaming, and safety settings
"""
safety_settings = {
"HARM_CATEGORY_HARASSMENT": "BLOCK_MEDIUM_AND_ABOVE",
"HARM_CATEGORY_HATE_SPEECH": "BLOCK_MEDIUM_AND_ABOVE"
}
llm = LLM(
model="google/gemini-2.0-flash-001",
stop_sequences=["Human:", "Assistant:"],
stream=True,
safety_settings=safety_settings,
project="test-project",
location="us-central1"
)
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm, GeminiCompletion)
assert llm.stop_sequences == ["Human:", "Assistant:"]
assert llm.stream == True
assert llm.safety_settings == safety_settings
assert llm.project == "test-project"
assert llm.location == "us-central1"
def test_gemini_completion_call():
"""
Test that GeminiCompletion call method works
"""
llm = LLM(model="google/gemini-2.0-flash-001")
# Mock the call method on the instance
with patch.object(llm, 'call', return_value="Hello! I'm Gemini, ready to help.") as mock_call:
result = llm.call("Hello, how are you?")
assert result == "Hello! I'm Gemini, ready to help."
mock_call.assert_called_once_with("Hello, how are you?")
def test_gemini_completion_called_during_crew_execution():
"""
Test that GeminiCompletion.call is actually invoked when running a crew
"""
# Create the LLM instance first
gemini_llm = LLM(model="google/gemini-2.0-flash-001")
# Mock the call method on the specific instance
with patch.object(gemini_llm, 'call', return_value="Tokyo has 14 million people.") as mock_call:
# Create agent with explicit LLM configuration
agent = Agent(
role="Research Assistant",
goal="Find population info",
backstory="You research populations.",
llm=gemini_llm,
)
task = Task(
description="Find Tokyo population",
expected_output="Population number",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
# Verify mock was called
assert mock_call.called
assert "14 million" in str(result)
def test_gemini_completion_call_arguments():
"""
Test that GeminiCompletion.call is invoked with correct arguments
"""
# Create LLM instance first
gemini_llm = LLM(model="google/gemini-2.0-flash-001")
# Mock the instance method
with patch.object(gemini_llm, 'call') as mock_call:
mock_call.return_value = "Task completed successfully."
agent = Agent(
role="Test Agent",
goal="Complete a simple task",
backstory="You are a test agent.",
llm=gemini_llm # Use same instance
)
task = Task(
description="Say hello world",
expected_output="Hello world",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
crew.kickoff()
# Verify call was made
assert mock_call.called
# Check the arguments passed to the call method
call_args = mock_call.call_args
assert call_args is not None
# The first argument should be the messages
messages = call_args[0][0] # First positional argument
assert isinstance(messages, (str, list))
# Verify that the task description appears in the messages
if isinstance(messages, str):
assert "hello world" in messages.lower()
elif isinstance(messages, list):
message_content = str(messages).lower()
assert "hello world" in message_content
def test_multiple_gemini_calls_in_crew():
"""
Test that GeminiCompletion.call is invoked multiple times for multiple tasks
"""
# Create LLM instance first
gemini_llm = LLM(model="google/gemini-2.0-flash-001")
# Mock the instance method
with patch.object(gemini_llm, 'call') as mock_call:
mock_call.return_value = "Task completed."
agent = Agent(
role="Multi-task Agent",
goal="Complete multiple tasks",
backstory="You can handle multiple tasks.",
llm=gemini_llm # Use same instance
)
task1 = Task(
description="First task",
expected_output="First result",
agent=agent,
)
task2 = Task(
description="Second task",
expected_output="Second result",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task1, task2]
)
crew.kickoff()
# Verify multiple calls were made
assert mock_call.call_count >= 2 # At least one call per task
# Verify each call had proper arguments
for call in mock_call.call_args_list:
assert len(call[0]) > 0 # Has positional arguments
messages = call[0][0]
assert messages is not None
def test_gemini_completion_with_tools():
"""
Test that GeminiCompletion.call is invoked with tools when agent has tools
"""
from crewai.tools import tool
@tool
def sample_tool(query: str) -> str:
"""A sample tool for testing"""
return f"Tool result for: {query}"
# Create LLM instance first
gemini_llm = LLM(model="google/gemini-2.0-flash-001")
# Mock the instance method
with patch.object(gemini_llm, 'call') as mock_call:
mock_call.return_value = "Task completed with tools."
agent = Agent(
role="Tool User",
goal="Use tools to complete tasks",
backstory="You can use tools.",
llm=gemini_llm, # Use same instance
tools=[sample_tool]
)
task = Task(
description="Use the sample tool",
expected_output="Tool usage result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
crew.kickoff()
assert mock_call.called
call_args = mock_call.call_args
call_kwargs = call_args[1] if len(call_args) > 1 else {}
if 'tools' in call_kwargs:
assert call_kwargs['tools'] is not None
assert len(call_kwargs['tools']) > 0
def test_gemini_raises_error_when_model_not_supported():
"""Test that GeminiCompletion raises ValueError when model not supported"""
# Mock the Google client to raise an error
with patch('crewai.llms.providers.gemini.completion.genai') as mock_genai:
mock_client = MagicMock()
mock_genai.Client.return_value = mock_client
from google.genai.errors import ClientError # type: ignore
mock_response = MagicMock()
mock_response.body_segments = [{
'error': {
'code': 404,
'message': 'models/model-doesnt-exist is not found for API version v1beta, or is not supported for generateContent.',
'status': 'NOT_FOUND'
}
}]
mock_response.status_code = 404
mock_client.models.generate_content.side_effect = ClientError(404, mock_response)
llm = LLM(model="google/model-doesnt-exist")
with pytest.raises(Exception): # Should raise some error for unsupported model
llm.call("Hello")
def test_gemini_vertex_ai_setup():
"""
Test that Vertex AI configuration is properly handled
"""
with patch.dict(os.environ, {
"GOOGLE_CLOUD_PROJECT": "test-project",
"GOOGLE_CLOUD_LOCATION": "us-west1"
}):
llm = LLM(
model="google/gemini-2.0-flash-001",
project="test-project",
location="us-west1"
)
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm, GeminiCompletion)
assert llm.project == "test-project"
assert llm.location == "us-west1"
def test_gemini_api_key_configuration():
"""
Test that API key configuration works for both GOOGLE_API_KEY and GEMINI_API_KEY
"""
# Test with GOOGLE_API_KEY
with patch.dict(os.environ, {"GOOGLE_API_KEY": "test-google-key"}):
llm = LLM(model="google/gemini-2.0-flash-001")
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm, GeminiCompletion)
assert llm.api_key == "test-google-key"
# Test with GEMINI_API_KEY
with patch.dict(os.environ, {"GEMINI_API_KEY": "test-gemini-key"}, clear=True):
llm = LLM(model="google/gemini-2.0-flash-001")
assert isinstance(llm, GeminiCompletion)
assert llm.api_key == "test-gemini-key"
def test_gemini_model_capabilities():
"""
Test that model capabilities are correctly identified
"""
# Test Gemini 2.0 model
llm_2_0 = LLM(model="google/gemini-2.0-flash-001")
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm_2_0, GeminiCompletion)
assert llm_2_0.supports_tools == True
# Test Gemini 1.5 model
llm_1_5 = LLM(model="google/gemini-1.5-pro")
assert isinstance(llm_1_5, GeminiCompletion)
assert llm_1_5.supports_tools == True
def test_gemini_generation_config():
"""
Test that generation config is properly prepared
"""
llm = LLM(
model="google/gemini-2.0-flash-001",
temperature=0.7,
top_p=0.9,
top_k=40,
max_output_tokens=1000
)
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm, GeminiCompletion)
# Test config preparation
config = llm._prepare_generation_config()
# Verify config has the expected parameters
assert hasattr(config, 'temperature') or 'temperature' in str(config)
assert hasattr(config, 'top_p') or 'top_p' in str(config)
assert hasattr(config, 'top_k') or 'top_k' in str(config)
assert hasattr(config, 'max_output_tokens') or 'max_output_tokens' in str(config)
def test_gemini_model_detection():
"""
Test that various Gemini model formats are properly detected
"""
# Test Gemini model naming patterns that actually work with provider detection
gemini_test_cases = [
"google/gemini-2.0-flash-001",
"gemini/gemini-2.0-flash-001",
"google/gemini-1.5-pro",
"gemini/gemini-1.5-flash"
]
for model_name in gemini_test_cases:
llm = LLM(model=model_name)
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm, GeminiCompletion), f"Failed for model: {model_name}"
def test_gemini_supports_stop_words():
"""
Test that Gemini models support stop sequences
"""
llm = LLM(model="google/gemini-2.0-flash-001")
assert llm.supports_stop_words() == True
def test_gemini_context_window_size():
"""
Test that Gemini models return correct context window sizes
"""
# Test Gemini 2.0 Flash
llm_2_0 = LLM(model="google/gemini-2.0-flash-001")
context_size_2_0 = llm_2_0.get_context_window_size()
assert context_size_2_0 > 500000 # Should be substantial (1M tokens)
# Test Gemini 1.5 Pro
llm_1_5 = LLM(model="google/gemini-1.5-pro")
context_size_1_5 = llm_1_5.get_context_window_size()
assert context_size_1_5 > 1000000 # Should be very large (2M tokens)
def test_gemini_message_formatting():
"""
Test that messages are properly formatted for Gemini API
"""
llm = LLM(model="google/gemini-2.0-flash-001")
# Test message formatting
test_messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
{"role": "user", "content": "How are you?"}
]
formatted_contents, system_instruction = llm._format_messages_for_gemini(test_messages)
# System message should be extracted
assert system_instruction == "You are a helpful assistant."
# Remaining messages should be Content objects
assert len(formatted_contents) >= 3 # Should have user, model, user messages
# First content should be user role
assert formatted_contents[0].role == "user"
# Second should be model (converted from assistant)
assert formatted_contents[1].role == "model"
def test_gemini_streaming_parameter():
"""
Test that streaming parameter is properly handled
"""
# Test non-streaming
llm_no_stream = LLM(model="google/gemini-2.0-flash-001", stream=False)
assert llm_no_stream.stream == False
# Test streaming
llm_stream = LLM(model="google/gemini-2.0-flash-001", stream=True)
assert llm_stream.stream == True
def test_gemini_tool_conversion():
"""
Test that tools are properly converted to Gemini format
"""
llm = LLM(model="google/gemini-2.0-flash-001")
# Mock tool in CrewAI format
crewai_tools = [{
"type": "function",
"function": {
"name": "test_tool",
"description": "A test tool",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
}
}]
# Test tool conversion
gemini_tools = llm._convert_tools_for_interference(crewai_tools)
assert len(gemini_tools) == 1
# Gemini tools are Tool objects with function_declarations
assert hasattr(gemini_tools[0], 'function_declarations')
assert len(gemini_tools[0].function_declarations) == 1
func_decl = gemini_tools[0].function_declarations[0]
assert func_decl.name == "test_tool"
assert func_decl.description == "A test tool"
def test_gemini_environment_variable_api_key():
"""
Test that Google API key is properly loaded from environment
"""
with patch.dict(os.environ, {"GOOGLE_API_KEY": "test-google-key"}):
llm = LLM(model="google/gemini-2.0-flash-001")
assert llm.client is not None
assert hasattr(llm.client, 'models')
assert llm.api_key == "test-google-key"
@pytest.mark.vcr()
def test_gemini_token_usage_tracking():
"""
Test that token usage is properly tracked for Gemini responses
"""
llm = LLM(model="google/gemini-2.0-flash-001")
result = llm.call("Hello")
assert result.strip() == "Hi there! How can I help you today?"
usage = llm.get_token_usage_summary()
assert usage.successful_requests == 1
assert usage.prompt_tokens > 0
assert usage.completion_tokens > 0
assert usage.total_tokens > 0
@pytest.mark.vcr()
def test_gemini_tool_returning_float():
"""
Test that Gemini properly handles tools that return non-dict values like floats.
This is an end-to-end test that verifies the agent can use a tool that returns
a float (which gets wrapped in {"result": value} for Gemini's FunctionResponse).
"""
from pydantic import BaseModel, Field
from typing import Type
from crewai.tools import BaseTool
class SumNumbersToolInput(BaseModel):
a: float = Field(..., description="The first number to add")
b: float = Field(..., description="The second number to add")
class SumNumbersTool(BaseTool):
name: str = "sum_numbers"
description: str = "Add two numbers together and return the result"
args_schema: Type[BaseModel] = SumNumbersToolInput
def _run(self, a: float, b: float) -> float:
return a + b
sum_tool = SumNumbersTool()
agent = Agent(
role="Calculator",
goal="Calculate numbers accurately",
backstory="You are a calculator that adds numbers.",
llm=LLM(model="google/gemini-2.0-flash-001"),
tools=[sum_tool],
verbose=True,
)
task = Task(
description="What is 10000 + 20000? Use the sum_numbers tool to calculate this.",
expected_output="The sum of the two numbers",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=True)
result = crew.kickoff()
# The result should contain 30000 (the sum)
assert "30000" in result.raw
def test_gemini_stop_sequences_sync():
"""Test that stop and stop_sequences attributes stay synchronized."""
llm = LLM(model="google/gemini-2.0-flash-001")
# Test setting stop as a list
llm.stop = ["\nObservation:", "\nThought:"]
assert llm.stop_sequences == ["\nObservation:", "\nThought:"]
assert llm.stop == ["\nObservation:", "\nThought:"]
# Test setting stop as a string
llm.stop = "\nFinal Answer:"
assert llm.stop_sequences == ["\nFinal Answer:"]
assert llm.stop == ["\nFinal Answer:"]
# Test setting stop as None
llm.stop = None
assert llm.stop_sequences == []
assert llm.stop == []
def test_gemini_stop_sequences_sent_to_api():
"""Test that stop_sequences are properly sent to the Gemini API."""
llm = LLM(model="google/gemini-2.0-flash-001")
# Set stop sequences via the stop attribute (simulating CrewAgentExecutor)
llm.stop = ["\nObservation:", "\nThought:"]
# Patch the API call to capture parameters without making real call
with patch.object(llm.client.models, 'generate_content') as mock_generate:
mock_response = MagicMock()
mock_response.text = "Hello"
mock_response.candidates = []
mock_response.usage_metadata = MagicMock(
prompt_token_count=10,
candidates_token_count=5,
total_token_count=15
)
mock_generate.return_value = mock_response
llm.call("Say hello in one word")
# Verify stop_sequences were passed to the API in the config
call_kwargs = mock_generate.call_args[1]
assert "config" in call_kwargs
# The config object should have stop_sequences set
config = call_kwargs["config"]
# Check if the config has stop_sequences attribute
assert hasattr(config, 'stop_sequences') or 'stop_sequences' in config.__dict__
if hasattr(config, 'stop_sequences'):
assert config.stop_sequences == ["\nObservation:", "\nThought:"]
@pytest.mark.vcr()
@pytest.mark.skip(reason="VCR cannot replay SSE streaming responses")
def test_google_streaming_returns_usage_metrics():
"""
Test that Google Gemini streaming calls return proper token usage metrics.
"""
agent = Agent(
role="Research Assistant",
goal="Find information about the capital of Japan",
backstory="You are a helpful research assistant.",
llm=LLM(model="gemini/gemini-2.0-flash-exp", stream=True),
verbose=True,
)
task = Task(
description="What is the capital of Japan?",
expected_output="The capital of Japan",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result.token_usage is not None
assert result.token_usage.total_tokens > 0
assert result.token_usage.prompt_tokens > 0
assert result.token_usage.completion_tokens > 0
assert result.token_usage.successful_requests >= 1
@pytest.mark.vcr()
def test_google_express_mode_works() -> None:
"""
Test Google Vertex AI Express mode with API key authentication.
This tests Vertex AI Express mode (aiplatform.googleapis.com) with API key
authentication.
"""
with patch.dict(os.environ, {"GOOGLE_GENAI_USE_VERTEXAI": "true"}):
agent = Agent(
role="Research Assistant",
goal="Find information about the capital of Japan",
backstory="You are a helpful research assistant.",
llm=LLM(
model="gemini/gemini-2.0-flash-exp",
),
verbose=True,
)
task = Task(
description="What is the capital of Japan?",
expected_output="The capital of Japan",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result.token_usage is not None
assert result.token_usage.total_tokens > 0
assert result.token_usage.prompt_tokens > 0
assert result.token_usage.completion_tokens > 0
assert result.token_usage.successful_requests >= 1
def test_gemini_2_0_model_detection():
"""Test that Gemini 2.0 models are properly detected."""
# Test Gemini 2.0 models
llm_2_0 = LLM(model="google/gemini-2.0-flash-001")
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm_2_0, GeminiCompletion)
assert llm_2_0.is_gemini_2_0 is True
llm_2_5 = LLM(model="google/gemini-2.5-flash")
assert isinstance(llm_2_5, GeminiCompletion)
assert llm_2_5.is_gemini_2_0 is True
# Test non-2.0 models
llm_1_5 = LLM(model="google/gemini-1.5-pro")
assert isinstance(llm_1_5, GeminiCompletion)
assert llm_1_5.is_gemini_2_0 is False
def test_add_property_ordering_to_schema():
"""Test that _add_property_ordering correctly adds propertyOrdering to schemas."""
from crewai.llms.providers.gemini.completion import GeminiCompletion
# Test simple object schema
simple_schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"email": {"type": "string"}
}
}
result = GeminiCompletion._add_property_ordering(simple_schema)
assert "propertyOrdering" in result
assert result["propertyOrdering"] == ["name", "age", "email"]
# Test nested object schema
nested_schema = {
"type": "object",
"properties": {
"user": {
"type": "object",
"properties": {
"name": {"type": "string"},
"contact": {
"type": "object",
"properties": {
"email": {"type": "string"},
"phone": {"type": "string"}
}
}
}
},
"id": {"type": "integer"}
}
}
result = GeminiCompletion._add_property_ordering(nested_schema)
assert "propertyOrdering" in result
assert result["propertyOrdering"] == ["user", "id"]
assert "propertyOrdering" in result["properties"]["user"]
assert result["properties"]["user"]["propertyOrdering"] == ["name", "contact"]
assert "propertyOrdering" in result["properties"]["user"]["properties"]["contact"]
assert result["properties"]["user"]["properties"]["contact"]["propertyOrdering"] == ["email", "phone"]
def test_gemini_2_0_response_model_with_property_ordering():
"""Test that Gemini 2.0 models include propertyOrdering in response schemas."""
from pydantic import BaseModel, Field
class TestResponse(BaseModel):
"""Test response model."""
name: str = Field(..., description="The name")
age: int = Field(..., description="The age")
email: str = Field(..., description="The email")
llm = LLM(model="google/gemini-2.0-flash-001")
# Prepare generation config with response model
config = llm._prepare_generation_config(response_model=TestResponse)
# Verify that the config has response_json_schema
assert hasattr(config, 'response_json_schema') or 'response_json_schema' in config.__dict__
# Get the schema
if hasattr(config, 'response_json_schema'):
schema = config.response_json_schema
else:
schema = config.__dict__.get('response_json_schema', {})
# Verify propertyOrdering is present for Gemini 2.0
assert "propertyOrdering" in schema
assert "name" in schema["propertyOrdering"]
assert "age" in schema["propertyOrdering"]
assert "email" in schema["propertyOrdering"]
def test_gemini_1_5_response_model_uses_response_schema():
"""Test that Gemini 1.5 models use response_schema parameter (not response_json_schema)."""
from pydantic import BaseModel, Field
class TestResponse(BaseModel):
"""Test response model."""
name: str = Field(..., description="The name")
age: int = Field(..., description="The age")
llm = LLM(model="google/gemini-1.5-pro")
# Prepare generation config with response model
config = llm._prepare_generation_config(response_model=TestResponse)
# Verify that the config uses response_schema (not response_json_schema)
assert hasattr(config, 'response_schema') or 'response_schema' in config.__dict__
assert not (hasattr(config, 'response_json_schema') and config.response_json_schema is not None)
# Get the schema
if hasattr(config, 'response_schema'):
schema = config.response_schema
else:
schema = config.__dict__.get('response_schema')
# For Gemini 1.5, response_schema should be the Pydantic model itself
# The SDK handles conversion internally
assert schema is TestResponse or isinstance(schema, type)
# =============================================================================
# Agent Kickoff Structured Output Tests
# =============================================================================
@pytest.mark.vcr()
def test_gemini_agent_kickoff_structured_output_without_tools():
"""
Test that agent kickoff returns structured output without tools.
This tests native structured output handling for Gemini models.
"""
from pydantic import BaseModel, Field
class AnalysisResult(BaseModel):
"""Structured output for analysis results."""
topic: str = Field(description="The topic analyzed")
key_points: list[str] = Field(description="Key insights from the analysis")
summary: str = Field(description="Brief summary of findings")
agent = Agent(
role="Analyst",
goal="Provide structured analysis on topics",
backstory="You are an expert analyst who provides clear, structured insights.",
llm=LLM(model="google/gemini-2.5-flash"),
tools=[],
verbose=True,
)
result = agent.kickoff(
messages="Analyze the benefits of remote work briefly. Keep it concise.",
response_format=AnalysisResult,
)
assert result.pydantic is not None, "Expected pydantic output but got None"
assert isinstance(result.pydantic, AnalysisResult), f"Expected AnalysisResult but got {type(result.pydantic)}"
assert result.pydantic.topic, "Topic should not be empty"
assert len(result.pydantic.key_points) > 0, "Should have at least one key point"
assert result.pydantic.summary, "Summary should not be empty"
@pytest.mark.vcr()
def test_gemini_agent_kickoff_structured_output_with_tools():
"""
Test that agent kickoff returns structured output after using tools.
This tests post-tool-call structured output handling for Gemini models.
"""
from pydantic import BaseModel, Field
from crewai.tools import tool
class CalculationResult(BaseModel):
"""Structured output for calculation results."""
operation: str = Field(description="The mathematical operation performed")
result: int = Field(description="The result of the calculation")
explanation: str = Field(description="Brief explanation of the calculation")
@tool
def add_numbers(a: int, b: int) -> int:
"""Add two numbers together and return the sum."""
return a + b
agent = Agent(
role="Calculator",
goal="Perform calculations using available tools",
backstory="You are a calculator assistant that uses tools to compute results.",
llm=LLM(model="google/gemini-2.5-flash"),
tools=[add_numbers],
verbose=True,
)
result = agent.kickoff(
messages="Calculate 15 + 27 using your add_numbers tool. Report the result.",
response_format=CalculationResult,
)
assert result.pydantic is not None, "Expected pydantic output but got None"
assert isinstance(result.pydantic, CalculationResult), f"Expected CalculationResult but got {type(result.pydantic)}"
assert result.pydantic.result == 42, f"Expected result 42 but got {result.pydantic.result}"
assert result.pydantic.operation, "Operation should not be empty"
assert result.pydantic.explanation, "Explanation should not be empty"
@pytest.mark.vcr()
def test_gemini_crew_structured_output_with_tools():
"""
Test that a crew with Gemini can use both tools and output_pydantic on a task.
"""
from pydantic import BaseModel, Field
from crewai.tools import tool
class CalculationResult(BaseModel):
operation: str = Field(description="The mathematical operation performed")
result: int = Field(description="The result of the calculation")
explanation: str = Field(description="Brief explanation of the calculation")
@tool
def add_numbers(a: int, b: int) -> int:
"""Add two numbers together and return the sum."""
return a + b
agent = Agent(
role="Calculator",
goal="Perform calculations using available tools",
backstory="You are a calculator assistant that uses tools to compute results.",
llm=LLM(model="google/gemini-2.0-flash-001"),
tools=[add_numbers],
)
task = Task(
description="Calculate 15 + 27 using your add_numbers tool. Report the result.",
expected_output="A structured calculation result",
output_pydantic=CalculationResult,
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result.pydantic is not None, "Expected pydantic output but got None"
assert isinstance(result.pydantic, CalculationResult)
assert result.pydantic.result == 42, f"Expected 42 but got {result.pydantic.result}"
def test_gemini_stop_words_not_applied_to_structured_output():
"""
Test that stop words are NOT applied when response_model is provided.
This ensures JSON responses containing stop word patterns (like "Observation:")
are not truncated, which would cause JSON validation to fail.
"""
from pydantic import BaseModel, Field
from crewai.llms.providers.gemini.completion import GeminiCompletion
class ResearchResult(BaseModel):
"""Research result that may contain stop word patterns in string fields."""
finding: str = Field(description="The research finding")
observation: str = Field(description="Observation about the finding")
# Create Gemini completion instance with stop words configured
# Gemini uses stop_sequences instead of stop
llm = GeminiCompletion(
model="gemini-2.0-flash-001",
stop_sequences=["Observation:", "Final Answer:"], # Common stop words
)
# JSON response that contains a stop word pattern in a string field
# Without the fix, this would be truncated at "Observation:" breaking the JSON
json_response = '{"finding": "The data shows growth", "observation": "Observation: This confirms the hypothesis"}'
# Test the _validate_structured_output method which is used for structured output handling
result = llm._validate_structured_output(json_response, ResearchResult)
# Should successfully parse the full JSON without truncation
assert isinstance(result, ResearchResult)
assert result.finding == "The data shows growth"
# The observation field should contain the full text including "Observation:"
assert "Observation:" in result.observation
def test_gemini_stop_words_still_applied_to_regular_responses():
"""
Test that stop words ARE still applied for regular (non-structured) responses.
This ensures the fix didn't break normal stop word behavior.
"""
from crewai.llms.providers.gemini.completion import GeminiCompletion
# Create Gemini completion instance with stop words configured
# Gemini uses stop_sequences instead of stop
llm = GeminiCompletion(
model="gemini-2.0-flash-001",
stop_sequences=["Observation:", "Final Answer:"],
)
# Response that contains a stop word - should be truncated
response_with_stop_word = "I need to search for more information.\n\nAction: search\nObservation: Found results"
# Test the _apply_stop_words method directly
result = llm._apply_stop_words(response_with_stop_word)
# Response should be truncated at the stop word
assert "Observation:" not in result
assert "Found results" not in result
assert "I need to search for more information" in result
def test_gemini_structured_output_preserves_json_with_stop_word_patterns():
"""
Test that structured output validation preserves JSON content
even when string fields contain stop word patterns.
"""
from pydantic import BaseModel, Field
from crewai.llms.providers.gemini.completion import GeminiCompletion
class AgentObservation(BaseModel):
"""Model with fields that might contain stop word-like text."""
action_taken: str = Field(description="What action was taken")
observation_result: str = Field(description="The observation result")
final_answer: str = Field(description="The final answer")
# Gemini uses stop_sequences instead of stop
llm = GeminiCompletion(
model="gemini-2.0-flash-001",
stop_sequences=["Observation:", "Final Answer:", "Action:"],
)
# JSON that contains all the stop word patterns as part of the content
json_with_stop_patterns = '''{
"action_taken": "Action: Searched the database",
"observation_result": "Observation: Found 5 relevant results",
"final_answer": "Final Answer: The data shows positive growth"
}'''
# Test the _validate_structured_output method - this should NOT truncate
# since it's structured output
result = llm._validate_structured_output(json_with_stop_patterns, AgentObservation)
assert isinstance(result, AgentObservation)
assert "Action:" in result.action_taken
assert "Observation:" in result.observation_result
assert "Final Answer:" in result.final_answer
@pytest.mark.vcr()
def test_gemini_cached_prompt_tokens():
"""
Test that Gemini correctly extracts and tracks cached_prompt_tokens
from cached_content_token_count in the usage metadata.
Sends two calls with the same large prompt to trigger caching.
"""
padding = "This is padding text to ensure the prompt is large enough for caching. " * 80
system_msg = f"You are a helpful assistant. {padding}"
llm = LLM(model="google/gemini-2.5-flash")
# First call
llm.call([
{"role": "system", "content": system_msg},
{"role": "user", "content": "Say hello in one word."},
])
# Second call: same system prompt
llm.call([
{"role": "system", "content": system_msg},
{"role": "user", "content": "Say goodbye in one word."},
])
usage = llm.get_token_usage_summary()
assert usage.total_tokens > 0
assert usage.prompt_tokens > 0
assert usage.completion_tokens > 0
assert usage.successful_requests == 2
# cached_prompt_tokens should be populated (may be 0 if Gemini
# doesn't cache for this particular request, but the field should exist)
assert usage.cached_prompt_tokens >= 0
@pytest.mark.vcr()
def test_gemini_cached_prompt_tokens_with_tools():
"""
Test that Gemini correctly tracks cached_prompt_tokens when tools are used.
The large system prompt should be cached across tool-calling requests.
"""
padding = "This is padding text to ensure the prompt is large enough for caching. " * 80
system_msg = f"You are a helpful assistant that uses tools. {padding}"
def get_weather(location: str) -> str:
return f"The weather in {location} is sunny and 72°F"
tools = [
{
"name": "get_weather",
"description": "Get the current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city name"
}
},
"required": ["location"],
},
}
]
llm = LLM(model="google/gemini-2.5-flash")
# First call with tool
llm.call(
[
{"role": "system", "content": system_msg},
{"role": "user", "content": "What is the weather in Tokyo?"},
],
tools=tools,
available_functions={"get_weather": get_weather},
)
# Second call with same system prompt + tools
llm.call(
[
{"role": "system", "content": system_msg},
{"role": "user", "content": "What is the weather in Paris?"},
],
tools=tools,
available_functions={"get_weather": get_weather},
)
usage = llm.get_token_usage_summary()
assert usage.total_tokens > 0
assert usage.prompt_tokens > 0
assert usage.successful_requests == 2
# cached_prompt_tokens should be populated (may be 0 if Gemini
# doesn't cache for this particular request, but the field should exist)
assert usage.cached_prompt_tokens >= 0