Compare commits

..

7 Commits

Author SHA1 Message Date
Lorenze Jay
38735cba99 Merge branch 'main' into bugfix/flow-persist-nested-models 2025-03-21 17:03:57 -07:00
Brandon Hancock
cde67882b4 resuse existing code and address PRs 2025-03-21 15:10:08 -04:00
Lorenze Jay
d3df545f1e Merge branch 'main' into bugfix/flow-persist-nested-models 2025-03-21 11:59:11 -07:00
Brandon Hancock (bhancock_ai)
b5067a2689 Merge branch 'main' into bugfix/flow-persist-nested-models 2025-03-10 12:05:13 -04:00
Brandon Hancock (bhancock_ai)
362b20f052 Merge branch 'main' into bugfix/flow-persist-nested-models 2025-03-07 12:55:05 -05:00
Brandon Hancock
d5408ec461 Drop file 2025-03-06 16:40:36 -05:00
Brandon Hancock
6677c9c192 nested models in flow persist 2025-03-05 16:14:50 -05:00
12 changed files with 326 additions and 1066 deletions

View File

@@ -25,7 +25,6 @@ from crewai.tools.base_tool import BaseTool, Tool
from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter
from crewai.utilities.string_utils import interpolate_only
T = TypeVar("T", bound="BaseAgent")
@@ -334,15 +333,9 @@ class BaseAgent(ABC, BaseModel):
self._original_backstory = self.backstory
if inputs:
self.role = interpolate_only(
input_string=self._original_role, inputs=inputs
)
self.goal = interpolate_only(
input_string=self._original_goal, inputs=inputs
)
self.backstory = interpolate_only(
input_string=self._original_backstory, inputs=inputs
)
self.role = self._original_role.format(**inputs)
self.goal = self._original_goal.format(**inputs)
self.backstory = self._original_backstory.format(**inputs)
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.

View File

@@ -8,45 +8,45 @@ from pydantic import BaseModel
class FlowPersistence(abc.ABC):
"""Abstract base class for flow state persistence.
This class defines the interface that all persistence implementations must follow.
It supports both structured (Pydantic BaseModel) and unstructured (dict) states.
"""
@abc.abstractmethod
def init_db(self) -> None:
"""Initialize the persistence backend.
This method should handle any necessary setup, such as:
- Creating tables
- Establishing connections
- Setting up indexes
"""
pass
@abc.abstractmethod
def save_state(
self,
flow_uuid: str,
method_name: str,
state_data: Union[Dict[str, Any], BaseModel]
state_data: Union[Dict[str, Any], BaseModel],
) -> None:
"""Persist the flow state after method completion.
Args:
flow_uuid: Unique identifier for the flow instance
method_name: Name of the method that just completed
state_data: Current state data (either dict or Pydantic model)
"""
pass
@abc.abstractmethod
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
Args:
flow_uuid: Unique identifier for the flow instance
Returns:
The most recent state as a dictionary, or None if no state exists
"""

View File

@@ -11,6 +11,7 @@ from typing import Any, Dict, Optional, Union
from pydantic import BaseModel
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.state_utils import to_serializable
class SQLiteFlowPersistence(FlowPersistence):
@@ -78,34 +79,53 @@ class SQLiteFlowPersistence(FlowPersistence):
flow_uuid: Unique identifier for the flow instance
method_name: Name of the method that just completed
state_data: Current state data (either dict or Pydantic model)
"""
# Convert state_data to dict, handling both Pydantic and dict cases
if isinstance(state_data, BaseModel):
state_dict = dict(state_data) # Use dict() for better type compatibility
elif isinstance(state_data, dict):
state_dict = state_data
else:
raise ValueError(
f"state_data must be either a Pydantic BaseModel or dict, got {type(state_data)}"
)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO flow_states (
flow_uuid,
method_name,
timestamp,
state_json
) VALUES (?, ?, ?, ?)
""",
(
flow_uuid,
method_name,
datetime.now(timezone.utc).isoformat(),
json.dumps(state_dict),
),
)
Raises:
ValueError: If state_data is neither a dict nor a BaseModel
RuntimeError: If database operations fail
TypeError: If JSON serialization fails
"""
try:
# Convert state_data to a JSON-serializable dict using the helper method
state_dict = to_serializable(state_data)
# Try to serialize to JSON to catch any serialization issues early
try:
state_json = json.dumps(state_dict)
except (TypeError, ValueError, OverflowError) as json_err:
raise TypeError(
f"Failed to serialize state to JSON: {json_err}"
) from json_err
# Perform database operation with error handling
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
INSERT INTO flow_states (
flow_uuid,
method_name,
timestamp,
state_json
) VALUES (?, ?, ?, ?)
""",
(
flow_uuid,
method_name,
datetime.now(timezone.utc).isoformat(),
state_json,
),
)
except sqlite3.Error as db_err:
raise RuntimeError(f"Database operation failed: {db_err}") from db_err
except Exception as e:
# Log the error but don't crash the application
import logging
logging.error(f"Failed to save flow state: {e}")
# Re-raise to allow caller to handle or ignore
raise
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID.

View File

@@ -1,36 +1,16 @@
import json
from datetime import date, datetime
from enum import Enum
from typing import Any, Dict, List, Union
from pydantic import BaseModel
from crewai.flow import Flow
SerializablePrimitive = Union[str, int, float, bool, None]
Serializable = Union[
SerializablePrimitive, List["Serializable"], Dict[str, "Serializable"]
]
def export_state(flow: Flow) -> dict[str, Serializable]:
"""Exports the Flow's internal state as JSON-compatible data structures.
Performs a one-way transformation of a Flow's state into basic Python types
that can be safely serialized to JSON. To prevent infinite recursion with
circular references, the conversion is limited to a depth of 5 levels.
Args:
flow: The Flow object whose state needs to be exported
Returns:
dict[str, Any]: The transformed state using JSON-compatible Python
types.
"""
result = to_serializable(flow._state)
assert isinstance(result, dict)
return result
def to_serializable(
obj: Any, max_depth: int = 5, _current_depth: int = 0
) -> Serializable:
@@ -52,6 +32,8 @@ def to_serializable(
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
elif isinstance(obj, Enum):
return obj.value
elif isinstance(obj, (date, datetime)):
return obj.isoformat()
elif isinstance(obj, (list, tuple, set)):

View File

@@ -2,7 +2,6 @@ import datetime
import inspect
import json
import logging
import re
import threading
import uuid
from concurrent.futures import Future
@@ -50,7 +49,6 @@ from crewai.utilities.events import (
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import interpolate_only
class Task(BaseModel):
@@ -509,9 +507,7 @@ class Task(BaseModel):
return
try:
self.description = interpolate_only(
input_string=self._original_description, inputs=inputs
)
self.description = self._original_description.format(**inputs)
except KeyError as e:
raise ValueError(
f"Missing required template variable '{e.args[0]}' in description"
@@ -520,7 +516,7 @@ class Task(BaseModel):
raise ValueError(f"Error interpolating description: {str(e)}") from e
try:
self.expected_output = interpolate_only(
self.expected_output = self.interpolate_only(
input_string=self._original_expected_output, inputs=inputs
)
except (KeyError, ValueError) as e:
@@ -528,7 +524,7 @@ class Task(BaseModel):
if self.output_file is not None:
try:
self.output_file = interpolate_only(
self.output_file = self.interpolate_only(
input_string=self._original_output_file, inputs=inputs
)
except (KeyError, ValueError) as e:
@@ -559,6 +555,72 @@ class Task(BaseModel):
f"\n\n{conversation_instruction}\n\n{conversation_history}"
)
def interpolate_only(
self,
input_string: Optional[str],
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]],
) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
Args:
input_string: The string containing template variables to interpolate.
Can be None or empty, in which case an empty string is returned.
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, floats, and dicts/lists
containing only these types and other nested dicts/lists.
Returns:
The interpolated string with all template variables replaced with their values.
Empty string if input_string is None or empty.
Raises:
ValueError: If a value contains unsupported types
"""
# Validation function for recursive type checking
def validate_type(value: Any) -> None:
if value is None:
return
if isinstance(value, (str, int, float, bool)):
return
if isinstance(value, (dict, list)):
for item in value.values() if isinstance(value, dict) else value:
validate_type(item)
return
raise ValueError(
f"Unsupported type {type(value).__name__} in inputs. "
"Only str, int, float, bool, dict, and list are allowed."
)
# Validate all input values
for key, value in inputs.items():
try:
validate_type(value)
except ValueError as e:
raise ValueError(f"Invalid value for key '{key}': {str(e)}") from e
if input_string is None or not input_string:
return ""
if "{" not in input_string and "}" not in input_string:
return input_string
if not inputs:
raise ValueError(
"Inputs dictionary cannot be empty when interpolating variables"
)
try:
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
for key in inputs.keys():
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
return escaped_string.format(**inputs)
except KeyError as e:
raise KeyError(
f"Template variable '{e.args[0]}' not found in inputs dictionary"
) from e
except ValueError as e:
raise ValueError(f"Error during string interpolation: {str(e)}") from e
def increment_tools_errors(self) -> None:
"""Increment the tools errors counter."""
self.tools_errors += 1

