Merge branch 'main' into main

This commit is contained in:
Vardaan Grover
2025-02-10 13:46:30 +05:30
committed by GitHub
10 changed files with 545 additions and 77 deletions

View File

@@ -1,10 +1,18 @@
<div align="center">
![Logo of CrewAI, two people rowing on a boat](./docs/crewai_logo.png)
![Logo of CrewAI](./docs/crewai_logo.png)
# **CrewAI**
🤖 **CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
**CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
**CrewAI Enterprise**
Want to plan, build (+ no code), deploy, monitor and interare your agents: [CrewAI Enterprise](https://www.crewai.com/enterprise). Designed for complex, real-world applications, our enterprise solution offers:
- **Seamless Integrations**
- **Scalable & Secure Deployment**
- **Actionable Insights**
- **24/7 Support**
<h3>
@@ -392,7 +400,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
goal="Gather and validate supporting market data",
backstory="You excel at finding and correlating multiple data sources"
)
analysis_task = Task(
description="Analyze {sector} sector data for the past {timeframe}",
expected_output="Detailed market analysis with confidence score",
@@ -403,7 +411,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
expected_output="Corroborating evidence and potential contradictions",
agent=researcher
)
# Demonstrate crew autonomy
analysis_crew = Crew(
agents=[analyst, researcher],

View File

@@ -91,7 +91,7 @@ result = crew.kickoff(inputs={"question": "What city does John live in and how o
```
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including TXT, PDF, DOCX, HTML, and more.
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including MD, PDF, DOCX, HTML, and more.
<Note>
You need to install `docling` for the following example to work: `uv add docling`
@@ -152,10 +152,10 @@ Here are examples of how to use different types of knowledge sources:
### Text File Knowledge Source
```python
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
# Create a text file knowledge source
text_source = CrewDoclingSource(
text_source = TextFileKnowledgeSource(
file_paths=["document.txt", "another.txt"]
)

View File

@@ -58,41 +58,107 @@ my_crew = Crew(
### Example: Use Custom Memory Instances e.g FAISS as the VectorDB
```python Code
from crewai import Crew, Agent, Task, Process
from crewai import Crew, Process
from crewai.memory import LongTermMemory, ShortTermMemory, EntityMemory
from crewai.memory.storage import LTMSQLiteStorage, RAGStorage
from typing import List, Optional
# Assemble your crew with memory capabilities
my_crew = Crew(
agents=[...],
tasks=[...],
process="Process.sequential",
memory=True,
long_term_memory=EnhanceLongTermMemory(
my_crew: Crew = Crew(
agents = [...],
tasks = [...],
process = Process.sequential,
memory = True,
# Long-term memory for persistent storage across sessions
long_term_memory = LongTermMemory(
storage=LTMSQLiteStorage(
db_path="/my_data_dir/my_crew1/long_term_memory_storage.db"
db_path="/my_crew1/long_term_memory_storage.db"
)
),
short_term_memory=EnhanceShortTermMemory(
storage=CustomRAGStorage(
crew_name="my_crew",
storage_type="short_term",
data_dir="//my_data_dir",
model=embedder["model"],
dimension=embedder["dimension"],
# Short-term memory for current context using RAG
short_term_memory = ShortTermMemory(
storage = RAGStorage(
embedder_config={
"provider": "openai",
"config": {
"model": 'text-embedding-3-small'
}
},
type="short_term",
path="/my_crew1/"
)
),
),
entity_memory=EnhanceEntityMemory(
storage=CustomRAGStorage(
crew_name="my_crew",
storage_type="entities",
data_dir="//my_data_dir",
model=embedder["model"],
dimension=embedder["dimension"],
),
# Entity memory for tracking key information about entities
entity_memory = EntityMemory(
storage=RAGStorage(
embedder_config={
"provider": "openai",
"config": {
"model": 'text-embedding-3-small'
}
},
type="short_term",
path="/my_crew1/"
)
),
verbose=True,
)
```
## Security Considerations
When configuring memory storage:
- Use environment variables for storage paths (e.g., `CREWAI_STORAGE_DIR`)
- Never hardcode sensitive information like database credentials
- Consider access permissions for storage directories
- Use relative paths when possible to maintain portability
Example using environment variables:
```python
import os
from crewai import Crew
from crewai.memory import LongTermMemory
from crewai.memory.storage import LTMSQLiteStorage
# Configure storage path using environment variable
storage_path = os.getenv("CREWAI_STORAGE_DIR", "./storage")
crew = Crew(
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(
db_path="{storage_path}/memory.db".format(storage_path=storage_path)
)
)
)
```
## Configuration Examples
### Basic Memory Configuration
```python
from crewai import Crew
from crewai.memory import LongTermMemory
# Simple memory configuration
crew = Crew(memory=True) # Uses default storage locations
```
### Custom Storage Configuration
```python
from crewai import Crew
from crewai.memory import LongTermMemory
from crewai.memory.storage import LTMSQLiteStorage
# Configure custom storage paths
crew = Crew(
memory=True,
long_term_memory=LongTermMemory(
storage=LTMSQLiteStorage(db_path="./memory.db")
)
)
```
## Integrating Mem0 for Enhanced User Memory
[Mem0](https://mem0.ai/) is a self-improving memory layer for LLM applications, enabling personalized AI experiences.

View File

@@ -268,7 +268,7 @@ analysis_task = Task(
Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
efeedback to agents when their output doesn't meet specific criteria.
feedback to agents when their output doesn't meet specific criteria.
### Using Task Guardrails

View File

@@ -8,9 +8,9 @@ icon: file-pen
## Description
The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files.
The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files with cross-platform compatibility (Windows, Linux, macOS).
It is particularly useful in scenarios such as generating reports, saving logs, creating configuration files, and more.
This tool supports creating new directories if they don't exist, making it easier to organize your output.
This tool handles path differences across operating systems, supports UTF-8 encoding, and automatically creates directories if they don't exist, making it easier to organize your output reliably across different platforms.
## Installation
@@ -43,6 +43,8 @@ print(result)
## Conclusion
By integrating the `FileWriterTool` into your crews, the agents can execute the process of writing content to files and creating directories.
This tool is essential for tasks that require saving output data, creating structured file systems, and more. By adhering to the setup and usage guidelines provided,
incorporating this tool into projects is straightforward and efficient.
By integrating the `FileWriterTool` into your crews, the agents can reliably write content to files across different operating systems.
This tool is essential for tasks that require saving output data, creating structured file systems, and handling cross-platform file operations.
It's particularly recommended for Windows users who may encounter file writing issues with standard Python file operations.
By adhering to the setup and usage guidelines provided, incorporating this tool into projects is straightforward and ensures consistent file writing behavior across all platforms.

View File

@@ -382,6 +382,22 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def validate_must_have_non_conditional_task(self) -> "Crew":
"""Ensure that a crew has at least one non-conditional task."""
if not self.tasks:
return self
non_conditional_count = sum(
1 for task in self.tasks if not isinstance(task, ConditionalTask)
)
if non_conditional_count == 0:
raise PydanticCustomError(
"only_conditional_tasks",
"Crew must include at least one non-conditional task",
{},
)
return self
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
@@ -441,6 +457,8 @@ class Crew(BaseModel):
)
return self
@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
@@ -742,6 +760,7 @@ class Crew(BaseModel):
task, task_outputs, futures, task_index, was_replayed
)
if skipped_task_output:
task_outputs.append(skipped_task_output)
continue
if task.async_execution:
@@ -765,7 +784,7 @@ class Crew(BaseModel):
context=context,
tools=tools_for_task,
)
task_outputs = [task_output]
task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
@@ -786,7 +805,7 @@ class Crew(BaseModel):
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
previous_output = task_outputs[task_index - 1] if task_outputs else None
previous_output = task_outputs[-1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
@@ -971,11 +990,15 @@ class Crew(BaseModel):
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if len(task_outputs) != 1:
raise ValueError(
"Something went wrong. Kickoff should return only one task output."
)
final_task_output = task_outputs[0]
if not task_outputs:
raise ValueError("No task outputs available to create crew output.")
# Filter out empty outputs and get the last valid one as the main output
valid_outputs = [t for t in task_outputs if t.raw]
if not valid_outputs:
raise ValueError("No valid task outputs available to create crew output.")
final_task_output = valid_outputs[-1]
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
@@ -984,7 +1007,7 @@ class Crew(BaseModel):
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=[task.output for task in self.tasks if task.output],
tasks_output=task_outputs,
token_usage=token_usage,
)

View File

@@ -164,6 +164,7 @@ class LLM:
self.context_window_size = 0
self.reasoning_effort = reasoning_effort
self.additional_params = kwargs
self.is_anthropic = self._is_anthropic_model(model)
litellm.drop_params = True
@@ -178,42 +179,62 @@ class LLM:
self.set_callbacks(callbacks)
self.set_env_callbacks()
def _is_anthropic_model(self, model: str) -> bool:
"""Determine if the model is from Anthropic provider.
Args:
model: The model identifier string.
Returns:
bool: True if the model is from Anthropic, False otherwise.
"""
ANTHROPIC_PREFIXES = ('anthropic/', 'claude-', 'claude/')
return any(prefix in model.lower() for prefix in ANTHROPIC_PREFIXES)
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> str:
"""
High-level llm call method that:
1) Accepts either a string or a list of messages
2) Converts string input to the required message format
3) Calls litellm.completion
4) Handles function/tool calls if any
5) Returns the final text response or tool result
Parameters:
- messages (Union[str, List[Dict[str, str]]]): The input messages for the LLM.
- If a string is provided, it will be converted into a message list with a single entry.
- If a list of dictionaries is provided, each dictionary should have 'role' and 'content' keys.
- tools (Optional[List[dict]]): A list of tool schemas for function calling.
- callbacks (Optional[List[Any]]): A list of callback functions to be executed.
- available_functions (Optional[Dict[str, Any]]): A dictionary mapping function names to actual Python functions.
) -> Union[str, Any]:
"""High-level LLM call method.
Args:
messages: Input messages for the LLM.
Can be a string or list of message dictionaries.
If string, it will be converted to a single user message.
If list, each dict must have 'role' and 'content' keys.
tools: Optional list of tool schemas for function calling.
Each tool should define its name, description, and parameters.
callbacks: Optional list of callback functions to be executed
during and after the LLM call.
available_functions: Optional dict mapping function names to callables
that can be invoked by the LLM.
Returns:
- str: The final text response from the LLM or the result of a tool function call.
Union[str, Any]: Either a text response from the LLM (str) or
the result of a tool function call (Any).
Raises:
TypeError: If messages format is invalid
ValueError: If response format is not supported
LLMContextLengthExceededException: If input exceeds model's context limit
Examples:
---------
# Example 1: Using a string input
response = llm.call("Return the name of a random city in the world.")
print(response)
# Example 2: Using a list of messages
messages = [{"role": "user", "content": "What is the capital of France?"}]
response = llm.call(messages)
print(response)
# Example 1: Simple string input
>>> response = llm.call("Return the name of a random city.")
>>> print(response)
"Paris"
# Example 2: Message list with system and user messages
>>> messages = [
... {"role": "system", "content": "You are a geography expert"},
... {"role": "user", "content": "What is France's capital?"}
... ]
>>> response = llm.call(messages)
>>> print(response)
"The capital of France is Paris."
"""
# Validate parameters before proceeding with the call.
self._validate_call_params()
@@ -233,10 +254,13 @@ class LLM:
self.set_callbacks(callbacks)
try:
# --- 1) Prepare the parameters for the completion call
# --- 1) Format messages according to provider requirements
formatted_messages = self._format_messages_for_provider(messages)
# --- 2) Prepare the parameters for the completion call
params = {
"model": self.model,
"messages": messages,
"messages": formatted_messages,
"timeout": self.timeout,
"temperature": self.temperature,
"top_p": self.top_p,
@@ -324,6 +348,38 @@ class LLM:
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _format_messages_for_provider(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Format messages according to provider requirements.
Args:
messages: List of message dictionaries with 'role' and 'content' keys.
Can be empty or None.
Returns:
List of formatted messages according to provider requirements.
For Anthropic models, ensures first message has 'user' role.
Raises:
TypeError: If messages is None or contains invalid message format.
"""
if messages is None:
raise TypeError("Messages cannot be None")
# Validate message format first
for msg in messages:
if not isinstance(msg, dict) or "role" not in msg or "content" not in msg:
raise TypeError("Invalid message format. Each message must be a dict with 'role' and 'content' keys")
if not self.is_anthropic:
return messages
# Anthropic requires messages to start with 'user' role
if not messages or messages[0]["role"] == "system":
# If first message is system or empty, add a placeholder user message
return [{"role": "user", "content": "."}, *messages]
return messages
def _get_custom_llm_provider(self) -> str:
"""
Derives the custom_llm_provider from the model string.

View File

@@ -674,19 +674,32 @@ class Task(BaseModel):
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _save_file(self, result: Any) -> None:
def _save_file(self, result: Union[Dict, str, Any]) -> None:
"""Save task output to a file.
Note:
For cross-platform file writing, especially on Windows, consider using FileWriterTool
from the crewai_tools package:
pip install 'crewai[tools]'
from crewai_tools import FileWriterTool
Args:
result: The result to save to the file. Can be a dict or any stringifiable object.
Raises:
ValueError: If output_file is not set
RuntimeError: If there is an error writing to the file
RuntimeError: If there is an error writing to the file. For cross-platform
compatibility, especially on Windows, use FileWriterTool from crewai_tools
package.
"""
if self.output_file is None:
raise ValueError("output_file is not set.")
FILEWRITER_RECOMMENDATION = (
"For cross-platform file writing, especially on Windows, "
"use FileWriterTool from crewai_tools package."
)
try:
resolved_path = Path(self.output_file).expanduser().resolve()
directory = resolved_path.parent
@@ -702,7 +715,12 @@ class Task(BaseModel):
else:
file.write(str(result))
except (OSError, IOError) as e:
raise RuntimeError(f"Failed to save output file: {e}")
raise RuntimeError(
"\n".join([
f"Failed to save output file: {e}",
FILEWRITER_RECOMMENDATION
])
)
return None
def __repr__(self):

View File

@@ -49,6 +49,39 @@ writer = Agent(
)
def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True
conditional1 = ConditionalTask(
description="Conditional task 1",
expected_output="Output 1",
agent=researcher,
condition=condition_func,
)
conditional2 = ConditionalTask(
description="Conditional task 2",
expected_output="Output 2",
agent=researcher,
condition=condition_func,
)
conditional3 = ConditionalTask(
description="Conditional task 3",
expected_output="Output 3",
agent=researcher,
condition=condition_func,
)
with pytest.raises(
pydantic_core._pydantic_core.ValidationError,
match="Crew must include at least one non-conditional task",
):
Crew(
agents=[researcher],
tasks=[conditional1, conditional2, conditional3],
)
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -2060,6 +2093,195 @@ def test_tools_with_custom_caching():
assert result.raw == "3"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_task_uses_last_output():
"""Test that conditional tasks use the last task output for condition evaluation."""
task1 = Task(
description="First task",
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
agent=writer,
condition=condition_succeeds,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, conditional_task1, conditional_task2],
)
# Mock outputs for tasks
mock_first = TaskOutput(
description="First task output",
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Second task output",
raw="", # Empty output since condition fails
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "First success output" # First task succeeded
assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
"""Test that task outputs are properly collected based on execution status."""
task1 = Task(
description="Normal task that always executes",
expected_output="First output",
agent=researcher,
)
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
agent=writer,
condition=condition_always_met,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock outputs for different execution paths
mock_success = TaskOutput(
description="Success output",
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Skipped output",
raw="", # Empty output for skipped task
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Success output" # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
"""Test that having multiple conditional tasks in sequence works correctly."""
task1 = Task(
description="Initial research task",
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
agent=writer,
condition=condition2,
)
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2, task3],
)
# Mock different task outputs to test conditional logic
mock_success = TaskOutput(
description="Mock success",
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
# Verify all tasks were executed (no IndexError)
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch

View File

@@ -286,6 +286,79 @@ def test_o3_mini_reasoning_effort_medium():
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.fixture
def anthropic_llm():
"""Fixture providing an Anthropic LLM instance."""
return LLM(model="anthropic/claude-3-sonnet")
@pytest.fixture
def system_message():
"""Fixture providing a system message."""
return {"role": "system", "content": "test"}
@pytest.fixture
def user_message():
"""Fixture providing a user message."""
return {"role": "user", "content": "test"}
def test_anthropic_message_formatting_edge_cases(anthropic_llm):
"""Test edge cases for Anthropic message formatting."""
# Test None messages
with pytest.raises(TypeError, match="Messages cannot be None"):
anthropic_llm._format_messages_for_provider(None)
# Test empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test invalid message format
with pytest.raises(TypeError, match="Invalid message format"):
anthropic_llm._format_messages_for_provider([{"invalid": "message"}])
def test_anthropic_model_detection():
"""Test Anthropic model detection with various formats."""
models = [
("anthropic/claude-3", True),
("claude-instant", True),
("claude/v1", True),
("gpt-4", False),
("", False),
("anthropomorphic", False), # Should not match partial words
]
for model, expected in models:
llm = LLM(model=model)
assert llm.is_anthropic == expected, f"Failed for model: {model}"
def test_anthropic_message_formatting(anthropic_llm, system_message, user_message):
"""Test Anthropic message formatting with fixtures."""
# Test when first message is system
formatted = anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 2
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
assert formatted[1] == system_message
# Test when first message is already user
formatted = anthropic_llm._format_messages_for_provider([user_message])
assert len(formatted) == 1
assert formatted[0] == user_message
# Test with empty message list
formatted = anthropic_llm._format_messages_for_provider([])
assert len(formatted) == 1
assert formatted[0]["role"] == "user"
assert formatted[0]["content"] == "."
# Test with non-Anthropic model (should not modify messages)
non_anthropic_llm = LLM(model="gpt-4")
formatted = non_anthropic_llm._format_messages_for_provider([system_message])
assert len(formatted) == 1
assert formatted[0] == system_message
def test_deepseek_r1_with_open_router():
if not os.getenv("OPEN_ROUTER_API_KEY"):
pytest.skip("OPEN_ROUTER_API_KEY not set; skipping test.")