diff --git a/lib/crewai/src/crewai/lite_agent.py b/lib/crewai/src/crewai/lite_agent.py index 21c9089f3..f91d6d779 100644 --- a/lib/crewai/src/crewai/lite_agent.py +++ b/lib/crewai/src/crewai/lite_agent.py @@ -2,8 +2,10 @@ from __future__ import annotations import asyncio from collections.abc import Callable +from functools import wraps import inspect import json +from types import MethodType from typing import ( TYPE_CHECKING, Any, @@ -30,6 +32,8 @@ from typing_extensions import Self if TYPE_CHECKING: from crewai_files import FileInput + from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig + from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess from crewai.agents.cache.cache_handler import CacheHandler @@ -84,6 +88,80 @@ from crewai.utilities.tool_utils import execute_tool_and_check_finality from crewai.utilities.types import LLMMessage +def _kickoff_with_a2a_support( + agent: LiteAgent, + original_kickoff: Callable[..., LiteAgentOutput], + messages: str | list[LLMMessage], + response_format: type[BaseModel] | None, + input_files: dict[str, FileInput] | None, + extension_registry: Any, +) -> LiteAgentOutput: + """Wrap kickoff with A2A delegation using Task adapter. + + Args: + agent: The LiteAgent instance. + original_kickoff: The original kickoff method. + messages: Input messages. + response_format: Optional response format. + input_files: Optional input files. + extension_registry: A2A extension registry. + + Returns: + LiteAgentOutput from either local execution or A2A delegation. + """ + from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model + from crewai.a2a.wrapper import _execute_task_with_a2a + from crewai.task import Task + + a2a_agents, agent_response_model = get_a2a_agents_and_response_model(agent.a2a) + + if not a2a_agents: + return original_kickoff(messages, response_format, input_files) + + if isinstance(messages, str): + description = messages + else: + content = next( + (m["content"] for m in reversed(messages) if m["role"] == "user"), + None, + ) + description = content if isinstance(content, str) else "" + + if not description: + return original_kickoff(messages, response_format, input_files) + + fake_task = Task( + description=description, + agent=agent, + expected_output="Result from A2A delegation", + ) + + def task_to_kickoff_adapter( + self: Any, task: Task, context: str | None, tools: list[Any] | None + ) -> str: + result = original_kickoff(messages, response_format, input_files) + return result.raw + + result_str = _execute_task_with_a2a( + self=agent, # type: ignore[arg-type] + a2a_agents=a2a_agents, + original_fn=task_to_kickoff_adapter, + task=fake_task, + agent_response_model=agent_response_model, + context=None, + tools=None, + extension_registry=extension_registry, + ) + + return LiteAgentOutput( + raw=result_str, + pydantic=None, + agent_role=agent.role, + usage_metrics=None, + messages=[], + ) + + class LiteAgent(FlowTrackable, BaseModel): """ A lightweight agent that can process messages and use tools. @@ -154,6 +232,17 @@ class LiteAgent(FlowTrackable, BaseModel): guardrail_max_retries: int = Field( default=3, description="Maximum number of retries when guardrail fails" ) + a2a: ( + list[A2AConfig | A2AServerConfig | A2AClientConfig] + | A2AConfig + | A2AServerConfig + | A2AClientConfig + | None + ) = Field( + default=None, + description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. " + "Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of configurations.", + ) tools_results: list[dict[str, Any]] = Field( default_factory=list, description="Results of the tools used by the agent." ) @@ -209,6 +298,52 @@ class LiteAgent(FlowTrackable, BaseModel): return self + @model_validator(mode="after") + def setup_a2a_support(self) -> Self: + """Setup A2A extensions and server methods if a2a config exists.""" + if self.a2a: + from crewai.a2a.config import A2AClientConfig, A2AConfig + from crewai.a2a.extensions.registry import ( + create_extension_registry_from_config, + ) + from crewai.a2a.utils.agent_card import inject_a2a_server_methods + + configs = self.a2a if isinstance(self.a2a, list) else [self.a2a] + client_configs = [ + config + for config in configs + if isinstance(config, (A2AConfig, A2AClientConfig)) + ] + + extension_registry = ( + create_extension_registry_from_config(client_configs) + if client_configs + else create_extension_registry_from_config([]) + ) + extension_registry.inject_all_tools(self) # type: ignore[arg-type] + inject_a2a_server_methods(self) # type: ignore[arg-type] + + original_kickoff = self.kickoff + + @wraps(original_kickoff) + def kickoff_with_a2a( + messages: str | list[LLMMessage], + response_format: type[BaseModel] | None = None, + input_files: dict[str, FileInput] | None = None, + ) -> LiteAgentOutput: + return _kickoff_with_a2a_support( + self, + original_kickoff, + messages, + response_format, + input_files, + extension_registry, + ) + + object.__setattr__(self, "kickoff", MethodType(kickoff_with_a2a, self)) + + return self + @model_validator(mode="after") def ensure_guardrail_is_callable(self) -> Self: if callable(self.guardrail): @@ -626,7 +761,9 @@ class LiteAgent(FlowTrackable, BaseModel): except Exception as e: raise e - formatted_answer = process_llm_response(answer, self.use_stop_words) + formatted_answer = process_llm_response( + cast(str, answer), self.use_stop_words + ) if isinstance(formatted_answer, AgentAction): try: @@ -709,3 +846,21 @@ class LiteAgent(FlowTrackable, BaseModel): ) -> None: """Append a message to the message list with the given role.""" self._messages.append(format_message_for_llm(text, role=role)) + + +try: + from crewai.a2a.config import ( + A2AClientConfig as _A2AClientConfig, + A2AConfig as _A2AConfig, + A2AServerConfig as _A2AServerConfig, + ) + + LiteAgent.model_rebuild( + _types_namespace={ + "A2AConfig": _A2AConfig, + "A2AClientConfig": _A2AClientConfig, + "A2AServerConfig": _A2AServerConfig, + } + ) +except ImportError: + pass