import os from typing import Any from urllib.parse import urljoin import httpx from crewai_cli.config import Settings from crewai_cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL from crewai_cli.version import get_crewai_version class PlusAPI: """ This class exposes methods for working with the CrewAI+ API. """ TOOLS_RESOURCE = "/crewai_plus/api/v1/tools" ORGANIZATIONS_RESOURCE = "/crewai_plus/api/v1/me/organizations" CREWS_RESOURCE = "/crewai_plus/api/v1/crews" AGENTS_RESOURCE = "/crewai_plus/api/v1/agents" TRACING_RESOURCE = "/crewai_plus/api/v1/tracing" EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral" INTEGRATIONS_RESOURCE = "/crewai_plus/api/v1/integrations" def __init__(self, api_key: str | None = None) -> None: self.api_key = api_key self.headers = { "Content-Type": "application/json", "User-Agent": f"CrewAI-CLI/{get_crewai_version()}", "X-Crewai-Version": get_crewai_version(), } settings = Settings() if settings.org_uuid: self.headers["X-Crewai-Organization-Id"] = settings.org_uuid if api_key: self.headers["Authorization"] = f"Bearer {api_key}" self.base_url = ( os.getenv("CREWAI_PLUS_URL") or str(settings.enterprise_base_url) or DEFAULT_CREWAI_ENTERPRISE_URL ) def _make_request( self, method: str, endpoint: str, **kwargs: Any ) -> httpx.Response: url = urljoin(self.base_url, endpoint) verify = kwargs.pop("verify", True) with httpx.Client(trust_env=False, verify=verify) as client: return client.request(method, url, headers=self.headers, **kwargs) def login_to_tool_repository( self, user_identifier: str | None = None ) -> httpx.Response: payload = {} if user_identifier: payload["user_identifier"] = user_identifier return self._make_request("POST", f"{self.TOOLS_RESOURCE}/login", json=payload) def get_tool(self, handle: str) -> httpx.Response: return self._make_request("GET", f"{self.TOOLS_RESOURCE}/{handle}") async def get_agent(self, handle: str) -> httpx.Response: url = urljoin(self.base_url, f"{self.AGENTS_RESOURCE}/{handle}") async with httpx.AsyncClient() as client: return await client.get(url, headers=self.headers) def publish_tool( self, handle: str, is_public: bool, version: str, description: str | None, encoded_file: str, available_exports: list[dict[str, Any]] | None = None, tools_metadata: list[dict[str, Any]] | None = None, ) -> httpx.Response: params = { "handle": handle, "public": is_public, "version": version, "file": encoded_file, "description": description, "available_exports": available_exports, "tools_metadata": {"package": handle, "tools": tools_metadata} if tools_metadata is not None else None, } return self._make_request("POST", f"{self.TOOLS_RESOURCE}", json=params) def deploy_by_name(self, project_name: str) -> httpx.Response: return self._make_request( "POST", f"{self.CREWS_RESOURCE}/by-name/{project_name}/deploy" ) def deploy_by_uuid(self, uuid: str) -> httpx.Response: return self._make_request("POST", f"{self.CREWS_RESOURCE}/{uuid}/deploy") def crew_status_by_name(self, project_name: str) -> httpx.Response: return self._make_request( "GET", f"{self.CREWS_RESOURCE}/by-name/{project_name}/status" ) def crew_status_by_uuid(self, uuid: str) -> httpx.Response: return self._make_request("GET", f"{self.CREWS_RESOURCE}/{uuid}/status") def crew_by_name( self, project_name: str, log_type: str = "deployment" ) -> httpx.Response: return self._make_request( "GET", f"{self.CREWS_RESOURCE}/by-name/{project_name}/logs/{log_type}" ) def crew_by_uuid(self, uuid: str, log_type: str = "deployment") -> httpx.Response: return self._make_request( "GET", f"{self.CREWS_RESOURCE}/{uuid}/logs/{log_type}" ) def delete_crew_by_name(self, project_name: str) -> httpx.Response: return self._make_request( "DELETE", f"{self.CREWS_RESOURCE}/by-name/{project_name}" ) def delete_crew_by_uuid(self, uuid: str) -> httpx.Response: return self._make_request("DELETE", f"{self.CREWS_RESOURCE}/{uuid}") def list_crews(self) -> httpx.Response: return self._make_request("GET", self.CREWS_RESOURCE) def create_crew(self, payload: dict[str, Any]) -> httpx.Response: return self._make_request("POST", self.CREWS_RESOURCE, json=payload) def get_organizations(self) -> httpx.Response: return self._make_request("GET", self.ORGANIZATIONS_RESOURCE) def initialize_trace_batch(self, payload: dict[str, Any]) -> httpx.Response: return self._make_request( "POST", f"{self.TRACING_RESOURCE}/batches", json=payload, timeout=30, ) def initialize_ephemeral_trace_batch( self, payload: dict[str, Any] ) -> httpx.Response: return self._make_request( "POST", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches", json=payload, ) def send_trace_events( self, trace_batch_id: str, payload: dict[str, Any] ) -> httpx.Response: return self._make_request( "POST", f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/events", json=payload, timeout=30, ) def send_ephemeral_trace_events( self, trace_batch_id: str, payload: dict[str, Any] ) -> httpx.Response: return self._make_request( "POST", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/events", json=payload, timeout=30, ) def finalize_trace_batch( self, trace_batch_id: str, payload: dict[str, Any] ) -> httpx.Response: return self._make_request( "PATCH", f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize", json=payload, timeout=30, ) def finalize_ephemeral_trace_batch( self, trace_batch_id: str, payload: dict[str, Any] ) -> httpx.Response: return self._make_request( "PATCH", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/finalize", json=payload, timeout=30, ) def mark_trace_batch_as_failed( self, trace_batch_id: str, error_message: str ) -> httpx.Response: return self._make_request( "PATCH", f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}", json={"status": "failed", "failure_reason": error_message}, timeout=30, ) def mark_ephemeral_trace_batch_as_failed( self, trace_batch_id: str, error_message: str ) -> httpx.Response: return self._make_request( "PATCH", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}", json={"status": "failed", "failure_reason": error_message}, timeout=30, ) def get_mcp_configs(self, slugs: list[str]) -> httpx.Response: """Get MCP server configurations for the given slugs.""" return self._make_request( "GET", f"{self.INTEGRATIONS_RESOURCE}/mcp_configs", params={"slugs": ",".join(slugs)}, timeout=30, ) def get_triggers(self) -> httpx.Response: """Get all available triggers from integrations.""" return self._make_request("GET", f"{self.INTEGRATIONS_RESOURCE}/apps") def get_trigger_payload(self, app_slug: str, trigger_slug: str) -> httpx.Response: """Get sample payload for a specific trigger.""" return self._make_request( "GET", f"{self.INTEGRATIONS_RESOURCE}/{app_slug}/{trigger_slug}/payload" )