import gradio as gr from sentence_transformers import SentenceTransformer, util import os import time import threading import queue import torch import psycopg2 import zlib import numpy as np from urllib.parse import urlparse import logging from sklearn.preprocessing import normalize from concurrent.futures import ThreadPoolExecutor import requests import voyageai # Настройка логирования logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Настройки базы данных PostgreSQL DATABASE_URL = os.environ.get("DATABASE_URL") if DATABASE_URL is None: raise ValueError("DATABASE_URL environment variable not set.") parsed_url = urlparse(DATABASE_URL) db_params = { "host": parsed_url.hostname, "port": parsed_url.port, "database": parsed_url.path.lstrip("/"), "user": parsed_url.username, "password": parsed_url.password, "sslmode": "require" } # Загружаем модель эмбеддингов model_name = "BAAI/bge-m3" logging.info(f"Загрузка модели {model_name}...") model = SentenceTransformer(model_name) logging.info("Модель загружена успешно.") # Voyage AI API Key VOYAGE_API_KEY = os.environ.get("VOYAGE_API_KEY") if VOYAGE_API_KEY is None: raise ValueError("VOYAGE_API_KEY environment variable not set.") # Инициализация клиента Voyage AI vo = voyageai.Client(api_key=VOYAGE_API_KEY) # Имена таблиц embeddings_table = "movie_embeddings" query_cache_table = "query_cache" # Максимальный размер таблицы кэша запросов в байтах (50MB) MAX_CACHE_SIZE = 50 * 1024 * 1024 # Загружаем данные из файла movies.json try: import json with open("movies.json", "r", encoding="utf-8") as f: movies_data = json.load(f) logging.info(f"Загружено {len(movies_data)} фильмов из movies.json") except FileNotFoundError: logging.error("Ошибка: Файл movies.json не найден.") movies_data = [] # Очередь для необработанных фильмов movies_queue = queue.Queue() # Флаг, указывающий, что обработка фильмов завершена processing_complete = False # Флаг, указывающий, что выполняется поиск search_in_progress = False # Блокировка для доступа к базе данных db_lock = threading.Lock() # Размер пакета для обработки эмбеддингов batch_size = 32 # Количество потоков для параллельной обработки num_threads = 5 # Количество потоков для параллельного реранкинга rerank_threads = 3 # Ограничено лимитом RPM # Лимиты Voyage AI (запросов в минуту, токенов в минуту) - БЕСПЛАТНЫЙ АККАУНТ RPM_LIMIT = 3 TPM_LIMIT = 10000 # Переменные для отслеживания текущего использования current_rpm = 0 current_tpm = 0 last_reset_time = time.time() # Среднее количество токенов на описание фильма (можно вычислить один раз при запуске) avg_tokens_per_movie = 150 # Замените на более точное значение, если оно известно def get_db_connection(): """Устанавливает соединение с базой данных.""" try: conn = psycopg2.connect(**db_params) return conn except Exception as e: logging.error(f"Ошибка подключения к базе данных: {e}") return None def setup_database(): """Настраивает базу данных: создает расширение, таблицы и индексы.""" conn = get_db_connection() if conn is None: return try: with conn.cursor() as cur: # Создаем расширение pgvector если его нет cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") # Удаляем существующие таблицы если они есть # cur.execute(f"DROP TABLE IF EXISTS {embeddings_table}, {query_cache_table};") # Создаем таблицу для хранения эмбеддингов фильмов cur.execute(f""" CREATE TABLE {embeddings_table} ( movie_id INTEGER PRIMARY KEY, embedding_crc32 BIGINT, string_crc32 BIGINT, model_name TEXT, embedding vector(1024) ); CREATE INDEX ON {embeddings_table} (string_crc32); """) # Создаем таблицу для кэширования запросов cur.execute(f""" CREATE TABLE {query_cache_table} ( query_crc32 BIGINT PRIMARY KEY, query TEXT, model_name TEXT, embedding vector(1024), created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX ON {query_cache_table} (query_crc32); CREATE INDEX ON {query_cache_table} (created_at); """) conn.commit() logging.info("База данных успешно настроена.") except Exception as e: logging.error(f"Ошибка при настройке базы данных: {e}") conn.rollback() finally: conn.close() # Настраиваем базу данных при запуске setup_database() def calculate_crc32(text): """Вычисляет CRC32 для строки.""" return zlib.crc32(text.encode('utf-8')) & 0xFFFFFFFF def encode_string(text): """Кодирует строку в эмбеддинг.""" embedding = model.encode(text, convert_to_tensor=True, normalize_embeddings=True) return embedding.cpu().numpy() def get_movies_without_embeddings(): """Получает список фильмов, для которых нужно создать эмбеддинги.""" conn = get_db_connection() if conn is None: return [] movies_to_process = [] try: with conn.cursor() as cur: # Получаем список ID фильмов, которые уже есть в базе cur.execute(f"SELECT movie_id FROM {embeddings_table}") existing_ids = {row[0] for row in cur.fetchall()} # Фильтруем только те фильмы, которых нет в базе for movie in movies_data: if movie['id'] not in existing_ids: movies_to_process.append(movie) logging.info(f"Найдено {len(movies_to_process)} фильмов для обработки.") except Exception as e: logging.error(f"Ошибка при получении списка фильмов для обработки: {e}") finally: conn.close() return movies_to_process def get_embedding_from_db(conn, table_name, crc32_column, crc32_value, model_name): """Получает эмбеддинг из базы данных.""" try: with conn.cursor() as cur: cur.execute(f"SELECT embedding FROM {table_name} WHERE {crc32_column} = %s AND model_name = %s", (crc32_value, model_name)) result = cur.fetchone() if result and result[0]: # Нормализуем эмбеддинг после извлечения из БД return normalize(np.array(result[0]).reshape(1, -1))[0] except Exception as e: logging.error(f"Ошибка при получении эмбеддинга из БД: {e}") return None def insert_embedding(conn, table_name, movie_id, embedding_crc32, string_crc32, embedding): """Вставляет эмбеддинг в базу данных.""" try: # Нормализуем эмбеддинг перед сохранением normalized_embedding = normalize(embedding.reshape(1, -1))[0] with conn.cursor() as cur: cur.execute(f""" INSERT INTO {table_name} (movie_id, embedding_crc32, string_crc32, model_name, embedding) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (movie_id) DO NOTHING """, (movie_id, embedding_crc32, string_crc32, model_name, normalized_embedding.tolist())) conn.commit() return True except Exception as e: logging.error(f"Ошибка при вставке эмбеддинга: {e}") conn.rollback() return False def process_batch(batch): """Обрабатывает пакет фильмов, создавая для них эмбеддинги.""" conn = get_db_connection() if conn is None: return try: for movie in batch: embedding_string = f"Название: {movie['name']}\nГод: {movie['year']}\nЖанры: {movie['genreslist']}\nОписание: {movie['description']}" string_crc32 = calculate_crc32(embedding_string) # Проверяем существующий эмбеддинг existing_embedding = get_embedding_from_db(conn, embeddings_table, "string_crc32", string_crc32, model_name) if existing_embedding is None: embedding = encode_string(embedding_string) embedding_crc32 = calculate_crc32(str(embedding.tolist())) if insert_embedding(conn, embeddings_table, movie['id'], embedding_crc32, string_crc32, embedding): logging.info(f"Сохранен эмбеддинг для '{movie['name']}'") else: logging.error(f"Ошибка сохранения эмбеддинга для '{movie['name']}'") else: logging.info(f"Эмбеддинг для '{movie['name']}' уже существует") except Exception as e: logging.error(f"Ошибка при обработке пакета фильмов: {e}") finally: conn.close() def process_movies(): """Обрабатывает фильмы, создавая для них эмбеддинги.""" global processing_complete logging.info("Начало обработки фильмов.") # Получаем список фильмов, которые нужно обработать movies_to_process = get_movies_without_embeddings() if not movies_to_process: logging.info("Все фильмы уже обработаны.") processing_complete = True return # Добавляем фильмы в очередь for movie in movies_to_process: movies_queue.put(movie) with ThreadPoolExecutor(max_workers=num_threads) as executor: try: while not movies_queue.empty(): if search_in_progress: time.sleep(1) continue batch = [] while not movies_queue.empty() and len(batch) < batch_size: try: movie = movies_queue.get_nowait() batch.append(movie) except queue.Empty: break if not batch: break executor.submit(process_batch, batch) logging.info(f"Отправлен на обработку пакет из {len(batch)} фильмов.") except Exception as e: logging.error(f"Ошибка при обработке фильмов: {e}") processing_complete = True logging.info("Обработка фильмов завершена") def get_movie_embeddings(conn): """Загружает все эмбеддинги фильмов из базы данных.""" movie_embeddings = {} try: with conn.cursor() as cur: cur.execute(f"SELECT movie_id, embedding FROM {embeddings_table}") for movie_id, embedding in cur.fetchall(): # Находим название фильма по ID for movie in movies_data: if movie['id'] == movie_id: movie_embeddings[movie['name']] = normalize(np.array(embedding).reshape(1, -1))[0] break logging.info(f"Загружено {len(movie_embeddings)} эмбеддингов фильмов.") except Exception as e: logging.error(f"Ошибка при загрузке эмбеддингов фильмов: {e}") return movie_embeddings def check_and_wait_for_limits(): """Проверяет лимиты RPM и TPM и ожидает, если они исчерпаны.""" global current_rpm, current_tpm, last_reset_time elapsed_time = time.time() - last_reset_time if elapsed_time >= 60: current_rpm = 0 current_tpm = 0 last_reset_time = time.time() logging.info("Лимиты RPM и TPM сброшены.") if current_rpm >= RPM_LIMIT or current_tpm >= TPM_LIMIT: wait_time = 60 - elapsed_time logging.warning(f"Превышены лимиты RPM ({current_rpm}/{RPM_LIMIT}) или TPM ({current_tpm}/{TPM_LIMIT}). Ожидание {wait_time:.2f} секунд...") time.sleep(max(0, wait_time)) current_rpm = 0 current_tpm = 0 last_reset_time = time.time() logging.info("Лимиты RPM и TPM сброшены после ожидания.") def create_optimized_batches(query, results, max_tokens_per_batch=TPM_LIMIT): """Создает батчи для реранкинга, оптимизированные по количеству токенов.""" global avg_tokens_per_movie batches = [] current_batch = [] current_batch_tokens = 0 query_tokens = vo.count_tokens([query], model="rerank-2") for movie_id, _ in results: movie = next((m for m in movies_data if m['id'] == movie_id), None) if movie: movie_info = f"Название: {movie['name']}\nГод: {movie['year']}\nЖанры: {movie['genreslist']}\nОписание: {movie['description']}" # Считаем токены, но не отправляем запрос если лимит уже исчерпан estimated_movie_tokens = avg_tokens_per_movie if (current_batch_tokens + query_tokens + estimated_movie_tokens) <= max_tokens_per_batch: current_batch.append((movie_id, _)) current_batch_tokens += estimated_movie_tokens else: batches.append(current_batch) current_batch = [(movie_id, _)] current_batch_tokens = estimated_movie_tokens if current_batch: batches.append(current_batch) return batches def rerank_batch_voyage(query, batch): """Переранжирует пакет результатов с помощью Voyage AI.""" global current_rpm, current_tpm check_and_wait_for_limits() url = "https://api.voyageai.com/v1/rerank" headers = { "Authorization": f"Bearer {VOYAGE_API_KEY}", "content-type": "application/json" } documents = [] movie_ids = [] for movie_id, _ in batch: movie = next((m for m in movies_data if m['id'] == movie_id), None) if movie: movie_info = f"Название: {movie['name']}\nГод: {movie['year']}\nЖанры: {movie['genreslist']}\nОписание: {movie['description']}" documents.append(movie_info) movie_ids.append(movie_id) payload = { "query": query, "documents": documents, "model": "rerank-2", # Можно использовать rerank-2-lite для более быстрой, но менее точной модели "return_documents": False, "truncation": True } try: batch_tokens = vo.count_tokens([query] + documents, model="rerank-2") current_rpm += 1 current_tpm += batch_tokens logging.info(f"Отправка запроса к Voyage AI. RPM: {current_rpm}/{RPM_LIMIT}, TPM: {current_tpm}/{TPM_LIMIT}, Токенов в запросе: {batch_tokens}") response = requests.post(url, headers=headers, json=payload) response.raise_for_status() # Проверка на ошибки HTTP response_json = response.json() reranked_results = [] for item in response_json['data']: reranked_results.append((movie_ids[item['index']], item['relevance_score'])) logging.info(f"Voyage AI: Успешно переранжирован батч. Задействовано токенов: {response_json['usage']['total_tokens']}") return reranked_results except requests.exceptions.RequestException as e: logging.error(f"Ошибка запроса к Voyage AI: {e}") if response.status_code == 429: # Too Many Requests logging.warning("Слишком много запросов к Voyage AI. Ожидание сброса лимитов...") check_and_wait_for_limits() return rerank_batch_voyage(query, batch) # Повторная попытка после ожидания return [] except KeyError as e: logging.error(f"Ошибка обработки ответа от Voyage AI: {e}. Полный ответ: {response_json}") return [] def rerank_results(query, results): """Переранжирует результаты поиска с помощью Voyage AI.""" logging.info(f"Начало переранжирования для запроса: '{query}'") # Создаем оптимизированные батчи batches = create_optimized_batches(query, results) reranked_results = [] with ThreadPoolExecutor(max_workers=rerank_threads) as executor: futures = [] batch_num = 0 for batch in batches: logging.info(f"Отправка на переранжирование батча {batch_num+1} ({len(batch)} фильмов)") future = executor.submit(rerank_batch_voyage, query, batch) futures.append(future) batch_num += 1 # Сбор результатов for i, future in enumerate(futures): try: batch_result = future.result() reranked_results.extend(batch_result) logging.info(f"Завершен реранк батча {i+1}") except Exception as e: logging.error(f"Ошибка при переранжировании батча {i+1}: {e}") reranked_results = sorted(reranked_results, key=lambda x: x[1], reverse=True) logging.info("Переранжирование завершено.") return reranked_results def search_movies(query, top_k=20): """Выполняет поиск фильмов по запросу.""" global search_in_progress search_in_progress = True start_time = time.time() try: conn = get_db_connection() if conn is None: return "

