Files
crewAI/lib/cli/tests/test_kickoff_flow.py
Lorenze Jay 9b31226494
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Track conversational flow turn usage in telemetry (#6324)
* Track conversational flow turn usage in telemetry

* adjusted name to flow:conversation_turn

* only mark on turn completed event

* ensure tui also emits these events
2026-06-25 11:02:07 -07:00

101 lines
2.6 KiB
Python

from __future__ import annotations
import sys
from crewai_cli import kickoff_flow
def test_loads_conversational_flow_from_kickoff_script(tmp_path, monkeypatch) -> None:
package_dir = tmp_path / "src" / "demo_chat"
package_dir.mkdir(parents=True)
(package_dir / "__init__.py").write_text("")
(package_dir / "main.py").write_text(
"\n".join(
[
"from crewai.flow import Flow",
"",
"class DemoChatFlow(Flow):",
" conversational = True",
]
)
)
(tmp_path / "pyproject.toml").write_text(
"\n".join(
[
"[project]",
'name = "demo-chat"',
"[project.scripts]",
'kickoff = "demo_chat.main:kickoff"',
]
)
)
monkeypatch.chdir(tmp_path)
sys.modules.pop("demo_chat.main", None)
sys.modules.pop("demo_chat", None)
flow = kickoff_flow._load_conversational_flow_from_kickoff_script()
assert flow is not None
assert type(flow).__name__ == "DemoChatFlow"
assert flow.conversational is True
def test_kickoff_flow_falls_back_to_uv_when_no_conversational_flow(
monkeypatch,
) -> None:
calls: list[list[str]] = []
def fake_run(command, capture_output, text, check):
calls.append(command)
class Result:
stderr = ""
return Result()
monkeypatch.setattr(
kickoff_flow, "_load_conversational_flow_from_kickoff_script", lambda: None
)
monkeypatch.setattr(kickoff_flow.subprocess, "run", fake_run)
kickoff_flow.kickoff_flow()
assert calls == [["uv", "run", "kickoff"]]
def test_run_conversational_flow_tui_initializes_event_listener(monkeypatch) -> None:
calls: list[str] = []
class FakeEventListener:
def __init__(self) -> None:
calls.append("listener")
class FakeCrewRunApp:
def __init__(self, *, crew_name: str, conversational: bool) -> None:
calls.append("app")
self.crew_name = crew_name
self.conversational = conversational
self._status = "completed"
self._crew_result = "done"
self._flow = None
def run(self) -> None:
calls.append("run")
class DemoFlow:
name = "Demo"
monkeypatch.setattr(
"crewai.events.event_listener.EventListener",
FakeEventListener,
)
monkeypatch.setattr(
"crewai_cli.crew_run_tui.CrewRunApp",
FakeCrewRunApp,
)
result = kickoff_flow._run_conversational_flow_tui(DemoFlow())
assert result == "done"
assert calls == ["listener", "app", "run"]