feat: introduce trigger listing and execution commands for local development (#3643)

This commit is contained in:
Lucas Gomide
2025-10-06 11:20:49 -03:00
committed by GitHub
parent 29a0ac483f
commit 8b9186311f
9 changed files with 398 additions and 5 deletions

View File

@@ -1,6 +1,6 @@
from importlib.metadata import version as get_version
import os
import subprocess
from importlib.metadata import version as get_version
import click
@@ -28,6 +28,7 @@ from .reset_memories_command import reset_memories_command
from .run_crew import run_crew
from .tools.main import ToolCommand
from .train_crew import train_crew
from .triggers.main import TriggersCommand
from .update_crew import update_crew
@@ -392,6 +393,26 @@ def flow_add_crew(crew_name):
add_crew_to_flow(crew_name)
@crewai.group()
def triggers():
"""Trigger related commands. Use 'crewai triggers list' to see available triggers, or 'crewai triggers run app_slug/trigger_slug' to execute."""
@triggers.command(name="list")
def triggers_list():
"""List all available triggers from integrations."""
triggers_cmd = TriggersCommand()
triggers_cmd.list_triggers()
@triggers.command(name="run")
@click.argument("trigger_path")
def triggers_run(trigger_path: str):
"""Execute crew with trigger payload. Format: app_slug/trigger_slug"""
triggers_cmd = TriggersCommand()
triggers_cmd.execute_with_trigger(trigger_path)
@crewai.command()
def chat():
"""

View File

@@ -18,6 +18,7 @@ class PlusAPI:
AGENTS_RESOURCE = "/crewai_plus/api/v1/agents"
TRACING_RESOURCE = "/crewai_plus/api/v1/tracing"
EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral"
INTEGRATIONS_RESOURCE = "/crewai_plus/api/v1/integrations"
def __init__(self, api_key: str) -> None:
self.api_key = api_key
@@ -176,3 +177,13 @@ class PlusAPI:
json={"status": "failed", "failure_reason": error_message},
timeout=30,
)
def get_triggers(self) -> requests.Response:
"""Get all available triggers from integrations."""
return self._make_request("GET", f"{self.INTEGRATIONS_RESOURCE}/apps")
def get_trigger_payload(self, app_slug: str, trigger_slug: str) -> requests.Response:
"""Get sample payload for a specific trigger."""
return self._make_request(
"GET", f"{self.INTEGRATIONS_RESOURCE}/{app_slug}/{trigger_slug}/payload"
)

View File

@@ -21,7 +21,7 @@ def run():
'topic': 'AI LLMs',
'current_year': str(datetime.now().year)
}
try:
{{crew_name}}().crew().kickoff(inputs=inputs)
except Exception as e:
@@ -60,9 +60,35 @@ def test():
"topic": "AI LLMs",
"current_year": str(datetime.now().year)
}
try:
{{crew_name}}().crew().test(n_iterations=int(sys.argv[1]), eval_llm=sys.argv[2], inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while testing the crew: {e}")
def run_with_trigger():
"""
Run the crew with trigger payload.
"""
import json
if len(sys.argv) < 2:
raise Exception("No trigger payload provided. Please provide JSON payload as argument.")
try:
trigger_payload = json.loads(sys.argv[1])
except json.JSONDecodeError:
raise Exception("Invalid JSON payload provided as argument")
inputs = {
"crewai_trigger_payload": trigger_payload,
"topic": "",
"current_year": ""
}
try:
result = {{crew_name}}().crew().kickoff(inputs=inputs)
return result
except Exception as e:
raise Exception(f"An error occurred while running the crew with trigger: {e}")

View File

@@ -14,6 +14,7 @@ run_crew = "{{folder_name}}.main:run"
train = "{{folder_name}}.main:train"
replay = "{{folder_name}}.main:replay"
test = "{{folder_name}}.main:test"
run_with_trigger = "{{folder_name}}.main:run_with_trigger"
[build-system]
requires = ["hatchling"]

View File

@@ -16,9 +16,16 @@ class PoemState(BaseModel):
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
def generate_sentence_count(self, crewai_trigger_payload: dict = None):
print("Generating sentence count")
self.state.sentence_count = randint(1, 5)
# Use trigger payload if available
if crewai_trigger_payload:
# Example: use trigger data to influence sentence count
self.state.sentence_count = crewai_trigger_payload.get('sentence_count', randint(1, 5))
print(f"Using trigger payload: {crewai_trigger_payload}")
else:
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
@@ -49,5 +56,32 @@ def plot():
poem_flow.plot()
def run_with_trigger():
"""
Run the flow with trigger payload.
"""
import json
import sys
# Get trigger payload from command line argument
if len(sys.argv) < 2:
raise Exception("No trigger payload provided. Please provide JSON payload as argument.")
try:
trigger_payload = json.loads(sys.argv[1])
except json.JSONDecodeError:
raise Exception("Invalid JSON payload provided as argument")
# Create flow and kickoff with trigger payload
# The @start() methods will automatically receive crewai_trigger_payload parameter
poem_flow = PoemFlow()
try:
result = poem_flow.kickoff({"crewai_trigger_payload": trigger_payload})
return result
except Exception as e:
raise Exception(f"An error occurred while running the flow with trigger: {e}")
if __name__ == "__main__":
kickoff()

View File

@@ -12,6 +12,7 @@ dependencies = [
kickoff = "{{folder_name}}.main:kickoff"
run_crew = "{{folder_name}}.main:kickoff"
plot = "{{folder_name}}.main:plot"
run_with_trigger = "{{folder_name}}.main:run_with_trigger"
[build-system]
requires = ["hatchling"]

View File

@@ -0,0 +1,6 @@
"""Triggers command module for CrewAI CLI."""
from .main import TriggersCommand
__all__ = ["TriggersCommand"]

View File

@@ -0,0 +1,123 @@
import json
import subprocess
from typing import Any
from rich.console import Console
from rich.table import Table
from crewai.cli.command import BaseCommand, PlusAPIMixin
console = Console()
class TriggersCommand(BaseCommand, PlusAPIMixin):
"""
A class to handle trigger-related operations for CrewAI projects.
"""
def __init__(self):
BaseCommand.__init__(self)
PlusAPIMixin.__init__(self, telemetry=self._telemetry)
def list_triggers(self) -> None:
"""List all available triggers from integrations."""
try:
console.print("[bold blue]Fetching available triggers...[/bold blue]")
response = self.plus_api_client.get_triggers()
self._validate_response(response)
triggers_data = response.json()
self._display_triggers(triggers_data)
except Exception as e:
console.print(f"[bold red]Error fetching triggers: {e}[/bold red]")
raise SystemExit(1) from e
def execute_with_trigger(self, trigger_path: str) -> None:
"""Execute crew with trigger payload."""
try:
# Parse app_slug/trigger_slug
if "/" not in trigger_path:
console.print(
"[bold red]Error: Trigger must be in format 'app_slug/trigger_slug'[/bold red]"
)
raise SystemExit(1)
app_slug, trigger_slug = trigger_path.split("/", 1)
console.print(f"[bold blue]Fetching trigger payload for {app_slug}/{trigger_slug}...[/bold blue]")
response = self.plus_api_client.get_trigger_payload(app_slug, trigger_slug)
if response.status_code == 404:
error_data = response.json()
console.print(f"[bold red]Error: {error_data.get('error', 'Trigger not found')}[/bold red]")
raise SystemExit(1)
self._validate_response(response)
trigger_data = response.json()
self._display_trigger_info(trigger_data)
# Run crew with trigger payload
self._run_crew_with_payload(trigger_data.get("sample_payload", {}))
except Exception as e:
console.print(f"[bold red]Error executing crew with trigger: {e}[/bold red]")
raise SystemExit(1) from e
def _display_triggers(self, triggers_data: dict[str, Any]) -> None:
"""Display triggers in a formatted table."""
apps = triggers_data.get("apps", [])
if not apps:
console.print("[yellow]No triggers found.[/yellow]")
return
for app in apps:
app_name = app.get("name", "Unknown App")
app_slug = app.get("slug", "unknown")
is_connected = app.get("is_connected", False)
connection_status = "[green]✓ Connected[/green]" if is_connected else "[red]✗ Not Connected[/red]"
console.print(f"\n[bold cyan]{app_name}[/bold cyan] ({app_slug}) - {connection_status}")
console.print(f"[dim]{app.get('description', 'No description available')}[/dim]")
triggers = app.get("triggers", [])
if triggers:
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Trigger", style="cyan")
table.add_column("Name", style="green")
table.add_column("Description", style="dim")
for trigger in triggers:
trigger_path = f"{app_slug}/{trigger.get('slug', 'unknown')}"
table.add_row(
trigger_path,
trigger.get("name", "Unknown"),
trigger.get("description", "No description")
)
console.print(table)
else:
console.print("[dim] No triggers available[/dim]")
def _display_trigger_info(self, trigger_data: dict[str, Any]) -> None:
"""Display trigger information before execution."""
sample_payload = trigger_data.get("sample_payload", {})
if sample_payload:
console.print("\n[bold yellow]Sample Payload:[/bold yellow]")
console.print(json.dumps(sample_payload, indent=2))
def _run_crew_with_payload(self, payload: dict[str, Any]) -> None:
"""Run the crew with the trigger payload using the run_with_trigger method."""
try:
subprocess.run( # noqa: S603
["uv", "run", "run_with_trigger", json.dumps(payload)], # noqa: S607
capture_output=False,
text=True,
check=True
)
except Exception as e:
raise SystemExit(1) from e

View File

@@ -0,0 +1,170 @@
import json
import subprocess
import unittest
from unittest.mock import Mock, patch
import requests
from crewai.cli.triggers.main import TriggersCommand
class TestTriggersCommand(unittest.TestCase):
@patch("crewai.cli.command.get_auth_token")
@patch("crewai.cli.command.PlusAPI")
def setUp(self, mock_plus_api, mock_get_auth_token):
self.mock_get_auth_token = mock_get_auth_token
self.mock_plus_api = mock_plus_api
self.mock_get_auth_token.return_value = "test_token"
self.triggers_command = TriggersCommand()
self.mock_client = self.triggers_command.plus_api_client
@patch("crewai.cli.triggers.main.console.print")
def test_list_triggers_success(self, mock_console_print):
mock_response = Mock(spec=requests.Response)
mock_response.status_code = 200
mock_response.ok = True
mock_response.json.return_value = {
"apps": [
{
"name": "Test App",
"slug": "test-app",
"description": "A test application",
"is_connected": True,
"triggers": [
{
"name": "Test Trigger",
"slug": "test-trigger",
"description": "A test trigger"
}
]
}
]
}
self.mock_client.get_triggers.return_value = mock_response
self.triggers_command.list_triggers()
self.mock_client.get_triggers.assert_called_once()
mock_console_print.assert_any_call("[bold blue]Fetching available triggers...[/bold blue]")
@patch("crewai.cli.triggers.main.console.print")
def test_list_triggers_no_apps(self, mock_console_print):
mock_response = Mock(spec=requests.Response)
mock_response.status_code = 200
mock_response.ok = True
mock_response.json.return_value = {"apps": []}
self.mock_client.get_triggers.return_value = mock_response
self.triggers_command.list_triggers()
mock_console_print.assert_any_call("[yellow]No triggers found.[/yellow]")
@patch("crewai.cli.triggers.main.console.print")
def test_list_triggers_api_error(self, mock_console_print):
self.mock_client.get_triggers.side_effect = Exception("API Error")
with self.assertRaises(SystemExit):
self.triggers_command.list_triggers()
mock_console_print.assert_any_call("[bold red]Error fetching triggers: API Error[/bold red]")
@patch("crewai.cli.triggers.main.console.print")
def test_execute_with_trigger_invalid_format(self, mock_console_print):
with self.assertRaises(SystemExit):
self.triggers_command.execute_with_trigger("invalid-format")
mock_console_print.assert_called_with(
"[bold red]Error: Trigger must be in format 'app_slug/trigger_slug'[/bold red]"
)
@patch("crewai.cli.triggers.main.console.print")
@patch.object(TriggersCommand, "_run_crew_with_payload")
def test_execute_with_trigger_success(self, mock_run_crew, mock_console_print):
mock_response = Mock(spec=requests.Response)
mock_response.status_code = 200
mock_response.ok = True
mock_response.json.return_value = {
"sample_payload": {"key": "value", "data": "test"}
}
self.mock_client.get_trigger_payload.return_value = mock_response
self.triggers_command.execute_with_trigger("test-app/test-trigger")
self.mock_client.get_trigger_payload.assert_called_once_with("test-app", "test-trigger")
mock_run_crew.assert_called_once_with({"key": "value", "data": "test"})
mock_console_print.assert_any_call(
"[bold blue]Fetching trigger payload for test-app/test-trigger...[/bold blue]"
)
@patch("crewai.cli.triggers.main.console.print")
def test_execute_with_trigger_not_found(self, mock_console_print):
mock_response = Mock(spec=requests.Response)
mock_response.status_code = 404
mock_response.json.return_value = {"error": "Trigger not found"}
self.mock_client.get_trigger_payload.return_value = mock_response
with self.assertRaises(SystemExit):
self.triggers_command.execute_with_trigger("test-app/nonexistent-trigger")
mock_console_print.assert_any_call("[bold red]Error: Trigger not found[/bold red]")
@patch("crewai.cli.triggers.main.console.print")
def test_execute_with_trigger_api_error(self, mock_console_print):
self.mock_client.get_trigger_payload.side_effect = Exception("API Error")
with self.assertRaises(SystemExit):
self.triggers_command.execute_with_trigger("test-app/test-trigger")
mock_console_print.assert_any_call(
"[bold red]Error executing crew with trigger: API Error[/bold red]"
)
@patch("subprocess.run")
def test_run_crew_with_payload_success(self, mock_subprocess):
payload = {"key": "value", "data": "test"}
mock_subprocess.return_value = None
self.triggers_command._run_crew_with_payload(payload)
mock_subprocess.assert_called_once_with(
["uv", "run", "run_with_trigger", json.dumps(payload)],
capture_output=False,
text=True,
check=True
)
@patch("subprocess.run")
def test_run_crew_with_payload_failure(self, mock_subprocess):
payload = {"key": "value"}
mock_subprocess.side_effect = subprocess.CalledProcessError(1, "uv")
with self.assertRaises(SystemExit):
self.triggers_command._run_crew_with_payload(payload)
@patch("subprocess.run")
def test_run_crew_with_payload_empty_payload(self, mock_subprocess):
payload = {}
mock_subprocess.return_value = None
self.triggers_command._run_crew_with_payload(payload)
mock_subprocess.assert_called_once_with(
["uv", "run", "run_with_trigger", "{}"],
capture_output=False,
text=True,
check=True
)
@patch("crewai.cli.triggers.main.console.print")
def test_execute_with_trigger_with_default_error_message(self, mock_console_print):
mock_response = Mock(spec=requests.Response)
mock_response.status_code = 404
mock_response.json.return_value = {}
self.mock_client.get_trigger_payload.return_value = mock_response
with self.assertRaises(SystemExit):
self.triggers_command.execute_with_trigger("test-app/test-trigger")
mock_console_print.assert_any_call("[bold red]Error: Trigger not found[/bold red]")