added additional sources

This commit is contained in:
Brandon Hancock
2024-11-06 16:41:17 -05:00
parent 6131dbac4f
commit 617ee989cd
10 changed files with 424 additions and 4 deletions

View File

View File

@@ -0,0 +1,82 @@
import os
from typing import List, Optional
import numpy as np
from openai import OpenAI
from .base_embedder import BaseEmbedder
class OllamaEmbedder(BaseEmbedder):
"""
A wrapper class for text embedding models using Ollama's API
"""
def __init__(
self,
model_name: str,
api_key: Optional[str] = None,
base_url: str = "http://localhost:11434/v1",
):
"""
Initialize the embedding model
Args:
model_name: Name of the model to use
api_key: API key (defaults to 'ollama' or environment variable 'OLLAMA_API_KEY')
base_url: Base URL for the Ollama API (default is 'http://localhost:11434/v1')
"""
self.model_name = model_name
self.api_key = api_key or os.getenv("OLLAMA_API_KEY") or "ollama"
self.base_url = base_url
self.client = OpenAI(base_url=self.base_url, api_key=self.api_key)
def embed_chunks(self, chunks: List[str]) -> List[np.ndarray]:
"""
Generate embeddings for a list of text chunks
Args:
chunks: List of text chunks to embed
Returns:
List of embeddings
"""
return self.embed_texts(chunks)
def embed_texts(self, texts: List[str]) -> List[np.ndarray]:
"""
Generate embeddings for a list of texts
Args:
texts: List of texts to embed
Returns:
List of embeddings
"""
embeddings = []
max_batch_size = 2048 # Adjust batch size if necessary
for i in range(0, len(texts), max_batch_size):
batch = texts[i : i + max_batch_size]
response = self.client.embeddings.create(input=batch, model=self.model_name)
batch_embeddings = [np.array(item.embedding) for item in response.data]
embeddings.extend(batch_embeddings)
return embeddings
def embed_text(self, text: str) -> np.ndarray:
"""
Generate embedding for a single text
Args:
text: Text to embed
Returns:
Embedding array
"""
return self.embed_texts([text])[0]
@property
def dimension(self) -> int:
"""Get the dimension of the embeddings"""
# Embedding dimensions may vary; we'll determine it dynamically
test_embed = self.embed_text("test")
return len(test_embed)

View File

@@ -0,0 +1,85 @@
import os
from typing import List, Optional
import numpy as np
from openai import OpenAI
from .base_embedder import BaseEmbedder
class OpenAIEmbedder(BaseEmbedder):
"""
A wrapper class for text embedding models using OpenAI's Embedding API
"""
def __init__(
self,
model_name: str = "text-embedding-ada-002",
api_key: Optional[str] = None,
):
"""
Initialize the embedding model
Args:
model_name: Name of the model to use
api_key: OpenAI API key
"""
self.model_name = model_name
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
if not self.api_key:
raise ValueError(
"OpenAI API key must be provided or set in the environment variable 'OPENAI_API_KEY'"
)
self.client = OpenAI(
api_key=self.api_key,
base_url="http://localhost:11434/v1",
)
def embed_chunks(self, chunks: List[str]) -> List[np.ndarray]:
"""
Generate embeddings for a list of text chunks
Args:
chunks: List of text chunks to embed
Returns:
List of embeddings
"""
return self.embed_texts(chunks)
def embed_texts(self, texts: List[str]) -> List[np.ndarray]:
"""
Generate embeddings for a list of texts
Args:
texts: List of texts to embed
Returns:
List of embeddings
"""
embeddings = []
max_batch_size = 2048 # OpenAI recommends smaller batch sizes
for i in range(0, len(texts), max_batch_size):
batch = texts[i : i + max_batch_size]
response = self.client.embeddings.create(input=batch, model=self.model_name)
batch_embeddings = [np.array(data.embedding) for data in response.data]
embeddings.extend(batch_embeddings)
return embeddings
def embed_text(self, text: str) -> np.ndarray:
"""
Generate embedding fors a single text
Args:
text: Text to embed
Returns:
Embedding array
"""
return self.embed_texts([text])[0]
@property
def dimension(self) -> int:
"""Get the dimension of the embeddings"""
# For OpenAI's text-embedding-ada-002, the dimension is 1536
return 1536

View File

@@ -0,0 +1,38 @@
import csv
from typing import List
from crewai.knowledge.embedder.base_embedder import BaseEmbedder
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
class CSVKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries CSV file content using embeddings."""
def load_content(self) -> str:
"""Load and preprocess CSV file content."""
super().load_content() # Validate the file path
with open(self.file_path, "r", encoding="utf-8") as csvfile:
reader = csv.reader(csvfile)
content = ""
for row in reader:
content += " ".join(row) + "\n"
return content
def add(self, embedder: BaseEmbedder) -> None:
"""
Add CSV file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
"""
new_chunks = self._chunk_text(self.content)
self.chunks.extend(new_chunks)
# Compute embeddings for the new chunks
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]
for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
]

View File

@@ -0,0 +1,48 @@
from typing import List
from crewai.knowledge.embedder.base_embedder import BaseEmbedder
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
class ExcelKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries Excel file content using embeddings."""
def load_content(self) -> str:
"""Load and preprocess Excel file content."""
super().load_content() # Validate the file path
pd = self._import_dependencies()
df = pd.read_excel(self.file_path)
content = df.to_csv(index=False)
return content
def _import_dependencies(self):
"""Dynamically import dependencies."""
try:
import openpyxl
import pandas as pd
return pd
except ImportError as e:
missing_package = str(e).split()[-1]
raise ImportError(
f"{missing_package} is not installed. Please install it with: pip install {missing_package}"
)
def add(self, embedder: BaseEmbedder) -> None:
"""
Add Excel file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
"""
new_chunks = self._chunk_text(self.content)
self.chunks.extend(new_chunks)
# Compute embeddings for the new chunks
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]
for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
]

View File

@@ -0,0 +1,50 @@
import json
from typing import Any, List
from crewai.knowledge.embedder.base_embedder import BaseEmbedder
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
class JSONKnowledgeSource(BaseFileKnowledgeSource):
"""A knowledge source that stores and queries JSON file content using embeddings."""
def load_content(self) -> str:
"""Load and preprocess JSON file content."""
super().load_content() # Validate the file path
with open(self.file_path, "r", encoding="utf-8") as json_file:
data = json.load(json_file)
content = self._json_to_text(data)
return content
def _json_to_text(self, data: Any, level: int = 0) -> str:
"""Recursively convert JSON data to a text representation."""
text = ""
indent = " " * level
if isinstance(data, dict):
for key, value in data.items():
text += f"{indent}{key}: {self._json_to_text(value, level + 1)}\n"
elif isinstance(data, list):
for item in data:
text += f"{indent}- {self._json_to_text(item, level + 1)}\n"
else:
text += f"{str(data)}"
return text
def add(self, embedder: BaseEmbedder) -> None:
"""
Add JSON file content to the knowledge source, chunk it, compute embeddings,
and save the embeddings.
"""
new_chunks = self._chunk_text(self.content)
self.chunks.extend(new_chunks)
# Compute embeddings for the new chunks
new_embeddings = embedder.embed_chunks(new_chunks)
# Save the embeddings
self.chunk_embeddings.extend(new_embeddings)
def _chunk_text(self, text: str) -> List[str]:
"""Utility method to split text into chunks."""
return [
text[i : i + self.chunk_size]
for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
]