From 35f693cf68d623066251fca268ab1c64f420bfca Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 20 May 2026 01:43:48 +0800 Subject: [PATCH] chore: tighten typing across plus_api client Adds typed containers for wire payloads, literal aliases for HTTP method and log type, and Ffnal markers on resource constants. Updates upstream returns in project_utils.py and deploy/main.py to match the new contracts. --- lib/cli/src/crewai_cli/deploy/main.py | 5 +- lib/crewai-core/src/crewai_core/plus_api.py | 202 +++++++++++++++--- .../listeners/tracing/trace_batch_manager.py | 59 ++--- .../src/crewai/utilities/project_utils.py | 39 ++-- 4 files changed, 232 insertions(+), 73 deletions(-) diff --git a/lib/cli/src/crewai_cli/deploy/main.py b/lib/cli/src/crewai_cli/deploy/main.py index 606bf1c16..2e44f87a6 100644 --- a/lib/cli/src/crewai_cli/deploy/main.py +++ b/lib/cli/src/crewai_cli/deploy/main.py @@ -1,5 +1,6 @@ from typing import Any +from crewai_core.plus_api import CreateCrewPayload from rich.console import Console from crewai_cli import git @@ -161,7 +162,7 @@ class DeployCommand(BaseCommand, PlusAPIMixin): self, env_vars: dict[str, str], remote_repo_url: str, - ) -> dict[str, Any]: + ) -> CreateCrewPayload: """ Create the payload for crew creation. @@ -172,6 +173,8 @@ class DeployCommand(BaseCommand, PlusAPIMixin): Returns: Dict[str, Any]: The payload for crew creation. """ + if not self.project_name: + raise ValueError("project_name is required to create a deployment payload") return { "deploy": { "name": self.project_name, diff --git a/lib/crewai-core/src/crewai_core/plus_api.py b/lib/crewai-core/src/crewai_core/plus_api.py index 39f34e1b8..9c107f2d3 100644 --- a/lib/crewai-core/src/crewai_core/plus_api.py +++ b/lib/crewai-core/src/crewai_core/plus_api.py @@ -3,36 +3,161 @@ from __future__ import annotations import os -from typing import Any +from typing import Any, Final, Literal, TypedDict, cast from urllib.parse import urljoin import httpx +from typing_extensions import NotRequired from crewai_core.constants import DEFAULT_CREWAI_ENTERPRISE_URL from crewai_core.settings import Settings from crewai_core.version import get_crewai_version +HttpMethod = Literal["GET", "POST", "PATCH", "DELETE"] + + +class AvailableExport(TypedDict): + name: str + + +class EnvVarEntry(TypedDict): + name: str + description: str + required: bool + default: str | None + + +class ToolMetadata(TypedDict): + name: str + module: str + humanized_name: str + description: str + run_params_schema: dict[str, Any] + init_params_schema: dict[str, Any] + env_vars: list[EnvVarEntry] + + +class ToolsMetadataPayload(TypedDict): + package: str + tools: list[ToolMetadata] | None + + +class PublishToolPayload(TypedDict): + handle: str + public: bool + version: str + file: str + description: str | None + available_exports: list[AvailableExport] | None + tools_metadata: ToolsMetadataPayload | None + + +class CrewDeploymentSpec(TypedDict): + name: str + repo_clone_url: str + env: dict[str, str] + + +class CreateCrewPayload(TypedDict): + deploy: CrewDeploymentSpec + + +class _WithUserIdentifier(TypedDict): + user_identifier: NotRequired[str] + + +class LoginPayload(_WithUserIdentifier): + pass + + +class TraceExecutionContext(TypedDict): + crew_fingerprint: str | None + crew_name: str | None + flow_name: str | None + crewai_version: str + privacy_level: str + + +class TraceExecutionMetadata(TypedDict): + expected_duration_estimate: int + agent_count: int + task_count: int + flow_method_count: int + execution_started_at: str + + +class TraceBatchInitPayload(_WithUserIdentifier): + trace_id: str + execution_type: str + execution_context: TraceExecutionContext + execution_metadata: TraceExecutionMetadata + ephemeral_trace_id: NotRequired[str] + + +class TraceBatchMetadata(TypedDict): + events_count: int + batch_sequence: int + is_final_batch: bool + + +class TraceEventsPayload(TypedDict): + events: list[dict[str, Any]] + batch_metadata: TraceBatchMetadata + + +class TraceFinalizePayload(TypedDict): + status: Literal["completed"] + duration_ms: float | None + final_event_count: int + + +class TraceFailedPayload(TypedDict): + status: Literal["failed"] + failure_reason: str + + +Headers = TypedDict( + "Headers", + { + "Content-Type": str, + "User-Agent": str, + "X-Crewai-Version": str, + "Authorization": NotRequired[str], + "X-Crewai-Organization-Id": NotRequired[str], + }, +) + + +class RequestKwargs(TypedDict): + headers: dict[str, str] + json: NotRequired[Any] + params: NotRequired[dict[str, str]] + timeout: NotRequired[float] + + class PlusAPI: """Client for working with the CrewAI+ API.""" - TOOLS_RESOURCE = "/crewai_plus/api/v1/tools" - ORGANIZATIONS_RESOURCE = "/crewai_plus/api/v1/me/organizations" - CREWS_RESOURCE = "/crewai_plus/api/v1/crews" - AGENTS_RESOURCE = "/crewai_plus/api/v1/agents" - TRACING_RESOURCE = "/crewai_plus/api/v1/tracing" - EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral" - INTEGRATIONS_RESOURCE = "/crewai_plus/api/v1/integrations" + TOOLS_RESOURCE: Final = "/crewai_plus/api/v1/tools" + ORGANIZATIONS_RESOURCE: Final = "/crewai_plus/api/v1/me/organizations" + CREWS_RESOURCE: Final = "/crewai_plus/api/v1/crews" + AGENTS_RESOURCE: Final = "/crewai_plus/api/v1/agents" + TRACING_RESOURCE: Final = "/crewai_plus/api/v1/tracing" + EPHEMERAL_TRACING_RESOURCE: Final = "/crewai_plus/api/v1/tracing/ephemeral" + INTEGRATIONS_RESOURCE: Final = "/crewai_plus/api/v1/integrations" def __init__(self, api_key: str | None = None) -> None: + version = get_crewai_version() self.api_key = api_key - self.headers = { + self.headers: Headers = { "Content-Type": "application/json", - "User-Agent": f"CrewAI-CLI/{get_crewai_version()}", - "X-Crewai-Version": get_crewai_version(), + "User-Agent": f"CrewAI-CLI/{version}", + "X-Crewai-Version": version, } if api_key: self.headers["Authorization"] = f"Bearer {api_key}" + settings = Settings() if settings.org_uuid: self.headers["X-Crewai-Organization-Id"] = settings.org_uuid @@ -44,17 +169,30 @@ class PlusAPI: ) def _make_request( - self, method: str, endpoint: str, **kwargs: Any + self, + method: HttpMethod, + endpoint: str, + *, + json: Any = None, + params: dict[str, str] | None = None, + timeout: float | None = None, + verify: bool = True, ) -> httpx.Response: url = urljoin(self.base_url, endpoint) - verify = kwargs.pop("verify", True) + request_kwargs: RequestKwargs = {"headers": cast(dict[str, str], self.headers)} + if json is not None: + request_kwargs["json"] = json + if params is not None: + request_kwargs["params"] = params + if timeout is not None: + request_kwargs["timeout"] = timeout with httpx.Client(trust_env=False, verify=verify) as client: - return client.request(method, url, headers=self.headers, **kwargs) + return client.request(method, url, **request_kwargs) def login_to_tool_repository( self, user_identifier: str | None = None ) -> httpx.Response: - payload = {} + payload: LoginPayload = {} if user_identifier: payload["user_identifier"] = user_identifier return self._make_request("POST", f"{self.TOOLS_RESOURCE}/login", json=payload) @@ -65,7 +203,7 @@ class PlusAPI: async def get_agent(self, handle: str) -> httpx.Response: url = urljoin(self.base_url, f"{self.AGENTS_RESOURCE}/{handle}") async with httpx.AsyncClient() as client: - return await client.get(url, headers=self.headers) + return await client.get(url, headers=cast(dict[str, str], self.headers)) def publish_tool( self, @@ -74,10 +212,10 @@ class PlusAPI: version: str, description: str | None, encoded_file: str, - available_exports: list[dict[str, Any]] | None = None, - tools_metadata: list[dict[str, Any]] | None = None, + available_exports: list[AvailableExport] | None = None, + tools_metadata: list[ToolMetadata] | None = None, ) -> httpx.Response: - params = { + params: PublishToolPayload = { "handle": handle, "public": is_public, "version": version, @@ -129,13 +267,13 @@ class PlusAPI: def list_crews(self) -> httpx.Response: return self._make_request("GET", self.CREWS_RESOURCE) - def create_crew(self, payload: dict[str, Any]) -> httpx.Response: + def create_crew(self, payload: CreateCrewPayload) -> httpx.Response: return self._make_request("POST", self.CREWS_RESOURCE, json=payload) def get_organizations(self) -> httpx.Response: return self._make_request("GET", self.ORGANIZATIONS_RESOURCE) - def initialize_trace_batch(self, payload: dict[str, Any]) -> httpx.Response: + def initialize_trace_batch(self, payload: TraceBatchInitPayload) -> httpx.Response: return self._make_request( "POST", f"{self.TRACING_RESOURCE}/batches", @@ -144,7 +282,7 @@ class PlusAPI: ) def initialize_ephemeral_trace_batch( - self, payload: dict[str, Any] + self, payload: TraceBatchInitPayload ) -> httpx.Response: return self._make_request( "POST", @@ -153,7 +291,7 @@ class PlusAPI: ) def send_trace_events( - self, trace_batch_id: str, payload: dict[str, Any] + self, trace_batch_id: str, payload: TraceEventsPayload ) -> httpx.Response: return self._make_request( "POST", @@ -163,7 +301,7 @@ class PlusAPI: ) def send_ephemeral_trace_events( - self, trace_batch_id: str, payload: dict[str, Any] + self, trace_batch_id: str, payload: TraceEventsPayload ) -> httpx.Response: return self._make_request( "POST", @@ -173,7 +311,7 @@ class PlusAPI: ) def finalize_trace_batch( - self, trace_batch_id: str, payload: dict[str, Any] + self, trace_batch_id: str, payload: TraceFinalizePayload ) -> httpx.Response: return self._make_request( "PATCH", @@ -183,7 +321,7 @@ class PlusAPI: ) def finalize_ephemeral_trace_batch( - self, trace_batch_id: str, payload: dict[str, Any] + self, trace_batch_id: str, payload: TraceFinalizePayload ) -> httpx.Response: return self._make_request( "PATCH", @@ -195,20 +333,28 @@ class PlusAPI: def mark_trace_batch_as_failed( self, trace_batch_id: str, error_message: str ) -> httpx.Response: + payload: TraceFailedPayload = { + "status": "failed", + "failure_reason": error_message, + } return self._make_request( "PATCH", f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}", - json={"status": "failed", "failure_reason": error_message}, + json=payload, timeout=30, ) def mark_ephemeral_trace_batch_as_failed( self, trace_batch_id: str, error_message: str ) -> httpx.Response: + payload: TraceFailedPayload = { + "status": "failed", + "failure_reason": error_message, + } return self._make_request( "PATCH", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}", - json={"status": "failed", "failure_reason": error_message}, + json=payload, timeout=30, ) diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py index 72dbb21a2..0cfe227ac 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_batch_manager.py @@ -6,6 +6,14 @@ import time from typing import Any import uuid +from crewai_core.plus_api import ( + TraceBatchInitPayload, + TraceBatchMetadata, + TraceEventsPayload, + TraceExecutionContext, + TraceExecutionMetadata, + TraceFinalizePayload, +) from crewai_core.settings import Settings from rich.console import Console from rich.panel import Panel @@ -123,25 +131,27 @@ class TraceBatchManager: return None try: - payload = { + execution_context: TraceExecutionContext = { + "crew_fingerprint": execution_metadata.get("crew_fingerprint"), + "crew_name": execution_metadata.get("crew_name", None), + "flow_name": execution_metadata.get("flow_name", None), + "crewai_version": self.current_batch.version, + "privacy_level": user_context.get("privacy_level", "standard"), + } + execution_metadata_payload: TraceExecutionMetadata = { + "expected_duration_estimate": execution_metadata.get( + "expected_duration_estimate", 300 + ), + "agent_count": execution_metadata.get("agent_count", 0), + "task_count": execution_metadata.get("task_count", 0), + "flow_method_count": execution_metadata.get("flow_method_count", 0), + "execution_started_at": datetime.now(timezone.utc).isoformat(), + } + payload: TraceBatchInitPayload = { "trace_id": self.current_batch.batch_id, "execution_type": execution_metadata.get("execution_type", "crew"), - "execution_context": { - "crew_fingerprint": execution_metadata.get("crew_fingerprint"), - "crew_name": execution_metadata.get("crew_name", None), - "flow_name": execution_metadata.get("flow_name", None), - "crewai_version": self.current_batch.version, - "privacy_level": user_context.get("privacy_level", "standard"), - }, - "execution_metadata": { - "expected_duration_estimate": execution_metadata.get( - "expected_duration_estimate", 300 - ), - "agent_count": execution_metadata.get("agent_count", 0), - "task_count": execution_metadata.get("task_count", 0), - "flow_method_count": execution_metadata.get("flow_method_count", 0), - "execution_started_at": datetime.now(timezone.utc).isoformat(), - }, + "execution_context": execution_context, + "execution_metadata": execution_metadata_payload, } if use_ephemeral: payload["ephemeral_trace_id"] = self.current_batch.batch_id @@ -264,13 +274,14 @@ class TraceBatchManager: if not self.plus_api or not self.trace_batch_id or not self.event_buffer: return 500 try: - payload = { + batch_metadata: TraceBatchMetadata = { + "events_count": len(self.event_buffer), + "batch_sequence": 1, + "is_final_batch": False, + } + payload: TraceEventsPayload = { "events": [event.to_dict() for event in self.event_buffer], - "batch_metadata": { - "events_count": len(self.event_buffer), - "batch_sequence": 1, - "is_final_batch": False, - }, + "batch_metadata": batch_metadata, } response = ( @@ -364,7 +375,7 @@ class TraceBatchManager: return try: - payload = { + payload: TraceFinalizePayload = { "status": "completed", "duration_ms": self.calculate_duration("execution"), "final_event_count": events_count, diff --git a/lib/crewai/src/crewai/utilities/project_utils.py b/lib/crewai/src/crewai/utilities/project_utils.py index c22b85a3c..a06bccdaf 100644 --- a/lib/crewai/src/crewai/utilities/project_utils.py +++ b/lib/crewai/src/crewai/utilities/project_utils.py @@ -13,6 +13,7 @@ import sys import types from typing import Any, cast, get_type_hints +from crewai_core.plus_api import AvailableExport, EnvVarEntry, ToolMetadata from crewai_core.project import ( get_project_description as get_project_description, get_project_name as get_project_name, @@ -279,7 +280,7 @@ def is_valid_tool(obj: Any) -> bool: return isinstance(obj, Tool) -def extract_available_exports(dir_path: str = "src") -> list[dict[str, Any]]: +def extract_available_exports(dir_path: str = "src") -> list[AvailableExport]: """Extract available tool classes from the project's __init__.py files. Only includes classes that inherit from BaseTool or functions decorated with @tool. @@ -338,7 +339,7 @@ def _load_module_from_file( sys.modules.pop(module_name, None) -def _load_tools_from_init(init_file: Path) -> list[dict[str, Any]]: +def _load_tools_from_init(init_file: Path) -> list[AvailableExport]: """Load and validate tools from a given __init__.py file.""" try: with _load_module_from_file(init_file) as module: @@ -392,7 +393,7 @@ def _print_no_tools_warning() -> None: ) -def extract_tools_metadata(dir_path: str = "src") -> list[dict[str, Any]]: +def extract_tools_metadata(dir_path: str = "src") -> list[ToolMetadata]: """ Extract rich metadata from tool classes in the project. @@ -404,7 +405,7 @@ def extract_tools_metadata(dir_path: str = "src") -> list[dict[str, Any]]: - init_params_schema: JSON Schema for __init__ params (filtered) - env_vars: List of environment variable dicts """ - tools_metadata: list[dict[str, Any]] = [] + tools_metadata: list[ToolMetadata] = [] for init_file in Path(dir_path).glob("**/__init__.py"): tools = _extract_tool_metadata_from_init(init_file) @@ -413,7 +414,7 @@ def extract_tools_metadata(dir_path: str = "src") -> list[dict[str, Any]]: return tools_metadata -def _extract_tool_metadata_from_init(init_file: Path) -> list[dict[str, Any]]: +def _extract_tool_metadata_from_init(init_file: Path) -> list[ToolMetadata]: """ Load module from init file and extract metadata from valid tool classes. """ @@ -428,7 +429,7 @@ def _extract_tool_metadata_from_init(init_file: Path) -> list[dict[str, Any]]: if not exported_names: return [] - tools_metadata = [] + tools_metadata: list[ToolMetadata] = [] for name in exported_names: obj = getattr(module, name, None) if obj is None or not ( @@ -446,7 +447,7 @@ def _extract_tool_metadata_from_init(init_file: Path) -> list[dict[str, Any]]: return [] -def _extract_single_tool_metadata(tool_class: type) -> dict[str, Any] | None: +def _extract_single_tool_metadata(tool_class: type) -> ToolMetadata | None: """ Extract metadata from a single tool class. """ @@ -470,19 +471,17 @@ def _extract_single_tool_metadata(tool_class: type) -> dict[str, Any] | None: except (TypeError, ValueError): module = tool_class.__module__ - return { - "name": tool_class.__name__, - "module": module, - "humanized_name": _extract_field_default( - fields.get("name"), fallback=tool_class.__name__ + return ToolMetadata( + name=tool_class.__name__, + module=module, + humanized_name=str( + _extract_field_default(fields.get("name"), fallback=tool_class.__name__) ), - "description": str( - _extract_field_default(fields.get("description")) - ).strip(), - "run_params_schema": _extract_run_params_schema(fields.get("args_schema")), - "init_params_schema": _extract_init_params_schema(tool_class), - "env_vars": _extract_env_vars(fields.get("env_vars")), - } + description=str(_extract_field_default(fields.get("description"))).strip(), + run_params_schema=_extract_run_params_schema(fields.get("args_schema")), + init_params_schema=_extract_init_params_schema(tool_class), + env_vars=_extract_env_vars(fields.get("env_vars")), + ) except Exception: return None @@ -597,7 +596,7 @@ def _extract_init_params_schema(tool_class: type) -> dict[str, Any]: return {} -def _extract_env_vars(env_vars_field: dict[str, Any] | None) -> list[dict[str, Any]]: +def _extract_env_vars(env_vars_field: dict[str, Any] | None) -> list[EnvVarEntry]: """ Extract environment variable definitions from env_vars field. """