diff --git a/README.md b/README.md
index 917add533..06044f0c0 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,18 @@
-
+
# **CrewAI**
-🤖 **CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
+**CrewAI**: Production-grade framework for orchestrating sophisticated AI agent systems. From simple automations to complex real-world applications, CrewAI provides precise control and deep customization. By fostering collaborative intelligence through flexible, production-ready architecture, CrewAI empowers agents to work together seamlessly, tackling complex business challenges with predictable, consistent results.
+
+**CrewAI Enterprise**
+Want to plan, build (+ no code), deploy, monitor and interare your agents: [CrewAI Enterprise](https://www.crewai.com/enterprise). Designed for complex, real-world applications, our enterprise solution offers:
+
+- **Seamless Integrations**
+- **Scalable & Secure Deployment**
+- **Actionable Insights**
+- **24/7 Support**
@@ -392,7 +400,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
goal="Gather and validate supporting market data",
backstory="You excel at finding and correlating multiple data sources"
)
-
+
analysis_task = Task(
description="Analyze {sector} sector data for the past {timeframe}",
expected_output="Detailed market analysis with confidence score",
@@ -403,7 +411,7 @@ class AdvancedAnalysisFlow(Flow[MarketState]):
expected_output="Corroborating evidence and potential contradictions",
agent=researcher
)
-
+
# Demonstrate crew autonomy
analysis_crew = Crew(
agents=[analyst, researcher],
diff --git a/docs/concepts/knowledge.mdx b/docs/concepts/knowledge.mdx
index 78443ecab..b5827551a 100644
--- a/docs/concepts/knowledge.mdx
+++ b/docs/concepts/knowledge.mdx
@@ -91,7 +91,7 @@ result = crew.kickoff(inputs={"question": "What city does John live in and how o
```
-Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including TXT, PDF, DOCX, HTML, and more.
+Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including MD, PDF, DOCX, HTML, and more.
You need to install `docling` for the following example to work: `uv add docling`
@@ -152,10 +152,10 @@ Here are examples of how to use different types of knowledge sources:
### Text File Knowledge Source
```python
-from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
+from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
# Create a text file knowledge source
-text_source = CrewDoclingSource(
+text_source = TextFileKnowledgeSource(
file_paths=["document.txt", "another.txt"]
)
diff --git a/docs/concepts/memory.mdx b/docs/concepts/memory.mdx
index 33df47b82..8db1fe33a 100644
--- a/docs/concepts/memory.mdx
+++ b/docs/concepts/memory.mdx
@@ -58,41 +58,107 @@ my_crew = Crew(
### Example: Use Custom Memory Instances e.g FAISS as the VectorDB
```python Code
-from crewai import Crew, Agent, Task, Process
+from crewai import Crew, Process
+from crewai.memory import LongTermMemory, ShortTermMemory, EntityMemory
+from crewai.memory.storage import LTMSQLiteStorage, RAGStorage
+from typing import List, Optional
# Assemble your crew with memory capabilities
-my_crew = Crew(
- agents=[...],
- tasks=[...],
- process="Process.sequential",
- memory=True,
- long_term_memory=EnhanceLongTermMemory(
+my_crew: Crew = Crew(
+ agents = [...],
+ tasks = [...],
+ process = Process.sequential,
+ memory = True,
+ # Long-term memory for persistent storage across sessions
+ long_term_memory = LongTermMemory(
storage=LTMSQLiteStorage(
- db_path="/my_data_dir/my_crew1/long_term_memory_storage.db"
+ db_path="/my_crew1/long_term_memory_storage.db"
)
),
- short_term_memory=EnhanceShortTermMemory(
- storage=CustomRAGStorage(
- crew_name="my_crew",
- storage_type="short_term",
- data_dir="//my_data_dir",
- model=embedder["model"],
- dimension=embedder["dimension"],
+ # Short-term memory for current context using RAG
+ short_term_memory = ShortTermMemory(
+ storage = RAGStorage(
+ embedder_config={
+ "provider": "openai",
+ "config": {
+ "model": 'text-embedding-3-small'
+ }
+ },
+ type="short_term",
+ path="/my_crew1/"
+ )
),
),
- entity_memory=EnhanceEntityMemory(
- storage=CustomRAGStorage(
- crew_name="my_crew",
- storage_type="entities",
- data_dir="//my_data_dir",
- model=embedder["model"],
- dimension=embedder["dimension"],
- ),
+ # Entity memory for tracking key information about entities
+ entity_memory = EntityMemory(
+ storage=RAGStorage(
+ embedder_config={
+ "provider": "openai",
+ "config": {
+ "model": 'text-embedding-3-small'
+ }
+ },
+ type="short_term",
+ path="/my_crew1/"
+ )
),
verbose=True,
)
```
+## Security Considerations
+
+When configuring memory storage:
+- Use environment variables for storage paths (e.g., `CREWAI_STORAGE_DIR`)
+- Never hardcode sensitive information like database credentials
+- Consider access permissions for storage directories
+- Use relative paths when possible to maintain portability
+
+Example using environment variables:
+```python
+import os
+from crewai import Crew
+from crewai.memory import LongTermMemory
+from crewai.memory.storage import LTMSQLiteStorage
+
+# Configure storage path using environment variable
+storage_path = os.getenv("CREWAI_STORAGE_DIR", "./storage")
+crew = Crew(
+ memory=True,
+ long_term_memory=LongTermMemory(
+ storage=LTMSQLiteStorage(
+ db_path="{storage_path}/memory.db".format(storage_path=storage_path)
+ )
+ )
+)
+```
+
+## Configuration Examples
+
+### Basic Memory Configuration
+```python
+from crewai import Crew
+from crewai.memory import LongTermMemory
+
+# Simple memory configuration
+crew = Crew(memory=True) # Uses default storage locations
+```
+
+### Custom Storage Configuration
+```python
+from crewai import Crew
+from crewai.memory import LongTermMemory
+from crewai.memory.storage import LTMSQLiteStorage
+
+# Configure custom storage paths
+crew = Crew(
+ memory=True,
+ long_term_memory=LongTermMemory(
+ storage=LTMSQLiteStorage(db_path="./memory.db")
+ )
+)
+```
+
## Integrating Mem0 for Enhanced User Memory
[Mem0](https://mem0.ai/) is a self-improving memory layer for LLM applications, enabling personalized AI experiences.
diff --git a/docs/concepts/tasks.mdx b/docs/concepts/tasks.mdx
index 4aa47c416..120f5d547 100644
--- a/docs/concepts/tasks.mdx
+++ b/docs/concepts/tasks.mdx
@@ -268,7 +268,7 @@ analysis_task = Task(
Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
-efeedback to agents when their output doesn't meet specific criteria.
+feedback to agents when their output doesn't meet specific criteria.
### Using Task Guardrails
diff --git a/docs/tools/filewritetool.mdx b/docs/tools/filewritetool.mdx
index f5dffb2ad..5e00801b7 100644
--- a/docs/tools/filewritetool.mdx
+++ b/docs/tools/filewritetool.mdx
@@ -8,9 +8,9 @@ icon: file-pen
## Description
-The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files.
+The `FileWriterTool` is a component of the crewai_tools package, designed to simplify the process of writing content to files with cross-platform compatibility (Windows, Linux, macOS).
It is particularly useful in scenarios such as generating reports, saving logs, creating configuration files, and more.
-This tool supports creating new directories if they don't exist, making it easier to organize your output.
+This tool handles path differences across operating systems, supports UTF-8 encoding, and automatically creates directories if they don't exist, making it easier to organize your output reliably across different platforms.
## Installation
@@ -43,6 +43,8 @@ print(result)
## Conclusion
-By integrating the `FileWriterTool` into your crews, the agents can execute the process of writing content to files and creating directories.
-This tool is essential for tasks that require saving output data, creating structured file systems, and more. By adhering to the setup and usage guidelines provided,
-incorporating this tool into projects is straightforward and efficient.
\ No newline at end of file
+By integrating the `FileWriterTool` into your crews, the agents can reliably write content to files across different operating systems.
+This tool is essential for tasks that require saving output data, creating structured file systems, and handling cross-platform file operations.
+It's particularly recommended for Windows users who may encounter file writing issues with standard Python file operations.
+
+By adhering to the setup and usage guidelines provided, incorporating this tool into projects is straightforward and ensures consistent file writing behavior across all platforms.
diff --git a/src/crewai/crew.py b/src/crewai/crew.py
index 9d1b7e8d3..48331de08 100644
--- a/src/crewai/crew.py
+++ b/src/crewai/crew.py
@@ -382,6 +382,22 @@ class Crew(BaseModel):
return self
+ @model_validator(mode="after")
+ def validate_must_have_non_conditional_task(self) -> "Crew":
+ """Ensure that a crew has at least one non-conditional task."""
+ if not self.tasks:
+ return self
+ non_conditional_count = sum(
+ 1 for task in self.tasks if not isinstance(task, ConditionalTask)
+ )
+ if non_conditional_count == 0:
+ raise PydanticCustomError(
+ "only_conditional_tasks",
+ "Crew must include at least one non-conditional task",
+ {},
+ )
+ return self
+
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
@@ -441,6 +457,8 @@ class Crew(BaseModel):
)
return self
+
+
@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
@@ -742,6 +760,7 @@ class Crew(BaseModel):
task, task_outputs, futures, task_index, was_replayed
)
if skipped_task_output:
+ task_outputs.append(skipped_task_output)
continue
if task.async_execution:
@@ -765,7 +784,7 @@ class Crew(BaseModel):
context=context,
tools=tools_for_task,
)
- task_outputs = [task_output]
+ task_outputs.append(task_output)
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
@@ -786,7 +805,7 @@ class Crew(BaseModel):
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
- previous_output = task_outputs[task_index - 1] if task_outputs else None
+ previous_output = task_outputs[-1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
@@ -971,11 +990,15 @@ class Crew(BaseModel):
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
- if len(task_outputs) != 1:
- raise ValueError(
- "Something went wrong. Kickoff should return only one task output."
- )
- final_task_output = task_outputs[0]
+ if not task_outputs:
+ raise ValueError("No task outputs available to create crew output.")
+
+ # Filter out empty outputs and get the last valid one as the main output
+ valid_outputs = [t for t in task_outputs if t.raw]
+ if not valid_outputs:
+ raise ValueError("No valid task outputs available to create crew output.")
+ final_task_output = valid_outputs[-1]
+
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
@@ -984,7 +1007,7 @@ class Crew(BaseModel):
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
- tasks_output=[task.output for task in self.tasks if task.output],
+ tasks_output=task_outputs,
token_usage=token_usage,
)
diff --git a/src/crewai/llm.py b/src/crewai/llm.py
index d6be4b588..ada5c9bf3 100644
--- a/src/crewai/llm.py
+++ b/src/crewai/llm.py
@@ -164,6 +164,7 @@ class LLM:
self.context_window_size = 0
self.reasoning_effort = reasoning_effort
self.additional_params = kwargs
+ self.is_anthropic = self._is_anthropic_model(model)
litellm.drop_params = True
@@ -178,42 +179,62 @@ class LLM:
self.set_callbacks(callbacks)
self.set_env_callbacks()
+ def _is_anthropic_model(self, model: str) -> bool:
+ """Determine if the model is from Anthropic provider.
+
+ Args:
+ model: The model identifier string.
+
+ Returns:
+ bool: True if the model is from Anthropic, False otherwise.
+ """
+ ANTHROPIC_PREFIXES = ('anthropic/', 'claude-', 'claude/')
+ return any(prefix in model.lower() for prefix in ANTHROPIC_PREFIXES)
+
def call(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
- ) -> str:
- """
- High-level llm call method that:
- 1) Accepts either a string or a list of messages
- 2) Converts string input to the required message format
- 3) Calls litellm.completion
- 4) Handles function/tool calls if any
- 5) Returns the final text response or tool result
-
- Parameters:
- - messages (Union[str, List[Dict[str, str]]]): The input messages for the LLM.
- - If a string is provided, it will be converted into a message list with a single entry.
- - If a list of dictionaries is provided, each dictionary should have 'role' and 'content' keys.
- - tools (Optional[List[dict]]): A list of tool schemas for function calling.
- - callbacks (Optional[List[Any]]): A list of callback functions to be executed.
- - available_functions (Optional[Dict[str, Any]]): A dictionary mapping function names to actual Python functions.
-
+ ) -> Union[str, Any]:
+ """High-level LLM call method.
+
+ Args:
+ messages: Input messages for the LLM.
+ Can be a string or list of message dictionaries.
+ If string, it will be converted to a single user message.
+ If list, each dict must have 'role' and 'content' keys.
+ tools: Optional list of tool schemas for function calling.
+ Each tool should define its name, description, and parameters.
+ callbacks: Optional list of callback functions to be executed
+ during and after the LLM call.
+ available_functions: Optional dict mapping function names to callables
+ that can be invoked by the LLM.
+
Returns:
- - str: The final text response from the LLM or the result of a tool function call.
-
+ Union[str, Any]: Either a text response from the LLM (str) or
+ the result of a tool function call (Any).
+
+ Raises:
+ TypeError: If messages format is invalid
+ ValueError: If response format is not supported
+ LLMContextLengthExceededException: If input exceeds model's context limit
+
Examples:
- ---------
- # Example 1: Using a string input
- response = llm.call("Return the name of a random city in the world.")
- print(response)
-
- # Example 2: Using a list of messages
- messages = [{"role": "user", "content": "What is the capital of France?"}]
- response = llm.call(messages)
- print(response)
+ # Example 1: Simple string input
+ >>> response = llm.call("Return the name of a random city.")
+ >>> print(response)
+ "Paris"
+
+ # Example 2: Message list with system and user messages
+ >>> messages = [
+ ... {"role": "system", "content": "You are a geography expert"},
+ ... {"role": "user", "content": "What is France's capital?"}
+ ... ]
+ >>> response = llm.call(messages)
+ >>> print(response)
+ "The capital of France is Paris."
"""
# Validate parameters before proceeding with the call.
self._validate_call_params()
@@ -233,10 +254,13 @@ class LLM:
self.set_callbacks(callbacks)
try:
- # --- 1) Prepare the parameters for the completion call
+ # --- 1) Format messages according to provider requirements
+ formatted_messages = self._format_messages_for_provider(messages)
+
+ # --- 2) Prepare the parameters for the completion call
params = {
"model": self.model,
- "messages": messages,
+ "messages": formatted_messages,
"timeout": self.timeout,
"temperature": self.temperature,
"top_p": self.top_p,
@@ -324,6 +348,38 @@ class LLM:
logging.error(f"LiteLLM call failed: {str(e)}")
raise
+ def _format_messages_for_provider(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
+ """Format messages according to provider requirements.
+
+ Args:
+ messages: List of message dictionaries with 'role' and 'content' keys.
+ Can be empty or None.
+
+ Returns:
+ List of formatted messages according to provider requirements.
+ For Anthropic models, ensures first message has 'user' role.
+
+ Raises:
+ TypeError: If messages is None or contains invalid message format.
+ """
+ if messages is None:
+ raise TypeError("Messages cannot be None")
+
+ # Validate message format first
+ for msg in messages:
+ if not isinstance(msg, dict) or "role" not in msg or "content" not in msg:
+ raise TypeError("Invalid message format. Each message must be a dict with 'role' and 'content' keys")
+
+ if not self.is_anthropic:
+ return messages
+
+ # Anthropic requires messages to start with 'user' role
+ if not messages or messages[0]["role"] == "system":
+ # If first message is system or empty, add a placeholder user message
+ return [{"role": "user", "content": "."}, *messages]
+
+ return messages
+
def _get_custom_llm_provider(self) -> str:
"""
Derives the custom_llm_provider from the model string.
diff --git a/src/crewai/task.py b/src/crewai/task.py
index a3ee8aa14..4088c3fb0 100644
--- a/src/crewai/task.py
+++ b/src/crewai/task.py
@@ -674,19 +674,32 @@ class Task(BaseModel):
return OutputFormat.PYDANTIC
return OutputFormat.RAW
- def _save_file(self, result: Any) -> None:
+ def _save_file(self, result: Union[Dict, str, Any]) -> None:
"""Save task output to a file.
+ Note:
+ For cross-platform file writing, especially on Windows, consider using FileWriterTool
+ from the crewai_tools package:
+ pip install 'crewai[tools]'
+ from crewai_tools import FileWriterTool
+
Args:
result: The result to save to the file. Can be a dict or any stringifiable object.
Raises:
ValueError: If output_file is not set
- RuntimeError: If there is an error writing to the file
+ RuntimeError: If there is an error writing to the file. For cross-platform
+ compatibility, especially on Windows, use FileWriterTool from crewai_tools
+ package.
"""
if self.output_file is None:
raise ValueError("output_file is not set.")
+ FILEWRITER_RECOMMENDATION = (
+ "For cross-platform file writing, especially on Windows, "
+ "use FileWriterTool from crewai_tools package."
+ )
+
try:
resolved_path = Path(self.output_file).expanduser().resolve()
directory = resolved_path.parent
@@ -702,7 +715,12 @@ class Task(BaseModel):
else:
file.write(str(result))
except (OSError, IOError) as e:
- raise RuntimeError(f"Failed to save output file: {e}")
+ raise RuntimeError(
+ "\n".join([
+ f"Failed to save output file: {e}",
+ FILEWRITER_RECOMMENDATION
+ ])
+ )
return None
def __repr__(self):
diff --git a/tests/crew_test.py b/tests/crew_test.py
index 4812ab93f..e69c71315 100644
--- a/tests/crew_test.py
+++ b/tests/crew_test.py
@@ -49,6 +49,39 @@ writer = Agent(
)
+def test_crew_with_only_conditional_tasks_raises_error():
+ """Test that creating a crew with only conditional tasks raises an error."""
+ def condition_func(task_output: TaskOutput) -> bool:
+ return True
+
+ conditional1 = ConditionalTask(
+ description="Conditional task 1",
+ expected_output="Output 1",
+ agent=researcher,
+ condition=condition_func,
+ )
+ conditional2 = ConditionalTask(
+ description="Conditional task 2",
+ expected_output="Output 2",
+ agent=researcher,
+ condition=condition_func,
+ )
+ conditional3 = ConditionalTask(
+ description="Conditional task 3",
+ expected_output="Output 3",
+ agent=researcher,
+ condition=condition_func,
+ )
+
+ with pytest.raises(
+ pydantic_core._pydantic_core.ValidationError,
+ match="Crew must include at least one non-conditional task",
+ ):
+ Crew(
+ agents=[researcher],
+ tasks=[conditional1, conditional2, conditional3],
+ )
+
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -2060,6 +2093,195 @@ def test_tools_with_custom_caching():
assert result.raw == "3"
+@pytest.mark.vcr(filter_headers=["authorization"])
+def test_conditional_task_uses_last_output():
+ """Test that conditional tasks use the last task output for condition evaluation."""
+ task1 = Task(
+ description="First task",
+ expected_output="First output",
+ agent=researcher,
+ )
+ def condition_fails(task_output: TaskOutput) -> bool:
+ # This condition will never be met
+ return "never matches" in task_output.raw.lower()
+
+ def condition_succeeds(task_output: TaskOutput) -> bool:
+ # This condition will match first task's output
+ return "first success" in task_output.raw.lower()
+
+ conditional_task1 = ConditionalTask(
+ description="Second task - conditional that fails condition",
+ expected_output="Second output",
+ agent=researcher,
+ condition=condition_fails,
+ )
+
+ conditional_task2 = ConditionalTask(
+ description="Third task - conditional that succeeds using first task output",
+ expected_output="Third output",
+ agent=writer,
+ condition=condition_succeeds,
+ )
+
+ crew = Crew(
+ agents=[researcher, writer],
+ tasks=[task1, conditional_task1, conditional_task2],
+ )
+
+ # Mock outputs for tasks
+ mock_first = TaskOutput(
+ description="First task output",
+ raw="First success output", # Will be used by third task's condition
+ agent=researcher.role,
+ )
+ mock_skipped = TaskOutput(
+ description="Second task output",
+ raw="", # Empty output since condition fails
+ agent=researcher.role,
+ )
+ mock_third = TaskOutput(
+ description="Third task output",
+ raw="Third task executed", # Output when condition succeeds using first task output
+ agent=writer.role,
+ )
+
+ # Set up mocks for task execution and conditional logic
+ with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
+ # First conditional fails, second succeeds
+ mock_should_execute.side_effect = [False, True]
+
+ with patch.object(Task, "execute_sync") as mock_execute:
+ mock_execute.side_effect = [mock_first, mock_third]
+ result = crew.kickoff()
+
+ # Verify execution behavior
+ assert mock_execute.call_count == 2 # Only first and third tasks execute
+ assert mock_should_execute.call_count == 2 # Both conditionals checked
+
+ # Verify outputs collection
+ assert len(result.tasks_output) == 3
+ assert result.tasks_output[0].raw == "First success output" # First task succeeded
+ assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
+ assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output
+
+@pytest.mark.vcr(filter_headers=["authorization"])
+def test_conditional_tasks_result_collection():
+ """Test that task outputs are properly collected based on execution status."""
+ task1 = Task(
+ description="Normal task that always executes",
+ expected_output="First output",
+ agent=researcher,
+ )
+
+ def condition_never_met(task_output: TaskOutput) -> bool:
+ return "never matches" in task_output.raw.lower()
+
+ def condition_always_met(task_output: TaskOutput) -> bool:
+ return "success" in task_output.raw.lower()
+
+ task2 = ConditionalTask(
+ description="Conditional task that never executes",
+ expected_output="Second output",
+ agent=researcher,
+ condition=condition_never_met,
+ )
+
+ task3 = ConditionalTask(
+ description="Conditional task that always executes",
+ expected_output="Third output",
+ agent=writer,
+ condition=condition_always_met,
+ )
+
+ crew = Crew(
+ agents=[researcher, writer],
+ tasks=[task1, task2, task3],
+ )
+
+ # Mock outputs for different execution paths
+ mock_success = TaskOutput(
+ description="Success output",
+ raw="Success output", # Triggers third task's condition
+ agent=researcher.role,
+ )
+ mock_skipped = TaskOutput(
+ description="Skipped output",
+ raw="", # Empty output for skipped task
+ agent=researcher.role,
+ )
+ mock_conditional = TaskOutput(
+ description="Conditional output",
+ raw="Conditional task executed",
+ agent=writer.role,
+ )
+
+ # Set up mocks for task execution and conditional logic
+ with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
+ # First conditional fails, second succeeds
+ mock_should_execute.side_effect = [False, True]
+
+ with patch.object(Task, "execute_sync") as mock_execute:
+ mock_execute.side_effect = [mock_success, mock_conditional]
+ result = crew.kickoff()
+
+ # Verify execution behavior
+ assert mock_execute.call_count == 2 # Only first and third tasks execute
+ assert mock_should_execute.call_count == 2 # Both conditionals checked
+
+ # Verify task output collection
+ assert len(result.tasks_output) == 3
+ assert result.tasks_output[0].raw == "Success output" # Normal task executed
+ assert result.tasks_output[1].raw == "" # Second task skipped
+ assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed
+
+@pytest.mark.vcr(filter_headers=["authorization"])
+def test_multiple_conditional_tasks():
+ """Test that having multiple conditional tasks in sequence works correctly."""
+ task1 = Task(
+ description="Initial research task",
+ expected_output="Research output",
+ agent=researcher,
+ )
+
+ def condition1(task_output: TaskOutput) -> bool:
+ return "success" in task_output.raw.lower()
+
+ def condition2(task_output: TaskOutput) -> bool:
+ return "proceed" in task_output.raw.lower()
+
+ task2 = ConditionalTask(
+ description="First conditional task",
+ expected_output="Conditional output 1",
+ agent=writer,
+ condition=condition1,
+ )
+
+ task3 = ConditionalTask(
+ description="Second conditional task",
+ expected_output="Conditional output 2",
+ agent=writer,
+ condition=condition2,
+ )
+
+ crew = Crew(
+ agents=[researcher, writer],
+ tasks=[task1, task2, task3],
+ )
+
+ # Mock different task outputs to test conditional logic
+ mock_success = TaskOutput(
+ description="Mock success",
+ raw="Success and proceed output",
+ agent=researcher.role,
+ )
+
+ # Set up mocks for task execution
+ with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
+ result = crew.kickoff()
+ # Verify all tasks were executed (no IndexError)
+ assert mock_execute.call_count == 3
+ assert len(result.tasks_output) == 3
+
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
diff --git a/tests/llm_test.py b/tests/llm_test.py
index d64639dca..2e5faf774 100644
--- a/tests/llm_test.py
+++ b/tests/llm_test.py
@@ -286,6 +286,79 @@ def test_o3_mini_reasoning_effort_medium():
@pytest.mark.vcr(filter_headers=["authorization"])
+@pytest.fixture
+def anthropic_llm():
+ """Fixture providing an Anthropic LLM instance."""
+ return LLM(model="anthropic/claude-3-sonnet")
+
+@pytest.fixture
+def system_message():
+ """Fixture providing a system message."""
+ return {"role": "system", "content": "test"}
+
+@pytest.fixture
+def user_message():
+ """Fixture providing a user message."""
+ return {"role": "user", "content": "test"}
+
+def test_anthropic_message_formatting_edge_cases(anthropic_llm):
+ """Test edge cases for Anthropic message formatting."""
+ # Test None messages
+ with pytest.raises(TypeError, match="Messages cannot be None"):
+ anthropic_llm._format_messages_for_provider(None)
+
+ # Test empty message list
+ formatted = anthropic_llm._format_messages_for_provider([])
+ assert len(formatted) == 1
+ assert formatted[0]["role"] == "user"
+ assert formatted[0]["content"] == "."
+
+ # Test invalid message format
+ with pytest.raises(TypeError, match="Invalid message format"):
+ anthropic_llm._format_messages_for_provider([{"invalid": "message"}])
+
+def test_anthropic_model_detection():
+ """Test Anthropic model detection with various formats."""
+ models = [
+ ("anthropic/claude-3", True),
+ ("claude-instant", True),
+ ("claude/v1", True),
+ ("gpt-4", False),
+ ("", False),
+ ("anthropomorphic", False), # Should not match partial words
+ ]
+
+ for model, expected in models:
+ llm = LLM(model=model)
+ assert llm.is_anthropic == expected, f"Failed for model: {model}"
+
+def test_anthropic_message_formatting(anthropic_llm, system_message, user_message):
+ """Test Anthropic message formatting with fixtures."""
+ # Test when first message is system
+ formatted = anthropic_llm._format_messages_for_provider([system_message])
+ assert len(formatted) == 2
+ assert formatted[0]["role"] == "user"
+ assert formatted[0]["content"] == "."
+ assert formatted[1] == system_message
+
+ # Test when first message is already user
+ formatted = anthropic_llm._format_messages_for_provider([user_message])
+ assert len(formatted) == 1
+ assert formatted[0] == user_message
+
+ # Test with empty message list
+ formatted = anthropic_llm._format_messages_for_provider([])
+ assert len(formatted) == 1
+ assert formatted[0]["role"] == "user"
+ assert formatted[0]["content"] == "."
+
+ # Test with non-Anthropic model (should not modify messages)
+ non_anthropic_llm = LLM(model="gpt-4")
+ formatted = non_anthropic_llm._format_messages_for_provider([system_message])
+ assert len(formatted) == 1
+ assert formatted[0] == system_message
+
+
def test_deepseek_r1_with_open_router():
if not os.getenv("OPEN_ROUTER_API_KEY"):
pytest.skip("OPEN_ROUTER_API_KEY not set; skipping test.")