Compare commits

...

3 Commits

Author SHA1 Message Date
Greyson LaLonde
c1776aca8a feat: add utils for deferred imports 2026-01-13 11:27:02 -05:00
Koushiv
8f99fa76ed feat: additional a2a transports
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: Koushiv Sadhukhan <koushiv.777@gmail.com>
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-01-12 12:03:06 -05:00
GininDenis
17e3fcbe1f fix: unlink task in execution spans
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-01-12 02:58:42 -05:00
6 changed files with 426 additions and 10 deletions

View File

@@ -91,6 +91,10 @@ The `A2AConfig` class accepts the following parameters:
Update mechanism for receiving task status. Options: `StreamingConfig`, `PollingConfig`, or `PushNotificationConfig`.
</ParamField>
<ParamField path="transport_protocol" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
</ParamField>
## Authentication
For A2A agents that require authentication, use one of the provided auth schemes:

View File

@@ -5,7 +5,7 @@ This module is separate from experimental.a2a to avoid circular imports.
from __future__ import annotations
from typing import Annotated, Any, ClassVar
from typing import Annotated, Any, ClassVar, Literal
from pydantic import (
BaseModel,
@@ -53,6 +53,7 @@ class A2AConfig(BaseModel):
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
transport_protocol: A2A transport protocol (grpc, jsonrpc, http+json).
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@@ -82,3 +83,7 @@ class A2AConfig(BaseModel):
default_factory=_get_default_update_config,
description="Update mechanism config",
)
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)

View File

