mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-11 05:22:41 +00:00
Compare commits
7 Commits
devin/1742
...
bugfix/flo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
38735cba99 | ||
|
|
cde67882b4 | ||
|
|
d3df545f1e | ||
|
|
b5067a2689 | ||
|
|
362b20f052 | ||
|
|
d5408ec461 | ||
|
|
6677c9c192 |
@@ -25,7 +25,6 @@ from crewai.tools.base_tool import BaseTool, Tool
|
||||
from crewai.utilities import I18N, Logger, RPMController
|
||||
from crewai.utilities.config import process_config
|
||||
from crewai.utilities.converter import Converter
|
||||
from crewai.utilities.string_utils import interpolate_only
|
||||
|
||||
T = TypeVar("T", bound="BaseAgent")
|
||||
|
||||
@@ -334,15 +333,9 @@ class BaseAgent(ABC, BaseModel):
|
||||
self._original_backstory = self.backstory
|
||||
|
||||
if inputs:
|
||||
self.role = interpolate_only(
|
||||
input_string=self._original_role, inputs=inputs
|
||||
)
|
||||
self.goal = interpolate_only(
|
||||
input_string=self._original_goal, inputs=inputs
|
||||
)
|
||||
self.backstory = interpolate_only(
|
||||
input_string=self._original_backstory, inputs=inputs
|
||||
)
|
||||
self.role = self._original_role.format(**inputs)
|
||||
self.goal = self._original_goal.format(**inputs)
|
||||
self.backstory = self._original_backstory.format(**inputs)
|
||||
|
||||
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
|
||||
"""Set the cache handler for the agent.
|
||||
|
||||
@@ -8,45 +8,45 @@ from pydantic import BaseModel
|
||||
|
||||
class FlowPersistence(abc.ABC):
|
||||
"""Abstract base class for flow state persistence.
|
||||
|
||||
|
||||
This class defines the interface that all persistence implementations must follow.
|
||||
It supports both structured (Pydantic BaseModel) and unstructured (dict) states.
|
||||
"""
|
||||
|
||||
|
||||
@abc.abstractmethod
|
||||
def init_db(self) -> None:
|
||||
"""Initialize the persistence backend.
|
||||
|
||||
|
||||
This method should handle any necessary setup, such as:
|
||||
- Creating tables
|
||||
- Establishing connections
|
||||
- Setting up indexes
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@abc.abstractmethod
|
||||
def save_state(
|
||||
self,
|
||||
flow_uuid: str,
|
||||
method_name: str,
|
||||
state_data: Union[Dict[str, Any], BaseModel]
|
||||
state_data: Union[Dict[str, Any], BaseModel],
|
||||
) -> None:
|
||||
"""Persist the flow state after method completion.
|
||||
|
||||
|
||||
Args:
|
||||
flow_uuid: Unique identifier for the flow instance
|
||||
method_name: Name of the method that just completed
|
||||
state_data: Current state data (either dict or Pydantic model)
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@abc.abstractmethod
|
||||
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
|
||||
"""Load the most recent state for a given flow UUID.
|
||||
|
||||
|
||||
Args:
|
||||
flow_uuid: Unique identifier for the flow instance
|
||||
|
||||
|
||||
Returns:
|
||||
The most recent state as a dictionary, or None if no state exists
|
||||
"""
|
||||
|
||||
@@ -11,6 +11,7 @@ from typing import Any, Dict, Optional, Union
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.flow.state_utils import to_serializable
|
||||
|
||||
|
||||
class SQLiteFlowPersistence(FlowPersistence):
|
||||
@@ -78,34 +79,53 @@ class SQLiteFlowPersistence(FlowPersistence):
|
||||
flow_uuid: Unique identifier for the flow instance
|
||||
method_name: Name of the method that just completed
|
||||
state_data: Current state data (either dict or Pydantic model)
|
||||
"""
|
||||
# Convert state_data to dict, handling both Pydantic and dict cases
|
||||
if isinstance(state_data, BaseModel):
|
||||
state_dict = dict(state_data) # Use dict() for better type compatibility
|
||||
elif isinstance(state_data, dict):
|
||||
state_dict = state_data
|
||||
else:
|
||||
raise ValueError(
|
||||
f"state_data must be either a Pydantic BaseModel or dict, got {type(state_data)}"
|
||||
)
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO flow_states (
|
||||
flow_uuid,
|
||||
method_name,
|
||||
timestamp,
|
||||
state_json
|
||||
) VALUES (?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
flow_uuid,
|
||||
method_name,
|
||||
datetime.now(timezone.utc).isoformat(),
|
||||
json.dumps(state_dict),
|
||||
),
|
||||
)
|
||||
Raises:
|
||||
ValueError: If state_data is neither a dict nor a BaseModel
|
||||
RuntimeError: If database operations fail
|
||||
TypeError: If JSON serialization fails
|
||||
"""
|
||||
try:
|
||||
# Convert state_data to a JSON-serializable dict using the helper method
|
||||
state_dict = to_serializable(state_data)
|
||||
|
||||
# Try to serialize to JSON to catch any serialization issues early
|
||||
try:
|
||||
state_json = json.dumps(state_dict)
|
||||
except (TypeError, ValueError, OverflowError) as json_err:
|
||||
raise TypeError(
|
||||
f"Failed to serialize state to JSON: {json_err}"
|
||||
) from json_err
|
||||
|
||||
# Perform database operation with error handling
|
||||
try:
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO flow_states (
|
||||
flow_uuid,
|
||||
method_name,
|
||||
timestamp,
|
||||
state_json
|
||||
) VALUES (?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
flow_uuid,
|
||||
method_name,
|
||||
datetime.now(timezone.utc).isoformat(),
|
||||
state_json,
|
||||
),
|
||||
)
|
||||
except sqlite3.Error as db_err:
|
||||
raise RuntimeError(f"Database operation failed: {db_err}") from db_err
|
||||
|
||||
except Exception as e:
|
||||
# Log the error but don't crash the application
|
||||
import logging
|
||||
|
||||
logging.error(f"Failed to save flow state: {e}")
|
||||
# Re-raise to allow caller to handle or ignore
|
||||
raise
|
||||
|
||||
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
|
||||
"""Load the most recent state for a given flow UUID.
|
||||
|
||||
@@ -1,36 +1,16 @@
|
||||
import json
|
||||
from datetime import date, datetime
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.flow import Flow
|
||||
|
||||
SerializablePrimitive = Union[str, int, float, bool, None]
|
||||
Serializable = Union[
|
||||
SerializablePrimitive, List["Serializable"], Dict[str, "Serializable"]
|
||||
]
|
||||
|
||||
|
||||
def export_state(flow: Flow) -> dict[str, Serializable]:
|
||||
"""Exports the Flow's internal state as JSON-compatible data structures.
|
||||
|
||||
Performs a one-way transformation of a Flow's state into basic Python types
|
||||
that can be safely serialized to JSON. To prevent infinite recursion with
|
||||
circular references, the conversion is limited to a depth of 5 levels.
|
||||
|
||||
Args:
|
||||
flow: The Flow object whose state needs to be exported
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: The transformed state using JSON-compatible Python
|
||||
types.
|
||||
"""
|
||||
result = to_serializable(flow._state)
|
||||
assert isinstance(result, dict)
|
||||
return result
|
||||
|
||||
|
||||
def to_serializable(
|
||||
obj: Any, max_depth: int = 5, _current_depth: int = 0
|
||||
) -> Serializable:
|
||||
@@ -52,6 +32,8 @@ def to_serializable(
|
||||
|
||||
if isinstance(obj, (str, int, float, bool, type(None))):
|
||||
return obj
|
||||
elif isinstance(obj, Enum):
|
||||
return obj.value
|
||||
elif isinstance(obj, (date, datetime)):
|
||||
return obj.isoformat()
|
||||
elif isinstance(obj, (list, tuple, set)):
|
||||
|
||||
@@ -2,7 +2,6 @@ import datetime
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import threading
|
||||
import uuid
|
||||
from concurrent.futures import Future
|
||||
@@ -50,7 +49,6 @@ from crewai.utilities.events import (
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.i18n import I18N
|
||||
from crewai.utilities.printer import Printer
|
||||
from crewai.utilities.string_utils import interpolate_only
|
||||
|
||||
|
||||
class Task(BaseModel):
|
||||
@@ -509,9 +507,7 @@ class Task(BaseModel):
|
||||
return
|
||||
|
||||
try:
|
||||
self.description = interpolate_only(
|
||||
input_string=self._original_description, inputs=inputs
|
||||
)
|
||||
self.description = self._original_description.format(**inputs)
|
||||
except KeyError as e:
|
||||
raise ValueError(
|
||||
f"Missing required template variable '{e.args[0]}' in description"
|
||||
@@ -520,7 +516,7 @@ class Task(BaseModel):
|
||||
raise ValueError(f"Error interpolating description: {str(e)}") from e
|
||||
|
||||
try:
|
||||
self.expected_output = interpolate_only(
|
||||
self.expected_output = self.interpolate_only(
|
||||
input_string=self._original_expected_output, inputs=inputs
|
||||
)
|
||||
except (KeyError, ValueError) as e:
|
||||
@@ -528,7 +524,7 @@ class Task(BaseModel):
|
||||
|
||||
if self.output_file is not None:
|
||||
try:
|
||||
self.output_file = interpolate_only(
|
||||
self.output_file = self.interpolate_only(
|
||||
input_string=self._original_output_file, inputs=inputs
|
||||
)
|
||||
except (KeyError, ValueError) as e:
|
||||
@@ -559,6 +555,72 @@ class Task(BaseModel):
|
||||
f"\n\n{conversation_instruction}\n\n{conversation_history}"
|
||||
)
|
||||
|
||||
def interpolate_only(
|
||||
self,
|
||||
input_string: Optional[str],
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]],
|
||||
) -> str:
|
||||
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
|
||||
|
||||
Args:
|
||||
input_string: The string containing template variables to interpolate.
|
||||
Can be None or empty, in which case an empty string is returned.
|
||||
inputs: Dictionary mapping template variables to their values.
|
||||
Supported value types are strings, integers, floats, and dicts/lists
|
||||
containing only these types and other nested dicts/lists.
|
||||
|
||||
Returns:
|
||||
The interpolated string with all template variables replaced with their values.
|
||||
Empty string if input_string is None or empty.
|
||||
|
||||
Raises:
|
||||
ValueError: If a value contains unsupported types
|
||||
"""
|
||||
|
||||
# Validation function for recursive type checking
|
||||
def validate_type(value: Any) -> None:
|
||||
if value is None:
|
||||
return
|
||||
if isinstance(value, (str, int, float, bool)):
|
||||
return
|
||||
if isinstance(value, (dict, list)):
|
||||
for item in value.values() if isinstance(value, dict) else value:
|
||||
validate_type(item)
|
||||
return
|
||||
raise ValueError(
|
||||
f"Unsupported type {type(value).__name__} in inputs. "
|
||||
"Only str, int, float, bool, dict, and list are allowed."
|
||||
)
|
||||
|
||||
# Validate all input values
|
||||
for key, value in inputs.items():
|
||||
try:
|
||||
validate_type(value)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid value for key '{key}': {str(e)}") from e
|
||||
|
||||
if input_string is None or not input_string:
|
||||
return ""
|
||||
if "{" not in input_string and "}" not in input_string:
|
||||
return input_string
|
||||
if not inputs:
|
||||
raise ValueError(
|
||||
"Inputs dictionary cannot be empty when interpolating variables"
|
||||
)
|
||||
try:
|
||||
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
|
||||
|
||||
for key in inputs.keys():
|
||||
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
|
||||
|
||||
return escaped_string.format(**inputs)
|
||||
except KeyError as e:
|
||||
raise KeyError(
|
||||
f"Template variable '{e.args[0]}' not found in inputs dictionary"
|
||||
) from e
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Error during string interpolation: {str(e)}") from e
|
||||
|
||||
def increment_tools_errors(self) -> None:
|
||||
"""Increment the tools errors counter."""
|
||||
self.tools_errors += 1
|
||||
|
||||
@@ -2,13 +2,12 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from importlib.metadata import version
|
||||
from typing import TYPE_CHECKING, Any, Optional, Sequence
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
|
||||
@contextmanager
|
||||
@@ -23,7 +22,7 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter, # noqa: E402
|
||||
)
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
|
||||
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider # noqa: E402
|
||||
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
|
||||
from opentelemetry.trace import Span, Status, StatusCode # noqa: E402
|
||||
|
||||
@@ -32,62 +31,6 @@ if TYPE_CHECKING:
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
# A custom BatchSpanProcessor that catches and suppresses all exceptions
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SafeBatchSpanProcessor(BatchSpanProcessor):
|
||||
"""A wrapper around BatchSpanProcessor that suppresses all exceptions.
|
||||
|
||||
This processor ensures that telemetry operations do not disrupt user code
|
||||
by catching and suppressing connection and timeout errors that might occur
|
||||
during span export operations.
|
||||
|
||||
It logs suppressed errors at the debug level for diagnostic purposes without
|
||||
propagating them to calling code.
|
||||
"""
|
||||
|
||||
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
|
||||
"""Override force_flush to catch and suppress all exceptions.
|
||||
|
||||
Args:
|
||||
timeout_millis: The maximum amount of time to wait for spans to be exported.
|
||||
|
||||
Returns:
|
||||
bool: True if the flush was successful, False otherwise.
|
||||
"""
|
||||
try:
|
||||
return super().force_flush(timeout_millis)
|
||||
except ConnectionError as e:
|
||||
logger.debug(f"Suppressed telemetry force_flush connection error: {str(e)}")
|
||||
return False
|
||||
except TimeoutError as e:
|
||||
logger.debug(f"Suppressed telemetry force_flush timeout: {str(e)}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.debug(f"Unexpected telemetry force_flush error: {str(e)}")
|
||||
return False
|
||||
|
||||
def export(self, spans: Sequence[ReadableSpan]) -> None:
|
||||
"""Override export to catch and suppress all exceptions.
|
||||
|
||||
Args:
|
||||
spans: The spans to export.
|
||||
"""
|
||||
try:
|
||||
if hasattr(super(), 'export'):
|
||||
super().export(spans)
|
||||
else:
|
||||
# Call the exporter directly if super().export doesn't exist
|
||||
self._span_exporter.export(spans)
|
||||
except ConnectionError as e:
|
||||
logger.debug(f"Suppressed telemetry export connection error: {str(e)}")
|
||||
except TimeoutError as e:
|
||||
logger.debug(f"Suppressed telemetry export timeout: {str(e)}")
|
||||
except Exception as e:
|
||||
logger.debug(f"Unexpected telemetry export error: {str(e)}")
|
||||
|
||||
|
||||
|
||||
class Telemetry:
|
||||
"""A class to handle anonymous telemetry for the crewai package.
|
||||
|
||||
@@ -116,7 +59,7 @@ class Telemetry:
|
||||
with suppress_warnings():
|
||||
self.provider = TracerProvider(resource=self.resource)
|
||||
|
||||
processor = SafeBatchSpanProcessor(
|
||||
processor = BatchSpanProcessor(
|
||||
OTLPSpanExporter(
|
||||
endpoint=f"{telemetry_endpoint}/v1/traces",
|
||||
timeout=30,
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
import re
|
||||
from typing import TYPE_CHECKING, List
|
||||
from typing import List
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
|
||||
def aggregate_raw_outputs_from_task_outputs(task_outputs: List["TaskOutput"]) -> str:
|
||||
def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> str:
|
||||
"""Generate string context from the task outputs."""
|
||||
dividers = "\n\n----------\n\n"
|
||||
|
||||
@@ -15,7 +13,7 @@ def aggregate_raw_outputs_from_task_outputs(task_outputs: List["TaskOutput"]) ->
|
||||
return context
|
||||
|
||||
|
||||
def aggregate_raw_outputs_from_tasks(tasks: List["Task"]) -> str:
|
||||
def aggregate_raw_outputs_from_tasks(tasks: List[Task]) -> str:
|
||||
"""Generate string context from the tasks."""
|
||||
task_outputs = [task.output for task in tasks if task.output is not None]
|
||||
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
|
||||
def interpolate_only(
|
||||
input_string: Optional[str],
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]],
|
||||
) -> str:
|
||||
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
|
||||
Only interpolates placeholders that follow the pattern {variable_name} where
|
||||
variable_name starts with a letter/underscore and contains only letters, numbers, and underscores.
|
||||
|
||||
Args:
|
||||
input_string: The string containing template variables to interpolate.
|
||||
Can be None or empty, in which case an empty string is returned.
|
||||
inputs: Dictionary mapping template variables to their values.
|
||||
Supported value types are strings, integers, floats, and dicts/lists
|
||||
containing only these types and other nested dicts/lists.
|
||||
|
||||
Returns:
|
||||
The interpolated string with all template variables replaced with their values.
|
||||
Empty string if input_string is None or empty.
|
||||
|
||||
Raises:
|
||||
ValueError: If a value contains unsupported types or a template variable is missing
|
||||
"""
|
||||
|
||||
# Validation function for recursive type checking
|
||||
def validate_type(value: Any) -> None:
|
||||
if value is None:
|
||||
return
|
||||
if isinstance(value, (str, int, float, bool)):
|
||||
return
|
||||
if isinstance(value, (dict, list)):
|
||||
for item in value.values() if isinstance(value, dict) else value:
|
||||
validate_type(item)
|
||||
return
|
||||
raise ValueError(
|
||||
f"Unsupported type {type(value).__name__} in inputs. "
|
||||
"Only str, int, float, bool, dict, and list are allowed."
|
||||
)
|
||||
|
||||
# Validate all input values
|
||||
for key, value in inputs.items():
|
||||
try:
|
||||
validate_type(value)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Invalid value for key '{key}': {str(e)}") from e
|
||||
|
||||
if input_string is None or not input_string:
|
||||
return ""
|
||||
if "{" not in input_string and "}" not in input_string:
|
||||
return input_string
|
||||
if not inputs:
|
||||
raise ValueError(
|
||||
"Inputs dictionary cannot be empty when interpolating variables"
|
||||
)
|
||||
|
||||
# The regex pattern to find valid variable placeholders
|
||||
# Matches {variable_name} where variable_name starts with a letter/underscore
|
||||
# and contains only letters, numbers, and underscores
|
||||
pattern = r"\{([A-Za-z_][A-Za-z0-9_]*)\}"
|
||||
|
||||
# Find all matching variables in the input string
|
||||
variables = re.findall(pattern, input_string)
|
||||
result = input_string
|
||||
|
||||
# Check if all variables exist in inputs
|
||||
missing_vars = [var for var in variables if var not in inputs]
|
||||
if missing_vars:
|
||||
raise KeyError(
|
||||
f"Template variable '{missing_vars[0]}' not found in inputs dictionary"
|
||||
)
|
||||
|
||||
# Replace each variable with its value
|
||||
for var in variables:
|
||||
if var in inputs:
|
||||
placeholder = "{" + var + "}"
|
||||
value = str(inputs[var])
|
||||
result = result.replace(placeholder, value)
|
||||
|
||||
return result
|
||||
@@ -15,7 +15,6 @@ from crewai import Agent, Crew, Process, Task
|
||||
from crewai.tasks.conditional_task import ConditionalTask
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.utilities.converter import Converter
|
||||
from crewai.utilities.string_utils import interpolate_only
|
||||
|
||||
|
||||
def test_task_tool_reflect_agent_tools():
|
||||
@@ -823,7 +822,7 @@ def test_interpolate_only():
|
||||
|
||||
# Test JSON structure preservation
|
||||
json_string = '{"info": "Look at {placeholder}", "nested": {"val": "{nestedVal}"}}'
|
||||
result = interpolate_only(
|
||||
result = task.interpolate_only(
|
||||
input_string=json_string,
|
||||
inputs={"placeholder": "the data", "nestedVal": "something else"},
|
||||
)
|
||||
@@ -834,18 +833,20 @@ def test_interpolate_only():
|
||||
|
||||
# Test normal string interpolation
|
||||
normal_string = "Hello {name}, welcome to {place}!"
|
||||
result = interpolate_only(
|
||||
result = task.interpolate_only(
|
||||
input_string=normal_string, inputs={"name": "John", "place": "CrewAI"}
|
||||
)
|
||||
assert result == "Hello John, welcome to CrewAI!"
|
||||
|
||||
# Test empty string
|
||||
result = interpolate_only(input_string="", inputs={"unused": "value"})
|
||||
result = task.interpolate_only(input_string="", inputs={"unused": "value"})
|
||||
assert result == ""
|
||||
|
||||
# Test string with no placeholders
|
||||
no_placeholders = "Hello, this is a test"
|
||||
result = interpolate_only(input_string=no_placeholders, inputs={"unused": "value"})
|
||||
result = task.interpolate_only(
|
||||
input_string=no_placeholders, inputs={"unused": "value"}
|
||||
)
|
||||
assert result == no_placeholders
|
||||
|
||||
|
||||
@@ -857,7 +858,7 @@ def test_interpolate_only_with_dict_inside_expected_output():
|
||||
)
|
||||
|
||||
json_string = '{"questions": {"main_question": "What is the user\'s name?", "secondary_question": "What is the user\'s age?"}}'
|
||||
result = interpolate_only(
|
||||
result = task.interpolate_only(
|
||||
input_string=json_string,
|
||||
inputs={
|
||||
"questions": {
|
||||
@@ -871,16 +872,18 @@ def test_interpolate_only_with_dict_inside_expected_output():
|
||||
assert result == json_string
|
||||
|
||||
normal_string = "Hello {name}, welcome to {place}!"
|
||||
result = interpolate_only(
|
||||
result = task.interpolate_only(
|
||||
input_string=normal_string, inputs={"name": "John", "place": "CrewAI"}
|
||||
)
|
||||
assert result == "Hello John, welcome to CrewAI!"
|
||||
|
||||
result = interpolate_only(input_string="", inputs={"unused": "value"})
|
||||
result = task.interpolate_only(input_string="", inputs={"unused": "value"})
|
||||
assert result == ""
|
||||
|
||||
no_placeholders = "Hello, this is a test"
|
||||
result = interpolate_only(input_string=no_placeholders, inputs={"unused": "value"})
|
||||
result = task.interpolate_only(
|
||||
input_string=no_placeholders, inputs={"unused": "value"}
|
||||
)
|
||||
assert result == no_placeholders
|
||||
|
||||
|
||||
@@ -1082,12 +1085,12 @@ def test_interpolate_with_list_of_strings():
|
||||
# Test simple list of strings
|
||||
input_str = "Available items: {items}"
|
||||
inputs = {"items": ["apple", "banana", "cherry"]}
|
||||
result = interpolate_only(input_str, inputs)
|
||||
result = task.interpolate_only(input_str, inputs)
|
||||
assert result == f"Available items: {inputs['items']}"
|
||||
|
||||
# Test empty list
|
||||
empty_list_input = {"items": []}
|
||||
result = interpolate_only(input_str, empty_list_input)
|
||||
result = task.interpolate_only(input_str, empty_list_input)
|
||||
assert result == "Available items: []"
|
||||
|
||||
|
||||
@@ -1103,7 +1106,7 @@ def test_interpolate_with_list_of_dicts():
|
||||
{"name": "Bob", "age": 25, "skills": ["Java", "Cloud"]},
|
||||
]
|
||||
}
|
||||
result = interpolate_only("{people}", input_data)
|
||||
result = task.interpolate_only("{people}", input_data)
|
||||
|
||||
parsed_result = eval(result)
|
||||
assert isinstance(parsed_result, list)
|
||||
@@ -1135,7 +1138,7 @@ def test_interpolate_with_nested_structures():
|
||||
],
|
||||
}
|
||||
}
|
||||
result = interpolate_only("{company}", input_data)
|
||||
result = task.interpolate_only("{company}", input_data)
|
||||
parsed = eval(result)
|
||||
|
||||
assert parsed["name"] == "TechCorp"
|
||||
@@ -1158,7 +1161,7 @@ def test_interpolate_with_special_characters():
|
||||
"empty": "",
|
||||
}
|
||||
}
|
||||
result = interpolate_only("{special_data}", input_data)
|
||||
result = task.interpolate_only("{special_data}", input_data)
|
||||
parsed = eval(result)
|
||||
|
||||
assert parsed["quotes"] == """This has "double" and 'single' quotes"""
|
||||
@@ -1185,7 +1188,7 @@ def test_interpolate_mixed_types():
|
||||
},
|
||||
}
|
||||
}
|
||||
result = interpolate_only("{data}", input_data)
|
||||
result = task.interpolate_only("{data}", input_data)
|
||||
parsed = eval(result)
|
||||
|
||||
assert parsed["name"] == "Test Dataset"
|
||||
@@ -1213,7 +1216,7 @@ def test_interpolate_complex_combination():
|
||||
},
|
||||
]
|
||||
}
|
||||
result = interpolate_only("{report}", input_data)
|
||||
result = task.interpolate_only("{report}", input_data)
|
||||
parsed = eval(result)
|
||||
|
||||
assert len(parsed) == 2
|
||||
@@ -1230,7 +1233,7 @@ def test_interpolate_invalid_type_validation():
|
||||
|
||||
# Test with invalid top-level type
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
interpolate_only("{data}", {"data": set()}) # type: ignore we are purposely testing this failure
|
||||
task.interpolate_only("{data}", {"data": set()}) # type: ignore we are purposely testing this failure
|
||||
|
||||
assert "Unsupported type set" in str(excinfo.value)
|
||||
|
||||
@@ -1243,7 +1246,7 @@ def test_interpolate_invalid_type_validation():
|
||||
}
|
||||
}
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
interpolate_only("{data}", {"data": invalid_nested})
|
||||
task.interpolate_only("{data}", {"data": invalid_nested})
|
||||
assert "Unsupported type set" in str(excinfo.value)
|
||||
|
||||
|
||||
@@ -1262,22 +1265,24 @@ def test_interpolate_custom_object_validation():
|
||||
|
||||
# Test with custom object at top level
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
interpolate_only("{obj}", {"obj": CustomObject(5)}) # type: ignore we are purposely testing this failure
|
||||
task.interpolate_only("{obj}", {"obj": CustomObject(5)}) # type: ignore we are purposely testing this failure
|
||||
assert "Unsupported type CustomObject" in str(excinfo.value)
|
||||
|
||||
# Test with nested custom object in dictionary
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
interpolate_only("{data}", {"data": {"valid": 1, "invalid": CustomObject(5)}})
|
||||
task.interpolate_only(
|
||||
"{data}", {"data": {"valid": 1, "invalid": CustomObject(5)}}
|
||||
)
|
||||
assert "Unsupported type CustomObject" in str(excinfo.value)
|
||||
|
||||
# Test with nested custom object in list
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
interpolate_only("{data}", {"data": [1, "valid", CustomObject(5)]})
|
||||
task.interpolate_only("{data}", {"data": [1, "valid", CustomObject(5)]})
|
||||
assert "Unsupported type CustomObject" in str(excinfo.value)
|
||||
|
||||
# Test with deeply nested custom object
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
interpolate_only(
|
||||
task.interpolate_only(
|
||||
"{data}", {"data": {"level1": {"level2": [{"level3": CustomObject(5)}]}}}
|
||||
)
|
||||
assert "Unsupported type CustomObject" in str(excinfo.value)
|
||||
@@ -1301,7 +1306,7 @@ def test_interpolate_valid_complex_types():
|
||||
}
|
||||
|
||||
# Should not raise any errors
|
||||
result = interpolate_only("{data}", {"data": valid_data})
|
||||
result = task.interpolate_only("{data}", {"data": valid_data})
|
||||
parsed = eval(result)
|
||||
assert parsed["name"] == "Valid Dataset"
|
||||
assert parsed["stats"]["nested"]["deeper"]["b"] == 2.5
|
||||
@@ -1314,16 +1319,16 @@ def test_interpolate_edge_cases():
|
||||
)
|
||||
|
||||
# Test empty dict and list
|
||||
assert interpolate_only("{}", {"data": {}}) == "{}"
|
||||
assert interpolate_only("[]", {"data": []}) == "[]"
|
||||
assert task.interpolate_only("{}", {"data": {}}) == "{}"
|
||||
assert task.interpolate_only("[]", {"data": []}) == "[]"
|
||||
|
||||
# Test numeric types
|
||||
assert interpolate_only("{num}", {"num": 42}) == "42"
|
||||
assert interpolate_only("{num}", {"num": 3.14}) == "3.14"
|
||||
assert task.interpolate_only("{num}", {"num": 42}) == "42"
|
||||
assert task.interpolate_only("{num}", {"num": 3.14}) == "3.14"
|
||||
|
||||
# Test boolean values (valid JSON types)
|
||||
assert interpolate_only("{flag}", {"flag": True}) == "True"
|
||||
assert interpolate_only("{flag}", {"flag": False}) == "False"
|
||||
assert task.interpolate_only("{flag}", {"flag": True}) == "True"
|
||||
assert task.interpolate_only("{flag}", {"flag": False}) == "False"
|
||||
|
||||
|
||||
def test_interpolate_valid_types():
|
||||
@@ -1341,7 +1346,7 @@ def test_interpolate_valid_types():
|
||||
"nested": {"flag": True, "empty": None},
|
||||
}
|
||||
|
||||
result = interpolate_only("{data}", {"data": valid_data})
|
||||
result = task.interpolate_only("{data}", {"data": valid_data})
|
||||
parsed = eval(result)
|
||||
|
||||
assert parsed["active"] is True
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
import os
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
|
||||
from crewai.telemetry.telemetry import SafeBatchSpanProcessor, Telemetry
|
||||
|
||||
|
||||
class TestTelemetry:
|
||||
"""Test suite for Telemetry functionality focusing on error handling and span processing."""
|
||||
|
||||
def test_safe_batch_span_processor(self):
|
||||
"""Test that SafeBatchSpanProcessor properly suppresses exceptions."""
|
||||
# Create a mock exporter that will be used by the processor
|
||||
mock_exporter = Mock()
|
||||
|
||||
# Create a SafeBatchSpanProcessor with the mock exporter
|
||||
processor = SafeBatchSpanProcessor(mock_exporter)
|
||||
|
||||
# Test force_flush with an exception
|
||||
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=ConnectionError("Test error")):
|
||||
# This should not raise an exception
|
||||
processor.force_flush()
|
||||
|
||||
# Test that the processor's export method suppresses exceptions
|
||||
with patch.object(mock_exporter, 'export', side_effect=ConnectionError("Test error")):
|
||||
# This should not raise an exception
|
||||
processor.export([])
|
||||
|
||||
def test_telemetry_with_connection_error(self):
|
||||
"""Test that telemetry connection errors are properly handled in real usage."""
|
||||
# Make sure telemetry is enabled for the test
|
||||
os.environ["OTEL_SDK_DISABLED"] = "false"
|
||||
|
||||
# Create a telemetry instance
|
||||
telemetry = Telemetry()
|
||||
|
||||
# Verify telemetry is initialized
|
||||
assert telemetry.ready is True
|
||||
|
||||
# Test a real telemetry operation
|
||||
# This should not raise an exception even if there are connection issues
|
||||
telemetry.flow_creation_span("test_flow")
|
||||
|
||||
# Reset environment variables
|
||||
os.environ["OTEL_SDK_DISABLED"] = "true"
|
||||
|
||||
def test_safe_batch_span_processor_with_timeout(self):
|
||||
"""Test that SafeBatchSpanProcessor properly handles timeout errors."""
|
||||
# Create a mock exporter that will be used by the processor
|
||||
mock_exporter = Mock()
|
||||
|
||||
# Create a SafeBatchSpanProcessor with the mock exporter
|
||||
processor = SafeBatchSpanProcessor(mock_exporter)
|
||||
|
||||
# Test force_flush with a timeout error
|
||||
with patch.object(BatchSpanProcessor, 'force_flush', side_effect=TimeoutError("Test timeout")):
|
||||
# This should not raise an exception
|
||||
processor.force_flush()
|
||||
|
||||
# Test that the processor's export method suppresses timeout exceptions
|
||||
with patch.object(mock_exporter, 'export', side_effect=TimeoutError("Test timeout")):
|
||||
# This should not raise an exception
|
||||
processor.export([])
|
||||
|
||||
def test_safe_batch_span_processor_with_valid_data(self):
|
||||
"""Test SafeBatchSpanProcessor normal operation with valid data."""
|
||||
# Create a mock exporter that will be used by the processor
|
||||
mock_exporter = Mock()
|
||||
|
||||
# Create a SafeBatchSpanProcessor with the mock exporter
|
||||
processor = SafeBatchSpanProcessor(mock_exporter)
|
||||
|
||||
# Test force_flush with no exception
|
||||
with patch.object(BatchSpanProcessor, 'force_flush', return_value=None):
|
||||
# This should complete normally
|
||||
processor.force_flush()
|
||||
|
||||
# Mock some valid spans
|
||||
mock_spans = [Mock() for _ in range(3)]
|
||||
|
||||
# Test that the processor's export method works with valid data
|
||||
with patch.object(mock_exporter, 'export', return_value=None):
|
||||
# This should complete normally
|
||||
processor.export(mock_spans)
|
||||
@@ -1,35 +1,17 @@
|
||||
import json
|
||||
import os
|
||||
from typing import Dict, List, Optional
|
||||
from datetime import date, datetime
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Union, cast
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.llm import LLM
|
||||
from crewai.utilities.converter import (
|
||||
Converter,
|
||||
ConverterError,
|
||||
convert_to_model,
|
||||
convert_with_instructions,
|
||||
create_converter,
|
||||
generate_model_description,
|
||||
get_conversion_instructions,
|
||||
handle_partial_json,
|
||||
validate_model,
|
||||
)
|
||||
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
|
||||
from crewai.flow.state_utils import _to_serializable_key, to_serializable, to_string
|
||||
|
||||
|
||||
# Sample Pydantic models for testing
|
||||
class EmailResponse(BaseModel):
|
||||
previous_message_content: str
|
||||
|
||||
|
||||
class EmailResponses(BaseModel):
|
||||
responses: list[EmailResponse]
|
||||
|
||||
|
||||
class SimpleModel(BaseModel):
|
||||
name: str
|
||||
age: int
|
||||
@@ -52,560 +34,190 @@ class Person(BaseModel):
|
||||
address: Address
|
||||
|
||||
|
||||
class CustomConverter(Converter):
|
||||
pass
|
||||
class Color(Enum):
|
||||
RED = "red"
|
||||
GREEN = "green"
|
||||
BLUE = "blue"
|
||||
|
||||
|
||||
# Fixtures
|
||||
@pytest.fixture
|
||||
def mock_agent():
|
||||
agent = Mock()
|
||||
agent.function_calling_llm = None
|
||||
agent.llm = Mock()
|
||||
return agent
|
||||
class EnumModel(BaseModel):
|
||||
name: str
|
||||
color: Color
|
||||
|
||||
|
||||
# Tests for convert_to_model
|
||||
def test_convert_to_model_with_valid_json():
|
||||
result = '{"name": "John", "age": 30}'
|
||||
output = convert_to_model(result, SimpleModel, None, None)
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "John"
|
||||
assert output.age == 30
|
||||
class OptionalModel(BaseModel):
|
||||
name: str
|
||||
age: Optional[int]
|
||||
|
||||
|
||||
def test_convert_to_model_with_invalid_json():
|
||||
result = '{"name": "John", "age": "thirty"}'
|
||||
with patch("crewai.utilities.converter.handle_partial_json") as mock_handle:
|
||||
mock_handle.return_value = "Fallback result"
|
||||
output = convert_to_model(result, SimpleModel, None, None)
|
||||
assert output == "Fallback result"
|
||||
class ListModel(BaseModel):
|
||||
items: List[int]
|
||||
|
||||
|
||||
def test_convert_to_model_with_no_model():
|
||||
result = "Plain text"
|
||||
output = convert_to_model(result, None, None, None)
|
||||
assert output == "Plain text"
|
||||
class UnionModel(BaseModel):
|
||||
field: Union[int, str, None]
|
||||
|
||||
|
||||
def test_convert_to_model_with_special_characters():
|
||||
json_string_test = """
|
||||
{
|
||||
"responses": [
|
||||
{
|
||||
"previous_message_content": "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on"
|
||||
}
|
||||
]
|
||||
# Tests for to_serializable function
|
||||
def test_to_serializable_primitives():
|
||||
"""Test serialization of primitive types."""
|
||||
assert to_serializable("test string") == "test string"
|
||||
assert to_serializable(42) == 42
|
||||
assert to_serializable(3.14) == 3.14
|
||||
assert to_serializable(True) == True
|
||||
assert to_serializable(None) is None
|
||||
|
||||
|
||||
def test_to_serializable_dates():
|
||||
"""Test serialization of date and datetime objects."""
|
||||
test_date = date(2023, 1, 15)
|
||||
test_datetime = datetime(2023, 1, 15, 10, 30, 45)
|
||||
|
||||
assert to_serializable(test_date) == "2023-01-15"
|
||||
assert to_serializable(test_datetime) == "2023-01-15T10:30:45"
|
||||
|
||||
|
||||
def test_to_serializable_collections():
|
||||
"""Test serialization of lists, tuples, and sets."""
|
||||
test_list = [1, "two", 3.0]
|
||||
test_tuple = (4, "five", 6.0)
|
||||
test_set = {7, "eight", 9.0}
|
||||
|
||||
assert to_serializable(test_list) == [1, "two", 3.0]
|
||||
assert to_serializable(test_tuple) == [4, "five", 6.0]
|
||||
|
||||
# For sets, we can't rely on order, so we'll verify differently
|
||||
serialized_set = to_serializable(test_set)
|
||||
assert isinstance(serialized_set, list)
|
||||
assert len(serialized_set) == 3
|
||||
assert 7 in serialized_set
|
||||
assert "eight" in serialized_set
|
||||
assert 9.0 in serialized_set
|
||||
|
||||
|
||||
def test_to_serializable_dict():
|
||||
"""Test serialization of dictionaries."""
|
||||
test_dict = {"a": 1, "b": "two", "c": [3, 4, 5]}
|
||||
|
||||
assert to_serializable(test_dict) == {"a": 1, "b": "two", "c": [3, 4, 5]}
|
||||
|
||||
|
||||
def test_to_serializable_pydantic_models():
|
||||
"""Test serialization of Pydantic models."""
|
||||
simple = SimpleModel(name="John", age=30)
|
||||
|
||||
assert to_serializable(simple) == {"name": "John", "age": 30}
|
||||
|
||||
|
||||
def test_to_serializable_nested_models():
|
||||
"""Test serialization of nested Pydantic models."""
|
||||
simple = SimpleModel(name="John", age=30)
|
||||
nested = NestedModel(id=1, data=simple)
|
||||
|
||||
assert to_serializable(nested) == {"id": 1, "data": {"name": "John", "age": 30}}
|
||||
|
||||
|
||||
def test_to_serializable_complex_model():
|
||||
"""Test serialization of a complex model with nested structures."""
|
||||
person = Person(
|
||||
name="Jane",
|
||||
age=28,
|
||||
address=Address(street="123 Main St", city="Anytown", zip_code="12345"),
|
||||
)
|
||||
|
||||
assert to_serializable(person) == {
|
||||
"name": "Jane",
|
||||
"age": 28,
|
||||
"address": {"street": "123 Main St", "city": "Anytown", "zip_code": "12345"},
|
||||
}
|
||||
"""
|
||||
output = convert_to_model(json_string_test, EmailResponses, None, None)
|
||||
assert isinstance(output, EmailResponses)
|
||||
assert len(output.responses) == 1
|
||||
assert (
|
||||
output.responses[0].previous_message_content
|
||||
== "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on"
|
||||
)
|
||||
|
||||
|
||||
def test_convert_to_model_with_escaped_special_characters():
|
||||
json_string_test = json.dumps(
|
||||
{
|
||||
"responses": [
|
||||
{
|
||||
"previous_message_content": "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on"
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
output = convert_to_model(json_string_test, EmailResponses, None, None)
|
||||
assert isinstance(output, EmailResponses)
|
||||
assert len(output.responses) == 1
|
||||
assert (
|
||||
output.responses[0].previous_message_content
|
||||
== "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on"
|
||||
)
|
||||
def test_to_serializable_enum():
|
||||
"""Test serialization of Enum values."""
|
||||
model = EnumModel(name="ColorTest", color=Color.RED)
|
||||
|
||||
assert to_serializable(model) == {"name": "ColorTest", "color": "red"}
|
||||
|
||||
def test_convert_to_model_with_multiple_special_characters():
|
||||
json_string_test = """
|
||||
{
|
||||
"responses": [
|
||||
{
|
||||
"previous_message_content": "Line 1\r\nLine 2\tTabbed\nLine 3\r\n\rEscaped newline"
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
output = convert_to_model(json_string_test, EmailResponses, None, None)
|
||||
assert isinstance(output, EmailResponses)
|
||||
assert len(output.responses) == 1
|
||||
assert (
|
||||
output.responses[0].previous_message_content
|
||||
== "Line 1\r\nLine 2\tTabbed\nLine 3\r\n\rEscaped newline"
|
||||
)
|
||||
|
||||
def test_to_serializable_optional_fields():
|
||||
"""Test serialization of models with optional fields."""
|
||||
model_with_age = OptionalModel(name="WithAge", age=25)
|
||||
model_without_age = OptionalModel(name="WithoutAge", age=None)
|
||||
|
||||
# Tests for validate_model
|
||||
def test_validate_model_pydantic_output():
|
||||
result = '{"name": "Alice", "age": 25}'
|
||||
output = validate_model(result, SimpleModel, False)
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "Alice"
|
||||
assert output.age == 25
|
||||
assert to_serializable(model_with_age) == {"name": "WithAge", "age": 25}
|
||||
assert to_serializable(model_without_age) == {"name": "WithoutAge", "age": None}
|
||||
|
||||
|
||||
def test_validate_model_json_output():
|
||||
result = '{"name": "Bob", "age": 40}'
|
||||
output = validate_model(result, SimpleModel, True)
|
||||
assert isinstance(output, dict)
|
||||
assert output == {"name": "Bob", "age": 40}
|
||||
def test_to_serializable_list_field():
|
||||
"""Test serialization of models with list fields."""
|
||||
model = ListModel(items=[1, 2, 3, 4, 5])
|
||||
|
||||
assert to_serializable(model) == {"items": [1, 2, 3, 4, 5]}
|
||||
|
||||
# Tests for handle_partial_json
|
||||
def test_handle_partial_json_with_valid_partial():
|
||||
result = 'Some text {"name": "Charlie", "age": 35} more text'
|
||||
output = handle_partial_json(result, SimpleModel, False, None)
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "Charlie"
|
||||
assert output.age == 35
|
||||
|
||||
def test_to_serializable_union_field():
|
||||
"""Test serialization of models with union fields."""
|
||||
model_int = UnionModel(field=42)
|
||||
model_str = UnionModel(field="test")
|
||||
model_none = UnionModel(field=None)
|
||||
|
||||
def test_handle_partial_json_with_invalid_partial(mock_agent):
|
||||
result = "No valid JSON here"
|
||||
with patch("crewai.utilities.converter.convert_with_instructions") as mock_convert:
|
||||
mock_convert.return_value = "Converted result"
|
||||
output = handle_partial_json(result, SimpleModel, False, mock_agent)
|
||||
assert output == "Converted result"
|
||||
assert to_serializable(model_int) == {"field": 42}
|
||||
assert to_serializable(model_str) == {"field": "test"}
|
||||
assert to_serializable(model_none) == {"field": None}
|
||||
|
||||
|
||||
# Tests for convert_with_instructions
|
||||
@patch("crewai.utilities.converter.create_converter")
|
||||
@patch("crewai.utilities.converter.get_conversion_instructions")
|
||||
def test_convert_with_instructions_success(
|
||||
mock_get_instructions, mock_create_converter, mock_agent
|
||||
):
|
||||
mock_get_instructions.return_value = "Instructions"
|
||||
mock_converter = Mock()
|
||||
mock_converter.to_pydantic.return_value = SimpleModel(name="David", age=50)
|
||||
mock_create_converter.return_value = mock_converter
|
||||
def test_to_serializable_max_depth():
|
||||
"""Test max depth parameter to prevent infinite recursion."""
|
||||
# Create recursive structure
|
||||
a: Dict[str, Any] = {"name": "a"}
|
||||
b: Dict[str, Any] = {"name": "b", "ref": a}
|
||||
a["ref"] = b # Create circular reference
|
||||
|
||||
result = "Some text to convert"
|
||||
output = convert_with_instructions(result, SimpleModel, False, mock_agent)
|
||||
result = to_serializable(a, max_depth=3)
|
||||
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "David"
|
||||
assert output.age == 50
|
||||
assert isinstance(result, dict)
|
||||
assert "name" in result
|
||||
assert "ref" in result
|
||||
assert isinstance(result["ref"], dict)
|
||||
assert "ref" in result["ref"]
|
||||
assert isinstance(result["ref"]["ref"], dict)
|
||||
# At depth 3, it should convert to string
|
||||
assert isinstance(result["ref"]["ref"]["ref"], str)
|
||||
|
||||
|
||||
@patch("crewai.utilities.converter.create_converter")
|
||||
@patch("crewai.utilities.converter.get_conversion_instructions")
|
||||
def test_convert_with_instructions_failure(
|
||||
mock_get_instructions, mock_create_converter, mock_agent
|
||||
):
|
||||
mock_get_instructions.return_value = "Instructions"
|
||||
mock_converter = Mock()
|
||||
mock_converter.to_pydantic.return_value = ConverterError("Conversion failed")
|
||||
mock_create_converter.return_value = mock_converter
|
||||
def test_to_serializable_non_serializable():
|
||||
"""Test serialization of objects that aren't directly JSON serializable."""
|
||||
|
||||
result = "Some text to convert"
|
||||
with patch("crewai.utilities.converter.Printer") as mock_printer:
|
||||
output = convert_with_instructions(result, SimpleModel, False, mock_agent)
|
||||
assert output == result
|
||||
mock_printer.return_value.print.assert_called_once()
|
||||
class CustomObject:
|
||||
def __repr__(self):
|
||||
return "CustomObject()"
|
||||
|
||||
obj = CustomObject()
|
||||
|
||||
# Tests for get_conversion_instructions
|
||||
def test_get_conversion_instructions_gpt():
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
with patch.object(LLM, "supports_function_calling") as supports_function_calling:
|
||||
supports_function_calling.return_value = True
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
model_schema = PydanticSchemaParser(model=SimpleModel).get_schema()
|
||||
expected_instructions = (
|
||||
"Please convert the following text into valid JSON.\n\n"
|
||||
"Output ONLY the valid JSON and nothing else.\n\n"
|
||||
"The JSON must follow this schema exactly:\n```json\n"
|
||||
f"{model_schema}\n```"
|
||||
)
|
||||
assert instructions == expected_instructions
|
||||
# Should convert to string representation
|
||||
assert to_serializable(obj) == "CustomObject()"
|
||||
|
||||
|
||||
def test_get_conversion_instructions_non_gpt():
|
||||
llm = LLM(model="ollama/llama3.1", base_url="http://localhost:11434")
|
||||
with patch.object(LLM, "supports_function_calling", return_value=False):
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
assert '"name": str' in instructions
|
||||
assert '"age": int' in instructions
|
||||
def test_to_string_conversion():
|
||||
"""Test the to_string function."""
|
||||
test_dict = {"name": "Test", "values": [1, 2, 3]}
|
||||
|
||||
# Should convert to a JSON string
|
||||
assert to_string(test_dict) == '{"name": "Test", "values": [1, 2, 3]}'
|
||||
|
||||
# Tests for is_gpt
|
||||
def test_supports_function_calling_true():
|
||||
llm = LLM(model="gpt-4o")
|
||||
assert llm.supports_function_calling() is True
|
||||
# None should return None
|
||||
assert to_string(None) is None
|
||||
|
||||
|
||||
def test_supports_function_calling_false():
|
||||
llm = LLM(model="non-existent-model")
|
||||
assert llm.supports_function_calling() is False
|
||||
def test_to_serializable_key():
|
||||
"""Test serialization of dictionary keys."""
|
||||
# String and int keys are converted to strings
|
||||
assert _to_serializable_key("test") == "test"
|
||||
assert _to_serializable_key(42) == "42"
|
||||
|
||||
|
||||
def test_create_converter_with_mock_agent():
|
||||
mock_agent = MagicMock()
|
||||
mock_agent.get_output_converter.return_value = MagicMock(spec=Converter)
|
||||
|
||||
converter = create_converter(
|
||||
agent=mock_agent,
|
||||
llm=Mock(),
|
||||
text="Sample",
|
||||
model=SimpleModel,
|
||||
instructions="Convert",
|
||||
)
|
||||
|
||||
assert isinstance(converter, Converter)
|
||||
mock_agent.get_output_converter.assert_called_once()
|
||||
|
||||
|
||||
def test_create_converter_with_custom_converter():
|
||||
converter = create_converter(
|
||||
converter_cls=CustomConverter,
|
||||
llm=LLM(model="gpt-4o-mini"),
|
||||
text="Sample",
|
||||
model=SimpleModel,
|
||||
instructions="Convert",
|
||||
)
|
||||
|
||||
assert isinstance(converter, CustomConverter)
|
||||
|
||||
|
||||
def test_create_converter_fails_without_agent_or_converter_cls():
|
||||
with pytest.raises(
|
||||
ValueError, match="Either agent or converter_cls must be provided"
|
||||
):
|
||||
create_converter(
|
||||
llm=Mock(), text="Sample", model=SimpleModel, instructions="Convert"
|
||||
)
|
||||
|
||||
|
||||
def test_generate_model_description_simple_model():
|
||||
description = generate_model_description(SimpleModel)
|
||||
expected_description = '{\n "name": str,\n "age": int\n}'
|
||||
assert description == expected_description
|
||||
|
||||
|
||||
def test_generate_model_description_nested_model():
|
||||
description = generate_model_description(NestedModel)
|
||||
expected_description = (
|
||||
'{\n "id": int,\n "data": {\n "name": str,\n "age": int\n}\n}'
|
||||
)
|
||||
assert description == expected_description
|
||||
|
||||
|
||||
def test_generate_model_description_optional_field():
|
||||
class ModelWithOptionalField(BaseModel):
|
||||
name: Optional[str]
|
||||
age: int
|
||||
|
||||
description = generate_model_description(ModelWithOptionalField)
|
||||
expected_description = '{\n "name": Optional[str],\n "age": int\n}'
|
||||
assert description == expected_description
|
||||
|
||||
|
||||
def test_generate_model_description_list_field():
|
||||
class ModelWithListField(BaseModel):
|
||||
items: List[int]
|
||||
|
||||
description = generate_model_description(ModelWithListField)
|
||||
expected_description = '{\n "items": List[int]\n}'
|
||||
assert description == expected_description
|
||||
|
||||
|
||||
def test_generate_model_description_dict_field():
|
||||
class ModelWithDictField(BaseModel):
|
||||
attributes: Dict[str, int]
|
||||
|
||||
description = generate_model_description(ModelWithDictField)
|
||||
expected_description = '{\n "attributes": Dict[str, int]\n}'
|
||||
assert description == expected_description
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_convert_with_instructions():
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
sample_text = "Name: Alice, Age: 30"
|
||||
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=SimpleModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
|
||||
# Act
|
||||
output = converter.to_pydantic()
|
||||
|
||||
# Assert
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "Alice"
|
||||
assert output.age == 30
|
||||
|
||||
|
||||
# Skip tests that call external APIs when running in CI/CD
|
||||
skip_external_api = pytest.mark.skipif(
|
||||
os.getenv("CI") is not None, reason="Skipping tests that call external API in CI/CD"
|
||||
)
|
||||
|
||||
|
||||
@skip_external_api
|
||||
@pytest.mark.vcr(filter_headers=["authorization"], record_mode="once")
|
||||
def test_converter_with_llama3_2_model():
|
||||
llm = LLM(model="ollama/llama3.2:3b", base_url="http://localhost:11434")
|
||||
sample_text = "Name: Alice Llama, Age: 30"
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=SimpleModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
output = converter.to_pydantic()
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "Alice Llama"
|
||||
assert output.age == 30
|
||||
|
||||
|
||||
@skip_external_api
|
||||
@pytest.mark.vcr(filter_headers=["authorization"], record_mode="once")
|
||||
def test_converter_with_llama3_1_model():
|
||||
llm = LLM(model="ollama/llama3.1", base_url="http://localhost:11434")
|
||||
sample_text = "Name: Alice Llama, Age: 30"
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=SimpleModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
output = converter.to_pydantic()
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "Alice Llama"
|
||||
assert output.age == 30
|
||||
|
||||
|
||||
# Skip tests that call external APIs when running in CI/CD
|
||||
skip_external_api = pytest.mark.skipif(
|
||||
os.getenv("CI") is not None, reason="Skipping tests that call external API in CI/CD"
|
||||
)
|
||||
|
||||
|
||||
@skip_external_api
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_converter_with_nested_model():
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
sample_text = "Name: John Doe\nAge: 30\nAddress: 123 Main St, Anytown, 12345"
|
||||
|
||||
instructions = get_conversion_instructions(Person, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=Person,
|
||||
instructions=instructions,
|
||||
)
|
||||
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert isinstance(output, Person)
|
||||
assert output.name == "John Doe"
|
||||
assert output.age == 30
|
||||
assert isinstance(output.address, Address)
|
||||
assert output.address.street == "123 Main St"
|
||||
assert output.address.city == "Anytown"
|
||||
assert output.address.zip_code == "12345"
|
||||
|
||||
|
||||
# Tests for error handling
|
||||
def test_converter_error_handling():
|
||||
llm = Mock(spec=LLM)
|
||||
llm.supports_function_calling.return_value = False
|
||||
llm.call.return_value = "Invalid JSON"
|
||||
sample_text = "Name: Alice, Age: 30"
|
||||
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=SimpleModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
|
||||
with pytest.raises(ConverterError) as exc_info:
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert "Failed to convert text into a Pydantic model" in str(exc_info.value)
|
||||
|
||||
|
||||
# Tests for retry logic
|
||||
def test_converter_retry_logic():
|
||||
llm = Mock(spec=LLM)
|
||||
llm.supports_function_calling.return_value = False
|
||||
llm.call.side_effect = [
|
||||
"Invalid JSON",
|
||||
"Still invalid",
|
||||
'{"name": "Retry Alice", "age": 30}',
|
||||
]
|
||||
sample_text = "Name: Retry Alice, Age: 30"
|
||||
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=SimpleModel,
|
||||
instructions=instructions,
|
||||
max_attempts=3,
|
||||
)
|
||||
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "Retry Alice"
|
||||
assert output.age == 30
|
||||
assert llm.call.call_count == 3
|
||||
|
||||
|
||||
# Tests for optional fields
|
||||
def test_converter_with_optional_fields():
|
||||
class OptionalModel(BaseModel):
|
||||
name: str
|
||||
age: Optional[int]
|
||||
|
||||
llm = Mock(spec=LLM)
|
||||
llm.supports_function_calling.return_value = False
|
||||
# Simulate the LLM's response with 'age' explicitly set to null
|
||||
llm.call.return_value = '{"name": "Bob", "age": null}'
|
||||
sample_text = "Name: Bob, age: None"
|
||||
|
||||
instructions = get_conversion_instructions(OptionalModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=OptionalModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert isinstance(output, OptionalModel)
|
||||
assert output.name == "Bob"
|
||||
assert output.age is None
|
||||
|
||||
|
||||
# Tests for list fields
|
||||
def test_converter_with_list_field():
|
||||
class ListModel(BaseModel):
|
||||
items: List[int]
|
||||
|
||||
llm = Mock(spec=LLM)
|
||||
llm.supports_function_calling.return_value = False
|
||||
llm.call.return_value = '{"items": [1, 2, 3]}'
|
||||
sample_text = "Items: 1, 2, 3"
|
||||
|
||||
instructions = get_conversion_instructions(ListModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=ListModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert isinstance(output, ListModel)
|
||||
assert output.items == [1, 2, 3]
|
||||
|
||||
|
||||
# Tests for enums
|
||||
from enum import Enum
|
||||
|
||||
|
||||
def test_converter_with_enum():
|
||||
class Color(Enum):
|
||||
RED = "red"
|
||||
GREEN = "green"
|
||||
BLUE = "blue"
|
||||
|
||||
class EnumModel(BaseModel):
|
||||
name: str
|
||||
color: Color
|
||||
|
||||
llm = Mock(spec=LLM)
|
||||
llm.supports_function_calling.return_value = False
|
||||
llm.call.return_value = '{"name": "Alice", "color": "red"}'
|
||||
sample_text = "Name: Alice, Color: Red"
|
||||
|
||||
instructions = get_conversion_instructions(EnumModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=EnumModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert isinstance(output, EnumModel)
|
||||
assert output.name == "Alice"
|
||||
assert output.color == Color.RED
|
||||
|
||||
|
||||
# Tests for ambiguous input
|
||||
def test_converter_with_ambiguous_input():
|
||||
llm = Mock(spec=LLM)
|
||||
llm.supports_function_calling.return_value = False
|
||||
llm.call.return_value = '{"name": "Charlie", "age": "Not an age"}'
|
||||
sample_text = "Charlie is thirty years old"
|
||||
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text=sample_text,
|
||||
model=SimpleModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
|
||||
with pytest.raises(ConverterError) as exc_info:
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert "failed to convert text into a pydantic model" in str(exc_info.value).lower()
|
||||
|
||||
|
||||
# Tests for function calling support
|
||||
def test_converter_with_function_calling():
|
||||
llm = Mock(spec=LLM)
|
||||
llm.supports_function_calling.return_value = True
|
||||
|
||||
instructor = Mock()
|
||||
instructor.to_pydantic.return_value = SimpleModel(name="Eve", age=35)
|
||||
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
text="Name: Eve, Age: 35",
|
||||
model=SimpleModel,
|
||||
instructions="Convert this text.",
|
||||
)
|
||||
converter._create_instructor = Mock(return_value=instructor)
|
||||
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "Eve"
|
||||
assert output.age == 35
|
||||
instructor.to_pydantic.assert_called_once()
|
||||
|
||||
|
||||
def test_generate_model_description_union_field():
|
||||
class UnionModel(BaseModel):
|
||||
field: int | str | None
|
||||
|
||||
description = generate_model_description(UnionModel)
|
||||
expected_description = '{\n "field": int | str | None\n}'
|
||||
assert description == expected_description
|
||||
# Complex objects are converted to a unique string
|
||||
obj = object()
|
||||
key_str = _to_serializable_key(obj)
|
||||
assert isinstance(key_str, str)
|
||||
assert "key_" in key_str
|
||||
assert "object" in key_str
|
||||
|
||||
@@ -1,187 +0,0 @@
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.utilities.string_utils import interpolate_only
|
||||
|
||||
|
||||
class TestInterpolateOnly:
|
||||
"""Tests for the interpolate_only function in string_utils.py."""
|
||||
|
||||
def test_basic_variable_interpolation(self):
|
||||
"""Test basic variable interpolation works correctly."""
|
||||
template = "Hello, {name}! Welcome to {company}."
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"name": "Alice",
|
||||
"company": "CrewAI",
|
||||
}
|
||||
|
||||
result = interpolate_only(template, inputs)
|
||||
|
||||
assert result == "Hello, Alice! Welcome to CrewAI."
|
||||
|
||||
def test_multiple_occurrences_of_same_variable(self):
|
||||
"""Test that multiple occurrences of the same variable are replaced."""
|
||||
template = "{name} is using {name}'s account."
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"name": "Bob"
|
||||
}
|
||||
|
||||
result = interpolate_only(template, inputs)
|
||||
|
||||
assert result == "Bob is using Bob's account."
|
||||
|
||||
def test_json_structure_preservation(self):
|
||||
"""Test that JSON structures are preserved and not interpolated incorrectly."""
|
||||
template = """
|
||||
Instructions for {agent}:
|
||||
|
||||
Please return the following object:
|
||||
|
||||
{"name": "person's name", "age": 25, "skills": ["coding", "testing"]}
|
||||
"""
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"agent": "DevAgent"
|
||||
}
|
||||
|
||||
result = interpolate_only(template, inputs)
|
||||
|
||||
assert "Instructions for DevAgent:" in result
|
||||
assert (
|
||||
'{"name": "person\'s name", "age": 25, "skills": ["coding", "testing"]}'
|
||||
in result
|
||||
)
|
||||
|
||||
def test_complex_nested_json(self):
|
||||
"""Test with complex JSON structures containing curly braces."""
|
||||
template = """
|
||||
{agent} needs to process:
|
||||
{
|
||||
"config": {
|
||||
"nested": {
|
||||
"value": 42
|
||||
},
|
||||
"arrays": [1, 2, {"inner": "value"}]
|
||||
}
|
||||
}
|
||||
"""
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"agent": "DataProcessor"
|
||||
}
|
||||
|
||||
result = interpolate_only(template, inputs)
|
||||
|
||||
assert "DataProcessor needs to process:" in result
|
||||
assert '"nested": {' in result
|
||||
assert '"value": 42' in result
|
||||
assert '[1, 2, {"inner": "value"}]' in result
|
||||
|
||||
def test_missing_variable(self):
|
||||
"""Test that an error is raised when a required variable is missing."""
|
||||
template = "Hello, {name}!"
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"not_name": "Alice"
|
||||
}
|
||||
|
||||
with pytest.raises(KeyError) as excinfo:
|
||||
interpolate_only(template, inputs)
|
||||
|
||||
assert "template variable" in str(excinfo.value).lower()
|
||||
assert "name" in str(excinfo.value)
|
||||
|
||||
def test_invalid_input_types(self):
|
||||
"""Test that an error is raised with invalid input types."""
|
||||
template = "Hello, {name}!"
|
||||
# Using Any for this test since we're intentionally testing an invalid type
|
||||
inputs: Dict[str, Any] = {"name": object()} # Object is not a valid input type
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
interpolate_only(template, inputs)
|
||||
|
||||
assert "unsupported type" in str(excinfo.value).lower()
|
||||
|
||||
def test_empty_input_string(self):
|
||||
"""Test handling of empty or None input string."""
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"name": "Alice"
|
||||
}
|
||||
|
||||
assert interpolate_only("", inputs) == ""
|
||||
assert interpolate_only(None, inputs) == ""
|
||||
|
||||
def test_no_variables_in_template(self):
|
||||
"""Test a template with no variables to replace."""
|
||||
template = "This is a static string with no variables."
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"name": "Alice"
|
||||
}
|
||||
|
||||
result = interpolate_only(template, inputs)
|
||||
|
||||
assert result == template
|
||||
|
||||
def test_variable_name_starting_with_underscore(self):
|
||||
"""Test variables starting with underscore are replaced correctly."""
|
||||
template = "Variable: {_special_var}"
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"_special_var": "Special Value"
|
||||
}
|
||||
|
||||
result = interpolate_only(template, inputs)
|
||||
|
||||
assert result == "Variable: Special Value"
|
||||
|
||||
def test_preserves_non_matching_braces(self):
|
||||
"""Test that non-matching braces patterns are preserved."""
|
||||
template = (
|
||||
"This {123} and {!var} should not be replaced but {valid_var} should."
|
||||
)
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"valid_var": "works"
|
||||
}
|
||||
|
||||
result = interpolate_only(template, inputs)
|
||||
|
||||
assert (
|
||||
result == "This {123} and {!var} should not be replaced but works should."
|
||||
)
|
||||
|
||||
def test_complex_mixed_scenario(self):
|
||||
"""Test a complex scenario with both valid variables and JSON structures."""
|
||||
template = """
|
||||
{agent_name} is working on task {task_id}.
|
||||
|
||||
Instructions:
|
||||
1. Process the data
|
||||
2. Return results as:
|
||||
|
||||
{
|
||||
"taskId": "{task_id}",
|
||||
"results": {
|
||||
"processed_by": "agent_name",
|
||||
"status": "complete",
|
||||
"values": [1, 2, 3]
|
||||
}
|
||||
}
|
||||
"""
|
||||
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
|
||||
"agent_name": "AnalyticsAgent",
|
||||
"task_id": "T-12345",
|
||||
}
|
||||
|
||||
result = interpolate_only(template, inputs)
|
||||
|
||||
assert "AnalyticsAgent is working on task T-12345" in result
|
||||
assert '"taskId": "T-12345"' in result
|
||||
assert '"processed_by": "agent_name"' in result # This shouldn't be replaced
|
||||
assert '"values": [1, 2, 3]' in result
|
||||
|
||||
def test_empty_inputs_dictionary(self):
|
||||
"""Test that an error is raised with empty inputs dictionary."""
|
||||
template = "Hello, {name}!"
|
||||
inputs: Dict[str, Any] = {}
|
||||
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
interpolate_only(template, inputs)
|
||||
|
||||
assert "inputs dictionary cannot be empty" in str(excinfo.value).lower()
|
||||
Reference in New Issue
Block a user