View File

@@ -2,13 +2,12 @@ from __future__ import annotations
import asyncio
import json
import logging
import os
import platform
import warnings
from contextlib import contextmanager
from importlib.metadata import version
from typing import TYPE_CHECKING, Any, Optional, Sequence
from typing import TYPE_CHECKING, Any, Optional
@contextmanager
@@ -23,7 +22,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter, # noqa: E402
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider # noqa: E402
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
@@ -32,62 +31,6 @@ if TYPE_CHECKING:
from crewai.task import Task
# A custom BatchSpanProcessor that catches and suppresses all exceptions
logger = logging.getLogger(__name__)
class SafeBatchSpanProcessor(BatchSpanProcessor):
"""A wrapper around BatchSpanProcessor that suppresses all exceptions.
This processor ensures that telemetry operations do not disrupt user code
by catching and suppressing connection and timeout errors that might occur
during span export operations.
It logs suppressed errors at the debug level for diagnostic purposes without
propagating them to calling code.
"""
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
"""Override force_flush to catch and suppress all exceptions.
Args:
timeout_millis: The maximum amount of time to wait for spans to be exported.
Returns:
bool: True if the flush was successful, False otherwise.
"""
try:
return super().force_flush(timeout_millis)
except ConnectionError as e:
logger.debug(f"Suppressed telemetry force_flush connection error: {str(e)}")
return False
except TimeoutError as e:
logger.debug(f"Suppressed telemetry force_flush timeout: {str(e)}")
return False
except Exception as e:
logger.debug(f"Unexpected telemetry force_flush error: {str(e)}")
return False
def export(self, spans: Sequence[ReadableSpan]) -> None:
"""Override export to catch and suppress all exceptions.
Args:
spans: The spans to export.
"""
try:
if hasattr(super(), 'export'):
super().export(spans)
else:
# Call the exporter directly if super().export doesn't exist
self._span_exporter.export(spans)
except ConnectionError as e:
logger.debug(f"Suppressed telemetry export connection error: {str(e)}")
except TimeoutError as e:
logger.debug(f"Suppressed telemetry export timeout: {str(e)}")
except Exception as e:
logger.debug(f"Unexpected telemetry export error: {str(e)}")
class Telemetry:
"""A class to handle anonymous telemetry for the crewai package.
@@ -116,7 +59,7 @@ class Telemetry:
with suppress_warnings():
self.provider = TracerProvider(resource=self.resource)
processor = SafeBatchSpanProcessor(
processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=f"{telemetry_endpoint}/v1/traces",
timeout=30,

View File

@@ -1,12 +1,10 @@
import re
from typing import TYPE_CHECKING, List
from typing import List
if TYPE_CHECKING:
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
def aggregate_raw_outputs_from_task_outputs(task_outputs: List["TaskOutput"]) -> str:
def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> str:
"""Generate string context from the task outputs."""
dividers = "\n\n----------\n\n"
@@ -15,7 +13,7 @@ def aggregate_raw_outputs_from_task_outputs(task_outputs: List["TaskOutput"]) ->
return context
def aggregate_raw_outputs_from_tasks(tasks: List["Task"]) -> str:
def aggregate_raw_outputs_from_tasks(tasks: List[Task]) -> str:
"""Generate string context from the tasks."""
task_outputs = [task.output for task in tasks if task.output is not None]

View File

@@ -1,82 +0,0 @@
import re
from typing import Any, Dict, List, Optional, Union
def interpolate_only(
input_string: Optional[str],
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]],
) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
Only interpolates placeholders that follow the pattern {variable_name} where
variable_name starts with a letter/underscore and contains only letters, numbers, and underscores.
Args:
input_string: The string containing template variables to interpolate.
Can be None or empty, in which case an empty string is returned.
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, floats, and dicts/lists
containing only these types and other nested dicts/lists.
Returns:
The interpolated string with all template variables replaced with their values.
Empty string if input_string is None or empty.
Raises:
ValueError: If a value contains unsupported types or a template variable is missing
"""
# Validation function for recursive type checking
def validate_type(value: Any) -> None:
if value is None:
return
if isinstance(value, (str, int, float, bool)):
return
if isinstance(value, (dict, list)):
for item in value.values() if isinstance(value, dict) else value:
validate_type(item)
return
raise ValueError(
f"Unsupported type {type(value).__name__} in inputs. "
"Only str, int, float, bool, dict, and list are allowed."
)
# Validate all input values
for key, value in inputs.items():
try:
validate_type(value)
except ValueError as e:
raise ValueError(f"Invalid value for key '{key}': {str(e)}") from e
if input_string is None or not input_string:
return ""
if "{" not in input_string and "}" not in input_string:
return input_string
if not inputs:
raise ValueError(
"Inputs dictionary cannot be empty when interpolating variables"
)
# The regex pattern to find valid variable placeholders
# Matches {variable_name} where variable_name starts with a letter/underscore
# and contains only letters, numbers, and underscores
pattern = r"\{([A-Za-z_][A-Za-z0-9_]*)\}"
# Find all matching variables in the input string
variables = re.findall(pattern, input_string)
result = input_string
# Check if all variables exist in inputs
missing_vars = [var for var in variables if var not in inputs]
if missing_vars:
raise KeyError(
f"Template variable '{missing_vars[0]}' not found in inputs dictionary"
)
# Replace each variable with its value
for var in variables:
if var in inputs:
placeholder = "{" + var + "}"
value = str(inputs[var])
result = result.replace(placeholder, value)
return result

