VoiceBot / services /data_service.py
Chris4K's picture
Create services/data_service.py
f91204a verified
raw
history blame
3.75 kB
# services/data_service.py
from typing import List, Dict, Any, Optional
import pandas as pd
import faiss
import numpy as np
import aiohttp
from datetime import datetime
import logging
from config.config import settings
from functools import lru_cache
logger = logging.getLogger(__name__)
class DataService:
def __init__(self, model_service):
self.embedder = model_service.embedder
self.cache = {}
self.last_update = None
self.faiss_index = None
self.data_cleaned = None
async def fetch_csv_data(self) -> pd.DataFrame:
async with aiohttp.ClientSession() as session:
for attempt in range(settings.MAX_RETRIES):
try:
async with session.get(settings.CSV_URL) as response:
if response.status == 200:
content = await response.text()
return pd.read_csv(pd.StringIO(content), sep='|')
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {e}")
if attempt == settings.MAX_RETRIES - 1:
raise
async def prepare_data_and_index(self) -> tuple:
current_time = datetime.now()
# Check cache validity
if (self.last_update and
(current_time - self.last_update).seconds < settings.CACHE_DURATION and
self.cache):
return self.cache['data'], self.cache['index']
data = await self.fetch_csv_data()
# Data cleaning and preparation
columns_to_keep = [
'ID', 'Name', 'Description', 'Price',
'ProductCategory', 'Grammage',
'BasePriceText', 'Rating', 'RatingCount',
'Ingredients', 'CreationDate', 'Keywords', 'Brand'
]
self.data_cleaned = data[columns_to_keep].copy()
self.data_cleaned['Description'] = self.data_cleaned['Description'].str.replace(
r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True
)
# Improved text combination with weights
self.data_cleaned['combined_text'] = self.data_cleaned.apply(
lambda row: (
f"{row['Name']} {row['Name']} " # Double weight for name
f"{row['Description']} "
f"{row['Keywords'] if pd.notnull(row['Keywords']) else ''} "
f"{row['ProductCategory'] if pd.notnull(row['ProductCategory']) else ''}"
).strip(),
axis=1
)
# Create FAISS index
embeddings = self.embedder.encode(
self.data_cleaned['combined_text'].tolist(),
convert_to_tensor=True,
show_progress_bar=True
).cpu().detach().numpy()
d = embeddings.shape[1]
self.faiss_index = faiss.IndexFlatL2(d)
self.faiss_index.add(embeddings)
# Update cache
self.cache = {
'data': self.data_cleaned,
'index': self.faiss_index
}
self.last_update = current_time
return self.data_cleaned, self.faiss_index
async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
if not self.faiss_index:
await self.prepare_data_and_index()
query_embedding = self.embedder.encode([query], convert_to_tensor=True).cpu().detach().numpy()
distances, indices = self.faiss_index.search(query_embedding, top_k)
results = []
for i, idx in enumerate(indices[0]):
product = self.data_cleaned.iloc[idx].to_dict()
product['score'] = float(distances[0][i])
results.append(product)
return results