mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-11 00:58:30 +00:00
Merge branch 'main' into feat/add-prompt-observability
This commit is contained in:
156
tests/flow/test_state_utils.py
Normal file
156
tests/flow/test_state_utils.py
Normal file
@@ -0,0 +1,156 @@
|
||||
from datetime import date, datetime
|
||||
from typing import List
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.flow import Flow
|
||||
from crewai.flow.state_utils import export_state
|
||||
|
||||
|
||||
class Address(BaseModel):
|
||||
street: str
|
||||
city: str
|
||||
country: str
|
||||
|
||||
|
||||
class Person(BaseModel):
|
||||
name: str
|
||||
age: int
|
||||
address: Address
|
||||
birthday: date
|
||||
skills: List[str]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_flow():
|
||||
def create_flow(state):
|
||||
flow = Mock(spec=Flow)
|
||||
flow._state = state
|
||||
return flow
|
||||
|
||||
return create_flow
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_input,expected",
|
||||
[
|
||||
({"text": "hello world"}, {"text": "hello world"}),
|
||||
({"number": 42}, {"number": 42}),
|
||||
({"decimal": 3.14}, {"decimal": 3.14}),
|
||||
({"flag": True}, {"flag": True}),
|
||||
({"empty": None}, {"empty": None}),
|
||||
({"list": [1, 2, 3]}, {"list": [1, 2, 3]}),
|
||||
({"tuple": (1, 2, 3)}, {"tuple": [1, 2, 3]}),
|
||||
({"set": {1, 2, 3}}, {"set": [1, 2, 3]}),
|
||||
({"nested": [1, [2, 3], {4, 5}]}, {"nested": [1, [2, 3], [4, 5]]}),
|
||||
],
|
||||
)
|
||||
def test_basic_serialization(mock_flow, test_input, expected):
|
||||
flow = mock_flow(test_input)
|
||||
result = export_state(flow)
|
||||
assert result == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"input_date,expected",
|
||||
[
|
||||
(date(2024, 1, 1), "2024-01-01"),
|
||||
(datetime(2024, 1, 1, 12, 30), "2024-01-01T12:30:00"),
|
||||
],
|
||||
)
|
||||
def test_temporal_serialization(mock_flow, input_date, expected):
|
||||
flow = mock_flow({"date": input_date})
|
||||
result = export_state(flow)
|
||||
assert result["date"] == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key,value,expected_key_type",
|
||||
[
|
||||
(("tuple", "key"), "value", str),
|
||||
(None, "value", str),
|
||||
(123, "value", str),
|
||||
("normal", "value", str),
|
||||
],
|
||||
)
|
||||
def test_dictionary_key_serialization(mock_flow, key, value, expected_key_type):
|
||||
flow = mock_flow({key: value})
|
||||
result = export_state(flow)
|
||||
assert len(result) == 1
|
||||
result_key = next(iter(result.keys()))
|
||||
assert isinstance(result_key, expected_key_type)
|
||||
assert result[result_key] == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"callable_obj,expected_in_result",
|
||||
[
|
||||
(lambda x: x * 2, "lambda"),
|
||||
(str.upper, "upper"),
|
||||
],
|
||||
)
|
||||
def test_callable_serialization(mock_flow, callable_obj, expected_in_result):
|
||||
flow = mock_flow({"func": callable_obj})
|
||||
result = export_state(flow)
|
||||
assert isinstance(result["func"], str)
|
||||
assert expected_in_result in result["func"].lower()
|
||||
|
||||
|
||||
def test_pydantic_model_serialization(mock_flow):
|
||||
address = Address(street="123 Main St", city="Tech City", country="Pythonia")
|
||||
|
||||
person = Person(
|
||||
name="John Doe",
|
||||
age=30,
|
||||
address=address,
|
||||
birthday=date(1994, 1, 1),
|
||||
skills=["Python", "Testing"],
|
||||
)
|
||||
|
||||
flow = mock_flow(
|
||||
{
|
||||
"single_model": address,
|
||||
"nested_model": person,
|
||||
"model_list": [address, address],
|
||||
"model_dict": {"home": address},
|
||||
}
|
||||
)
|
||||
|
||||
result = export_state(flow)
|
||||
|
||||
assert result["single_model"]["street"] == "123 Main St"
|
||||
|
||||
assert result["nested_model"]["name"] == "John Doe"
|
||||
assert result["nested_model"]["address"]["city"] == "Tech City"
|
||||
assert result["nested_model"]["birthday"] == "1994-01-01"
|
||||
|
||||
assert len(result["model_list"]) == 2
|
||||
assert all(m["street"] == "123 Main St" for m in result["model_list"])
|
||||
assert result["model_dict"]["home"]["city"] == "Tech City"
|
||||
|
||||
|
||||
def test_depth_limit(mock_flow):
|
||||
"""Test max depth handling with a deeply nested structure"""
|
||||
|
||||
def create_nested(depth):
|
||||
if depth == 0:
|
||||
return "value"
|
||||
return {"next": create_nested(depth - 1)}
|
||||
|
||||
deep_structure = create_nested(10)
|
||||
flow = mock_flow(deep_structure)
|
||||
result = export_state(flow)
|
||||
|
||||
assert result == {
|
||||
"next": {
|
||||
"next": {
|
||||
"next": {
|
||||
"next": {
|
||||
"next": "{'next': {'next': {'next': {'next': {'next': 'value'}}}}}"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,18 @@
|
||||
"""Test Flow creation and execution basic functionality."""
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.flow.flow import Flow, and_, listen, or_, router, start
|
||||
from crewai.flow.flow_events import (
|
||||
FlowFinishedEvent,
|
||||
FlowStartedEvent,
|
||||
MethodExecutionFinishedEvent,
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
|
||||
|
||||
def test_simple_sequential_flow():
|
||||
@@ -398,3 +405,218 @@ def test_router_with_multiple_conditions():
|
||||
|
||||
# final_step should run after router_and
|
||||
assert execution_order.index("log_final_step") > execution_order.index("router_and")
|
||||
|
||||
|
||||
def test_unstructured_flow_event_emission():
|
||||
"""Test that the correct events are emitted during unstructured flow
|
||||
execution with all fields validated."""
|
||||
|
||||
class PoemFlow(Flow):
|
||||
@start()
|
||||
def prepare_flower(self):
|
||||
self.state["flower"] = "roses"
|
||||
return "foo"
|
||||
|
||||
@start()
|
||||
def prepare_color(self):
|
||||
self.state["color"] = "red"
|
||||
return "bar"
|
||||
|
||||
@listen(prepare_color)
|
||||
def write_first_sentence(self):
|
||||
return f"{self.state['flower']} are {self.state['color']}"
|
||||
|
||||
@listen(write_first_sentence)
|
||||
def finish_poem(self, first_sentence):
|
||||
separator = self.state.get("separator", "\n")
|
||||
return separator.join([first_sentence, "violets are blue"])
|
||||
|
||||
@listen(finish_poem)
|
||||
def save_poem_to_database(self):
|
||||
# A method without args/kwargs to ensure events are sent correctly
|
||||
pass
|
||||
|
||||
event_log = []
|
||||
|
||||
def handle_event(_, event):
|
||||
event_log.append(event)
|
||||
|
||||
flow = PoemFlow()
|
||||
flow.event_emitter.connect(handle_event)
|
||||
flow.kickoff(inputs={"separator": ", "})
|
||||
|
||||
assert isinstance(event_log[0], FlowStartedEvent)
|
||||
assert event_log[0].flow_name == "PoemFlow"
|
||||
assert event_log[0].inputs == {"separator": ", "}
|
||||
assert isinstance(event_log[0].timestamp, datetime)
|
||||
|
||||
# Asserting for concurrent start method executions in a for loop as you
|
||||
# can't guarantee ordering in asynchronous executions
|
||||
for i in range(1, 5):
|
||||
event = event_log[i]
|
||||
assert isinstance(event.state, dict)
|
||||
assert isinstance(event.state["id"], str)
|
||||
|
||||
if event.method_name == "prepare_flower":
|
||||
if isinstance(event, MethodExecutionStartedEvent):
|
||||
assert event.params == {}
|
||||
assert event.state["separator"] == ", "
|
||||
elif isinstance(event, MethodExecutionFinishedEvent):
|
||||
assert event.result == "foo"
|
||||
assert event.state["flower"] == "roses"
|
||||
assert event.state["separator"] == ", "
|
||||
else:
|
||||
assert False, "Unexpected event type for prepare_flower"
|
||||
elif event.method_name == "prepare_color":
|
||||
if isinstance(event, MethodExecutionStartedEvent):
|
||||
assert event.params == {}
|
||||
assert event.state["separator"] == ", "
|
||||
elif isinstance(event, MethodExecutionFinishedEvent):
|
||||
assert event.result == "bar"
|
||||
assert event.state["color"] == "red"
|
||||
assert event.state["separator"] == ", "
|
||||
else:
|
||||
assert False, "Unexpected event type for prepare_color"
|
||||
else:
|
||||
assert False, f"Unexpected method {event.method_name} in prepare events"
|
||||
|
||||
assert isinstance(event_log[5], MethodExecutionStartedEvent)
|
||||
assert event_log[5].method_name == "write_first_sentence"
|
||||
assert event_log[5].params == {}
|
||||
assert isinstance(event_log[5].state, dict)
|
||||
assert event_log[5].state["flower"] == "roses"
|
||||
assert event_log[5].state["color"] == "red"
|
||||
assert event_log[5].state["separator"] == ", "
|
||||
|
||||
assert isinstance(event_log[6], MethodExecutionFinishedEvent)
|
||||
assert event_log[6].method_name == "write_first_sentence"
|
||||
assert event_log[6].result == "roses are red"
|
||||
|
||||
assert isinstance(event_log[7], MethodExecutionStartedEvent)
|
||||
assert event_log[7].method_name == "finish_poem"
|
||||
assert event_log[7].params == {"_0": "roses are red"}
|
||||
assert isinstance(event_log[7].state, dict)
|
||||
assert event_log[7].state["flower"] == "roses"
|
||||
assert event_log[7].state["color"] == "red"
|
||||
|
||||
assert isinstance(event_log[8], MethodExecutionFinishedEvent)
|
||||
assert event_log[8].method_name == "finish_poem"
|
||||
assert event_log[8].result == "roses are red, violets are blue"
|
||||
|
||||
assert isinstance(event_log[9], MethodExecutionStartedEvent)
|
||||
assert event_log[9].method_name == "save_poem_to_database"
|
||||
assert event_log[9].params == {}
|
||||
assert isinstance(event_log[9].state, dict)
|
||||
assert event_log[9].state["flower"] == "roses"
|
||||
assert event_log[9].state["color"] == "red"
|
||||
|
||||
assert isinstance(event_log[10], MethodExecutionFinishedEvent)
|
||||
assert event_log[10].method_name == "save_poem_to_database"
|
||||
assert event_log[10].result is None
|
||||
|
||||
assert isinstance(event_log[11], FlowFinishedEvent)
|
||||
assert event_log[11].flow_name == "PoemFlow"
|
||||
assert event_log[11].result is None
|
||||
assert isinstance(event_log[11].timestamp, datetime)
|
||||
|
||||
|
||||
def test_structured_flow_event_emission():
|
||||
"""Test that the correct events are emitted during structured flow
|
||||
execution with all fields validated."""
|
||||
|
||||
class OnboardingState(BaseModel):
|
||||
name: str = ""
|
||||
sent: bool = False
|
||||
|
||||
class OnboardingFlow(Flow[OnboardingState]):
|
||||
@start()
|
||||
def user_signs_up(self):
|
||||
self.state.sent = False
|
||||
|
||||
@listen(user_signs_up)
|
||||
def send_welcome_message(self):
|
||||
self.state.sent = True
|
||||
return f"Welcome, {self.state.name}!"
|
||||
|
||||
event_log = []
|
||||
|
||||
def handle_event(_, event):
|
||||
event_log.append(event)
|
||||
|
||||
flow = OnboardingFlow()
|
||||
flow.event_emitter.connect(handle_event)
|
||||
flow.kickoff(inputs={"name": "Anakin"})
|
||||
|
||||
assert isinstance(event_log[0], FlowStartedEvent)
|
||||
assert event_log[0].flow_name == "OnboardingFlow"
|
||||
assert event_log[0].inputs == {"name": "Anakin"}
|
||||
assert isinstance(event_log[0].timestamp, datetime)
|
||||
|
||||
assert isinstance(event_log[1], MethodExecutionStartedEvent)
|
||||
assert event_log[1].method_name == "user_signs_up"
|
||||
|
||||
assert isinstance(event_log[2], MethodExecutionFinishedEvent)
|
||||
assert event_log[2].method_name == "user_signs_up"
|
||||
|
||||
assert isinstance(event_log[3], MethodExecutionStartedEvent)
|
||||
assert event_log[3].method_name == "send_welcome_message"
|
||||
assert event_log[3].params == {}
|
||||
assert getattr(event_log[3].state, "sent") is False
|
||||
|
||||
assert isinstance(event_log[4], MethodExecutionFinishedEvent)
|
||||
assert event_log[4].method_name == "send_welcome_message"
|
||||
assert getattr(event_log[4].state, "sent") is True
|
||||
assert event_log[4].result == "Welcome, Anakin!"
|
||||
|
||||
assert isinstance(event_log[5], FlowFinishedEvent)
|
||||
assert event_log[5].flow_name == "OnboardingFlow"
|
||||
assert event_log[5].result == "Welcome, Anakin!"
|
||||
assert isinstance(event_log[5].timestamp, datetime)
|
||||
|
||||
|
||||
def test_stateless_flow_event_emission():
|
||||
"""Test that the correct events are emitted stateless during flow execution
|
||||
with all fields validated."""
|
||||
|
||||
class StatelessFlow(Flow):
|
||||
@start()
|
||||
def init(self):
|
||||
pass
|
||||
|
||||
@listen(init)
|
||||
def process(self):
|
||||
return "Deeds will not be less valiant because they are unpraised."
|
||||
|
||||
event_log = []
|
||||
|
||||
def handle_event(_, event):
|
||||
event_log.append(event)
|
||||
|
||||
flow = StatelessFlow()
|
||||
flow.event_emitter.connect(handle_event)
|
||||
flow.kickoff()
|
||||
|
||||
assert isinstance(event_log[0], FlowStartedEvent)
|
||||
assert event_log[0].flow_name == "StatelessFlow"
|
||||
assert event_log[0].inputs is None
|
||||
assert isinstance(event_log[0].timestamp, datetime)
|
||||
|
||||
assert isinstance(event_log[1], MethodExecutionStartedEvent)
|
||||
assert event_log[1].method_name == "init"
|
||||
|
||||
assert isinstance(event_log[2], MethodExecutionFinishedEvent)
|
||||
assert event_log[2].method_name == "init"
|
||||
|
||||
assert isinstance(event_log[3], MethodExecutionStartedEvent)
|
||||
assert event_log[3].method_name == "process"
|
||||
|
||||
assert isinstance(event_log[4], MethodExecutionFinishedEvent)
|
||||
assert event_log[4].method_name == "process"
|
||||
|
||||
assert isinstance(event_log[5], FlowFinishedEvent)
|
||||
assert event_log[5].flow_name == "StatelessFlow"
|
||||
assert (
|
||||
event_log[5].result
|
||||
== "Deeds will not be less valiant because they are unpraised."
|
||||
)
|
||||
assert isinstance(event_log[5].timestamp, datetime)
|
||||
|
||||
Reference in New Issue
Block a user