@@ -7,7 +7,7 @@ from collections.abc import AsyncIterator, MutableMapping
from contextlib import asynccontextmanager
from functools import lru_cache
import time
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal
import uuid
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
@@ -18,7 +18,6 @@ from a2a.types import (
PushNotificationConfig as A2APushNotificationConfig,
Role,
TextPart,
TransportProtocol,
)
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
@@ -259,6 +258,7 @@ async def _afetch_agent_card_impl(
def execute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -282,6 +282,23 @@ def execute_a2a_delegation(
use aexecute_a2a_delegation directly.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest)
timeout: Request timeout in seconds
task_description: The task to delegate
context: Optional context information
context_id: Context ID for correlating messages/tasks
task_id: Specific task identifier
reference_task_ids: List of related task IDs
metadata: Additional metadata (external_id, request_id, etc.)
extensions: Protocol extensions for custom fields
conversation_history: Previous Message objects from conversation
agent_id: Agent identifier for logging
agent_role: Role of the CrewAI agent delegating the task
agent_branch: Optional agent tree branch for logging
response_model: Optional Pydantic model for structured outputs
turn_number: Optional turn number for multi-turn conversations
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -323,6 +340,7 @@ def execute_a2a_delegation(
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
transport_protocol=transport_protocol,
turn_number=turn_number,
updates=updates,
)
@@ -333,6 +351,7 @@ def execute_a2a_delegation(
async def aexecute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -356,6 +375,23 @@ async def aexecute_a2a_delegation(
in an async context (e.g., with Crew.akickoff() or agent.aexecute_task()).
Args:
endpoint: A2A agent endpoint URL
transport_protocol: Optional A2A transport protocol (grpc, jsonrpc, http+json)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
task_description: Task to delegate
context: Optional context
context_id: Context ID for correlation
task_id: Specific task identifier
reference_task_ids: Related task IDs
metadata: Additional metadata
extensions: Protocol extensions
conversation_history: Previous Message objects
turn_number: Current turn number
agent_branch: Agent tree branch for logging
agent_id: Agent identifier for logging
agent_role: Agent role for logging
response_model: Optional Pydantic model for structured outputs
endpoint: A2A agent endpoint URL.
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
@@ -414,6 +450,7 @@ async def aexecute_a2a_delegation(
agent_role=agent_role,
response_model=response_model,
updates=updates,
transport_protocol=transport_protocol,
)
crewai_event_bus.emit(
@@ -431,6 +468,7 @@ async def aexecute_a2a_delegation(
async def _aexecute_a2a_delegation_impl(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
auth: AuthScheme | None,
timeout: int,
task_description: str,
@@ -524,7 +562,6 @@ async def _aexecute_a2a_delegation_impl(
extensions=extensions,
)
transport_protocol = TransportProtocol("JSONRPC")
new_messages: list[Message] = [*conversation_history, message]
crewai_event_bus.emit(
None,
@@ -596,7 +633,7 @@ async def _aexecute_a2a_delegation_impl(
@asynccontextmanager
async def _create_a2a_client(
agent_card: AgentCard,
transport_protocol: TransportProtocol,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
timeout: int,
headers: MutableMapping[str, str],
streaming: bool,
@@ -640,7 +677,7 @@ async def _create_a2a_client(
config = ClientConfig(
httpx_client=httpx_client,
supported_transports=[str(transport_protocol.value)],
supported_transports=[transport_protocol],
streaming=streaming and not use_polling,
polling=use_polling,
accepted_output_modes=["application/json"],

View File

@@ -771,6 +771,7 @@ def _delegate_to_a2a(
response_model=agent_config.response_model,
turn_number=turn_num + 1,
updates=agent_config.updates,
transport_protocol=agent_config.transport_protocol,
)
conversation_history = a2a_result.get("history", [])
@@ -1085,6 +1086,7 @@ async def _adelegate_to_a2a(
agent_branch=agent_branch,
response_model=agent_config.response_model,
turn_number=turn_num + 1,
transport_protocol=agent_config.transport_protocol,
updates=agent_config.updates,
)

View File

@@ -209,10 +209,9 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
# Handle telemetry
span = self.execution_spans.get(source)
span = self.execution_spans.pop(source, None)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)
@@ -222,11 +221,10 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
span = self.execution_spans.get(source)
span = self.execution_spans.pop(source, None)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
# Pass task name if it exists
task_name = get_task_name(source)

View File

@@ -0,0 +1,370 @@
"""Lazy loader for Python packages.
Makes it easy to load subpackages and functions on demand.
Pulled from https://github.com/scientific-python/lazy-loader/blob/main/src/lazy_loader/__init__.py,
modernized a little.
"""
import ast
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
import importlib
import importlib.metadata
import importlib.util
import inspect
import os
from pathlib import Path
import sys
import threading
import types
from typing import Any, NoReturn
import warnings
import packaging.requirements
_threadlock = threading.Lock()
@dataclass(frozen=True, slots=True)
class _FrameData:
"""Captured stack frame information for delayed error reporting."""
filename: str
lineno: int
function: str
code_context: Sequence[str] | None
def attach(
package_name: str,
submodules: set[str] | None = None,
submod_attrs: dict[str, list[str]] | None = None,
) -> tuple[Callable[[str], Any], Callable[[], list[str]], list[str]]:
"""Attach lazily loaded submodules, functions, or other attributes.
Replaces a package's `__getattr__`, `__dir__`, and `__all__` such that
imports work normally but occur upon first use.
Example:
__getattr__, __dir__, __all__ = lazy.attach(
__name__, ["mysubmodule"], {"foo": ["someattr"]}
)
Args:
package_name: The package name, typically ``__name__``.
submodules: Set of submodule names to attach.
submod_attrs: Mapping of submodule names to lists of attributes.
These attributes are imported as they are used.
Returns:
A tuple of (__getattr__, __dir__, __all__) to assign in the package.
"""
submod_attrs = submod_attrs or {}
submodules = set(submodules) if submodules else set()
attr_to_modules = {
attr: mod for mod, attrs in submod_attrs.items() for attr in attrs
}
__all__ = sorted(submodules | attr_to_modules.keys())
def __getattr__(name: str) -> Any: # noqa: N807
if name in submodules:
return importlib.import_module(f"{package_name}.{name}")
if name in attr_to_modules:
submod_path = f"{package_name}.{attr_to_modules[name]}"
submod = importlib.import_module(submod_path)
attr = getattr(submod, name)
# If the attribute lives in a file (module) with the same
# name as the attribute, ensure that the attribute and *not*
# the module is accessible on the package.
if name == attr_to_modules[name]:
pkg = sys.modules[package_name]
pkg.__dict__[name] = attr
return attr
raise AttributeError(f"No {package_name} attribute {name}")
def __dir__() -> list[str]: # noqa: N807
return __all__.copy()
if os.environ.get("EAGER_IMPORT"):
for attr in set(attr_to_modules.keys()) | submodules:
__getattr__(attr)
return __getattr__, __dir__, __all__.copy()
class DelayedImportErrorModule(types.ModuleType):
"""Module type that delays raising ModuleNotFoundError until attribute access.
Captures stack frame data to provide helpful error messages showing where
the original import was attempted.
"""
def __init__(
self,
frame_data: _FrameData,
*args: Any,
message: str,
**kwargs: Any,
) -> None:
"""Initialize the delayed error module.
Args:
frame_data: Captured frame information for error reporting.
*args: Positional arguments passed to ModuleType.
message: The error message to display when accessed.
**kwargs: Keyword arguments passed to ModuleType.
"""
self._frame_data = frame_data
self._message = message
super().__init__(*args, **kwargs)
def __getattr__(self, name: str) -> NoReturn:
"""Raise ModuleNotFoundError with detailed context on any attribute access."""
frame = self._frame_data
code = "".join(frame.code_context) if frame.code_context else ""
raise ModuleNotFoundError(
f"{self._message}\n\n"
"This error is lazily reported, having originally occurred in\n"
f" File {frame.filename}, line {frame.lineno}, in {frame.function}\n\n"
f"----> {code.strip()}"
)
def load(
fullname: str,
*,
require: str | None = None,
error_on_import: bool = False,
suppress_warning: bool = False,
) -> types.ModuleType:
"""Return a lazily imported proxy for a module.
The proxy module delays actual import until first attribute access.
Example:
np = lazy.load("numpy")
def myfunc():
np.norm(...)
Warning:
Lazily loading subpackages causes the parent package to be eagerly
loaded. Use `lazy_loader.attach` instead for subpackages.
Args:
fullname: The full name of the module to import (e.g., "scipy").
require: A PEP-508 dependency requirement (e.g., "numpy >=1.24").
If specified, raises an error if the installed version doesn't match.
error_on_import: If True, raise import errors immediately.
If False (default), delay errors until module is accessed.
suppress_warning: If True, suppress the warning when loading subpackages.
Returns:
A proxy module that loads on first attribute access.
"""
with _threadlock:
module = sys.modules.get(fullname)
# Most common, short-circuit
if module is not None and require is None:
return module
have_module = module is not None
if not suppress_warning and "." in fullname:
msg = (
"subpackages can technically be lazily loaded, but it causes the "
"package to be eagerly loaded even if it is already lazily loaded. "
"So, you probably shouldn't use subpackages with this lazy feature."
)
warnings.warn(msg, RuntimeWarning, stacklevel=2)
spec = None
if not have_module:
spec = importlib.util.find_spec(fullname)
have_module = spec is not None
if not have_module:
not_found_message = f"No module named '{fullname}'"
elif require is not None:
try:
have_module = _check_requirement(require)
except ModuleNotFoundError as e:
raise ValueError(
f"Found module '{fullname}' but cannot test "
"requirement '{require}'. "
"Requirements must match distribution name, not module name."
) from e
not_found_message = f"No distribution can be found matching '{require}'"
if not have_module:
if error_on_import:
raise ModuleNotFoundError(not_found_message)
parent = inspect.stack()[1]
frame_data = _FrameData(
filename=parent.filename,
lineno=parent.lineno,
function=parent.function,
code_context=parent.code_context,
)
del parent
return DelayedImportErrorModule(
frame_data,
"DelayedImportErrorModule",
message=not_found_message,
)
if spec is not None:
module = importlib.util.module_from_spec(spec)
sys.modules[fullname] = module
if spec.loader is not None:
loader = importlib.util.LazyLoader(spec.loader)
loader.exec_module(module)
if module is None:
raise ModuleNotFoundError(f"No module named '{fullname}'")
return module
def _check_requirement(require: str) -> bool:
"""Verify that a package requirement is satisfied.
Args:
require: A dependency requirement as defined in PEP-508.
Returns:
True if the installed version matches the requirement, False otherwise.
Raises:
ModuleNotFoundError: If the package is not installed.
"""
req = packaging.requirements.Requirement(require)
return req.specifier.contains(
importlib.metadata.version(req.name),
prereleases=True,
)
@dataclass
class _StubVisitor(ast.NodeVisitor):
"""AST visitor to parse a stub file for submodules and submod_attrs."""
_submodules: set[str] = field(default_factory=set)
_submod_attrs: dict[str, list[str]] = field(default_factory=dict)
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
"""Visit an ImportFrom node and extract submodule/attribute information.
Args:
node: The AST ImportFrom node to visit.
Raises:
ValueError: If the import is not a relative import or uses star import.
"""
if node.level != 1:
raise ValueError(
"Only within-module imports are supported (`from .* import`)"
)
names = [alias.name for alias in node.names]
if node.module:
if "*" in names:
raise ValueError(
f"lazy stub loader does not support star import "
f"`from {node.module} import *`"
)
self._submod_attrs.setdefault(node.module, []).extend(names)
else:
self._submodules.update(names)
def attach_stub(
package_name: str,
filename: str,
) -> tuple[Callable[[str], Any], Callable[[], list[str]], list[str]]:
"""Attach lazily loaded submodules and functions from a type stub.
Parses a `.pyi` stub file to infer submodules and attributes. This allows
static type checkers to find imports while providing lazy loading at runtime.
Args:
package_name: The package name, typically ``__name__``.
filename: Path to `.py` file with an adjacent `.pyi` file.
Typically use ``__file__``.
Returns:
A tuple of (__getattr__, __dir__, __all__) to assign in the package.
Raises:
ValueError: If stub file is not found or contains invalid imports.
"""
path = Path(filename)
stubfile = path if path.suffix == ".pyi" else path.with_suffix(".pyi")
if not stubfile.exists():
raise ValueError(f"Cannot load imports from non-existent stub {stubfile!r}")
visitor = _StubVisitor()
visitor.visit(ast.parse(stubfile.read_text()))
return attach(package_name, visitor._submodules, visitor._submod_attrs)
def lazy_exports_stub(package_name: str, filename: str) -> None:
"""Install lazy loading on a module based on its .pyi stub file.
Parses the adjacent `.pyi` stub file to determine what to export lazily.
Type checkers see the stub, runtime gets lazy loading.
Example:
# __init__.py
from crewai.utilities.lazy import lazy_exports_stub
lazy_exports_stub(__name__, __file__)
# __init__.pyi
from .config import ChromaDBConfig, ChromaDBSettings
from .types import EmbeddingType
Args:
package_name: The package name, typically ``__name__``.
filename: Path to the module file, typically ``__file__``.
"""
__getattr__, __dir__, __all__ = attach_stub(package_name, filename)
module = sys.modules[package_name]
module.__getattr__ = __getattr__ # type: ignore[method-assign]
module.__dir__ = __dir__ # type: ignore[method-assign]
module.__dict__["__all__"] = __all__
def lazy_exports(
package_name: str,
submod_attrs: dict[str, list[str]],
submodules: set[str] | None = None,
) -> None:
"""Install lazy loading on a module.
Example:
from crewai.utilities.lazy import lazy_exports
lazy_exports(__name__, {
'config': ['ChromaDBConfig', 'ChromaDBSettings'],
'types': ['EmbeddingType'],
})
Args:
package_name: The package name, typically ``__name__``.
submod_attrs: Mapping of submodule names to lists of attributes.
submodules: Optional set of submodule names to expose directly.
"""
__getattr__, __dir__, __all__ = attach(package_name, submodules, submod_attrs)
module = sys.modules[package_name]
module.__getattr__ = __getattr__ # type: ignore[method-assign]
module.__dir__ = __dir__ # type: ignore[method-assign]
module.__dict__["__all__"] = __all__