mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-05 06:59:23 +00:00
Add expressions to FlowDefinition actions (#6145)
* Add expressions to FlowDefinition actions
Let definitions compute values without Python. A new `call: expression`
action evaluates a Common Expression Language (CEL) expression, and tool
`with:` blocks now render `${...}` CEL templates.
Example 1:
```yaml
decide:
do:
call: expression
expr: "state.score >= 80 ? 'qualified' : 'nurture'"
router: true
emit: [qualified, nurture]
```
Example 2:
```yaml
search:
do:
call: tool
ref: my.pkg:SearchTool
with:
search_query: "${outputs.build_query.query + ' news'}"
max_results: "${state.limit}"
```
* Address code review comments
* Address code review comments
* Fix linting offenses
* Address code review comments
* Fix scrapgraph issue
This commit is contained in:
@@ -13,8 +13,8 @@ from crewai_core import (
|
||||
user_data,
|
||||
version,
|
||||
)
|
||||
import pytest
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
import pytest
|
||||
|
||||
|
||||
def test_version_returns_string() -> None:
|
||||
|
||||
@@ -63,7 +63,7 @@ spider-client = [
|
||||
"spider-client>=0.1.25",
|
||||
]
|
||||
scrapegraph-py = [
|
||||
"scrapegraph-py>=1.9.0",
|
||||
"scrapegraph-py>=1.9.0,<2",
|
||||
]
|
||||
linkup-sdk = [
|
||||
"linkup-sdk>=0.2.2",
|
||||
|
||||
@@ -33,6 +33,7 @@ dependencies = [
|
||||
"appdirs~=1.4.4",
|
||||
"jsonref~=1.1.0",
|
||||
"json-repair~=0.25.2",
|
||||
"cel-python>=0.5.0,<0.6",
|
||||
"tomli-w~=1.1.0",
|
||||
"tomli~=2.0.2",
|
||||
"json5~=0.10.0",
|
||||
|
||||
@@ -158,7 +158,6 @@ class EventListener(BaseEventListener):
|
||||
trace_listener.formatter = self.formatter
|
||||
|
||||
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
|
||||
|
||||
@crewai_event_bus.on(CCEnvEvent)
|
||||
def on_cc_env(_: Any, event: CCEnvEvent) -> None:
|
||||
self._telemetry.env_context_span(event.type)
|
||||
|
||||
@@ -146,6 +146,10 @@ class _ConversationalMixin:
|
||||
def kickoff(self, *args: Any, **kwargs: Any) -> Any:
|
||||
pass
|
||||
|
||||
@property
|
||||
def method_outputs(self) -> list[Any]:
|
||||
pass
|
||||
|
||||
def conversation_start(self) -> str | None:
|
||||
"""Return the current user message for conversational route selection.
|
||||
|
||||
@@ -1033,7 +1037,8 @@ class _ConversationalMixin:
|
||||
# of warning about an empty scope stack.
|
||||
started_id = getattr(self, "_deferred_flow_started_event_id", None)
|
||||
if started_id:
|
||||
last_output = self._method_outputs[-1] if self._method_outputs else None
|
||||
method_outputs = self.method_outputs
|
||||
last_output = method_outputs[-1] if method_outputs else None
|
||||
restore_event_scope(((started_id, "flow_started"),))
|
||||
try:
|
||||
crewai_event_bus.emit(
|
||||
|
||||
@@ -35,6 +35,7 @@ __all__ = [
|
||||
"FlowDefinition",
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
"FlowExpressionActionDefinition",
|
||||
"FlowHumanFeedbackDefinition",
|
||||
"FlowMethodDefinition",
|
||||
"FlowPersistenceDefinition",
|
||||
@@ -163,7 +164,18 @@ class FlowToolActionDefinition(BaseModel):
|
||||
with_: dict[str, Any] | None = Field(default=None, alias="with")
|
||||
|
||||
|
||||
FlowActionDefinition = FlowCodeActionDefinition | FlowToolActionDefinition
|
||||
class FlowExpressionActionDefinition(BaseModel):
|
||||
"""A Flow method action that evaluates a CEL expression."""
|
||||
|
||||
model_config = ConfigDict(extra="forbid")
|
||||
|
||||
call: TypingLiteral["expression"]
|
||||
expr: str
|
||||
|
||||
|
||||
FlowActionDefinition = (
|
||||
FlowCodeActionDefinition | FlowToolActionDefinition | FlowExpressionActionDefinition
|
||||
)
|
||||
|
||||
|
||||
class FlowMethodDefinition(BaseModel):
|
||||
|
||||
@@ -962,7 +962,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
}
|
||||
self._restored_from_checkpoint = True
|
||||
if self.checkpoint_method_outputs is not None:
|
||||
self._method_outputs = list(self.checkpoint_method_outputs)
|
||||
self._method_outputs = [
|
||||
entry
|
||||
if isinstance(entry, dict) and "method" in entry and "output" in entry
|
||||
else {"method": "", "output": entry}
|
||||
for entry in self.checkpoint_method_outputs
|
||||
]
|
||||
if self.checkpoint_method_counts is not None:
|
||||
self._method_execution_counts = {
|
||||
FlowMethodName(k): v for k, v in self.checkpoint_method_counts.items()
|
||||
@@ -1649,6 +1654,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
metadata=context.metadata,
|
||||
)
|
||||
collapsed_outcome = result.outcome
|
||||
resumed_method_output = (
|
||||
result.output
|
||||
if emit and isinstance(result, HumanFeedbackResult)
|
||||
else result
|
||||
)
|
||||
|
||||
self._completed_methods.add(FlowMethodName(context.method_name))
|
||||
|
||||
@@ -1677,9 +1687,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
|
||||
self._is_execution_resuming = False
|
||||
|
||||
self._method_outputs.append(
|
||||
{"method": context.method_name, "output": resumed_method_output}
|
||||
)
|
||||
|
||||
try:
|
||||
if emit and collapsed_outcome:
|
||||
self._method_outputs.append(collapsed_outcome)
|
||||
await self._execute_listeners(
|
||||
FlowMethodName(collapsed_outcome),
|
||||
result,
|
||||
@@ -1725,7 +1738,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
return e
|
||||
raise
|
||||
|
||||
final_result = self._method_outputs[-1] if self._method_outputs else result
|
||||
method_outputs = self.method_outputs
|
||||
final_result = (
|
||||
method_outputs[-1]
|
||||
if method_outputs
|
||||
else (resumed_method_output if emit else result)
|
||||
)
|
||||
|
||||
if self._event_futures:
|
||||
await asyncio.gather(
|
||||
@@ -1906,7 +1924,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
@property
|
||||
def method_outputs(self) -> list[Any]:
|
||||
"""Returns the list of all outputs from executed methods."""
|
||||
return self._method_outputs
|
||||
outputs: list[Any] = []
|
||||
for entry in self._method_outputs:
|
||||
if isinstance(entry, dict) and "output" in entry:
|
||||
outputs.append(entry["output"])
|
||||
else:
|
||||
outputs.append(entry)
|
||||
return outputs
|
||||
|
||||
@property
|
||||
def flow_id(self) -> str:
|
||||
@@ -2540,7 +2564,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# Clear the resumption flag after initial execution completes
|
||||
self._is_execution_resuming = False
|
||||
|
||||
final_output = self._method_outputs[-1] if self._method_outputs else None
|
||||
method_outputs = self.method_outputs
|
||||
final_output = method_outputs[-1] if method_outputs else None
|
||||
|
||||
if self._event_futures:
|
||||
await asyncio.gather(
|
||||
@@ -2695,7 +2720,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
if start_method_name in self._completed_methods:
|
||||
if self._is_execution_resuming:
|
||||
# During resumption, skip execution but continue listeners
|
||||
last_output = self._method_outputs[-1] if self._method_outputs else None
|
||||
method_outputs = self.method_outputs
|
||||
last_output = method_outputs[-1] if method_outputs else None
|
||||
await self._execute_listeners(start_method_name, last_output)
|
||||
return
|
||||
# For cyclic flows, clear from completed to allow re-execution
|
||||
@@ -2825,7 +2851,7 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
method_name, method_definition.human_feedback, result
|
||||
)
|
||||
|
||||
self._method_outputs.append(result)
|
||||
self._method_outputs.append({"method": str(method_name), "output": result})
|
||||
|
||||
# For @human_feedback methods with emit, the result is the collapsed outcome
|
||||
# (e.g., "approved") used for routing. But we want the actual method output
|
||||
@@ -2833,8 +2859,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
# if a stashed output exists. Dict-based stash is concurrency-safe and
|
||||
# handles None return values (presence in dict = stashed, not value).
|
||||
if method_name in self._human_feedback_method_outputs:
|
||||
self._method_outputs[-1] = self._human_feedback_method_outputs.pop(
|
||||
method_name
|
||||
self._method_outputs[-1]["output"] = (
|
||||
self._human_feedback_method_outputs.pop(method_name)
|
||||
)
|
||||
|
||||
self._method_execution_counts[method_name] = (
|
||||
@@ -3560,7 +3586,6 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
|
||||
def _resolve_feedback_provider(
|
||||
self, feedback_definition: FlowHumanFeedbackDefinition
|
||||
) -> Any:
|
||||
|
||||
provider = feedback_definition.provider
|
||||
if isinstance(provider, str):
|
||||
provider = resolve_instance_ref(provider, field="human_feedback.provider")
|
||||
|
||||
144
lib/crewai/src/crewai/flow/runtime/_expressions.py
Normal file
144
lib/crewai/src/crewai/flow/runtime/_expressions.py
Normal file
@@ -0,0 +1,144 @@
|
||||
"""Runtime expression support for FlowDefinition CEL expressions."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
import dataclasses
|
||||
from itertools import pairwise
|
||||
import json
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.flow.runtime import Flow
|
||||
|
||||
|
||||
_EXPRESSION_PATTERN = re.compile(r"\$\{([^{}]*)\}")
|
||||
|
||||
__all__ = ["FlowExpressionError", "evaluate_expression", "render_with_block"]
|
||||
|
||||
|
||||
class FlowExpressionError(ValueError):
|
||||
"""A FlowDefinition expression failed to parse or evaluate."""
|
||||
|
||||
|
||||
def render_with_block(flow: Flow[Any], value: Any) -> Any:
|
||||
"""Render CEL expressions inside a FlowDefinition ``with:`` payload."""
|
||||
context = _expression_context(flow)
|
||||
return _render_value(value, context)
|
||||
|
||||
|
||||
def evaluate_expression(flow: Flow[Any], expression: str) -> Any:
|
||||
"""Evaluate a FlowDefinition CEL expression against runtime context."""
|
||||
expression = expression.strip()
|
||||
if not expression:
|
||||
raise FlowExpressionError("empty CEL expression")
|
||||
return _eval_cel(expression, _expression_context(flow))
|
||||
|
||||
|
||||
def _expression_context(flow: Flow[Any]) -> dict[str, Any]:
|
||||
return {
|
||||
"state": flow._copy_and_serialize_state(),
|
||||
"outputs": _outputs_by_name(flow._method_outputs),
|
||||
}
|
||||
|
||||
|
||||
def _outputs_by_name(method_outputs: list[Any]) -> dict[str, Any]:
|
||||
outputs: dict[str, Any] = {}
|
||||
for entry in method_outputs:
|
||||
method = ""
|
||||
output = entry
|
||||
if isinstance(entry, dict) and "output" in entry:
|
||||
method = str(entry.get("method", ""))
|
||||
output = entry["output"]
|
||||
output = copy.deepcopy(output)
|
||||
if isinstance(output, BaseModel):
|
||||
output = output.model_dump(mode="json")
|
||||
elif dataclasses.is_dataclass(output) and not isinstance(output, type):
|
||||
output = dataclasses.asdict(output)
|
||||
outputs[method] = output
|
||||
return outputs
|
||||
|
||||
|
||||
def _render_value(value: Any, context: dict[str, Any]) -> Any:
|
||||
if isinstance(value, str):
|
||||
return _render_string(value, context)
|
||||
if isinstance(value, dict):
|
||||
return {key: _render_value(item, context) for key, item in value.items()}
|
||||
if isinstance(value, list):
|
||||
return [_render_value(item, context) for item in value]
|
||||
return value
|
||||
|
||||
|
||||
def _render_string(value: str, context: dict[str, Any]) -> Any:
|
||||
matches = list(_EXPRESSION_PATTERN.finditer(value))
|
||||
if not matches:
|
||||
_raise_for_invalid_interpolation(value)
|
||||
return value
|
||||
|
||||
_raise_for_literal_braces(value[: matches[0].start()])
|
||||
for previous, current in pairwise(matches):
|
||||
_raise_for_literal_braces(value[previous.end() : current.start()])
|
||||
_raise_for_literal_braces(value[matches[-1].end() :])
|
||||
|
||||
if len(matches) == 1 and matches[0].span() == (0, len(value)):
|
||||
expression = matches[0].group(1).strip()
|
||||
if not expression:
|
||||
raise FlowExpressionError("empty CEL expression in with block")
|
||||
return _eval_cel(expression, context)
|
||||
|
||||
rendered: list[str] = []
|
||||
position = 0
|
||||
for match in matches:
|
||||
start, end = match.span()
|
||||
literal = value[position:start]
|
||||
rendered.append(literal)
|
||||
|
||||
expression = match.group(1).strip()
|
||||
if not expression:
|
||||
raise FlowExpressionError("empty CEL expression in with block")
|
||||
result = _eval_cel(expression, context)
|
||||
rendered.append(result if isinstance(result, str) else json.dumps(result))
|
||||
position = end
|
||||
|
||||
literal = value[position:]
|
||||
rendered.append(literal)
|
||||
|
||||
return "".join(rendered)
|
||||
|
||||
|
||||
def _raise_for_invalid_interpolation(value: str) -> None:
|
||||
if "${" not in value:
|
||||
return
|
||||
raise FlowExpressionError(
|
||||
"invalid CEL interpolation in with block: expressions must be enclosed "
|
||||
"as ${...} and cannot contain braces"
|
||||
)
|
||||
|
||||
|
||||
def _raise_for_literal_braces(value: str) -> None:
|
||||
if "{" not in value and "}" not in value:
|
||||
return
|
||||
raise FlowExpressionError(
|
||||
"invalid CEL interpolation in with block: expressions must be enclosed "
|
||||
"as ${...} and cannot contain braces"
|
||||
)
|
||||
|
||||
|
||||
def _eval_cel(expression: str, context: dict[str, Any]) -> Any:
|
||||
try:
|
||||
from celpy import Environment
|
||||
from celpy.adapter import CELJSONEncoder, json_to_cel
|
||||
from celpy.evaluation import Context
|
||||
|
||||
environment = Environment()
|
||||
program = environment.program(environment.compile(expression))
|
||||
result = program.evaluate(cast(Context, json_to_cel(context)))
|
||||
return json.loads(json.dumps(result, cls=CELJSONEncoder))
|
||||
except Exception as e:
|
||||
raise FlowExpressionError(
|
||||
f"failed to evaluate CEL expression {expression!r}: {e}"
|
||||
) from e
|
||||
@@ -16,8 +16,10 @@ from typing import TYPE_CHECKING, Any, cast
|
||||
from crewai.flow.flow_definition import (
|
||||
FlowActionDefinition,
|
||||
FlowCodeActionDefinition,
|
||||
FlowExpressionActionDefinition,
|
||||
FlowToolActionDefinition,
|
||||
)
|
||||
from crewai.flow.runtime._expressions import evaluate_expression, render_with_block
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -68,7 +70,7 @@ def _resolve_code_action(
|
||||
|
||||
|
||||
def _resolve_tool_action(
|
||||
_flow: Flow[Any], action: FlowToolActionDefinition
|
||||
flow: Flow[Any], action: FlowToolActionDefinition
|
||||
) -> Callable[..., Any]:
|
||||
target = resolve_ref(action.ref, field="do")
|
||||
from crewai.tools import BaseTool
|
||||
@@ -89,15 +91,26 @@ def _resolve_tool_action(
|
||||
tool_kwargs = action.with_ or {}
|
||||
|
||||
def run_tool(*_args: Any, **_kwargs: Any) -> Any:
|
||||
return tool.run(**tool_kwargs)
|
||||
return tool.run(**render_with_block(flow, tool_kwargs))
|
||||
|
||||
return run_tool
|
||||
|
||||
|
||||
def _resolve_expression_action(
|
||||
flow: Flow[Any], action: FlowExpressionActionDefinition
|
||||
) -> Callable[..., Any]:
|
||||
def run_expression(*_args: Any, **_kwargs: Any) -> Any:
|
||||
return evaluate_expression(flow, action.expr)
|
||||
|
||||
return run_expression
|
||||
|
||||
|
||||
def resolve_action(flow: Flow[Any], action: FlowActionDefinition) -> Callable[..., Any]:
|
||||
"""Turn one `do:` action into the callable the flow runs for that node."""
|
||||
if action.call == "code":
|
||||
return _resolve_code_action(flow, action)
|
||||
if action.call == "tool":
|
||||
return _resolve_tool_action(flow, action)
|
||||
if action.call == "expression":
|
||||
return _resolve_expression_action(flow, action)
|
||||
raise ValueError(f"unknown call type {action.call!r}")
|
||||
|
||||
@@ -21,7 +21,7 @@ from unittest.mock import MagicMock, patch
|
||||
import pytest
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.flow import Flow, start, listen, human_feedback
|
||||
from crewai.flow import Flow, HumanFeedbackResult, start, listen, human_feedback
|
||||
from crewai.flow.async_feedback import (
|
||||
ConsoleProvider,
|
||||
HumanFeedbackPending,
|
||||
@@ -615,6 +615,45 @@ class TestFlowResumeWithFeedback:
|
||||
|
||||
assert persistence.load_pending_feedback("resume-test-123") is None
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_terminal_resume_without_emit_returns_feedback_result(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Terminal resumed non-emit methods return the full feedback result."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(message="Review this:", metadata={"stage": "draft"})
|
||||
def generate(self):
|
||||
return {"content": "generated content"}
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="terminal-non-emit-test-123",
|
||||
flow_class="test.TestFlow",
|
||||
method_name="generate",
|
||||
method_output={"content": "generated content"},
|
||||
message="Review this:",
|
||||
metadata={"stage": "draft"},
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="terminal-non-emit-test-123",
|
||||
context=context,
|
||||
state_data={"id": "terminal-non-emit-test-123"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("terminal-non-emit-test-123", persistence)
|
||||
result = flow.resume("looks good!")
|
||||
|
||||
assert isinstance(result, HumanFeedbackResult)
|
||||
assert result.output == {"content": "generated content"}
|
||||
assert result.feedback == "looks good!"
|
||||
assert result.outcome is None
|
||||
assert result.metadata == {"stage": "draft"}
|
||||
assert flow.method_outputs == [result]
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_routing(self, mock_emit: MagicMock) -> None:
|
||||
"""Test resume with routing."""
|
||||
@@ -667,6 +706,93 @@ class TestFlowResumeWithFeedback:
|
||||
assert flow.last_human_feedback.outcome == "approved"
|
||||
assert flow.result_path == "approved"
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_terminal_resume_with_emit_returns_method_output(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Terminal resumed emit methods return the original method output."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
method_output = {"content": "original content", "status": "ready"}
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return method_output
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="terminal-route-test-123",
|
||||
flow_class="test.TestFlow",
|
||||
method_name="review",
|
||||
method_output=method_output,
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="terminal-route-test-123",
|
||||
context=context,
|
||||
state_data={"id": "terminal-route-test-123"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("terminal-route-test-123", persistence)
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", return_value="approved"):
|
||||
result = flow.resume("yes, this looks great")
|
||||
|
||||
assert result == method_output
|
||||
assert flow.method_outputs == [method_output]
|
||||
assert flow.last_human_feedback.outcome == "approved"
|
||||
|
||||
@patch("crewai.flow.runtime.crewai_event_bus.emit")
|
||||
def test_resume_records_method_output_before_downstream_listeners(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Downstream listeners can read outputs from the resumed method."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(message="Review:")
|
||||
def review(self):
|
||||
return "generated content"
|
||||
|
||||
@listen(review)
|
||||
def downstream(self, result):
|
||||
self.state["seen_outputs"] = self.method_outputs
|
||||
return f"downstream:{result.output}"
|
||||
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="listener-output-test-123",
|
||||
flow_class="test.TestFlow",
|
||||
method_name="review",
|
||||
method_output="generated content",
|
||||
message="Review:",
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="listener-output-test-123",
|
||||
context=context,
|
||||
state_data={"id": "listener-output-test-123"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("listener-output-test-123", persistence)
|
||||
result = flow.resume("looks good")
|
||||
|
||||
assert result == "downstream:generated content"
|
||||
assert len(flow.state["seen_outputs"]) == 1
|
||||
seen_output = flow.state["seen_outputs"][0]
|
||||
assert isinstance(seen_output, HumanFeedbackResult)
|
||||
assert seen_output.output == "generated content"
|
||||
assert seen_output.feedback == "looks good"
|
||||
|
||||
|
||||
# Integration Tests with @human_feedback decorator
|
||||
|
||||
|
||||
@@ -617,6 +617,44 @@ class TestKickoffFromCheckpoint:
|
||||
|
||||
|
||||
|
||||
class TestLegacyMethodOutputsRestore:
|
||||
def test_restore_wraps_legacy_plain_value_outputs(self) -> None:
|
||||
flow = Flow()
|
||||
flow._method_outputs = ["first", "second"]
|
||||
state = RuntimeState(root=[flow])
|
||||
state._provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
loc = state.checkpoint(d)
|
||||
cfg = CheckpointConfig(restore_from=loc)
|
||||
restored = Flow.from_checkpoint(cfg)
|
||||
|
||||
assert restored.method_outputs == ["first", "second"]
|
||||
|
||||
def test_restore_legacy_outputs_evaluates_expressions(self) -> None:
|
||||
from crewai.flow.runtime._expressions import _expression_context
|
||||
|
||||
flow = Flow()
|
||||
flow._method_outputs = ["legacy"]
|
||||
state = RuntimeState(root=[flow])
|
||||
state._provider = JsonProvider()
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
loc = state.checkpoint(d)
|
||||
cfg = CheckpointConfig(restore_from=loc)
|
||||
restored = Flow.from_checkpoint(cfg)
|
||||
|
||||
context = _expression_context(restored)
|
||||
assert context["outputs"] == {"": "legacy"}
|
||||
|
||||
def test_raw_legacy_outputs_remain_readable(self) -> None:
|
||||
from crewai.flow.runtime._expressions import _expression_context
|
||||
|
||||
flow = Flow()
|
||||
flow._method_outputs = ["legacy"]
|
||||
|
||||
assert flow.method_outputs == ["legacy"]
|
||||
assert _expression_context(flow)["outputs"] == {"": "legacy"}
|
||||
|
||||
|
||||
class TestAgentCheckpoint:
|
||||
def _make_agent_state(self) -> RuntimeState:
|
||||
agent = Agent(role="r", goal="g", backstory="b", llm="gpt-4o-mini")
|
||||
|
||||
@@ -44,6 +44,7 @@ def test_flow_public_exports_are_explicit():
|
||||
"FlowDefinition",
|
||||
"FlowDefinitionCondition",
|
||||
"FlowDefinitionDiagnostic",
|
||||
"FlowExpressionActionDefinition",
|
||||
"FlowHumanFeedbackDefinition",
|
||||
"FlowMethodDefinition",
|
||||
"FlowPersistenceDefinition",
|
||||
|
||||
@@ -36,6 +36,14 @@ class StaticSearchTool(BaseTool):
|
||||
return f"{prefix}:{search_query}"
|
||||
|
||||
|
||||
class TypedInputsTool(BaseTool):
|
||||
name: str = "TypedInputsTool"
|
||||
description: str = "Returns typed input details."
|
||||
|
||||
def _run(self, count: int, include_domains: list[str]) -> str:
|
||||
return f"{count}:{','.join(include_domains)}"
|
||||
|
||||
|
||||
class ChainFlow(Flow):
|
||||
@start()
|
||||
def begin(self):
|
||||
@@ -52,6 +60,13 @@ class ChainFlow(Flow):
|
||||
return f"confirmed:{self.state['confirmed']}"
|
||||
|
||||
|
||||
class ToolInputFlow(Flow):
|
||||
@start()
|
||||
def build_query(self):
|
||||
self.state["prefix"] = "found"
|
||||
return {"query": "ai agents", "suffix": " news"}
|
||||
|
||||
|
||||
CHAIN_YAML = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: ChainFlow
|
||||
@@ -545,6 +560,254 @@ def test_tool_action_round_trips_with_inputs():
|
||||
assert Flow.from_definition(definition).kickoff() == "search:ai agents"
|
||||
|
||||
|
||||
def test_tool_action_renders_cel_inputs_at_runtime():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: ToolFlow
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
call: code
|
||||
ref: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
search:
|
||||
do:
|
||||
call: tool
|
||||
ref: {__name__}:StaticSearchTool
|
||||
with:
|
||||
search_query: "${{state.begin_ran ? state.topic + ' agents' : 'missing'}}"
|
||||
prefix: found
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff(inputs={"topic": "ai"}) == "found:ai agents"
|
||||
|
||||
|
||||
def test_tool_action_rejects_braces_in_embedded_cel_input():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "ToolFlow",
|
||||
"methods": {
|
||||
"search": {
|
||||
"start": True,
|
||||
"do": {
|
||||
"call": "tool",
|
||||
"ref": f"{__name__}:StaticSearchTool",
|
||||
"with": {
|
||||
"search_query": "wrapped ${'a}b'} value",
|
||||
"prefix": "${'p}x'}",
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="cannot contain braces"):
|
||||
Flow.from_definition(definition).kickoff()
|
||||
|
||||
|
||||
def test_tool_action_rejects_braces_in_full_cel_input():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "ToolFlow",
|
||||
"methods": {
|
||||
"search": {
|
||||
"start": True,
|
||||
"do": {
|
||||
"call": "tool",
|
||||
"ref": f"{__name__}:StaticSearchTool",
|
||||
"with": {
|
||||
"search_query": "${{'query': 'ai agents'}.query}",
|
||||
"prefix": "found",
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError, match="cannot contain braces"):
|
||||
Flow.from_definition(definition).kickoff()
|
||||
|
||||
|
||||
def test_tool_action_renders_latest_output_by_method_name():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: ToolFlow
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
call: code
|
||||
ref: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
search:
|
||||
do:
|
||||
call: tool
|
||||
ref: {__name__}:StaticSearchTool
|
||||
with:
|
||||
search_query: "${{outputs.begin + ' agents'}}"
|
||||
listen: begin
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff() == "search:hello agents"
|
||||
|
||||
|
||||
def test_tool_action_uses_state_and_outputs_in_full_yaml_example():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: ToolFlow
|
||||
methods:
|
||||
build_query:
|
||||
do:
|
||||
call: code
|
||||
ref: {__name__}:ToolInputFlow.build_query
|
||||
start: true
|
||||
search:
|
||||
do:
|
||||
call: tool
|
||||
ref: {__name__}:StaticSearchTool
|
||||
with:
|
||||
search_query: "${{outputs.build_query.query + outputs.build_query.suffix}}"
|
||||
prefix: "${{state.prefix}}"
|
||||
listen: build_query
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert flow.kickoff() == "found:ai agents news"
|
||||
|
||||
|
||||
def test_tool_action_preserves_whole_expression_value_types():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: ToolFlow
|
||||
methods:
|
||||
typed:
|
||||
do:
|
||||
call: tool
|
||||
ref: {__name__}:TypedInputsTool
|
||||
with:
|
||||
count: "${{state.limit}}"
|
||||
include_domains: "${{state.domains}}"
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
assert (
|
||||
flow.kickoff(inputs={"limit": 2, "domains": ["crewai.com", "example.com"]})
|
||||
== "2:crewai.com,example.com"
|
||||
)
|
||||
|
||||
|
||||
def test_tool_action_reports_invalid_cel_expression():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: ToolFlow
|
||||
methods:
|
||||
search:
|
||||
do:
|
||||
call: tool
|
||||
ref: {__name__}:StaticSearchTool
|
||||
with:
|
||||
search_query: "${{state.}}"
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
with pytest.raises(ValueError, match="failed to evaluate CEL expression"):
|
||||
flow.kickoff()
|
||||
|
||||
|
||||
def test_expression_action_round_trips():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
"schema": "crewai.flow/v1",
|
||||
"name": "ExpressionFlow",
|
||||
"methods": {
|
||||
"classify": {
|
||||
"start": True,
|
||||
"do": {
|
||||
"call": "expression",
|
||||
"expr": "state.score >= 80 ? 'qualified' : 'nurture'",
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
assert definition.to_dict()["methods"]["classify"]["do"] == {
|
||||
"call": "expression",
|
||||
"expr": "state.score >= 80 ? 'qualified' : 'nurture'",
|
||||
}
|
||||
assert Flow.from_definition(definition).kickoff(inputs={"score": 90}) == "qualified"
|
||||
|
||||
|
||||
def test_expression_action_can_route_like_if_else():
|
||||
yaml_str = f"""
|
||||
schema: crewai.flow/v1
|
||||
name: ExpressionRouterFlow
|
||||
methods:
|
||||
begin:
|
||||
do:
|
||||
call: code
|
||||
ref: {__name__}:ChainFlow.begin
|
||||
start: true
|
||||
decide:
|
||||
do:
|
||||
call: expression
|
||||
expr: "state.direction == 'left' ? 'left' : 'right'"
|
||||
listen: begin
|
||||
router: true
|
||||
emit: [left, right]
|
||||
take_left:
|
||||
do:
|
||||
call: code
|
||||
ref: {__name__}:RouteFlow.take_left
|
||||
listen: left
|
||||
take_right:
|
||||
do:
|
||||
call: code
|
||||
ref: {__name__}:RouteFlow.take_right
|
||||
listen: right
|
||||
"""
|
||||
|
||||
definition = FlowDefinition.from_yaml(yaml_str)
|
||||
|
||||
assert Flow.from_definition(definition).kickoff(
|
||||
inputs={"direction": "left"}
|
||||
) == "took-left"
|
||||
assert Flow.from_definition(definition).kickoff(
|
||||
inputs={"direction": "right"}
|
||||
) == "took-right"
|
||||
|
||||
|
||||
def test_expression_action_reports_invalid_cel_expression():
|
||||
yaml_str = """
|
||||
schema: crewai.flow/v1
|
||||
name: ExpressionFlow
|
||||
methods:
|
||||
classify:
|
||||
do:
|
||||
call: expression
|
||||
expr: "state."
|
||||
start: true
|
||||
"""
|
||||
|
||||
flow = Flow.from_definition(FlowDefinition.from_yaml(yaml_str))
|
||||
|
||||
with pytest.raises(ValueError, match="failed to evaluate CEL expression"):
|
||||
flow.kickoff()
|
||||
|
||||
|
||||
def test_tool_action_requires_module_qualname_ref():
|
||||
definition = FlowDefinition.from_dict(
|
||||
{
|
||||
|
||||
@@ -860,9 +860,9 @@ class TestHumanFeedbackFinalOutputPreservation:
|
||||
):
|
||||
flow.kickoff()
|
||||
|
||||
# _method_outputs should contain the real output
|
||||
assert len(flow._method_outputs) == 1
|
||||
assert flow._method_outputs[0] == {"data": "real output"}
|
||||
# method_outputs should contain the real output
|
||||
assert flow.method_outputs == [{"data": "real output"}]
|
||||
assert flow._method_outputs[0]["method"] == "generate"
|
||||
|
||||
@patch("builtins.input", return_value="looks good")
|
||||
@patch("builtins.print")
|
||||
|
||||
Reference in New Issue
Block a user