Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter

This commit is contained in:
Lorenze Jay
2025-02-11 14:33:08 -08:00
47 changed files with 1692 additions and 351 deletions

View File

@@ -1183,7 +1183,7 @@ def test_agent_max_retry_limit():
[
mock.call(
{
"input": "Say the word: Hi\n\nThis is the expect criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"input": "Say the word: Hi\n\nThis is the expected criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"tool_names": "",
"tools": "",
"ask_for_human_input": True,
@@ -1191,7 +1191,7 @@ def test_agent_max_retry_limit():
),
mock.call(
{
"input": "Say the word: Hi\n\nThis is the expect criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"input": "Say the word: Hi\n\nThis is the expected criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.",
"tool_names": "",
"tools": "",
"ask_for_human_input": True,

View File

@@ -55,72 +55,83 @@ def test_train_invalid_string_iterations(train_crew, runner):
)
@mock.patch("crewai.cli.reset_memories_command.ShortTermMemory")
@mock.patch("crewai.cli.reset_memories_command.EntityMemory")
@mock.patch("crewai.cli.reset_memories_command.LongTermMemory")
@mock.patch("crewai.cli.reset_memories_command.TaskOutputStorageHandler")
def test_reset_all_memories(
MockTaskOutputStorageHandler,
MockLongTermMemory,
MockEntityMemory,
MockShortTermMemory,
runner,
):
result = runner.invoke(reset_memories, ["--all"])
MockShortTermMemory().reset.assert_called_once()
MockEntityMemory().reset.assert_called_once()
MockLongTermMemory().reset.assert_called_once()
MockTaskOutputStorageHandler().reset.assert_called_once()
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_all_memories(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-a"])
mock_crew.reset_memories.assert_called_once_with(command_type="all")
assert result.output == "All memories have been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.ShortTermMemory")
def test_reset_short_term_memories(MockShortTermMemory, runner):
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_short_term_memories(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-s"])
MockShortTermMemory().reset.assert_called_once()
mock_crew.reset_memories.assert_called_once_with(command_type="short")
assert result.output == "Short term memory has been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.EntityMemory")
def test_reset_entity_memories(MockEntityMemory, runner):
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_entity_memories(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-e"])
MockEntityMemory().reset.assert_called_once()
mock_crew.reset_memories.assert_called_once_with(command_type="entity")
assert result.output == "Entity memory has been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.LongTermMemory")
def test_reset_long_term_memories(MockLongTermMemory, runner):
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_long_term_memories(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-l"])
MockLongTermMemory().reset.assert_called_once()
mock_crew.reset_memories.assert_called_once_with(command_type="long")
assert result.output == "Long term memory has been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.TaskOutputStorageHandler")
def test_reset_kickoff_outputs(MockTaskOutputStorageHandler, runner):
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_kickoff_outputs(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-k"])
MockTaskOutputStorageHandler().reset.assert_called_once()
mock_crew.reset_memories.assert_called_once_with(command_type="kickoff_outputs")
assert result.output == "Latest Kickoff outputs stored has been reset.\n"
@mock.patch("crewai.cli.reset_memories_command.ShortTermMemory")
@mock.patch("crewai.cli.reset_memories_command.LongTermMemory")
def test_reset_multiple_memory_flags(MockShortTermMemory, MockLongTermMemory, runner):
result = runner.invoke(
reset_memories,
[
"-s",
"-l",
],
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_multiple_memory_flags(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["-s", "-l"])
# Check that reset_memories was called twice with the correct arguments
assert mock_crew.reset_memories.call_count == 2
mock_crew.reset_memories.assert_has_calls(
[mock.call(command_type="long"), mock.call(command_type="short")]
)
MockShortTermMemory().reset.assert_called_once()
MockLongTermMemory().reset.assert_called_once()
assert (
result.output
== "Long term memory has been reset.\nShort term memory has been reset.\n"
)
@mock.patch("crewai.cli.reset_memories_command.get_crew")
def test_reset_knowledge(mock_get_crew, runner):
mock_crew = mock.Mock()
mock_get_crew.return_value = mock_crew
result = runner.invoke(reset_memories, ["--knowledge"])
mock_crew.reset_memories.assert_called_once_with(command_type="knowledge")
assert result.output == "Knowledge has been reset.\n"
def test_reset_no_memory_flags(runner):
result = runner.invoke(
reset_memories,

View File

@@ -2,7 +2,7 @@ research_task:
description: >
Conduct a thorough research about {topic}
Make sure you find any interesting and relevant information given
the current year is 2024.
the current year is 2025.
expected_output: >
A list with 10 bullet points of the most relevant information about {topic}
agent: researcher

View File

@@ -49,6 +49,41 @@ writer = Agent(
)
def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True
conditional1 = ConditionalTask(
description="Conditional task 1",
expected_output="Output 1",
agent=researcher,
condition=condition_func,
)
conditional2 = ConditionalTask(
description="Conditional task 2",
expected_output="Output 2",
agent=researcher,
condition=condition_func,
)
conditional3 = ConditionalTask(
description="Conditional task 3",
expected_output="Output 3",
agent=researcher,
condition=condition_func,
)
with pytest.raises(
pydantic_core._pydantic_core.ValidationError,
match="Crew must include at least one non-conditional task",
):
Crew(
agents=[researcher],
tasks=[conditional1, conditional2, conditional3],
)
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -556,12 +591,12 @@ def test_crew_with_delegating_agents_should_not_override_task_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(isinstance(tool, TestTool) for tool in tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
assert any(
isinstance(tool, TestTool) for tool in tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -620,12 +655,12 @@ def test_crew_with_delegating_agents_should_not_override_agent_tools():
_, kwargs = mock_execute_sync.call_args
tools = kwargs["tools"]
assert any(isinstance(tool, TestTool) for tool in new_ceo.tools), (
"TestTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in tools), (
"Delegation tool should be present"
)
assert any(
isinstance(tool, TestTool) for tool in new_ceo.tools
), "TestTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in tools
), "Delegation tool should be present"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -749,17 +784,17 @@ def test_task_tools_override_agent_tools_with_allow_delegation():
used_tools = kwargs["tools"]
# Confirm AnotherTestTool is present but TestTool is not
assert any(isinstance(tool, AnotherTestTool) for tool in used_tools), (
"AnotherTestTool should be present"
)
assert not any(isinstance(tool, TestTool) for tool in used_tools), (
"TestTool should not be present among used tools"
)
assert any(
isinstance(tool, AnotherTestTool) for tool in used_tools
), "AnotherTestTool should be present"
assert not any(
isinstance(tool, TestTool) for tool in used_tools
), "TestTool should not be present among used tools"
# Confirm delegation tool(s) are present
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
# Finally, make sure the agent's original tools remain unchanged
assert len(researcher_with_delegation.tools) == 1
@@ -1560,9 +1595,9 @@ def test_code_execution_flag_adds_code_tool_upon_kickoff():
# Verify that exactly one tool was used and it was a CodeInterpreterTool
assert len(used_tools) == 1, "Should have exactly one tool"
assert isinstance(used_tools[0], CodeInterpreterTool), (
"Tool should be CodeInterpreterTool"
)
assert isinstance(
used_tools[0], CodeInterpreterTool
), "Tool should be CodeInterpreterTool"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -1917,6 +1952,78 @@ def test_task_callback_on_crew():
assert isinstance(args[0], TaskOutput)
def test_task_callback_both_on_task_and_crew():
from unittest.mock import MagicMock, patch
mock_callback_on_task = MagicMock()
mock_callback_on_crew = MagicMock()
researcher_agent = Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
allow_delegation=False,
)
list_ideas = Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 important events.",
agent=researcher_agent,
async_execution=True,
callback=mock_callback_on_task,
)
crew = Crew(
agents=[researcher_agent],
process=Process.sequential,
tasks=[list_ideas],
task_callback=mock_callback_on_crew,
)
with patch.object(Agent, "execute_task") as execute:
execute.return_value = "ok"
crew.kickoff()
assert list_ideas.callback is not None
mock_callback_on_task.assert_called_once_with(list_ideas.output)
mock_callback_on_crew.assert_called_once_with(list_ideas.output)
def test_task_same_callback_both_on_task_and_crew():
from unittest.mock import MagicMock, patch
mock_callback = MagicMock()
researcher_agent = Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
allow_delegation=False,
)
list_ideas = Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 important events.",
agent=researcher_agent,
async_execution=True,
callback=mock_callback,
)
crew = Crew(
agents=[researcher_agent],
process=Process.sequential,
tasks=[list_ideas],
task_callback=mock_callback,
)
with patch.object(Agent, "execute_task") as execute:
execute.return_value = "ok"
crew.kickoff()
assert list_ideas.callback is not None
mock_callback.assert_called_once_with(list_ideas.output)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_tools_with_custom_caching():
from unittest.mock import patch
@@ -1989,6 +2096,210 @@ def test_tools_with_custom_caching():
assert result.raw == "3"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_task_uses_last_output():
"""Test that conditional tasks use the last task output for condition evaluation."""
task1 = Task(
description="First task",
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
agent=writer,
condition=condition_succeeds,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, conditional_task1, conditional_task2],
)
# Mock outputs for tasks
mock_first = TaskOutput(
description="First task output",
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection:
# First executed task output, followed by an automatically generated (skipped) output, then the conditional execution
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "First success output"
) # First task succeeded
assert (
result.tasks_output[1].raw == ""
) # Second task skipped (condition failed)
assert (
result.tasks_output[2].raw == "Third task executed"
) # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
"""Test that task outputs are properly collected based on execution status."""
task1 = Task(
description="Normal task that always executes",
expected_output="First output",
agent=researcher,
)
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
agent=writer,
condition=condition_always_met,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock outputs for different execution paths
mock_success = TaskOutput(
description="Success output",
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection:
# There should be three outputs: normal task, skipped conditional task (empty output),
# and the conditional task that executed.
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
# Verify task output collection
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
"""Test that having multiple conditional tasks in sequence works correctly."""
task1 = Task(
description="Initial research task",
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
agent=writer,
condition=condition2,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock different task outputs to test conditional logic
mock_success = TaskOutput(
description="Mock success",
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
# Verify all tasks were executed (no IndexError)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
@@ -3107,9 +3418,9 @@ def test_fetch_inputs():
expected_placeholders = {"role_detail", "topic", "field"}
actual_placeholders = crew.fetch_inputs()
assert actual_placeholders == expected_placeholders, (
f"Expected {expected_placeholders}, but got {actual_placeholders}"
)
assert (
actual_placeholders == expected_placeholders
), f"Expected {expected_placeholders}, but got {actual_placeholders}"
def test_task_tools_preserve_code_execution_tools():
@@ -3182,20 +3493,20 @@ def test_task_tools_preserve_code_execution_tools():
used_tools = kwargs["tools"]
# Verify all expected tools are present
assert any(isinstance(tool, TestTool) for tool in used_tools), (
"Task's TestTool should be present"
)
assert any(isinstance(tool, CodeInterpreterTool) for tool in used_tools), (
"CodeInterpreterTool should be present"
)
assert any("delegate" in tool.name.lower() for tool in used_tools), (
"Delegation tool should be present"
)
assert any(
isinstance(tool, TestTool) for tool in used_tools
), "Task's TestTool should be present"
assert any(
isinstance(tool, CodeInterpreterTool) for tool in used_tools
), "CodeInterpreterTool should be present"
assert any(
"delegate" in tool.name.lower() for tool in used_tools
), "Delegation tool should be present"
# Verify the total number of tools (TestTool + CodeInterpreter + 2 delegation tools)
assert len(used_tools) == 4, (
"Should have TestTool, CodeInterpreter, and 2 delegation tools"
)
assert (
len(used_tools) == 4
), "Should have TestTool, CodeInterpreter, and 2 delegation tools"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -3239,9 +3550,9 @@ def test_multimodal_flag_adds_multimodal_tools():
used_tools = kwargs["tools"]
# Check that the multimodal tool was added
assert any(isinstance(tool, AddImageTool) for tool in used_tools), (
"AddImageTool should be present when agent is multimodal"
)
assert any(
isinstance(tool, AddImageTool) for tool in used_tools
), "AddImageTool should be present when agent is multimodal"
# Verify we have exactly one tool (just the AddImageTool)
assert len(used_tools) == 1, "Should only have the AddImageTool"
@@ -3467,9 +3778,9 @@ def test_crew_guardrail_feedback_in_context():
assert len(execution_contexts) > 1, "Task should have been executed multiple times"
# Verify that the second execution included the guardrail feedback
assert "Output must contain the keyword 'IMPORTANT'" in execution_contexts[1], (
"Guardrail feedback should be included in retry context"
)
assert (
"Output must contain the keyword 'IMPORTANT'" in execution_contexts[1]
), "Guardrail feedback should be included in retry context"
# Verify final output meets guardrail requirements
assert "IMPORTANT" in result.raw, "Final output should contain required keyword"

View File

@@ -3,6 +3,7 @@ from time import sleep
from unittest.mock import MagicMock, patch
import pytest
from pydantic import BaseModel
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.llm import LLM
@@ -205,6 +206,52 @@ def test_llm_passes_additional_params():
assert result == "Test response"
def test_get_custom_llm_provider_openrouter():
llm = LLM(model="openrouter/deepseek/deepseek-chat")
assert llm._get_custom_llm_provider() == "openrouter"
def test_get_custom_llm_provider_gemini():
llm = LLM(model="gemini/gemini-1.5-pro")
assert llm._get_custom_llm_provider() == "gemini"
def test_get_custom_llm_provider_openai():
llm = LLM(model="gpt-4")
assert llm._get_custom_llm_provider() == "openai"
def test_validate_call_params_supported():
class DummyResponse(BaseModel):
a: int
# Patch supports_response_schema to simulate a supported model.
with patch("crewai.llm.supports_response_schema", return_value=True):
llm = LLM(
model="openrouter/deepseek/deepseek-chat", response_format=DummyResponse
)
# Should not raise any error.
llm._validate_call_params()
def test_validate_call_params_not_supported():
class DummyResponse(BaseModel):
a: int
# Patch supports_response_schema to simulate an unsupported model.
with patch("crewai.llm.supports_response_schema", return_value=False):
llm = LLM(model="gemini/gemini-1.5-pro", response_format=DummyResponse)
with pytest.raises(ValueError) as excinfo:
llm._validate_call_params()
assert "does not support response_format" in str(excinfo.value)
def test_validate_call_params_no_response_format():
# When no response_format is provided, no validation error should occur.
llm = LLM(model="gemini/gemini-1.5-pro", response_format=None)
llm._validate_call_params()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_o3_mini_reasoning_effort_high():
llm = LLM(
@@ -239,6 +286,79 @@ def test_o3_mini_reasoning_effort_medium():
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.fixture
def anthropic_llm():
"""Fixture providing an Anthropic LLM instance."""
return LLM(model="anthropic/claude-3-sonnet")
@pytest.fixture
def system_message():
"""Fixture providing a system message."""
return {"role": "system", "content": "test"}
@pytest.fixture
def user_message():
"""Fixture providing a user message."""
return {"role": "user", "content": "test"}
def test_anthropic_message_formatting_edge_cases(anthropic_llm):
"""Test edge cases for Anthropic message formatting."""
# Test None messages
with pytest.raises(TypeError, match="Messages cannot be None"):
anthropic_llm._format_messages_for_provider(None)
# Test empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test invalid message format
with pytest.raises(TypeError, match="Invalid message format"):
anthropic_llm._format_messages_for_provider([{"invalid": "message"}])
def test_anthropic_model_detection():
"""Test Anthropic model detection with various formats."""
models = [
("anthropic/claude-3", True),
("claude-instant", True),
("claude/v1", True),
("gpt-4", False),
("", False),
("anthropomorphic", False), # Should not match partial words
]
for model, expected in models:
llm = LLM(model=model)
assert llm.is_anthropic == expected, f"Failed for model: {model}"
def test_anthropic_message_formatting(anthropic_llm, system_message, user_message):
"""Test Anthropic message formatting with fixtures."""
# Test when first message is system
formatted = anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 2
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
assert formatted[1] == system_message
# Test when first message is already user
formatted = anthropic_llm._format_messages_for_provider([user_message])
assert len(formatted) == 1
assert formatted[0] == user_message
# Test with empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test with non-Anthropic model (should not modify messages)
non_anthropic_llm = LLM(model="gpt-4")
formatted = non_anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 1
assert formatted[0] == system_message
def test_deepseek_r1_with_open_router():
if not os.getenv("OPEN_ROUTER_API_KEY"):
pytest.skip("OPEN_ROUTER_API_KEY not set; skipping test.")

View File

@@ -723,14 +723,14 @@ def test_interpolate_inputs():
)
task.interpolate_inputs_and_add_conversation_history(
inputs={"topic": "AI", "date": "2024"}
inputs={"topic": "AI", "date": "2025"}
)
assert (
task.description
== "Give me a list of 5 interesting ideas about AI to explore for an article, what makes them unique and interesting."
)
assert task.expected_output == "Bullet point list of 5 interesting ideas about AI."
assert task.output_file == "/tmp/AI/output_2024.txt"
assert task.output_file == "/tmp/AI/output_2025.txt"
task.interpolate_inputs_and_add_conversation_history(
inputs={"topic": "ML", "date": "2025"}