This commit is contained in:
Brandon Hancock
2024-11-05 12:04:58 -05:00
parent dc314c1151
commit a8a2f80616
7 changed files with 163 additions and 34 deletions

View File

@@ -0,0 +1,112 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List
import numpy as np
from crewai.knowledge.embedder.base_embedder import BaseEmbedder
class BaseKnowledgeSource(ABC):
"""Abstract base class for knowledge bases"""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.chunks: List[str] = []
self.chunk_embeddings: Dict[int, np.ndarray] = {}
@abstractmethod
def query(self, query: str) -> str:
"""Query the knowledge base and return relevant information"""
pass
@abstractmethod
def add(self, content: Any) -> None:
"""Process and store content in the knowledge base"""
pass
def embed(self, embedder: BaseEmbedder, new_chunks: List[str]) -> None:
"""Embed chunks and store them"""
if not new_chunks:
return
# Get embeddings for new chunks
embeddings = embedder.embed_texts(new_chunks)
# Store embeddings with their corresponding chunks
start_idx = len(self.chunks)
for i, embedding in enumerate(embeddings):
self.chunk_embeddings[start_idx + i] = embedding
def _chunk_text(self, text: str) -> List[str]:
"""Split text into chunks with overlap"""
chunks = []
start = 0
text_length = len(text)
while start < text_length:
# Get the chunk of size chunk_size
end = start + self.chunk_size
if end >= text_length:
# If we're at the end, just take the rest
chunks.append(text[start:].strip())
break
# Look for a good breaking point
# Priority: double newline > single newline > period > space
break_chars = ["\n\n", "\n", ". ", " "]
chunk_end = end
for break_char in break_chars:
# Look for the break_char in a window around the end point
window_start = max(start + self.chunk_size - 100, start)
window_end = min(start + self.chunk_size + 100, text_length)
window_text = text[window_start:window_end]
# Find the last occurrence of the break_char in the window
last_break = window_text.rfind(break_char)
if last_break != -1:
chunk_end = window_start + last_break + len(break_char)
break
# Add the chunk
chunk = text[start:chunk_end].strip()
if chunk: # Only add non-empty chunks
chunks.append(chunk)
# Move the start pointer, accounting for overlap
start = max(
start + self.chunk_size - self.chunk_overlap,
chunk_end - self.chunk_overlap,
)
return chunks
def _find_similar_chunks(
self, embedder: BaseEmbedder, query: str, top_k: int = 3
) -> List[str]:
"""Find the most similar chunks to a query using embeddings"""
if not self.chunks:
return []
# Get query embedding
query_embedding = embedder.embed_text(query)
# Calculate similarities with all chunks
similarities = []
for idx, chunk_embedding in self.chunk_embeddings.items():
similarity = np.dot(query_embedding, chunk_embedding)
similarities.append((similarity, idx))
# Sort by similarity and get top_k chunks
similarities.sort(reverse=True)
top_chunks = []
for _, idx in similarities[:top_k]:
top_chunks.append(self.chunks[idx])
return top_chunks