mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
Adding tooling to use Amazon Bedrock Agents as enternal agent, enbaling distributed agentic capabilities
This commit is contained in:
181
src/crewai_tools/aws/bedrock/agents/README.md
Normal file
181
src/crewai_tools/aws/bedrock/agents/README.md
Normal file
@@ -0,0 +1,181 @@
|
|||||||
|
# BedrockInvokeAgentTool
|
||||||
|
|
||||||
|
The `BedrockInvokeAgentTool` enables CrewAI agents to invoke Amazon Bedrock Agents and leverage their capabilities within your workflows.
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip install 'crewai[tools]'
|
||||||
|
```
|
||||||
|
|
||||||
|
## Requirements
|
||||||
|
|
||||||
|
- AWS credentials configured (either through environment variables or AWS CLI)
|
||||||
|
- `boto3` and `python-dotenv` packages
|
||||||
|
- Access to Amazon Bedrock Agents
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
Here's how to use the tool with a CrewAI agent:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from crewai import Agent, Task, Crew
|
||||||
|
from crewai_tools.aws.bedrock.agents.invoke_agent_tool import BedrockInvokeAgentTool
|
||||||
|
|
||||||
|
# Initialize the tool
|
||||||
|
agent_tool = BedrockInvokeAgentTool(
|
||||||
|
agent_id="your-agent-id",
|
||||||
|
agent_alias_id="your-agent-alias-id"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a CrewAI agent that uses the tool
|
||||||
|
aws_expert = Agent(
|
||||||
|
role='AWS Service Expert',
|
||||||
|
goal='Help users understand AWS services and quotas',
|
||||||
|
backstory='I am an expert in AWS services and can provide detailed information about them.',
|
||||||
|
tools=[agent_tool],
|
||||||
|
verbose=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a task for the agent
|
||||||
|
quota_task = Task(
|
||||||
|
description="Find out the current service quotas for EC2 in us-west-2 and explain any recent changes.",
|
||||||
|
agent=aws_expert
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a crew with the agent
|
||||||
|
crew = Crew(
|
||||||
|
agents=[aws_expert],
|
||||||
|
tasks=[quota_task],
|
||||||
|
verbose=2
|
||||||
|
)
|
||||||
|
|
||||||
|
# Run the crew
|
||||||
|
result = crew.kickoff()
|
||||||
|
print(result)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Tool Arguments
|
||||||
|
|
||||||
|
| Argument | Type | Required | Default | Description |
|
||||||
|
|----------|------|----------|---------|-------------|
|
||||||
|
| agent_id | str | Yes | None | The unique identifier of the Bedrock agent |
|
||||||
|
| agent_alias_id | str | Yes | None | The unique identifier of the agent alias |
|
||||||
|
| session_id | str | No | timestamp | The unique identifier of the session |
|
||||||
|
| enable_trace | bool | No | False | Whether to enable trace for debugging |
|
||||||
|
| end_session | bool | No | False | Whether to end the session after invocation |
|
||||||
|
| description | str | No | None | Custom description for the tool |
|
||||||
|
|
||||||
|
## Environment Variables
|
||||||
|
|
||||||
|
```bash
|
||||||
|
BEDROCK_AGENT_ID=your-agent-id # Alternative to passing agent_id
|
||||||
|
BEDROCK_AGENT_ALIAS_ID=your-agent-alias-id # Alternative to passing agent_alias_id
|
||||||
|
AWS_REGION=your-aws-region # Defaults to us-west-2
|
||||||
|
AWS_ACCESS_KEY_ID=your-access-key # Required for AWS authentication
|
||||||
|
AWS_SECRET_ACCESS_KEY=your-secret-key # Required for AWS authentication
|
||||||
|
```
|
||||||
|
|
||||||
|
## Advanced Usage
|
||||||
|
|
||||||
|
### Multi-Agent Workflow with Session Management
|
||||||
|
|
||||||
|
```python
|
||||||
|
from crewai import Agent, Task, Crew, Process
|
||||||
|
from crewai_tools.aws.bedrock.agents.invoke_agent_tool import BedrockInvokeAgentTool
|
||||||
|
|
||||||
|
# Initialize tools with session management
|
||||||
|
initial_tool = BedrockInvokeAgentTool(
|
||||||
|
agent_id="your-agent-id",
|
||||||
|
agent_alias_id="your-agent-alias-id",
|
||||||
|
session_id="custom-session-id"
|
||||||
|
)
|
||||||
|
|
||||||
|
followup_tool = BedrockInvokeAgentTool(
|
||||||
|
agent_id="your-agent-id",
|
||||||
|
agent_alias_id="your-agent-alias-id",
|
||||||
|
session_id="custom-session-id"
|
||||||
|
)
|
||||||
|
|
||||||
|
final_tool = BedrockInvokeAgentTool(
|
||||||
|
agent_id="your-agent-id",
|
||||||
|
agent_alias_id="your-agent-alias-id",
|
||||||
|
session_id="custom-session-id",
|
||||||
|
end_session=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create agents for different stages
|
||||||
|
researcher = Agent(
|
||||||
|
role='AWS Service Researcher',
|
||||||
|
goal='Gather information about AWS services',
|
||||||
|
backstory='I am specialized in finding detailed AWS service information.',
|
||||||
|
tools=[initial_tool]
|
||||||
|
)
|
||||||
|
|
||||||
|
analyst = Agent(
|
||||||
|
role='Service Compatibility Analyst',
|
||||||
|
goal='Analyze service compatibility and requirements',
|
||||||
|
backstory='I analyze AWS services for compatibility and integration possibilities.',
|
||||||
|
tools=[followup_tool]
|
||||||
|
)
|
||||||
|
|
||||||
|
summarizer = Agent(
|
||||||
|
role='Technical Documentation Writer',
|
||||||
|
goal='Create clear technical summaries',
|
||||||
|
backstory='I specialize in creating clear, concise technical documentation.',
|
||||||
|
tools=[final_tool]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create tasks
|
||||||
|
research_task = Task(
|
||||||
|
description="Find all available AWS services in us-west-2 region.",
|
||||||
|
agent=researcher
|
||||||
|
)
|
||||||
|
|
||||||
|
analysis_task = Task(
|
||||||
|
description="Analyze which services support IPv6 and their implementation requirements.",
|
||||||
|
agent=analyst
|
||||||
|
)
|
||||||
|
|
||||||
|
summary_task = Task(
|
||||||
|
description="Create a summary of IPv6-compatible services and their key features.",
|
||||||
|
agent=summarizer
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a crew with the agents and tasks
|
||||||
|
crew = Crew(
|
||||||
|
agents=[researcher, analyst, summarizer],
|
||||||
|
tasks=[research_task, analysis_task, summary_task],
|
||||||
|
process=Process.sequential,
|
||||||
|
verbose=2
|
||||||
|
)
|
||||||
|
|
||||||
|
# Run the crew
|
||||||
|
result = crew.kickoff()
|
||||||
|
```
|
||||||
|
|
||||||
|
## Use Cases
|
||||||
|
|
||||||
|
### Hybrid Multi-Agent Collaborations
|
||||||
|
- Create workflows where CrewAI agents collaborate with managed Bedrock agents running as services in AWS
|
||||||
|
- Enable scenarios where sensitive data processing happens within your AWS environment while other agents operate externally
|
||||||
|
- Bridge on-premises CrewAI agents with cloud-based Bedrock agents for distributed intelligence workflows
|
||||||
|
|
||||||
|
### Data Sovereignty and Compliance
|
||||||
|
- Keep data-sensitive agentic workflows within your AWS environment while allowing external CrewAI agents to orchestrate tasks
|
||||||
|
- Maintain compliance with data residency requirements by processing sensitive information only within your AWS account
|
||||||
|
- Enable secure multi-agent collaborations where some agents cannot access your organization's private data
|
||||||
|
|
||||||
|
### Seamless AWS Service Integration
|
||||||
|
- Access any AWS service through Amazon Bedrock Actions without writing complex integration code
|
||||||
|
- Enable CrewAI agents to interact with AWS services through natural language requests
|
||||||
|
- Leverage pre-built Bedrock agent capabilities to interact with AWS services like Bedrock Knowledge Bases, Lambda, and more
|
||||||
|
|
||||||
|
### Scalable Hybrid Agent Architectures
|
||||||
|
- Offload computationally intensive tasks to managed Bedrock agents while lightweight tasks run in CrewAI
|
||||||
|
- Scale agent processing by distributing workloads between local CrewAI agents and cloud-based Bedrock agents
|
||||||
|
|
||||||
|
### Cross-Organizational Agent Collaboration
|
||||||
|
- Enable secure collaboration between your organization's CrewAI agents and partner organizations' Bedrock agents
|
||||||
|
- Create workflows where external expertise from Bedrock agents can be incorporated without exposing sensitive data
|
||||||
|
- Build agent ecosystems that span organizational boundaries while maintaining security and data control
|
||||||
140
src/crewai_tools/aws/bedrock/agents/invoke_agent_tool.py
Normal file
140
src/crewai_tools/aws/bedrock/agents/invoke_agent_tool.py
Normal file
@@ -0,0 +1,140 @@
|
|||||||
|
from typing import Type, Optional, Dict, Any
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
from crewai.tools import BaseTool
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
import boto3
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
|
||||||
|
# Load environment variables from .env file
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
|
class BedrockInvokeAgentToolInput(BaseModel):
|
||||||
|
"""Input schema for BedrockInvokeAgentTool."""
|
||||||
|
query: str = Field(..., description="The query to send to the agent")
|
||||||
|
|
||||||
|
|
||||||
|
class BedrockInvokeAgentTool(BaseTool):
|
||||||
|
name: str = "Bedrock Agent Invoke Tool"
|
||||||
|
description: str = "An agent responsible for policy analysis."
|
||||||
|
args_schema: Type[BaseModel] = BedrockInvokeAgentToolInput
|
||||||
|
agent_id: str = None
|
||||||
|
agent_alias_id: str = None
|
||||||
|
session_id: str = None
|
||||||
|
enable_trace: bool = False
|
||||||
|
end_session: bool = False
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
agent_id: str = None,
|
||||||
|
agent_alias_id: str = None,
|
||||||
|
session_id: str = None,
|
||||||
|
enable_trace: bool = False,
|
||||||
|
end_session: bool = False,
|
||||||
|
description: Optional[str] = None,
|
||||||
|
**kwargs
|
||||||
|
):
|
||||||
|
"""Initialize the BedrockInvokeAgentTool with agent configuration.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent_id (str): The unique identifier of the Bedrock agent
|
||||||
|
agent_alias_id (str): The unique identifier of the agent alias
|
||||||
|
session_id (str): The unique identifier of the session
|
||||||
|
enable_trace (bool): Whether to enable trace for the agent invocation
|
||||||
|
end_session (bool): Whether to end the session with the agent
|
||||||
|
description (Optional[str]): Custom description for the tool
|
||||||
|
"""
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
# Get values from environment variables if not provided
|
||||||
|
self.agent_id = agent_id or os.getenv('BEDROCK_AGENT_ID')
|
||||||
|
self.agent_alias_id = agent_alias_id or os.getenv('BEDROCK_AGENT_ALIAS_ID')
|
||||||
|
self.session_id = session_id or str(int(time.time())) # Use timestamp as session ID if not provided
|
||||||
|
self.enable_trace = enable_trace
|
||||||
|
self.end_session = end_session
|
||||||
|
|
||||||
|
# Update the description if provided
|
||||||
|
if description:
|
||||||
|
self.description = description
|
||||||
|
|
||||||
|
def _run(self, query: str) -> str:
|
||||||
|
try:
|
||||||
|
# Initialize the Bedrock Agent Runtime client
|
||||||
|
bedrock_agent = boto3.client(
|
||||||
|
"bedrock-agent-runtime",
|
||||||
|
region_name=os.getenv('AWS_REGION', os.getenv('AWS_DEFAULT_REGION', 'us-west-2'))
|
||||||
|
)
|
||||||
|
|
||||||
|
# Format the prompt with current time
|
||||||
|
current_utc = datetime.now(timezone.utc)
|
||||||
|
prompt = f"""
|
||||||
|
The current time is: {current_utc}
|
||||||
|
|
||||||
|
Below is the users query or task. Complete it and answer it consicely and to the point:
|
||||||
|
{query}
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Invoke the agent
|
||||||
|
response = bedrock_agent.invoke_agent(
|
||||||
|
agentId=self.agent_id,
|
||||||
|
agentAliasId=self.agent_alias_id,
|
||||||
|
sessionId=self.session_id,
|
||||||
|
inputText=prompt,
|
||||||
|
enableTrace=self.enable_trace,
|
||||||
|
endSession=self.end_session
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process the response
|
||||||
|
completion = ""
|
||||||
|
|
||||||
|
# Check if response contains a completion field
|
||||||
|
if 'completion' in response:
|
||||||
|
# Process streaming response format
|
||||||
|
for event in response.get('completion', []):
|
||||||
|
if 'chunk' in event and 'bytes' in event['chunk']:
|
||||||
|
chunk_bytes = event['chunk']['bytes']
|
||||||
|
if isinstance(chunk_bytes, (bytes, bytearray)):
|
||||||
|
completion += chunk_bytes.decode('utf-8')
|
||||||
|
else:
|
||||||
|
completion += str(chunk_bytes)
|
||||||
|
|
||||||
|
# If no completion found in streaming format, try direct format
|
||||||
|
if not completion and 'chunk' in response and 'bytes' in response['chunk']:
|
||||||
|
chunk_bytes = response['chunk']['bytes']
|
||||||
|
if isinstance(chunk_bytes, (bytes, bytearray)):
|
||||||
|
completion = chunk_bytes.decode('utf-8')
|
||||||
|
else:
|
||||||
|
completion = str(chunk_bytes)
|
||||||
|
|
||||||
|
# If still no completion, return debug info
|
||||||
|
if not completion:
|
||||||
|
debug_info = {
|
||||||
|
"error": "Could not extract completion from response",
|
||||||
|
"response_keys": list(response.keys())
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add more debug info
|
||||||
|
if 'chunk' in response:
|
||||||
|
debug_info["chunk_keys"] = list(response['chunk'].keys())
|
||||||
|
|
||||||
|
return json.dumps(debug_info, indent=2)
|
||||||
|
|
||||||
|
return completion
|
||||||
|
|
||||||
|
except ClientError as e:
|
||||||
|
error_code = "Unknown"
|
||||||
|
error_message = str(e)
|
||||||
|
|
||||||
|
# Try to extract error code if available
|
||||||
|
if hasattr(e, 'response') and 'Error' in e.response and 'Code' in e.response['Error']:
|
||||||
|
error_code = e.response['Error']['Code']
|
||||||
|
|
||||||
|
return f"Error invoking Bedrock Agent ({error_code}): {error_message}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error: {str(e)}"
|
||||||
Reference in New Issue
Block a user