diff --git a/src/crewai_tools/aws/bedrock/knowledge_base/README.md b/src/crewai_tools/aws/bedrock/knowledge_base/README.md new file mode 100644 index 000000000..6da54f848 --- /dev/null +++ b/src/crewai_tools/aws/bedrock/knowledge_base/README.md @@ -0,0 +1,159 @@ +# BedrockKBRetrieverTool + +The `BedrockKBRetrieverTool` enables CrewAI agents to retrieve information from Amazon Bedrock Knowledge Bases using natural language queries. + +## Installation + +```bash +pip install 'crewai[tools]' +``` + +## Requirements + +- AWS credentials configured (either through environment variables or AWS CLI) +- `boto3` and `python-dotenv` packages +- Access to Amazon Bedrock Knowledge Base + +## Usage + +Here's how to use the tool with a CrewAI agent: + +```python +from crewai import Agent, Task, Crew +from crewai_tools.aws.bedrock.knowledge_base.retriever_tool import BedrockKBRetrieverTool + +# Initialize the tool +kb_tool = BedrockKBRetrieverTool( + knowledge_base_id="your-kb-id", + number_of_results=5 +) + +# Create a CrewAI agent that uses the tool +researcher = Agent( + role='Knowledge Base Researcher', + goal='Find information about company policies', + backstory='I am a researcher specialized in retrieving and analyzing company documentation.', + tools=[kb_tool], + verbose=True +) + +# Create a task for the agent +research_task = Task( + description="Find our company's remote work policy and summarize the key points.", + agent=researcher +) + +# Create a crew with the agent +crew = Crew( + agents=[researcher], + tasks=[research_task], + verbose=2 +) + +# Run the crew +result = crew.kickoff() +print(result) +``` + +## Tool Arguments + +| Argument | Type | Required | Default | Description | +|----------|------|----------|---------|-------------| +| knowledge_base_id | str | Yes | None | The unique identifier of the knowledge base (0-10 alphanumeric characters) | +| number_of_results | int | No | 5 | Maximum number of results to return | +| retrieval_configuration | dict | No | None | Custom configurations for the knowledge base query | +| guardrail_configuration | dict | No | None | Content filtering settings | +| next_token | str | No | None | Token for pagination | + +## Environment Variables + +```bash +BEDROCK_KB_ID=your-knowledge-base-id # Alternative to passing knowledge_base_id +AWS_REGION=your-aws-region # Defaults to us-east-1 +AWS_ACCESS_KEY_ID=your-access-key # Required for AWS authentication +AWS_SECRET_ACCESS_KEY=your-secret-key # Required for AWS authentication +``` + +## Response Format + +The tool returns results in JSON format: + +```json +{ + "results": [ + { + "content": "Retrieved text content", + "content_type": "text", + "source_type": "S3", + "source_uri": "s3://bucket/document.pdf", + "score": 0.95, + "metadata": { + "additional": "metadata" + } + } + ], + "nextToken": "pagination-token", + "guardrailAction": "NONE" +} +``` + +## Advanced Usage + +### Custom Retrieval Configuration + +```python +kb_tool = BedrockKBRetrieverTool( + knowledge_base_id="your-kb-id", + retrieval_configuration={ + "vectorSearchConfiguration": { + "numberOfResults": 10, + "overrideSearchType": "HYBRID" + } + } +) + +policy_expert = Agent( + role='Policy Expert', + goal='Analyze company policies in detail', + backstory='I am an expert in corporate policy analysis with deep knowledge of regulatory requirements.', + tools=[kb_tool] +) +``` + +## Supported Data Sources + +- Amazon S3 +- Confluence +- Salesforce +- SharePoint +- Web pages +- Custom document locations +- Amazon Kendra +- SQL databases + +## Use Cases + +### Enterprise Knowledge Integration +- Enable CrewAI agents to access your organization's proprietary knowledge without exposing sensitive data +- Allow agents to make decisions based on your company's specific policies, procedures, and documentation +- Create agents that can answer questions based on your internal documentation while maintaining data security + +### Specialized Domain Knowledge +- Connect CrewAI agents to domain-specific knowledge bases (legal, medical, technical) without retraining models +- Leverage existing knowledge repositories that are already maintained in your AWS environment +- Combine CrewAI's reasoning with domain-specific information from your knowledge bases + +### Data-Driven Decision Making +- Ground CrewAI agent responses in your actual company data rather than general knowledge +- Ensure agents provide recommendations based on your specific business context and documentation +- Reduce hallucinations by retrieving factual information from your knowledge bases + +### Scalable Information Access +- Access terabytes of organizational knowledge without embedding it all into your models +- Dynamically query only the relevant information needed for specific tasks +- Leverage AWS's scalable infrastructure to handle large knowledge bases efficiently + +### Compliance and Governance +- Ensure CrewAI agents provide responses that align with your company's approved documentation +- Create auditable trails of information sources used by your agents +- Maintain control over what information sources your agents can access \ No newline at end of file diff --git a/src/crewai_tools/aws/bedrock/knowledge_base/retriever_tool.py b/src/crewai_tools/aws/bedrock/knowledge_base/retriever_tool.py new file mode 100644 index 000000000..c01e83cff --- /dev/null +++ b/src/crewai_tools/aws/bedrock/knowledge_base/retriever_tool.py @@ -0,0 +1,188 @@ +from typing import Type, Optional, List, Dict, Any +import os +import json +from dotenv import load_dotenv + +from crewai.tools import BaseTool +from pydantic import BaseModel, Field +import boto3 +from botocore.exceptions import ClientError + +# Load environment variables from .env file +load_dotenv() + + +class BedrockKBRetrieverToolInput(BaseModel): + """Input schema for BedrockKBRetrieverTool.""" + query: str = Field(..., description="The query to retrieve information from the knowledge base") + + +class BedrockKBRetrieverTool(BaseTool): + name: str = "Bedrock Knowledge Base Retriever Tool" + description: str = "Retrieves information from an Amazon Bedrock Knowledge Base given a query" + args_schema: Type[BaseModel] = BedrockKBRetrieverToolInput + knowledge_base_id: str = None + number_of_results: Optional[int] = 5 + retrieval_configuration: Optional[Dict[str, Any]] = None + guardrail_configuration: Optional[Dict[str, Any]] = None + next_token: Optional[str] = None + + def __init__( + self, + knowledge_base_id: str = None, + number_of_results: Optional[int] = 5, + retrieval_configuration: Optional[Dict[str, Any]] = None, + guardrail_configuration: Optional[Dict[str, Any]] = None, + next_token: Optional[str] = None, + **kwargs + ): + """Initialize the BedrockKBRetrieverTool with knowledge base configuration. + + Args: + knowledge_base_id (str): The unique identifier of the knowledge base to query (length: 0-10, pattern: ^[0-9a-zA-Z]+$) + number_of_results (Optional[int], optional): The maximum number of results to return. Defaults to 5. + retrieval_configuration (Optional[Dict[str, Any]], optional): Configurations for the knowledge base query and retrieval process. Defaults to None. + guardrail_configuration (Optional[Dict[str, Any]], optional): Guardrail settings. Defaults to None. + next_token (Optional[str], optional): Token for retrieving the next batch of results. Defaults to None. + """ + super().__init__(**kwargs) + + # Get knowledge_base_id from environment variable if not provided + self.knowledge_base_id = knowledge_base_id or os.getenv('BEDROCK_KB_ID') + self.number_of_results = number_of_results + + # Initialize retrieval_configuration with number_of_results if provided + if retrieval_configuration is None and number_of_results is not None: + self.retrieval_configuration = { + "vectorSearchConfiguration": { + "numberOfResults": number_of_results + } + } + else: + self.retrieval_configuration = retrieval_configuration + + self.guardrail_configuration = guardrail_configuration + self.next_token = next_token + + # Validate parameters + self._validate_parameters() + + # Update the description to include the knowledge base details + self.description = f"Retrieves information from Amazon Bedrock Knowledge Base '{self.knowledge_base_id}' given a query" + + def _validate_parameters(self): + """Validate the parameters according to AWS API requirements.""" + # Validate knowledge_base_id + if not self.knowledge_base_id or len(self.knowledge_base_id) > 10 or not all(c.isalnum() for c in self.knowledge_base_id): + raise ValueError("knowledge_base_id must be 0-10 alphanumeric characters") + + # Validate next_token if provided + if self.next_token and (len(self.next_token) < 1 or len(self.next_token) > 2048 or ' ' in self.next_token): + raise ValueError("next_token must be 1-2048 characters and match pattern ^\\S*$") + + def _run(self, query: str) -> str: + try: + # Initialize the Bedrock Agent Runtime client + bedrock_agent_runtime = boto3.client( + 'bedrock-agent-runtime', + region_name=os.getenv('AWS_REGION', os.getenv('AWS_DEFAULT_REGION', 'us-east-1')), + # AWS SDK will automatically use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY from environment + ) + + # Prepare the request parameters + retrieve_params = { + 'knowledgeBaseId': self.knowledge_base_id, + 'retrievalQuery': { + 'text': query + } + } + + # Add optional parameters if provided + if self.retrieval_configuration: + retrieve_params['retrievalConfiguration'] = self.retrieval_configuration + + if self.guardrail_configuration: + retrieve_params['guardrailConfiguration'] = self.guardrail_configuration + + if self.next_token: + retrieve_params['nextToken'] = self.next_token + + # Make the retrieve API call + response = bedrock_agent_runtime.retrieve(**retrieve_params) + + # Process the response + results = [] + for result in response.get('retrievalResults', []): + # Extract content + content_obj = result.get('content', {}) + content = content_obj.get('text', '') + content_type = content_obj.get('type', 'text') + + # Extract location information + location = result.get('location', {}) + location_type = location.get('type', 'unknown') + source_uri = None + + # Map for location types and their URI fields + location_mapping = { + 's3Location': {'field': 'uri', 'type': 'S3'}, + 'confluenceLocation': {'field': 'url', 'type': 'Confluence'}, + 'salesforceLocation': {'field': 'url', 'type': 'Salesforce'}, + 'sharePointLocation': {'field': 'url', 'type': 'SharePoint'}, + 'webLocation': {'field': 'url', 'type': 'Web'}, + 'customDocumentLocation': {'field': 'id', 'type': 'CustomDocument'}, + 'kendraDocumentLocation': {'field': 'uri', 'type': 'KendraDocument'}, + 'sqlLocation': {'field': 'query', 'type': 'SQL'} + } + + # Extract the URI based on location type + for loc_key, config in location_mapping.items(): + if loc_key in location: + source_uri = location[loc_key].get(config['field']) + if not location_type or location_type == 'unknown': + location_type = config['type'] + break + + # Include score if available + score = result.get('score') + + # Include metadata if available + metadata = result.get('metadata') + + # Create a well-formed JSON object for each result + result_object = { + 'content': content, + 'content_type': content_type, + 'source_type': location_type, + 'source_uri': source_uri + } + + # Add score if available + if score is not None: + result_object['score'] = score + + # Add metadata if available + if metadata: + result_object['metadata'] = metadata + + # Add the JSON object to results + results.append(result_object) + + # Include nextToken in the response if available + response_object = {} + if results: + response_object["results"] = results + else: + response_object["message"] = "No results found for the given query." + + if "nextToken" in response: + response_object["nextToken"] = response["nextToken"] + + if "guardrailAction" in response: + response_object["guardrailAction"] = response["guardrailAction"] + + # Return the results as a JSON string + return json.dumps(response_object, indent=2) + + except ClientError as e: + return f"Error retrieving from Bedrock Knowledge Base: {str(e)}" \ No newline at end of file