mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-22 11:48:16 +00:00
Compare commits
1 Commits
feat/cli-l
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09b84dd2b0 |
@@ -22,7 +22,6 @@ from crewai.cli.replay_from_task import replay_task_command
|
||||
from crewai.cli.reset_memories_command import reset_memories_command
|
||||
from crewai.cli.run_crew import run_crew
|
||||
from crewai.cli.settings.main import SettingsCommand
|
||||
from crewai.cli.shared.token_manager import TokenManager
|
||||
from crewai.cli.tools.main import ToolCommand
|
||||
from crewai.cli.train_crew import train_crew
|
||||
from crewai.cli.triggers.main import TriggersCommand
|
||||
@@ -35,7 +34,7 @@ from crewai.memory.storage.kickoff_task_outputs_storage import (
|
||||
|
||||
@click.group()
|
||||
@click.version_option(get_version("crewai"))
|
||||
def crewai() -> None:
|
||||
def crewai():
|
||||
"""Top-level command group for crewai."""
|
||||
|
||||
|
||||
@@ -46,7 +45,7 @@ def crewai() -> None:
|
||||
),
|
||||
)
|
||||
@click.argument("uv_args", nargs=-1, type=click.UNPROCESSED)
|
||||
def uv(uv_args: tuple[str, ...]) -> None:
|
||||
def uv(uv_args):
|
||||
"""A wrapper around uv commands that adds custom tool authentication through env vars."""
|
||||
env = os.environ.copy()
|
||||
try:
|
||||
@@ -84,9 +83,7 @@ def uv(uv_args: tuple[str, ...]) -> None:
|
||||
@click.argument("name")
|
||||
@click.option("--provider", type=str, help="The provider to use for the crew")
|
||||
@click.option("--skip_provider", is_flag=True, help="Skip provider validation")
|
||||
def create(
|
||||
type: str, name: str, provider: str | None, skip_provider: bool = False
|
||||
) -> None:
|
||||
def create(type, name, provider, skip_provider=False):
|
||||
"""Create a new crew, or flow."""
|
||||
if type == "crew":
|
||||
create_crew(name, provider, skip_provider)
|
||||
@@ -100,7 +97,7 @@ def create(
|
||||
@click.option(
|
||||
"--tools", is_flag=True, help="Show the installed version of crewai tools"
|
||||
)
|
||||
def version(tools: bool) -> None:
|
||||
def version(tools):
|
||||
"""Show the installed version of crewai."""
|
||||
try:
|
||||
crewai_version = get_version("crewai")
|
||||
@@ -131,7 +128,7 @@ def version(tools: bool) -> None:
|
||||
default="trained_agents_data.pkl",
|
||||
help="Path to a custom file for training",
|
||||
)
|
||||
def train(n_iterations: int, filename: str) -> None:
|
||||
def train(n_iterations: int, filename: str):
|
||||
"""Train the crew."""
|
||||
click.echo(f"Training the Crew for {n_iterations} iterations")
|
||||
train_crew(n_iterations, filename)
|
||||
@@ -337,7 +334,7 @@ def memory(
|
||||
default="gpt-4o-mini",
|
||||
help="LLM Model to run the tests on the Crew. For now only accepting only OpenAI models.",
|
||||
)
|
||||
def test(n_iterations: int, model: str) -> None:
|
||||
def test(n_iterations: int, model: str):
|
||||
"""Test the crew and evaluate the results."""
|
||||
click.echo(f"Testing the crew for {n_iterations} iterations with model {model}")
|
||||
evaluate_crew(n_iterations, model)
|
||||
@@ -350,62 +347,46 @@ def test(n_iterations: int, model: str) -> None:
|
||||
)
|
||||
)
|
||||
@click.pass_context
|
||||
def install(context: click.Context) -> None:
|
||||
def install(context):
|
||||
"""Install the Crew."""
|
||||
install_crew(context.args)
|
||||
|
||||
|
||||
@crewai.command()
|
||||
def run() -> None:
|
||||
def run():
|
||||
"""Run the Crew."""
|
||||
run_crew()
|
||||
|
||||
|
||||
@crewai.command()
|
||||
def update() -> None:
|
||||
def update():
|
||||
"""Update the pyproject.toml of the Crew project to use uv."""
|
||||
update_crew()
|
||||
|
||||
|
||||
@crewai.command()
|
||||
def login() -> None:
|
||||
def login():
|
||||
"""Sign Up/Login to CrewAI AMP."""
|
||||
Settings().clear_user_settings()
|
||||
AuthenticationCommand().login()
|
||||
|
||||
|
||||
@crewai.command()
|
||||
@click.option(
|
||||
"--reset", is_flag=True, help="Also reset all CLI configuration to defaults"
|
||||
)
|
||||
def logout(reset: bool) -> None:
|
||||
"""Logout from CrewAI AMP."""
|
||||
settings = Settings()
|
||||
if reset:
|
||||
settings.reset()
|
||||
click.echo("Successfully logged out and reset all CLI configuration.")
|
||||
else:
|
||||
TokenManager().clear_tokens()
|
||||
settings.clear_user_settings()
|
||||
click.echo("Successfully logged out from CrewAI AMP.")
|
||||
|
||||
|
||||
# DEPLOY CREWAI+ COMMANDS
|
||||
@crewai.group()
|
||||
def deploy() -> None:
|
||||
def deploy():
|
||||
"""Deploy the Crew CLI group."""
|
||||
|
||||
|
||||
@deploy.command(name="create")
|
||||
@click.option("-y", "--yes", is_flag=True, help="Skip the confirmation prompt")
|
||||
def deploy_create(yes: bool) -> None:
|
||||
def deploy_create(yes: bool):
|
||||
"""Create a Crew deployment."""
|
||||
deploy_cmd = DeployCommand()
|
||||
deploy_cmd.create_crew(yes)
|
||||
|
||||
|
||||
@deploy.command(name="list")
|
||||
def deploy_list() -> None:
|
||||
def deploy_list():
|
||||
"""List all deployments."""
|
||||
deploy_cmd = DeployCommand()
|
||||
deploy_cmd.list_crews()
|
||||
@@ -413,7 +394,7 @@ def deploy_list() -> None:
|
||||
|
||||
@deploy.command(name="push")
|
||||
@click.option("-u", "--uuid", type=str, help="Crew UUID parameter")
|
||||
def deploy_push(uuid: str | None) -> None:
|
||||
def deploy_push(uuid: str | None):
|
||||
"""Deploy the Crew."""
|
||||
deploy_cmd = DeployCommand()
|
||||
deploy_cmd.deploy(uuid=uuid)
|
||||
@@ -421,7 +402,7 @@ def deploy_push(uuid: str | None) -> None:
|
||||
|
||||
@deploy.command(name="status")
|
||||
@click.option("-u", "--uuid", type=str, help="Crew UUID parameter")
|
||||
def deply_status(uuid: str | None) -> None:
|
||||
def deply_status(uuid: str | None):
|
||||
"""Get the status of a deployment."""
|
||||
deploy_cmd = DeployCommand()
|
||||
deploy_cmd.get_crew_status(uuid=uuid)
|
||||
@@ -429,7 +410,7 @@ def deply_status(uuid: str | None) -> None:
|
||||
|
||||
@deploy.command(name="logs")
|
||||
@click.option("-u", "--uuid", type=str, help="Crew UUID parameter")
|
||||
def deploy_logs(uuid: str | None) -> None:
|
||||
def deploy_logs(uuid: str | None):
|
||||
"""Get the logs of a deployment."""
|
||||
deploy_cmd = DeployCommand()
|
||||
deploy_cmd.get_crew_logs(uuid=uuid)
|
||||
@@ -437,27 +418,27 @@ def deploy_logs(uuid: str | None) -> None:
|
||||
|
||||
@deploy.command(name="remove")
|
||||
@click.option("-u", "--uuid", type=str, help="Crew UUID parameter")
|
||||
def deploy_remove(uuid: str | None) -> None:
|
||||
def deploy_remove(uuid: str | None):
|
||||
"""Remove a deployment."""
|
||||
deploy_cmd = DeployCommand()
|
||||
deploy_cmd.remove_crew(uuid=uuid)
|
||||
|
||||
|
||||
@crewai.group()
|
||||
def tool() -> None:
|
||||
def tool():
|
||||
"""Tool Repository related commands."""
|
||||
|
||||
|
||||
@tool.command(name="create")
|
||||
@click.argument("handle")
|
||||
def tool_create(handle: str) -> None:
|
||||
def tool_create(handle: str):
|
||||
tool_cmd = ToolCommand()
|
||||
tool_cmd.create(handle)
|
||||
|
||||
|
||||
@tool.command(name="install")
|
||||
@click.argument("handle")
|
||||
def tool_install(handle: str) -> None:
|
||||
def tool_install(handle: str):
|
||||
tool_cmd = ToolCommand()
|
||||
tool_cmd.login()
|
||||
tool_cmd.install(handle)
|
||||
@@ -473,26 +454,26 @@ def tool_install(handle: str) -> None:
|
||||
)
|
||||
@click.option("--public", "is_public", flag_value=True, default=False)
|
||||
@click.option("--private", "is_public", flag_value=False)
|
||||
def tool_publish(is_public: bool, force: bool) -> None:
|
||||
def tool_publish(is_public: bool, force: bool):
|
||||
tool_cmd = ToolCommand()
|
||||
tool_cmd.login()
|
||||
tool_cmd.publish(is_public, force)
|
||||
|
||||
|
||||
@crewai.group()
|
||||
def flow() -> None:
|
||||
def flow():
|
||||
"""Flow related commands."""
|
||||
|
||||
|
||||
@flow.command(name="kickoff")
|
||||
def flow_run() -> None:
|
||||
def flow_run():
|
||||
"""Kickoff the Flow."""
|
||||
click.echo("Running the Flow")
|
||||
kickoff_flow()
|
||||
|
||||
|
||||
@flow.command(name="plot")
|
||||
def flow_plot() -> None:
|
||||
def flow_plot():
|
||||
"""Plot the Flow."""
|
||||
click.echo("Plotting the Flow")
|
||||
plot_flow()
|
||||
@@ -500,19 +481,19 @@ def flow_plot() -> None:
|
||||
|
||||
@flow.command(name="add-crew")
|
||||
@click.argument("crew_name")
|
||||
def flow_add_crew(crew_name: str) -> None:
|
||||
def flow_add_crew(crew_name):
|
||||
"""Add a crew to an existing flow."""
|
||||
click.echo(f"Adding crew {crew_name} to the flow")
|
||||
add_crew_to_flow(crew_name)
|
||||
|
||||
|
||||
@crewai.group()
|
||||
def triggers() -> None:
|
||||
def triggers():
|
||||
"""Trigger related commands. Use 'crewai triggers list' to see available triggers, or 'crewai triggers run app_slug/trigger_slug' to execute."""
|
||||
|
||||
|
||||
@triggers.command(name="list")
|
||||
def triggers_list() -> None:
|
||||
def triggers_list():
|
||||
"""List all available triggers from integrations."""
|
||||
triggers_cmd = TriggersCommand()
|
||||
triggers_cmd.list_triggers()
|
||||
@@ -520,14 +501,14 @@ def triggers_list() -> None:
|
||||
|
||||
@triggers.command(name="run")
|
||||
@click.argument("trigger_path")
|
||||
def triggers_run(trigger_path: str) -> None:
|
||||
def triggers_run(trigger_path: str):
|
||||
"""Execute crew with trigger payload. Format: app_slug/trigger_slug"""
|
||||
triggers_cmd = TriggersCommand()
|
||||
triggers_cmd.execute_with_trigger(trigger_path)
|
||||
|
||||
|
||||
@crewai.command()
|
||||
def chat() -> None:
|
||||
def chat():
|
||||
"""
|
||||
Start a conversation with the Crew, collecting user-supplied inputs,
|
||||
and using the Chat LLM to generate responses.
|
||||
@@ -540,12 +521,12 @@ def chat() -> None:
|
||||
|
||||
|
||||
@crewai.group(invoke_without_command=True)
|
||||
def org() -> None:
|
||||
def org():
|
||||
"""Organization management commands."""
|
||||
|
||||
|
||||
@org.command("list")
|
||||
def org_list() -> None:
|
||||
def org_list():
|
||||
"""List available organizations."""
|
||||
org_command = OrganizationCommand()
|
||||
org_command.list()
|
||||
@@ -553,39 +534,39 @@ def org_list() -> None:
|
||||
|
||||
@org.command()
|
||||
@click.argument("id")
|
||||
def switch(id: str) -> None:
|
||||
def switch(id):
|
||||
"""Switch to a specific organization."""
|
||||
org_command = OrganizationCommand()
|
||||
org_command.switch(id)
|
||||
|
||||
|
||||
@org.command()
|
||||
def current() -> None:
|
||||
def current():
|
||||
"""Show current organization when 'crewai org' is called without subcommands."""
|
||||
org_command = OrganizationCommand()
|
||||
org_command.current()
|
||||
|
||||
|
||||
@crewai.group()
|
||||
def enterprise() -> None:
|
||||
def enterprise():
|
||||
"""Enterprise Configuration commands."""
|
||||
|
||||
|
||||
@enterprise.command("configure")
|
||||
@click.argument("enterprise_url")
|
||||
def enterprise_configure(enterprise_url: str) -> None:
|
||||
def enterprise_configure(enterprise_url: str):
|
||||
"""Configure CrewAI AMP OAuth2 settings from the provided Enterprise URL."""
|
||||
enterprise_command = EnterpriseConfigureCommand()
|
||||
enterprise_command.configure(enterprise_url)
|
||||
|
||||
|
||||
@crewai.group()
|
||||
def config() -> None:
|
||||
def config():
|
||||
"""CLI Configuration commands."""
|
||||
|
||||
|
||||
@config.command("list")
|
||||
def config_list() -> None:
|
||||
def config_list():
|
||||
"""List all CLI configuration parameters."""
|
||||
config_command = SettingsCommand()
|
||||
config_command.list()
|
||||
@@ -594,26 +575,26 @@ def config_list() -> None:
|
||||
@config.command("set")
|
||||
@click.argument("key")
|
||||
@click.argument("value")
|
||||
def config_set(key: str, value: str) -> None:
|
||||
def config_set(key: str, value: str):
|
||||
"""Set a CLI configuration parameter."""
|
||||
config_command = SettingsCommand()
|
||||
config_command.set(key, value)
|
||||
|
||||
|
||||
@config.command("reset")
|
||||
def config_reset() -> None:
|
||||
def config_reset():
|
||||
"""Reset all CLI configuration parameters to default values."""
|
||||
config_command = SettingsCommand()
|
||||
config_command.reset_all_settings()
|
||||
|
||||
|
||||
@crewai.group()
|
||||
def env() -> None:
|
||||
def env():
|
||||
"""Environment variable commands."""
|
||||
|
||||
|
||||
@env.command("view")
|
||||
def env_view() -> None:
|
||||
def env_view():
|
||||
"""View tracing-related environment variables."""
|
||||
import os
|
||||
from pathlib import Path
|
||||
@@ -691,12 +672,12 @@ def env_view() -> None:
|
||||
|
||||
|
||||
@crewai.group()
|
||||
def traces() -> None:
|
||||
def traces():
|
||||
"""Trace collection management commands."""
|
||||
|
||||
|
||||
@traces.command("enable")
|
||||
def traces_enable() -> None:
|
||||
def traces_enable():
|
||||
"""Enable trace collection for crew/flow executions."""
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
@@ -719,7 +700,7 @@ def traces_enable() -> None:
|
||||
|
||||
|
||||
@traces.command("disable")
|
||||
def traces_disable() -> None:
|
||||
def traces_disable():
|
||||
"""Disable trace collection for crew/flow executions."""
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
@@ -742,7 +723,7 @@ def traces_disable() -> None:
|
||||
|
||||
|
||||
@traces.command("status")
|
||||
def traces_status() -> None:
|
||||
def traces_status():
|
||||
"""Show current trace collection status."""
|
||||
import os
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import click
|
||||
from crewai.telemetry import Telemetry
|
||||
|
||||
|
||||
def create_flow(name: str) -> None:
|
||||
def create_flow(name):
|
||||
"""Create a new flow."""
|
||||
folder_name = name.replace(" ", "_").replace("-", "_").lower()
|
||||
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
|
||||
@@ -49,7 +49,7 @@ def create_flow(name: str) -> None:
|
||||
"poem_crew",
|
||||
]
|
||||
|
||||
def process_file(src_file: Path, dst_file: Path) -> None:
|
||||
def process_file(src_file, dst_file):
|
||||
if src_file.suffix in [".pyc", ".pyo", ".pyd"]:
|
||||
return
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
|
||||
A class to handle deployment-related operations for CrewAI projects.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the DeployCommand with project name and API client.
|
||||
"""
|
||||
@@ -67,7 +67,7 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
|
||||
Args:
|
||||
uuid (Optional[str]): The UUID of the crew to deploy.
|
||||
"""
|
||||
self._telemetry.start_deployment_span(uuid)
|
||||
self._start_deployment_span = self._telemetry.start_deployment_span(uuid)
|
||||
console.print("Starting deployment...", style="bold blue")
|
||||
if uuid:
|
||||
response = self.plus_api_client.deploy_by_uuid(uuid)
|
||||
@@ -84,7 +84,9 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
|
||||
"""
|
||||
Create a new crew deployment.
|
||||
"""
|
||||
self._telemetry.create_crew_deployment_span()
|
||||
self._create_crew_deployment_span = (
|
||||
self._telemetry.create_crew_deployment_span()
|
||||
)
|
||||
console.print("Creating deployment...", style="bold blue")
|
||||
env_vars = fetch_and_json_env_file()
|
||||
|
||||
@@ -234,7 +236,7 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
|
||||
uuid (Optional[str]): The UUID of the crew to get logs for.
|
||||
log_type (str): The type of logs to retrieve (default: "deployment").
|
||||
"""
|
||||
self._telemetry.get_crew_logs_span(uuid, log_type)
|
||||
self._get_crew_logs_span = self._telemetry.get_crew_logs_span(uuid, log_type)
|
||||
console.print(f"Fetching {log_type} logs...", style="bold blue")
|
||||
|
||||
if uuid:
|
||||
@@ -255,7 +257,7 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
|
||||
Args:
|
||||
uuid (Optional[str]): The UUID of the crew to remove.
|
||||
"""
|
||||
self._telemetry.remove_crew_span(uuid)
|
||||
self._remove_crew_span = self._telemetry.remove_crew_span(uuid)
|
||||
console.print("Removing deployment...", style="bold blue")
|
||||
|
||||
if uuid:
|
||||
|
||||
@@ -16,7 +16,7 @@ class TriggersCommand(BaseCommand, PlusAPIMixin):
|
||||
A class to handle trigger-related operations for CrewAI projects.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(self):
|
||||
BaseCommand.__init__(self)
|
||||
PlusAPIMixin.__init__(self, telemetry=self._telemetry)
|
||||
|
||||
|
||||
@@ -1315,7 +1315,25 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
context = self._pending_feedback_context
|
||||
emit = context.emit
|
||||
default_outcome = context.default_outcome
|
||||
llm = context.llm
|
||||
|
||||
# Try to get the live LLM from the re-imported decorator instead of the
|
||||
# serialized string. When a flow pauses for HITL and resumes (possibly in
|
||||
# a different process), context.llm only contains a model string like
|
||||
# 'gemini/gemini-3-flash-preview'. This loses credentials, project,
|
||||
# location, safety_settings, and client_params. By looking up the method
|
||||
# on the re-imported flow class, we can retrieve the fully-configured LLM
|
||||
# that was passed to the @human_feedback decorator.
|
||||
llm = context.llm # fallback to serialized string
|
||||
method = self._methods.get(FlowMethodName(context.method_name))
|
||||
if method is not None:
|
||||
live_llm = getattr(method, "_hf_llm", None)
|
||||
if live_llm is not None:
|
||||
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
|
||||
|
||||
# Only use live LLM if it's a BaseLLM instance (not a string)
|
||||
# String values offer no benefit over the serialized context.llm
|
||||
if isinstance(live_llm, BaseLLMClass):
|
||||
llm = live_llm
|
||||
|
||||
# Determine outcome
|
||||
collapsed_outcome: str | None = None
|
||||
|
||||
@@ -75,6 +75,7 @@ class FlowMethod(Generic[P, R]):
|
||||
"__is_router__",
|
||||
"__router_paths__",
|
||||
"__human_feedback_config__",
|
||||
"_hf_llm", # Live LLM object for HITL resume
|
||||
]:
|
||||
if hasattr(meth, attr):
|
||||
setattr(self, attr, getattr(meth, attr))
|
||||
|
||||
@@ -572,6 +572,14 @@ def human_feedback(
|
||||
wrapper.__is_router__ = True
|
||||
wrapper.__router_paths__ = list(emit)
|
||||
|
||||
# Stash the live LLM object for HITL resume to retrieve.
|
||||
# When a flow pauses for human feedback and later resumes (possibly in a
|
||||
# different process), the serialized context only contains a model string.
|
||||
# By storing the original LLM on the wrapper, resume_async can retrieve
|
||||
# the fully-configured LLM (with credentials, project, safety_settings, etc.)
|
||||
# instead of creating a bare LLM from just the model string.
|
||||
wrapper._hf_llm = llm
|
||||
|
||||
return wrapper # type: ignore[no-any-return]
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -1216,3 +1216,275 @@ class TestAsyncHumanFeedbackEdgeCases:
|
||||
|
||||
assert flow.last_human_feedback.outcome == "approved"
|
||||
assert flow.last_human_feedback.feedback == ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tests for _hf_llm attribute and live LLM resolution on resume
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestLiveLLMPreservationOnResume:
|
||||
"""Tests for preserving the full LLM config across HITL resume."""
|
||||
|
||||
def test_hf_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
|
||||
"""Test that _hf_llm is set on the wrapper when llm is a BaseLLM instance."""
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
# Create a mock BaseLLM object
|
||||
mock_llm = MagicMock(spec=BaseLLM)
|
||||
mock_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=mock_llm,
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm is mock_llm
|
||||
|
||||
def test_hf_llm_attribute_set_on_wrapper_with_string(self) -> None:
|
||||
"""Test that _hf_llm is set on the wrapper even when llm is a string."""
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm == "gpt-4o-mini"
|
||||
|
||||
@patch("crewai.flow.flow.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_live_basellm_over_serialized_string(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that resume_async uses the live BaseLLM from decorator instead of serialized string.
|
||||
|
||||
This is the main bug fix: when a flow resumes, it should use the fully-configured
|
||||
LLM from the re-imported decorator (with credentials, project, etc.) instead of
|
||||
creating a new LLM from just the model string.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
# Create a mock BaseLLM with full config (simulating Gemini with service account)
|
||||
live_llm = MagicMock(spec=BaseLLM)
|
||||
live_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
result_path: str = ""
|
||||
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm=live_llm, # Full LLM object with credentials
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
@listen("approved")
|
||||
def handle_approved(self):
|
||||
self.result_path = "approved"
|
||||
return "Approved!"
|
||||
|
||||
# Save pending feedback with just a model STRING (simulating serialization)
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="live-llm-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gemini/gemini-3-flash", # Serialized string, NOT the live object
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="live-llm-test",
|
||||
context=context,
|
||||
state_data={"id": "live-llm-test"},
|
||||
)
|
||||
|
||||
# Restore flow - this re-imports the class with the live LLM
|
||||
flow = TestFlow.from_pending("live-llm-test", persistence)
|
||||
|
||||
# Mock _collapse_to_outcome to capture what LLM it receives
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# The key assertion: _collapse_to_outcome received the LIVE BaseLLM object,
|
||||
# NOT the serialized string. The live_llm was captured at class definition
|
||||
# time and stored on the method wrapper as _hf_llm.
|
||||
assert len(captured_llm) == 1
|
||||
# Verify it's the same object that was passed to the decorator
|
||||
# (which is stored on the method's _hf_llm attribute)
|
||||
method = flow._methods.get("review")
|
||||
assert method is not None
|
||||
assert captured_llm[0] is method._hf_llm
|
||||
# And verify it's a BaseLLM instance, not a string
|
||||
assert isinstance(captured_llm[0], BaseLLM)
|
||||
|
||||
@patch("crewai.flow.flow.crewai_event_bus.emit")
|
||||
def test_resume_async_falls_back_to_serialized_string_when_no_hf_llm(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that resume_async falls back to context.llm when _hf_llm is not available.
|
||||
|
||||
This ensures backward compatibility with flows that were paused before this fix.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
# Save pending feedback
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="fallback-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="fallback-test",
|
||||
context=context,
|
||||
state_data={"id": "fallback-test"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("fallback-test", persistence)
|
||||
|
||||
# Remove _hf_llm to simulate old decorator without this attribute
|
||||
method = flow._methods.get("review")
|
||||
if hasattr(method, "_hf_llm"):
|
||||
delattr(method, "_hf_llm")
|
||||
|
||||
# Mock _collapse_to_outcome to capture what LLM it receives
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# Should fall back to the serialized string
|
||||
assert len(captured_llm) == 1
|
||||
assert captured_llm[0] == "gpt-4o-mini"
|
||||
|
||||
@patch("crewai.flow.flow.crewai_event_bus.emit")
|
||||
def test_resume_async_uses_string_from_context_when_hf_llm_is_string(
|
||||
self, mock_emit: MagicMock
|
||||
) -> None:
|
||||
"""Test that when _hf_llm is a string (not BaseLLM), we still use context.llm.
|
||||
|
||||
String LLM values offer no benefit over the serialized context.llm,
|
||||
so we don't prefer them.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = os.path.join(tmpdir, "test_flows.db")
|
||||
persistence = SQLiteFlowPersistence(db_path)
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini", # String LLM
|
||||
)
|
||||
def review(self):
|
||||
return "content"
|
||||
|
||||
# Save pending feedback
|
||||
context = PendingFeedbackContext(
|
||||
flow_id="string-llm-test",
|
||||
flow_class="TestFlow",
|
||||
method_name="review",
|
||||
method_output="content",
|
||||
message="Approve?",
|
||||
emit=["approved", "rejected"],
|
||||
llm="gpt-4o-mini",
|
||||
)
|
||||
persistence.save_pending_feedback(
|
||||
flow_uuid="string-llm-test",
|
||||
context=context,
|
||||
state_data={"id": "string-llm-test"},
|
||||
)
|
||||
|
||||
flow = TestFlow.from_pending("string-llm-test", persistence)
|
||||
|
||||
# Verify _hf_llm is a string
|
||||
method = flow._methods.get("review")
|
||||
assert method._hf_llm == "gpt-4o-mini"
|
||||
|
||||
# Mock _collapse_to_outcome to capture what LLM it receives
|
||||
captured_llm = []
|
||||
|
||||
def capture_llm(feedback, outcomes, llm):
|
||||
captured_llm.append(llm)
|
||||
return "approved"
|
||||
|
||||
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
|
||||
flow.resume("looks good!")
|
||||
|
||||
# Should use context.llm since _hf_llm is a string (not BaseLLM)
|
||||
assert len(captured_llm) == 1
|
||||
assert captured_llm[0] == "gpt-4o-mini"
|
||||
|
||||
def test_hf_llm_set_for_async_wrapper(self) -> None:
|
||||
"""Test that _hf_llm is set on async wrapper functions."""
|
||||
import asyncio
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
|
||||
mock_llm = MagicMock(spec=BaseLLM)
|
||||
mock_llm.model = "gemini/gemini-3-flash"
|
||||
|
||||
class TestFlow(Flow):
|
||||
@start()
|
||||
@human_feedback(
|
||||
message="Review:",
|
||||
emit=["approved", "rejected"],
|
||||
llm=mock_llm,
|
||||
)
|
||||
async def async_review(self):
|
||||
return "content"
|
||||
|
||||
flow = TestFlow()
|
||||
method = flow._methods.get("async_review")
|
||||
assert method is not None
|
||||
assert hasattr(method, "_hf_llm")
|
||||
assert method._hf_llm is mock_llm
|
||||
|
||||
Reference in New Issue
Block a user