View File

@@ -15,7 +15,6 @@ from crewai import Agent, Crew, Process, Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.converter import Converter
from crewai.utilities.string_utils import interpolate_only
def test_task_tool_reflect_agent_tools():
@@ -823,7 +822,7 @@ def test_interpolate_only():
# Test JSON structure preservation
json_string = '{"info": "Look at {placeholder}", "nested": {"val": "{nestedVal}"}}'
result = interpolate_only(
result = task.interpolate_only(
input_string=json_string,
inputs={"placeholder": "the data", "nestedVal": "something else"},
)
@@ -834,18 +833,20 @@ def test_interpolate_only():
# Test normal string interpolation
normal_string = "Hello {name}, welcome to {place}!"
result = interpolate_only(
result = task.interpolate_only(
input_string=normal_string, inputs={"name": "John", "place": "CrewAI"}
)
assert result == "Hello John, welcome to CrewAI!"
# Test empty string
result = interpolate_only(input_string="", inputs={"unused": "value"})
result = task.interpolate_only(input_string="", inputs={"unused": "value"})
assert result == ""
# Test string with no placeholders
no_placeholders = "Hello, this is a test"
result = interpolate_only(input_string=no_placeholders, inputs={"unused": "value"})
result = task.interpolate_only(
input_string=no_placeholders, inputs={"unused": "value"}
)
assert result == no_placeholders
@@ -857,7 +858,7 @@ def test_interpolate_only_with_dict_inside_expected_output():
)
json_string = '{"questions": {"main_question": "What is the user\'s name?", "secondary_question": "What is the user\'s age?"}}'
result = interpolate_only(
result = task.interpolate_only(
input_string=json_string,
inputs={
"questions": {
@@ -871,16 +872,18 @@ def test_interpolate_only_with_dict_inside_expected_output():
assert result == json_string
normal_string = "Hello {name}, welcome to {place}!"
result = interpolate_only(
result = task.interpolate_only(
input_string=normal_string, inputs={"name": "John", "place": "CrewAI"}
)
assert result == "Hello John, welcome to CrewAI!"
result = interpolate_only(input_string="", inputs={"unused": "value"})
result = task.interpolate_only(input_string="", inputs={"unused": "value"})
assert result == ""
no_placeholders = "Hello, this is a test"
result = interpolate_only(input_string=no_placeholders, inputs={"unused": "value"})
result = task.interpolate_only(
input_string=no_placeholders, inputs={"unused": "value"}
)
assert result == no_placeholders
@@ -1082,12 +1085,12 @@ def test_interpolate_with_list_of_strings():
# Test simple list of strings
input_str = "Available items: {items}"
inputs = {"items": ["apple", "banana", "cherry"]}
result = interpolate_only(input_str, inputs)
result = task.interpolate_only(input_str, inputs)
assert result == f"Available items: {inputs['items']}"
# Test empty list
empty_list_input = {"items": []}
result = interpolate_only(input_str, empty_list_input)
result = task.interpolate_only(input_str, empty_list_input)
assert result == "Available items: []"
@@ -1103,7 +1106,7 @@ def test_interpolate_with_list_of_dicts():
{"name": "Bob", "age": 25, "skills": ["Java", "Cloud"]},
]
}
result = interpolate_only("{people}", input_data)
result = task.interpolate_only("{people}", input_data)
parsed_result = eval(result)
assert isinstance(parsed_result, list)
@@ -1135,7 +1138,7 @@ def test_interpolate_with_nested_structures():
],
}
}
result = interpolate_only("{company}", input_data)
result = task.interpolate_only("{company}", input_data)
parsed = eval(result)
assert parsed["name"] == "TechCorp"
@@ -1158,7 +1161,7 @@ def test_interpolate_with_special_characters():
"empty": "",
}
}
result = interpolate_only("{special_data}", input_data)
result = task.interpolate_only("{special_data}", input_data)
parsed = eval(result)
assert parsed["quotes"] == """This has "double" and 'single' quotes"""
@@ -1185,7 +1188,7 @@ def test_interpolate_mixed_types():
},
}
}
result = interpolate_only("{data}", input_data)
result = task.interpolate_only("{data}", input_data)
parsed = eval(result)
assert parsed["name"] == "Test Dataset"
@@ -1213,7 +1216,7 @@ def test_interpolate_complex_combination():
},
]
}
result = interpolate_only("{report}", input_data)
result = task.interpolate_only("{report}", input_data)
parsed = eval(result)
assert len(parsed) == 2
@@ -1230,7 +1233,7 @@ def test_interpolate_invalid_type_validation():
# Test with invalid top-level type
with pytest.raises(ValueError) as excinfo:
interpolate_only("{data}", {"data": set()}) # type: ignore we are purposely testing this failure
task.interpolate_only("{data}", {"data": set()}) # type: ignore we are purposely testing this failure
assert "Unsupported type set" in str(excinfo.value)
@@ -1243,7 +1246,7 @@ def test_interpolate_invalid_type_validation():
}
}
with pytest.raises(ValueError) as excinfo:
interpolate_only("{data}", {"data": invalid_nested})
task.interpolate_only("{data}", {"data": invalid_nested})
assert "Unsupported type set" in str(excinfo.value)
@@ -1262,22 +1265,24 @@ def test_interpolate_custom_object_validation():
# Test with custom object at top level
with pytest.raises(ValueError) as excinfo:
interpolate_only("{obj}", {"obj": CustomObject(5)}) # type: ignore we are purposely testing this failure
task.interpolate_only("{obj}", {"obj": CustomObject(5)}) # type: ignore we are purposely testing this failure
assert "Unsupported type CustomObject" in str(excinfo.value)
# Test with nested custom object in dictionary
with pytest.raises(ValueError) as excinfo:
interpolate_only("{data}", {"data": {"valid": 1, "invalid": CustomObject(5)}})
task.interpolate_only(
"{data}", {"data": {"valid": 1, "invalid": CustomObject(5)}}
)
assert "Unsupported type CustomObject" in str(excinfo.value)
# Test with nested custom object in list
with pytest.raises(ValueError) as excinfo:
interpolate_only("{data}", {"data": [1, "valid", CustomObject(5)]})
task.interpolate_only("{data}", {"data": [1, "valid", CustomObject(5)]})
assert "Unsupported type CustomObject" in str(excinfo.value)
# Test with deeply nested custom object
with pytest.raises(ValueError) as excinfo:
interpolate_only(
task.interpolate_only(
"{data}", {"data": {"level1": {"level2": [{"level3": CustomObject(5)}]}}}
)
assert "Unsupported type CustomObject" in str(excinfo.value)
@@ -1301,7 +1306,7 @@ def test_interpolate_valid_complex_types():
}
# Should not raise any errors
result = interpolate_only("{data}", {"data": valid_data})
result = task.interpolate_only("{data}", {"data": valid_data})
parsed = eval(result)
assert parsed["name"] == "Valid Dataset"
assert parsed["stats"]["nested"]["deeper"]["b"] == 2.5
@@ -1314,16 +1319,16 @@ def test_interpolate_edge_cases():
)
# Test empty dict and list
assert interpolate_only("{}", {"data": {}}) == "{}"
assert interpolate_only("[]", {"data": []}) == "[]"
assert task.interpolate_only("{}", {"data": {}}) == "{}"
assert task.interpolate_only("[]", {"data": []}) == "[]"
# Test numeric types
assert interpolate_only("{num}", {"num": 42}) == "42"
assert interpolate_only("{num}", {"num": 3.14}) == "3.14"
assert task.interpolate_only("{num}", {"num": 42}) == "42"
assert task.interpolate_only("{num}", {"num": 3.14}) == "3.14"
# Test boolean values (valid JSON types)
assert interpolate_only("{flag}", {"flag": True}) == "True"
assert interpolate_only("{flag}", {"flag": False}) == "False"
assert task.interpolate_only("{flag}", {"flag": True}) == "True"
assert task.interpolate_only("{flag}", {"flag": False}) == "False"
def test_interpolate_valid_types():
@@ -1341,7 +1346,7 @@ def test_interpolate_valid_types():
"nested": {"flag": True, "empty": None},
}
result = interpolate_only("{data}", {"data": valid_data})
result = task.interpolate_only("{data}", {"data": valid_data})
parsed = eval(result)
assert parsed["active"] is True