Ошибка подключения к базе данных

" query_crc32 = calculate_crc32(query) query_embedding = get_embedding_from_db(conn, query_cache_table, "query_crc32", query_crc32, model_name) if query_embedding is None: query_embedding = encode_string(query) try: with conn.cursor() as cur: cur.execute(f""" INSERT INTO {query_cache_table} (query_crc32, query, model_name, embedding) VALUES (%s, %s, %s, %s) ON CONFLICT (query_crc32) DO NOTHING """, (query_crc32, query, model_name, query_embedding.tolist())) conn.commit() logging.info(f"Сохранен новый эмбеддинг запроса: {query}") except Exception as e: logging.error(f"Ошибка при сохранении эмбеддинга запроса: {e}") conn.rollback() # Используем косинусное расстояние для поиска try: with conn.cursor() as cur: cur.execute(f""" WITH query_embedding AS ( SELECT embedding FROM {query_cache_table} WHERE query_crc32 = %s ) SELECT m.movie_id, 1 - (m.embedding <=> (SELECT embedding FROM query_embedding)) as similarity FROM {embeddings_table} m, query_embedding ORDER BY similarity DESC LIMIT %s """, (query_crc32, int(top_k * 2))) # Увеличиваем лимит до * 2 results = cur.fetchall() logging.info(f"Найдено {len(results)} предварительных результатов поиска.") except Exception as e: logging.error(f"Ошибка при выполнении поискового запроса: {e}") results = [] # Переранжируем результаты reranked_results = rerank_results(query, results) output = "" for movie_id, score in reranked_results[:top_k]: # Находим фильм по ID movie = next((m for m in movies_data if m['id'] == movie_id), None) if movie: output += f"

{movie['name']} ({movie['year']})

\n" output += f"

Жанры: {movie['genreslist']}

\n" output += f"

Описание: {movie['description']}

\n" output += f"

Релевантность (Voyage AI reranker score): {score:.4f}

\n" output += "
\n" search_time = time.time() - start_time logging.info(f"Поиск выполнен за {search_time:.2f} секунд.") return f"

Время поиска: {search_time:.2f} сек

{output}" except Exception as e: logging.error(f"Ошибка при выполнении поиска: {e}") return "

Произошла ошибка при выполнении поиска.

" finally: if conn: conn.close() search_in_progress = False # Запускаем обработку фильмов в отдельном потоке processing_thread = threading.Thread(target=process_movies) processing_thread.start() # Создаем интерфейс Gradio iface = gr.Interface( fn=search_movies, inputs=gr.Textbox(lines=2, placeholder="Введите запрос для поиска фильмов..."), outputs=gr.HTML(label="Результаты поиска"), title="Семантический поиск фильмов", description="Введите описание фильма, который вы ищете, и система найдет наиболее похожие фильмы." ) # Запускаем интерфейс iface.launch()