|
|
|
from typing import List, Dict, Any, Optional |
|
import pandas as pd |
|
import faiss |
|
import numpy as np |
|
import aiohttp |
|
from datetime import datetime |
|
import logging |
|
from config.config import settings |
|
from functools import lru_cache |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class DataService: |
|
def __init__(self, model_service): |
|
self.embedder = model_service.embedder |
|
self.cache = {} |
|
self.last_update = None |
|
self.faiss_index = None |
|
self.data_cleaned = None |
|
|
|
async def fetch_csv_data(self) -> pd.DataFrame: |
|
async with aiohttp.ClientSession() as session: |
|
for attempt in range(settings.MAX_RETRIES): |
|
try: |
|
async with session.get(settings.CSV_URL) as response: |
|
if response.status == 200: |
|
content = await response.text() |
|
return pd.read_csv(pd.StringIO(content), sep='|') |
|
except Exception as e: |
|
logger.error(f"Attempt {attempt + 1} failed: {e}") |
|
if attempt == settings.MAX_RETRIES - 1: |
|
raise |
|
|
|
async def prepare_data_and_index(self) -> tuple: |
|
current_time = datetime.now() |
|
|
|
|
|
if (self.last_update and |
|
(current_time - self.last_update).seconds < settings.CACHE_DURATION and |
|
self.cache): |
|
return self.cache['data'], self.cache['index'] |
|
|
|
data = await self.fetch_csv_data() |
|
|
|
|
|
columns_to_keep = [ |
|
'ID', 'Name', 'Description', 'Price', |
|
'ProductCategory', 'Grammage', |
|
'BasePriceText', 'Rating', 'RatingCount', |
|
'Ingredients', 'CreationDate', 'Keywords', 'Brand' |
|
] |
|
|
|
self.data_cleaned = data[columns_to_keep].copy() |
|
self.data_cleaned['Description'] = self.data_cleaned['Description'].str.replace( |
|
r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True |
|
) |
|
|
|
|
|
self.data_cleaned['combined_text'] = self.data_cleaned.apply( |
|
lambda row: ( |
|
f"{row['Name']} {row['Name']} " |
|
f"{row['Description']} " |
|
f"{row['Keywords'] if pd.notnull(row['Keywords']) else ''} " |
|
f"{row['ProductCategory'] if pd.notnull(row['ProductCategory']) else ''}" |
|
).strip(), |
|
axis=1 |
|
) |
|
|
|
|
|
embeddings = self.embedder.encode( |
|
self.data_cleaned['combined_text'].tolist(), |
|
convert_to_tensor=True, |
|
show_progress_bar=True |
|
).cpu().detach().numpy() |
|
|
|
d = embeddings.shape[1] |
|
self.faiss_index = faiss.IndexFlatL2(d) |
|
self.faiss_index.add(embeddings) |
|
|
|
|
|
self.cache = { |
|
'data': self.data_cleaned, |
|
'index': self.faiss_index |
|
} |
|
self.last_update = current_time |
|
|
|
return self.data_cleaned, self.faiss_index |
|
|
|
async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: |
|
if not self.faiss_index: |
|
await self.prepare_data_and_index() |
|
|
|
query_embedding = self.embedder.encode([query], convert_to_tensor=True).cpu().detach().numpy() |
|
distances, indices = self.faiss_index.search(query_embedding, top_k) |
|
|
|
results = [] |
|
for i, idx in enumerate(indices[0]): |
|
product = self.data_cleaned.iloc[idx].to_dict() |
|
product['score'] = float(distances[0][i]) |
|
results.append(product) |
|
|
|
return results |
|
|
|
|