|
|
|
from pathlib import Path |
|
from typing import List, Dict, Any |
|
from PyPDF2 import PdfReader |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor |
|
import logging |
|
from config.config import settings |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class PDFService: |
|
def __init__(self, model_service): |
|
self.embedder = model_service.embedder |
|
self.text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=settings.CHUNK_SIZE, |
|
chunk_overlap=settings.CHUNK_OVERLAP |
|
) |
|
self.pdf_chunks = [] |
|
self.faiss_index = None |
|
|
|
async def index_pdfs(self, pdf_folder: Path = settings.PDF_FOLDER) -> List[Dict[str, Any]]: |
|
all_texts = [] |
|
|
|
async def process_pdf(pdf_file: Path) -> List[Dict[str, Any]]: |
|
try: |
|
reader = PdfReader(str(pdf_file)) |
|
metadata = reader.metadata |
|
full_text = " ".join([ |
|
page.extract_text() |
|
for page in reader.pages |
|
if page.extract_text() |
|
]) |
|
chunks = self.text_splitter.split_text(full_text) |
|
return [{ |
|
'text': chunk, |
|
'source': pdf_file.name, |
|
'metadata': metadata, |
|
'chunk_index': i |
|
} for i, chunk in enumerate(chunks)] |
|
except Exception as e: |
|
logger.error(f"Error processing PDF {pdf_file}: {e}") |
|
return [] |
|
|
|
pdf_files = [f for f in pdf_folder.iterdir() if f.suffix.lower() == ".pdf"] |
|
|
|
async with ThreadPoolExecutor() as executor: |
|
tasks = [process_pdf(pdf_file) for pdf_file in pdf_files] |
|
results = await asyncio.gather(*tasks) |
|
|
|
for result in results: |
|
all_texts.extend(result) |
|
|
|
self.pdf_chunks = all_texts |
|
return all_texts |
|
|
|
async def search_pdfs(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: |
|
if not self.pdf_chunks: |
|
await self.index_pdfs() |
|
|
|
query_embedding = self.embedder.encode([query], convert_to_tensor=True).cpu().detach().numpy() |
|
|
|
|
|
if not self.faiss_index: |
|
chunk_embeddings = self.embedder.encode( |
|
[chunk['text'] for chunk in self.pdf_chunks], |
|
convert_to_tensor=True |
|
).cpu().detach().numpy() |
|
|
|
d = chunk_embeddings.shape[1] |
|
self.faiss_index = faiss.IndexFlatL2(d) |
|
self.faiss_index.add(chunk_embeddings) |
|
|
|
distances, indices = self.faiss_index.search(query_embedding, top_k) |
|
|
|
results = [] |
|
for i, idx in enumerate(indices[0]): |
|
chunk = self.pdf_chunks[idx].copy() |
|
chunk['score'] = float(distances[0][i]) |
|
results.append(chunk) |
|
|
|
return results |
|
|
|
|