Chris4K commited on
Commit
f91204a
·
verified ·
1 Parent(s): f7c4a82

Create services/data_service.py

Browse files
Files changed (1) hide show
  1. services/data_service.py +104 -0
services/data_service.py ADDED
@@ -0,0 +1,104 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # services/data_service.py
2
+ from typing import List, Dict, Any, Optional
3
+ import pandas as pd
4
+ import faiss
5
+ import numpy as np
6
+ import aiohttp
7
+ from datetime import datetime
8
+ import logging
9
+ from config.config import settings
10
+ from functools import lru_cache
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+ class DataService:
15
+ def __init__(self, model_service):
16
+ self.embedder = model_service.embedder
17
+ self.cache = {}
18
+ self.last_update = None
19
+ self.faiss_index = None
20
+ self.data_cleaned = None
21
+
22
+ async def fetch_csv_data(self) -> pd.DataFrame:
23
+ async with aiohttp.ClientSession() as session:
24
+ for attempt in range(settings.MAX_RETRIES):
25
+ try:
26
+ async with session.get(settings.CSV_URL) as response:
27
+ if response.status == 200:
28
+ content = await response.text()
29
+ return pd.read_csv(pd.StringIO(content), sep='|')
30
+ except Exception as e:
31
+ logger.error(f"Attempt {attempt + 1} failed: {e}")
32
+ if attempt == settings.MAX_RETRIES - 1:
33
+ raise
34
+
35
+ async def prepare_data_and_index(self) -> tuple:
36
+ current_time = datetime.now()
37
+
38
+ # Check cache validity
39
+ if (self.last_update and
40
+ (current_time - self.last_update).seconds < settings.CACHE_DURATION and
41
+ self.cache):
42
+ return self.cache['data'], self.cache['index']
43
+
44
+ data = await self.fetch_csv_data()
45
+
46
+ # Data cleaning and preparation
47
+ columns_to_keep = [
48
+ 'ID', 'Name', 'Description', 'Price',
49
+ 'ProductCategory', 'Grammage',
50
+ 'BasePriceText', 'Rating', 'RatingCount',
51
+ 'Ingredients', 'CreationDate', 'Keywords', 'Brand'
52
+ ]
53
+
54
+ self.data_cleaned = data[columns_to_keep].copy()
55
+ self.data_cleaned['Description'] = self.data_cleaned['Description'].str.replace(
56
+ r'[^\w\s.,;:\'/?!€$%&()\[\]{}<>|=+\\-]', ' ', regex=True
57
+ )
58
+
59
+ # Improved text combination with weights
60
+ self.data_cleaned['combined_text'] = self.data_cleaned.apply(
61
+ lambda row: (
62
+ f"{row['Name']} {row['Name']} " # Double weight for name
63
+ f"{row['Description']} "
64
+ f"{row['Keywords'] if pd.notnull(row['Keywords']) else ''} "
65
+ f"{row['ProductCategory'] if pd.notnull(row['ProductCategory']) else ''}"
66
+ ).strip(),
67
+ axis=1
68
+ )
69
+
70
+ # Create FAISS index
71
+ embeddings = self.embedder.encode(
72
+ self.data_cleaned['combined_text'].tolist(),
73
+ convert_to_tensor=True,
74
+ show_progress_bar=True
75
+ ).cpu().detach().numpy()
76
+
77
+ d = embeddings.shape[1]
78
+ self.faiss_index = faiss.IndexFlatL2(d)
79
+ self.faiss_index.add(embeddings)
80
+
81
+ # Update cache
82
+ self.cache = {
83
+ 'data': self.data_cleaned,
84
+ 'index': self.faiss_index
85
+ }
86
+ self.last_update = current_time
87
+
88
+ return self.data_cleaned, self.faiss_index
89
+
90
+ async def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
91
+ if not self.faiss_index:
92
+ await self.prepare_data_and_index()
93
+
94
+ query_embedding = self.embedder.encode([query], convert_to_tensor=True).cpu().detach().numpy()
95
+ distances, indices = self.faiss_index.search(query_embedding, top_k)
96
+
97
+ results = []
98
+ for i, idx in enumerate(indices[0]):
99
+ product = self.data_cleaned.iloc[idx].to_dict()
100
+ product['score'] = float(distances[0][i])
101
+ results.append(product)
102
+
103
+ return results
104
+