This commit is contained in:
Mike Plachta
2025-02-27 08:44:43 -08:00
parent 7c16b7d284
commit f1187c5469
5 changed files with 136 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
from .s3 import S3ReaderTool, S3WriterTool
__all__ = ['S3ReaderTool', 'S3WriterTool']

View File

@@ -0,0 +1,52 @@
# AWS S3 Tools
## Description
These tools provide a way to interact with Amazon S3, a cloud storage service.
## Installation
Install the crewai_tools package
```shell
pip install 'crewai[tools]'
```
## AWS Connectivity
The tools use `boto3` to connect to AWS S3.
You can configure your environment to use AWS IAM roles, see [AWS IAM Roles documentation](https://docs.aws.amazon.com/sdk-for-python/v1/developer-guide/iam-roles.html#creating-an-iam-role)
Set the following environment variables:
- `CREW_AWS_REGION`
- `CREW_AWS_ACCESS_KEY_ID`
- `CREW_AWS_SEC_ACCESS_KEY`
## Usage
To use the AWS S3 tools in your CrewAI agents, import the necessary tools and include them in your agent's configuration:
```python
from crewai_tools.aws.s3 import S3ReaderTool, S3WriterTool
# For reading from S3
@agent
def file_retriever(self) -> Agent:
return Agent(
config=self.agents_config['file_retriever'],
verbose=True,
tools=[S3ReaderTool()]
)
# For writing to S3
@agent
def file_uploader(self) -> Agent:
return Agent(
config=self.agents_config['file_uploader'],
verbose=True,
tools=[S3WriterTool()]
)
```
These tools can be used to read from and write to S3 buckets within your CrewAI workflows. Make sure you have properly configured your AWS credentials as mentioned in the AWS Connectivity section above.

View File

@@ -0,0 +1,2 @@
from .reader_tool import S3ReaderTool
from .writer_tool import S3WriterTool

View File

@@ -0,0 +1,42 @@
from typing import Type
import os
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import boto3
from botocore.exceptions import ClientError
class S3ReaderToolInput(BaseModel):
"""Input schema for S3ReaderTool."""
file_path: str = Field(..., description="S3 file path (e.g., 's3://bucket-name/file-name')")
class S3ReaderTool(BaseTool):
name: str = "S3 Reader Tool"
description: str = "Reads a file from Amazon S3 given an S3 file path"
args_schema: Type[BaseModel] = S3ReaderToolInput
def _run(self, file_path: str) -> str:
try:
bucket_name, object_key = self._parse_s3_path(file_path)
s3 = boto3.client(
's3',
region_name=os.getenv('CREW_AWS_REGION', 'us-east-1'),
aws_access_key_id=os.getenv('CREW_AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('CREW_AWS_SEC_ACCESS_KEY')
)
# Read file content from S3
response = s3.get_object(Bucket=bucket_name, Key=object_key)
file_content = response['Body'].read().decode('utf-8')
return file_content
except ClientError as e:
return f"Error reading file from S3: {str(e)}"
def _parse_s3_path(self, file_path: str) -> tuple:
parts = file_path.replace("s3://", "").split("/", 1)
return parts[0], parts[1]

View File

@@ -0,0 +1,37 @@
from typing import Type
import os
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import boto3
from botocore.exceptions import ClientError
class S3WriterToolInput(BaseModel):
"""Input schema for S3WriterTool."""
file_path: str = Field(..., description="S3 file path (e.g., 's3://bucket-name/file-name')")
content: str = Field(..., description="Content to write to the file")
class S3WriterTool(BaseTool):
name: str = "S3 Writer Tool"
description: str = "Writes content to a file in Amazon S3 given an S3 file path"
args_schema: Type[BaseModel] = S3WriterToolInput
def _run(self, file_path: str, content: str) -> str:
try:
bucket_name, object_key = self._parse_s3_path(file_path)
s3 = boto3.client(
's3',
region_name=os.getenv('CREW_AWS_REGION', 'us-east-1'),
aws_access_key_id=os.getenv('CREW_AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('CREW_AWS_SEC_ACCESS_KEY')
)
s3.put_object(Bucket=bucket_name, Key=object_key, Body=content.encode('utf-8'))
return f"Successfully wrote content to {file_path}"
except ClientError as e:
return f"Error writing file to S3: {str(e)}"
def _parse_s3_path(self, file_path: str) -> tuple:
parts = file_path.replace("s3://", "").split("/", 1)
return parts[0], parts[1]