From d47adfc34ae9e9b40b27b5744de0f0879609bd53 Mon Sep 17 00:00:00 2001 From: Raju Rangan Date: Tue, 11 Mar 2025 10:21:30 -0400 Subject: [PATCH] Adding tooling to use Amazon Bedrock Agents as enternal agent, enbaling distributed agentic capabilities --- src/crewai_tools/aws/bedrock/agents/README.md | 181 ++++++++++++++++++ .../aws/bedrock/agents/invoke_agent_tool.py | 140 ++++++++++++++ 2 files changed, 321 insertions(+) create mode 100644 src/crewai_tools/aws/bedrock/agents/README.md create mode 100644 src/crewai_tools/aws/bedrock/agents/invoke_agent_tool.py diff --git a/src/crewai_tools/aws/bedrock/agents/README.md b/src/crewai_tools/aws/bedrock/agents/README.md new file mode 100644 index 000000000..7aa43b65d --- /dev/null +++ b/src/crewai_tools/aws/bedrock/agents/README.md @@ -0,0 +1,181 @@ +# BedrockInvokeAgentTool + +The `BedrockInvokeAgentTool` enables CrewAI agents to invoke Amazon Bedrock Agents and leverage their capabilities within your workflows. + +## Installation + +```bash +pip install 'crewai[tools]' +``` + +## Requirements + +- AWS credentials configured (either through environment variables or AWS CLI) +- `boto3` and `python-dotenv` packages +- Access to Amazon Bedrock Agents + +## Usage + +Here's how to use the tool with a CrewAI agent: + +```python +from crewai import Agent, Task, Crew +from crewai_tools.aws.bedrock.agents.invoke_agent_tool import BedrockInvokeAgentTool + +# Initialize the tool +agent_tool = BedrockInvokeAgentTool( + agent_id="your-agent-id", + agent_alias_id="your-agent-alias-id" +) + +# Create a CrewAI agent that uses the tool +aws_expert = Agent( + role='AWS Service Expert', + goal='Help users understand AWS services and quotas', + backstory='I am an expert in AWS services and can provide detailed information about them.', + tools=[agent_tool], + verbose=True +) + +# Create a task for the agent +quota_task = Task( + description="Find out the current service quotas for EC2 in us-west-2 and explain any recent changes.", + agent=aws_expert +) + +# Create a crew with the agent +crew = Crew( + agents=[aws_expert], + tasks=[quota_task], + verbose=2 +) + +# Run the crew +result = crew.kickoff() +print(result) +``` + +## Tool Arguments + +| Argument | Type | Required | Default | Description | +|----------|------|----------|---------|-------------| +| agent_id | str | Yes | None | The unique identifier of the Bedrock agent | +| agent_alias_id | str | Yes | None | The unique identifier of the agent alias | +| session_id | str | No | timestamp | The unique identifier of the session | +| enable_trace | bool | No | False | Whether to enable trace for debugging | +| end_session | bool | No | False | Whether to end the session after invocation | +| description | str | No | None | Custom description for the tool | + +## Environment Variables + +```bash +BEDROCK_AGENT_ID=your-agent-id # Alternative to passing agent_id +BEDROCK_AGENT_ALIAS_ID=your-agent-alias-id # Alternative to passing agent_alias_id +AWS_REGION=your-aws-region # Defaults to us-west-2 +AWS_ACCESS_KEY_ID=your-access-key # Required for AWS authentication +AWS_SECRET_ACCESS_KEY=your-secret-key # Required for AWS authentication +``` + +## Advanced Usage + +### Multi-Agent Workflow with Session Management + +```python +from crewai import Agent, Task, Crew, Process +from crewai_tools.aws.bedrock.agents.invoke_agent_tool import BedrockInvokeAgentTool + +# Initialize tools with session management +initial_tool = BedrockInvokeAgentTool( + agent_id="your-agent-id", + agent_alias_id="your-agent-alias-id", + session_id="custom-session-id" +) + +followup_tool = BedrockInvokeAgentTool( + agent_id="your-agent-id", + agent_alias_id="your-agent-alias-id", + session_id="custom-session-id" +) + +final_tool = BedrockInvokeAgentTool( + agent_id="your-agent-id", + agent_alias_id="your-agent-alias-id", + session_id="custom-session-id", + end_session=True +) + +# Create agents for different stages +researcher = Agent( + role='AWS Service Researcher', + goal='Gather information about AWS services', + backstory='I am specialized in finding detailed AWS service information.', + tools=[initial_tool] +) + +analyst = Agent( + role='Service Compatibility Analyst', + goal='Analyze service compatibility and requirements', + backstory='I analyze AWS services for compatibility and integration possibilities.', + tools=[followup_tool] +) + +summarizer = Agent( + role='Technical Documentation Writer', + goal='Create clear technical summaries', + backstory='I specialize in creating clear, concise technical documentation.', + tools=[final_tool] +) + +# Create tasks +research_task = Task( + description="Find all available AWS services in us-west-2 region.", + agent=researcher +) + +analysis_task = Task( + description="Analyze which services support IPv6 and their implementation requirements.", + agent=analyst +) + +summary_task = Task( + description="Create a summary of IPv6-compatible services and their key features.", + agent=summarizer +) + +# Create a crew with the agents and tasks +crew = Crew( + agents=[researcher, analyst, summarizer], + tasks=[research_task, analysis_task, summary_task], + process=Process.sequential, + verbose=2 +) + +# Run the crew +result = crew.kickoff() +``` + +## Use Cases + +### Hybrid Multi-Agent Collaborations +- Create workflows where CrewAI agents collaborate with managed Bedrock agents running as services in AWS +- Enable scenarios where sensitive data processing happens within your AWS environment while other agents operate externally +- Bridge on-premises CrewAI agents with cloud-based Bedrock agents for distributed intelligence workflows + +### Data Sovereignty and Compliance +- Keep data-sensitive agentic workflows within your AWS environment while allowing external CrewAI agents to orchestrate tasks +- Maintain compliance with data residency requirements by processing sensitive information only within your AWS account +- Enable secure multi-agent collaborations where some agents cannot access your organization's private data + +### Seamless AWS Service Integration +- Access any AWS service through Amazon Bedrock Actions without writing complex integration code +- Enable CrewAI agents to interact with AWS services through natural language requests +- Leverage pre-built Bedrock agent capabilities to interact with AWS services like Bedrock Knowledge Bases, Lambda, and more + +### Scalable Hybrid Agent Architectures +- Offload computationally intensive tasks to managed Bedrock agents while lightweight tasks run in CrewAI +- Scale agent processing by distributing workloads between local CrewAI agents and cloud-based Bedrock agents + +### Cross-Organizational Agent Collaboration +- Enable secure collaboration between your organization's CrewAI agents and partner organizations' Bedrock agents +- Create workflows where external expertise from Bedrock agents can be incorporated without exposing sensitive data +- Build agent ecosystems that span organizational boundaries while maintaining security and data control \ No newline at end of file diff --git a/src/crewai_tools/aws/bedrock/agents/invoke_agent_tool.py b/src/crewai_tools/aws/bedrock/agents/invoke_agent_tool.py new file mode 100644 index 000000000..41ecad75b --- /dev/null +++ b/src/crewai_tools/aws/bedrock/agents/invoke_agent_tool.py @@ -0,0 +1,140 @@ +from typing import Type, Optional, Dict, Any +import os +import json +import uuid +import time +from datetime import datetime, timezone +from dotenv import load_dotenv + +from crewai.tools import BaseTool +from pydantic import BaseModel, Field +import boto3 +from botocore.exceptions import ClientError + +# Load environment variables from .env file +load_dotenv() + + +class BedrockInvokeAgentToolInput(BaseModel): + """Input schema for BedrockInvokeAgentTool.""" + query: str = Field(..., description="The query to send to the agent") + + +class BedrockInvokeAgentTool(BaseTool): + name: str = "Bedrock Agent Invoke Tool" + description: str = "An agent responsible for policy analysis." + args_schema: Type[BaseModel] = BedrockInvokeAgentToolInput + agent_id: str = None + agent_alias_id: str = None + session_id: str = None + enable_trace: bool = False + end_session: bool = False + + def __init__( + self, + agent_id: str = None, + agent_alias_id: str = None, + session_id: str = None, + enable_trace: bool = False, + end_session: bool = False, + description: Optional[str] = None, + **kwargs + ): + """Initialize the BedrockInvokeAgentTool with agent configuration. + + Args: + agent_id (str): The unique identifier of the Bedrock agent + agent_alias_id (str): The unique identifier of the agent alias + session_id (str): The unique identifier of the session + enable_trace (bool): Whether to enable trace for the agent invocation + end_session (bool): Whether to end the session with the agent + description (Optional[str]): Custom description for the tool + """ + super().__init__(**kwargs) + + # Get values from environment variables if not provided + self.agent_id = agent_id or os.getenv('BEDROCK_AGENT_ID') + self.agent_alias_id = agent_alias_id or os.getenv('BEDROCK_AGENT_ALIAS_ID') + self.session_id = session_id or str(int(time.time())) # Use timestamp as session ID if not provided + self.enable_trace = enable_trace + self.end_session = end_session + + # Update the description if provided + if description: + self.description = description + + def _run(self, query: str) -> str: + try: + # Initialize the Bedrock Agent Runtime client + bedrock_agent = boto3.client( + "bedrock-agent-runtime", + region_name=os.getenv('AWS_REGION', os.getenv('AWS_DEFAULT_REGION', 'us-west-2')) + ) + + # Format the prompt with current time + current_utc = datetime.now(timezone.utc) + prompt = f""" +The current time is: {current_utc} + +Below is the users query or task. Complete it and answer it consicely and to the point: +{query} +""" + + # Invoke the agent + response = bedrock_agent.invoke_agent( + agentId=self.agent_id, + agentAliasId=self.agent_alias_id, + sessionId=self.session_id, + inputText=prompt, + enableTrace=self.enable_trace, + endSession=self.end_session + ) + + # Process the response + completion = "" + + # Check if response contains a completion field + if 'completion' in response: + # Process streaming response format + for event in response.get('completion', []): + if 'chunk' in event and 'bytes' in event['chunk']: + chunk_bytes = event['chunk']['bytes'] + if isinstance(chunk_bytes, (bytes, bytearray)): + completion += chunk_bytes.decode('utf-8') + else: + completion += str(chunk_bytes) + + # If no completion found in streaming format, try direct format + if not completion and 'chunk' in response and 'bytes' in response['chunk']: + chunk_bytes = response['chunk']['bytes'] + if isinstance(chunk_bytes, (bytes, bytearray)): + completion = chunk_bytes.decode('utf-8') + else: + completion = str(chunk_bytes) + + # If still no completion, return debug info + if not completion: + debug_info = { + "error": "Could not extract completion from response", + "response_keys": list(response.keys()) + } + + # Add more debug info + if 'chunk' in response: + debug_info["chunk_keys"] = list(response['chunk'].keys()) + + return json.dumps(debug_info, indent=2) + + return completion + + except ClientError as e: + error_code = "Unknown" + error_message = str(e) + + # Try to extract error code if available + if hasattr(e, 'response') and 'Error' in e.response and 'Code' in e.response['Error']: + error_code = e.response['Error']['Code'] + + return f"Error invoking Bedrock Agent ({error_code}): {error_message}" + except Exception as e: + return f"Error: {str(e)}" \ No newline at end of file