View File

@@ -1,86 +0,0 @@
import os
from unittest.mock import Mock, patch
import pytest
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from crewai.telemetry.telemetry import SafeBatchSpanProcessor, Telemetry
class TestTelemetry:
"""Test suite for Telemetry functionality focusing on error handling and span processing."""
def test_safe_batch_span_processor(self):
"""Test that SafeBatchSpanProcessor properly suppresses exceptions."""
# Create a mock exporter that will be used by the processor
mock_exporter = Mock()
# Create a SafeBatchSpanProcessor with the mock exporter
processor = SafeBatchSpanProcessor(mock_exporter)
# Test force_flush with an exception
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=ConnectionError("Test error")):
# This should not raise an exception
processor.force_flush()
# Test that the processor's export method suppresses exceptions
with patch.object(mock_exporter, 'export', side_effect=ConnectionError("Test error")):
# This should not raise an exception
processor.export([])
def test_telemetry_with_connection_error(self):
"""Test that telemetry connection errors are properly handled in real usage."""
# Make sure telemetry is enabled for the test
os.environ["OTEL_SDK_DISABLED"] = "false"
# Create a telemetry instance
telemetry = Telemetry()
# Verify telemetry is initialized
assert telemetry.ready is True
# Test a real telemetry operation
# This should not raise an exception even if there are connection issues
telemetry.flow_creation_span("test_flow")
# Reset environment variables
os.environ["OTEL_SDK_DISABLED"] = "true"
def test_safe_batch_span_processor_with_timeout(self):
"""Test that SafeBatchSpanProcessor properly handles timeout errors."""
# Create a mock exporter that will be used by the processor
mock_exporter = Mock()
# Create a SafeBatchSpanProcessor with the mock exporter
processor = SafeBatchSpanProcessor(mock_exporter)
# Test force_flush with a timeout error
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=TimeoutError("Test timeout")):
# This should not raise an exception
processor.force_flush()
# Test that the processor's export method suppresses timeout exceptions
with patch.object(mock_exporter, 'export', side_effect=TimeoutError("Test timeout")):
# This should not raise an exception
processor.export([])
def test_safe_batch_span_processor_with_valid_data(self):
"""Test SafeBatchSpanProcessor normal operation with valid data."""
# Create a mock exporter that will be used by the processor
mock_exporter = Mock()
# Create a SafeBatchSpanProcessor with the mock exporter
processor = SafeBatchSpanProcessor(mock_exporter)
# Test force_flush with no exception
with patch.object(BatchSpanProcessor, 'force_flush', return_value=None):
# This should complete normally
processor.force_flush()
# Mock some valid spans
mock_spans = [Mock() for _ in range(3)]
# Test that the processor's export method works with valid data
with patch.object(mock_exporter, 'export', return_value=None):
# This should complete normally
processor.export(mock_spans)

View File

@@ -1,35 +1,17 @@
import json
import os
from typing import Dict, List, Optional
from datetime import date, datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Union, cast
from unittest.mock import MagicMock, Mock, patch
import pytest
from pydantic import BaseModel
from crewai.llm import LLM
from crewai.utilities.converter import (
Converter,
ConverterError,
convert_to_model,
convert_with_instructions,
create_converter,
generate_model_description,
get_conversion_instructions,
handle_partial_json,
validate_model,
)
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
from crewai.flow.state_utils import _to_serializable_key, to_serializable, to_string
# Sample Pydantic models for testing
class EmailResponse(BaseModel):
previous_message_content: str
class EmailResponses(BaseModel):
responses: list[EmailResponse]
class SimpleModel(BaseModel):
name: str
age: int
@@ -52,560 +34,190 @@ class Person(BaseModel):
address: Address
class CustomConverter(Converter):
pass
class Color(Enum):
RED = "red"
GREEN = "green"
BLUE = "blue"
# Fixtures
@pytest.fixture
def mock_agent():
agent = Mock()
agent.function_calling_llm = None
agent.llm = Mock()
return agent
class EnumModel(BaseModel):
name: str
color: Color
# Tests for convert_to_model
def test_convert_to_model_with_valid_json():
result = '{"name": "John", "age": 30}'
output = convert_to_model(result, SimpleModel, None, None)
assert isinstance(output, SimpleModel)
assert output.name == "John"
assert output.age == 30
class OptionalModel(BaseModel):
name: str
age: Optional[int]
def test_convert_to_model_with_invalid_json():
result = '{"name": "John", "age": "thirty"}'
with patch("crewai.utilities.converter.handle_partial_json") as mock_handle:
mock_handle.return_value = "Fallback result"
output = convert_to_model(result, SimpleModel, None, None)
assert output == "Fallback result"
class ListModel(BaseModel):
items: List[int]
def test_convert_to_model_with_no_model():
result = "Plain text"
output = convert_to_model(result, None, None, None)
assert output == "Plain text"
class UnionModel(BaseModel):
field: Union[int, str, None]
def test_convert_to_model_with_special_characters():
json_string_test = """
{
"responses": [
{
"previous_message_content": "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on"
}
]
# Tests for to_serializable function
def test_to_serializable_primitives():
"""Test serialization of primitive types."""
assert to_serializable("test string") == "test string"
assert to_serializable(42) == 42
assert to_serializable(3.14) == 3.14
assert to_serializable(True) == True
assert to_serializable(None) is None
def test_to_serializable_dates():
"""Test serialization of date and datetime objects."""
test_date = date(2023, 1, 15)
test_datetime = datetime(2023, 1, 15, 10, 30, 45)
assert to_serializable(test_date) == "2023-01-15"
assert to_serializable(test_datetime) == "2023-01-15T10:30:45"
def test_to_serializable_collections():
"""Test serialization of lists, tuples, and sets."""
test_list = [1, "two", 3.0]
test_tuple = (4, "five", 6.0)
test_set = {7, "eight", 9.0}
assert to_serializable(test_list) == [1, "two", 3.0]
assert to_serializable(test_tuple) == [4, "five", 6.0]
# For sets, we can't rely on order, so we'll verify differently
serialized_set = to_serializable(test_set)
assert isinstance(serialized_set, list)
assert len(serialized_set) == 3
assert 7 in serialized_set
assert "eight" in serialized_set
assert 9.0 in serialized_set
def test_to_serializable_dict():
"""Test serialization of dictionaries."""
test_dict = {"a": 1, "b": "two", "c": [3, 4, 5]}
assert to_serializable(test_dict) == {"a": 1, "b": "two", "c": [3, 4, 5]}
def test_to_serializable_pydantic_models():
"""Test serialization of Pydantic models."""
simple = SimpleModel(name="John", age=30)
assert to_serializable(simple) == {"name": "John", "age": 30}
def test_to_serializable_nested_models():
"""Test serialization of nested Pydantic models."""
simple = SimpleModel(name="John", age=30)
nested = NestedModel(id=1, data=simple)
assert to_serializable(nested) == {"id": 1, "data": {"name": "John", "age": 30}}
def test_to_serializable_complex_model():
"""Test serialization of a complex model with nested structures."""
person = Person(
name="Jane",
age=28,
address=Address(street="123 Main St", city="Anytown", zip_code="12345"),
)
assert to_serializable(person) == {
"name": "Jane",
"age": 28,
"address": {"street": "123 Main St", "city": "Anytown", "zip_code": "12345"},
}
"""
output = convert_to_model(json_string_test, EmailResponses, None, None)
assert isinstance(output, EmailResponses)
assert len(output.responses) == 1
assert (
output.responses[0].previous_message_content
== "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on"
)
def test_convert_to_model_with_escaped_special_characters():
json_string_test = json.dumps(
{
"responses": [
{
"previous_message_content": "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on"
}
]
}
)
output = convert_to_model(json_string_test, EmailResponses, None, None)
assert isinstance(output, EmailResponses)
assert len(output.responses) == 1
assert (
output.responses[0].previous_message_content
== "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on"
)
def test_to_serializable_enum():
"""Test serialization of Enum values."""
model = EnumModel(name="ColorTest", color=Color.RED)
assert to_serializable(model) == {"name": "ColorTest", "color": "red"}
def test_convert_to_model_with_multiple_special_characters():
json_string_test = """
{
"responses": [
{
"previous_message_content": "Line 1\r\nLine 2\tTabbed\nLine 3\r\n\rEscaped newline"
}
]
}
"""
output = convert_to_model(json_string_test, EmailResponses, None, None)
assert isinstance(output, EmailResponses)
assert len(output.responses) == 1
assert (
output.responses[0].previous_message_content
== "Line 1\r\nLine 2\tTabbed\nLine 3\r\n\rEscaped newline"
)
def test_to_serializable_optional_fields():
"""Test serialization of models with optional fields."""
model_with_age = OptionalModel(name="WithAge", age=25)
model_without_age = OptionalModel(name="WithoutAge", age=None)
# Tests for validate_model
def test_validate_model_pydantic_output():
result = '{"name": "Alice", "age": 25}'
output = validate_model(result, SimpleModel, False)
assert isinstance(output, SimpleModel)
assert output.name == "Alice"
assert output.age == 25
assert to_serializable(model_with_age) == {"name": "WithAge", "age": 25}
assert to_serializable(model_without_age) == {"name": "WithoutAge", "age": None}
def test_validate_model_json_output():
result = '{"name": "Bob", "age": 40}'
output = validate_model(result, SimpleModel, True)
assert isinstance(output, dict)
assert output == {"name": "Bob", "age": 40}
def test_to_serializable_list_field():
"""Test serialization of models with list fields."""
model = ListModel(items=[1, 2, 3, 4, 5])
assert to_serializable(model) == {"items": [1, 2, 3, 4, 5]}
# Tests for handle_partial_json
def test_handle_partial_json_with_valid_partial():
result = 'Some text {"name": "Charlie", "age": 35} more text'
output = handle_partial_json(result, SimpleModel, False, None)
assert isinstance(output, SimpleModel)
assert output.name == "Charlie"
assert output.age == 35
def test_to_serializable_union_field():
"""Test serialization of models with union fields."""
model_int = UnionModel(field=42)
model_str = UnionModel(field="test")
model_none = UnionModel(field=None)
def test_handle_partial_json_with_invalid_partial(mock_agent):
result = "No valid JSON here"
with patch("crewai.utilities.converter.convert_with_instructions") as mock_convert:
mock_convert.return_value = "Converted result"
output = handle_partial_json(result, SimpleModel, False, mock_agent)
assert output == "Converted result"
assert to_serializable(model_int) == {"field": 42}
assert to_serializable(model_str) == {"field": "test"}
assert to_serializable(model_none) == {"field": None}
# Tests for convert_with_instructions
@patch("crewai.utilities.converter.create_converter")
@patch("crewai.utilities.converter.get_conversion_instructions")
def test_convert_with_instructions_success(
mock_get_instructions, mock_create_converter, mock_agent
):
mock_get_instructions.return_value = "Instructions"
mock_converter = Mock()
mock_converter.to_pydantic.return_value = SimpleModel(name="David", age=50)
mock_create_converter.return_value = mock_converter
def test_to_serializable_max_depth():
"""Test max depth parameter to prevent infinite recursion."""
# Create recursive structure
a: Dict[str, Any] = {"name": "a"}
b: Dict[str, Any] = {"name": "b", "ref": a}
a["ref"] = b # Create circular reference
result = "Some text to convert"
output = convert_with_instructions(result, SimpleModel, False, mock_agent)
result = to_serializable(a, max_depth=3)
assert isinstance(output, SimpleModel)
assert output.name == "David"
assert output.age == 50
assert isinstance(result, dict)
assert "name" in result
assert "ref" in result
assert isinstance(result["ref"], dict)
assert "ref" in result["ref"]
assert isinstance(result["ref"]["ref"], dict)
# At depth 3, it should convert to string
assert isinstance(result["ref"]["ref"]["ref"], str)
@patch("crewai.utilities.converter.create_converter")
@patch("crewai.utilities.converter.get_conversion_instructions")
def test_convert_with_instructions_failure(
mock_get_instructions, mock_create_converter, mock_agent
):
mock_get_instructions.return_value = "Instructions"
mock_converter = Mock()
mock_converter.to_pydantic.return_value = ConverterError("Conversion failed")
mock_create_converter.return_value = mock_converter
def test_to_serializable_non_serializable():
"""Test serialization of objects that aren't directly JSON serializable."""
result = "Some text to convert"
with patch("crewai.utilities.converter.Printer") as mock_printer:
output = convert_with_instructions(result, SimpleModel, False, mock_agent)
assert output == result
mock_printer.return_value.print.assert_called_once()
class CustomObject:
def __repr__(self):
return "CustomObject()"
obj = CustomObject()
# Tests for get_conversion_instructions
def test_get_conversion_instructions_gpt():
llm = LLM(model="gpt-4o-mini")
with patch.object(LLM, "supports_function_calling") as supports_function_calling:
supports_function_calling.return_value = True
instructions = get_conversion_instructions(SimpleModel, llm)
model_schema = PydanticSchemaParser(model=SimpleModel).get_schema()
expected_instructions = (
"Please convert the following text into valid JSON.\n\n"
"Output ONLY the valid JSON and nothing else.\n\n"
"The JSON must follow this schema exactly:\n```json\n"
f"{model_schema}\n```"
)
assert instructions == expected_instructions
# Should convert to string representation
assert to_serializable(obj) == "CustomObject()"
def test_get_conversion_instructions_non_gpt():
llm = LLM(model="ollama/llama3.1", base_url="http://localhost:11434")
with patch.object(LLM, "supports_function_calling", return_value=False):
instructions = get_conversion_instructions(SimpleModel, llm)
assert '"name": str' in instructions
assert '"age": int' in instructions
def test_to_string_conversion():
"""Test the to_string function."""
test_dict = {"name": "Test", "values": [1, 2, 3]}
# Should convert to a JSON string
assert to_string(test_dict) == '{"name": "Test", "values": [1, 2, 3]}'
# Tests for is_gpt
def test_supports_function_calling_true():
llm = LLM(model="gpt-4o")
assert llm.supports_function_calling() is True
# None should return None
assert to_string(None) is None
def test_supports_function_calling_false():
llm = LLM(model="non-existent-model")
assert llm.supports_function_calling() is False
def test_to_serializable_key():
"""Test serialization of dictionary keys."""
# String and int keys are converted to strings
assert _to_serializable_key("test") == "test"
assert _to_serializable_key(42) == "42"
def test_create_converter_with_mock_agent():
mock_agent = MagicMock()
mock_agent.get_output_converter.return_value = MagicMock(spec=Converter)
converter = create_converter(
agent=mock_agent,
llm=Mock(),
text="Sample",
model=SimpleModel,
instructions="Convert",
)
assert isinstance(converter, Converter)
mock_agent.get_output_converter.assert_called_once()
def test_create_converter_with_custom_converter():
converter = create_converter(
converter_cls=CustomConverter,
llm=LLM(model="gpt-4o-mini"),
text="Sample",
model=SimpleModel,
instructions="Convert",
)
assert isinstance(converter, CustomConverter)
def test_create_converter_fails_without_agent_or_converter_cls():
with pytest.raises(
ValueError, match="Either agent or converter_cls must be provided"
):
create_converter(
llm=Mock(), text="Sample", model=SimpleModel, instructions="Convert"
)
def test_generate_model_description_simple_model():
description = generate_model_description(SimpleModel)
expected_description = '{\n "name": str,\n "age": int\n}'
assert description == expected_description
def test_generate_model_description_nested_model():
description = generate_model_description(NestedModel)
expected_description = (
'{\n "id": int,\n "data": {\n "name": str,\n "age": int\n}\n}'
)
assert description == expected_description
def test_generate_model_description_optional_field():
class ModelWithOptionalField(BaseModel):
name: Optional[str]
age: int
description = generate_model_description(ModelWithOptionalField)
expected_description = '{\n "name": Optional[str],\n "age": int\n}'
assert description == expected_description
def test_generate_model_description_list_field():
class ModelWithListField(BaseModel):
items: List[int]
description = generate_model_description(ModelWithListField)
expected_description = '{\n "items": List[int]\n}'
assert description == expected_description
def test_generate_model_description_dict_field():
class ModelWithDictField(BaseModel):
attributes: Dict[str, int]
description = generate_model_description(ModelWithDictField)
expected_description = '{\n "attributes": Dict[str, int]\n}'
assert description == expected_description
@pytest.mark.vcr(filter_headers=["authorization"])
def test_convert_with_instructions():
llm = LLM(model="gpt-4o-mini")
sample_text = "Name: Alice, Age: 30"
instructions = get_conversion_instructions(SimpleModel, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=SimpleModel,
instructions=instructions,
)
# Act
output = converter.to_pydantic()
# Assert
assert isinstance(output, SimpleModel)
assert output.name == "Alice"
assert output.age == 30
# Skip tests that call external APIs when running in CI/CD
skip_external_api = pytest.mark.skipif(
os.getenv("CI") is not None, reason="Skipping tests that call external API in CI/CD"
)
@skip_external_api
@pytest.mark.vcr(filter_headers=["authorization"], record_mode="once")
def test_converter_with_llama3_2_model():
llm = LLM(model="ollama/llama3.2:3b", base_url="http://localhost:11434")
sample_text = "Name: Alice Llama, Age: 30"
instructions = get_conversion_instructions(SimpleModel, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=SimpleModel,
instructions=instructions,
)
output = converter.to_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Alice Llama"
assert output.age == 30
@skip_external_api
@pytest.mark.vcr(filter_headers=["authorization"], record_mode="once")
def test_converter_with_llama3_1_model():
llm = LLM(model="ollama/llama3.1", base_url="http://localhost:11434")
sample_text = "Name: Alice Llama, Age: 30"
instructions = get_conversion_instructions(SimpleModel, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=SimpleModel,
instructions=instructions,
)
output = converter.to_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Alice Llama"
assert output.age == 30
# Skip tests that call external APIs when running in CI/CD
skip_external_api = pytest.mark.skipif(
os.getenv("CI") is not None, reason="Skipping tests that call external API in CI/CD"
)
@skip_external_api
@pytest.mark.vcr(filter_headers=["authorization"])
def test_converter_with_nested_model():
llm = LLM(model="gpt-4o-mini")
sample_text = "Name: John Doe\nAge: 30\nAddress: 123 Main St, Anytown, 12345"
instructions = get_conversion_instructions(Person, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=Person,
instructions=instructions,
)
output = converter.to_pydantic()
assert isinstance(output, Person)
assert output.name == "John Doe"
assert output.age == 30
assert isinstance(output.address, Address)
assert output.address.street == "123 Main St"
assert output.address.city == "Anytown"
assert output.address.zip_code == "12345"
# Tests for error handling
def test_converter_error_handling():
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.call.return_value = "Invalid JSON"
sample_text = "Name: Alice, Age: 30"
instructions = get_conversion_instructions(SimpleModel, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=SimpleModel,
instructions=instructions,
)
with pytest.raises(ConverterError) as exc_info:
output = converter.to_pydantic()
assert "Failed to convert text into a Pydantic model" in str(exc_info.value)
# Tests for retry logic
def test_converter_retry_logic():
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.call.side_effect = [
"Invalid JSON",
"Still invalid",
'{"name": "Retry Alice", "age": 30}',
]
sample_text = "Name: Retry Alice, Age: 30"
instructions = get_conversion_instructions(SimpleModel, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=SimpleModel,
instructions=instructions,
max_attempts=3,
)
output = converter.to_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Retry Alice"
assert output.age == 30
assert llm.call.call_count == 3
# Tests for optional fields
def test_converter_with_optional_fields():
class OptionalModel(BaseModel):
name: str
age: Optional[int]
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
# Simulate the LLM's response with 'age' explicitly set to null
llm.call.return_value = '{"name": "Bob", "age": null}'
sample_text = "Name: Bob, age: None"
instructions = get_conversion_instructions(OptionalModel, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=OptionalModel,
instructions=instructions,
)
output = converter.to_pydantic()
assert isinstance(output, OptionalModel)
assert output.name == "Bob"
assert output.age is None
# Tests for list fields
def test_converter_with_list_field():
class ListModel(BaseModel):
items: List[int]
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.call.return_value = '{"items": [1, 2, 3]}'
sample_text = "Items: 1, 2, 3"
instructions = get_conversion_instructions(ListModel, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=ListModel,
instructions=instructions,
)
output = converter.to_pydantic()
assert isinstance(output, ListModel)
assert output.items == [1, 2, 3]
# Tests for enums
from enum import Enum
def test_converter_with_enum():
class Color(Enum):
RED = "red"
GREEN = "green"
BLUE = "blue"
class EnumModel(BaseModel):
name: str
color: Color
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.call.return_value = '{"name": "Alice", "color": "red"}'
sample_text = "Name: Alice, Color: Red"
instructions = get_conversion_instructions(EnumModel, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=EnumModel,
instructions=instructions,
)
output = converter.to_pydantic()
assert isinstance(output, EnumModel)
assert output.name == "Alice"
assert output.color == Color.RED
# Tests for ambiguous input
def test_converter_with_ambiguous_input():
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.call.return_value = '{"name": "Charlie", "age": "Not an age"}'
sample_text = "Charlie is thirty years old"
instructions = get_conversion_instructions(SimpleModel, llm)
converter = Converter(
llm=llm,
text=sample_text,
model=SimpleModel,
instructions=instructions,
)
with pytest.raises(ConverterError) as exc_info:
output = converter.to_pydantic()
assert "failed to convert text into a pydantic model" in str(exc_info.value).lower()
# Tests for function calling support
def test_converter_with_function_calling():
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = True
instructor = Mock()
instructor.to_pydantic.return_value = SimpleModel(name="Eve", age=35)
converter = Converter(
llm=llm,
text="Name: Eve, Age: 35",
model=SimpleModel,
instructions="Convert this text.",
)
converter._create_instructor = Mock(return_value=instructor)
output = converter.to_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Eve"
assert output.age == 35
instructor.to_pydantic.assert_called_once()
def test_generate_model_description_union_field():
class UnionModel(BaseModel):
field: int | str | None
description = generate_model_description(UnionModel)
expected_description = '{\n "field": int | str | None\n}'
assert description == expected_description
# Complex objects are converted to a unique string
obj = object()
key_str = _to_serializable_key(obj)
assert isinstance(key_str, str)
assert "key_" in key_str
assert "object" in key_str

View File

@@ -1,187 +0,0 @@
from typing import Any, Dict, List, Union
import pytest
from crewai.utilities.string_utils import interpolate_only
class TestInterpolateOnly:
"""Tests for the interpolate_only function in string_utils.py."""
def test_basic_variable_interpolation(self):
"""Test basic variable interpolation works correctly."""
template = "Hello, {name}! Welcome to {company}."
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"name": "Alice",
"company": "CrewAI",
}
result = interpolate_only(template, inputs)
assert result == "Hello, Alice! Welcome to CrewAI."
def test_multiple_occurrences_of_same_variable(self):
"""Test that multiple occurrences of the same variable are replaced."""
template = "{name} is using {name}'s account."
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"name": "Bob"
}
result = interpolate_only(template, inputs)
assert result == "Bob is using Bob's account."
def test_json_structure_preservation(self):
"""Test that JSON structures are preserved and not interpolated incorrectly."""
template = """
Instructions for {agent}:
Please return the following object:
{"name": "person's name", "age": 25, "skills": ["coding", "testing"]}
"""
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"agent": "DevAgent"
}
result = interpolate_only(template, inputs)
assert "Instructions for DevAgent:" in result
assert (
'{"name": "person\'s name", "age": 25, "skills": ["coding", "testing"]}'
in result
)
def test_complex_nested_json(self):
"""Test with complex JSON structures containing curly braces."""
template = """
{agent} needs to process:
{
"config": {
"nested": {
"value": 42
},
"arrays": [1, 2, {"inner": "value"}]
}
}
"""
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"agent": "DataProcessor"
}
result = interpolate_only(template, inputs)
assert "DataProcessor needs to process:" in result
assert '"nested": {' in result
assert '"value": 42' in result
assert '[1, 2, {"inner": "value"}]' in result
def test_missing_variable(self):
"""Test that an error is raised when a required variable is missing."""
template = "Hello, {name}!"
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"not_name": "Alice"
}
with pytest.raises(KeyError) as excinfo:
interpolate_only(template, inputs)
assert "template variable" in str(excinfo.value).lower()
assert "name" in str(excinfo.value)
def test_invalid_input_types(self):
"""Test that an error is raised with invalid input types."""
template = "Hello, {name}!"
# Using Any for this test since we're intentionally testing an invalid type
inputs: Dict[str, Any] = {"name": object()} # Object is not a valid input type
with pytest.raises(ValueError) as excinfo:
interpolate_only(template, inputs)
assert "unsupported type" in str(excinfo.value).lower()
def test_empty_input_string(self):
"""Test handling of empty or None input string."""
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"name": "Alice"
}
assert interpolate_only("", inputs) == ""
assert interpolate_only(None, inputs) == ""
def test_no_variables_in_template(self):
"""Test a template with no variables to replace."""
template = "This is a static string with no variables."
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"name": "Alice"
}
result = interpolate_only(template, inputs)
assert result == template
def test_variable_name_starting_with_underscore(self):
"""Test variables starting with underscore are replaced correctly."""
template = "Variable: {_special_var}"
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"_special_var": "Special Value"
}
result = interpolate_only(template, inputs)
assert result == "Variable: Special Value"
def test_preserves_non_matching_braces(self):
"""Test that non-matching braces patterns are preserved."""
template = (
"This {123} and {!var} should not be replaced but {valid_var} should."
)
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"valid_var": "works"
}
result = interpolate_only(template, inputs)
assert (
result == "This {123} and {!var} should not be replaced but works should."
)
def test_complex_mixed_scenario(self):
"""Test a complex scenario with both valid variables and JSON structures."""
template = """
{agent_name} is working on task {task_id}.
Instructions:
1. Process the data
2. Return results as:
{
"taskId": "{task_id}",
"results": {
"processed_by": "agent_name",
"status": "complete",
"values": [1, 2, 3]
}
}
"""
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"agent_name": "AnalyticsAgent",
"task_id": "T-12345",
}
result = interpolate_only(template, inputs)
assert "AnalyticsAgent is working on task T-12345" in result
assert '"taskId": "T-12345"' in result
assert '"processed_by": "agent_name"' in result # This shouldn't be replaced
assert '"values": [1, 2, 3]' in result
def test_empty_inputs_dictionary(self):
"""Test that an error is raised with empty inputs dictionary."""
template = "Hello, {name}!"
inputs: Dict[str, Any] = {}
with pytest.raises(ValueError) as excinfo:
interpolate_only(template, inputs)
assert "inputs dictionary cannot be empty" in str(excinfo.value).lower()