File size: 8,060 Bytes
dae6371 27090a6 421602f 9a9379e dae6371 27090a6 421602f 27090a6 421602f 27090a6 421602f 27090a6 421602f 27090a6 421602f 27090a6 9a9379e 652b0cf 9a9379e 421602f 9a9379e 421602f 9a9379e b25204d 9a9379e 421602f 652b0cf 9a9379e 53e6d17 9a9379e 53e6d17 9a9379e 27090a6 421602f 652b0cf 421602f dae6371 9a9379e 421602f 652b0cf 9a9379e 53e6d17 dae6371 652b0cf dae6371 652b0cf 27090a6 dae6371 27090a6 dae6371 9a9379e 421602f dae6371 f7cdb6c dae6371 27090a6 dae6371 9a9379e b25204d |
|
import gradio as gr
from sentence_transformers import SentenceTransformer, util
import json
import os
import time
import threading
import queue
# Загружаем модель
model_name = "HIT-TMG/KaLM-embedding-multilingual-mini-instruct-v1"
model = SentenceTransformer(model_name)
# Имя файла для сохранения эмбеддингов
embeddings_file = f"movie_embeddings_{model_name.replace('/', '_')}.json"
# Имя файла для сохранения эмбеддингов запросов
query_embeddings_file = f"query_embeddings_{model_name.replace('/', '_')}.json"
# Загружаем данные из файла movies.json
try:
with open("movies.json", "r", encoding="utf-8") as f:
movies_data = json.load(f)
except FileNotFoundError:
print("Ошибка: Файл movies.json не найден.")
movies_data = []
# Загружаем эмбеддинги фильмов
if os.path.exists(embeddings_file):
with open(embeddings_file, "r", encoding="utf-8") as f:
movie_embeddings = json.load(f)
print("Загружены эмбеддинги фильмов из файла.")
else:
movie_embeddings = {}
# Загружаем эмбеддинги запросов
if os.path.exists(query_embeddings_file):
with open(query_embeddings_file, "r", encoding="utf-8") as f:
query_embeddings = json.load(f)
print("Загружены эмбеддинги запросов из файла.")
else:
query_embeddings = {}
# Очередь для необработанных фильмов
movies_queue = queue.Queue()
for movie in movies_data:
if movie["name"] not in movie_embeddings:
movies_queue.put(movie)
# Флаг, указывающий, что обработка фильмов завершена
processing_complete = False
# Блокировка для доступа к movie_embeddings
movie_embeddings_lock = threading.Lock()
def encode_string(text):
"""Кодирует строку в эмбеддинг."""
return model.encode(text, convert_to_tensor=True)
def process_movies():
"""
Обрабатывает фильмы из очереди, создавая для них эмбеддинги.
"""
global processing_complete
while True:
try:
movie = movies_queue.get(timeout=1) # Тайм-аут 1 секунда
except queue.Empty:
print("Очередь фильмов пуста.")
processing_complete = True
break
title = movie["name"]
print(f"Создается эмбеддинг для фильма '{title}'...")
embedding_string = f"Название: {movie['name']}\nГод: {movie['year']}\nЖанры: {movie['genresList']}\nОписание: {movie['description']}"
embedding = encode_string(embedding_string).tolist()
with movie_embeddings_lock:
movie_embeddings[title] = embedding
# Сохраняем эмбеддинги в файл после обработки каждого фильма
with open(embeddings_file, "w", encoding="utf-8") as f:
json.dump(movie_embeddings, f, ensure_ascii=False, indent=4)
print(f"Эмбеддинг для фильма '{title}' создан и сохранен.")
print("Обработка фильмов завершена.")
def get_query_embedding(query):
"""
Возвращает эмбеддинг для запроса. Если эмбеддинг уже создан, возвращает его из словаря.
Иначе создает эмбеддинг, сохраняет его и возвращает.
"""
if query in query_embeddings:
print(f"Эмбеддинг для запроса '{query}' уже существует.")
return query_embeddings[query]
else:
print(f"Создается эмбеддинг для запроса '{query}'...")
embedding = encode_string(query).tolist()
query_embeddings[query] = embedding
# Сохраняем эмбеддинги запросов в файл
with open(query_embeddings_file, "w", encoding="utf-8") as f:
json.dump(query_embeddings, f, ensure_ascii=False, indent=4)
print(f"Эмбеддинг для запроса '{query}' создан и сохранен.")
return embedding
def search_movies(query, top_k=3):
"""
Ищет наиболее похожие фильмы по запросу.
Args:
query: Текстовый запрос.
top_k: Количество возвращаемых результатов.
Returns:
Строку с результатами поиска в формате HTML.
"""
start_time = time.time()
print(f"\n\033[1mПоиск по запросу: '{query}'\033[0m")
query_embedding_tensor = encode_string(query)
with movie_embeddings_lock:
# Делаем копию словаря, чтобы избежать ошибок при изменении оригинала во время итерации
current_movie_embeddings = movie_embeddings.copy()
if not current_movie_embeddings:
return "<p>Пока что нет обработанных фильмов. Попробуйте позже.</p>"
# Создаем словарь с описаниями фильмов
movie_descriptions = {}
for movie in movies_data:
movie_descriptions[movie["name"]] = f"Название: {movie['name']}\nГод: {movie['year']}\nЖанры: {movie['genresList']}\nОписание: {movie['description']}"
# Вычисляем косинусное сходство и сохраняем в список кортежей (название фильма, сходство)
similarity_scores = []
for title, embedding in current_movie_embeddings.items():
similarity = util.pytorch_cos_sim(query_embedding_tensor, encode_string(movie_descriptions[title]))[0][0].item()
similarity_scores.append((title, similarity))
# Сортируем фильмы по убыванию сходства
sorted_movies = sorted(similarity_scores, key=lambda item: item[1], reverse=True)
results_html = ""
for title, score in sorted_movies[:top_k]:
# Ищем полное описание фильма в исходных данных
for movie in movies_data:
if movie["name"] == title:
description = movie["description"]
year = movie["year"]
genres = movie["genresList"]
break
results_html += f"<h3><b>{title} ({year})</b></h3>"
results_html += f"<p><b>Жанры:</b> {genres}</p>"
results_html += f"<p><b>Описание:</b> {description}</p>"
results_html += f"<p><b>Сходство:</b> {score:.4f}</p>"
results_html += "<hr>"
end_time = time.time()
execution_time = end_time - start_time
print(f"Поиск завершен за {execution_time:.4f} секунд.")
return results_html
# Поток для обработки фильмов
processing_thread = threading.Thread(target=process_movies)
# Создаем интерфейс Gradio
iface = gr.Interface(
fn=search_movies,
inputs=gr.Textbox(label="Введите запрос:"),
outputs=gr.HTML(label="Результаты поиска:"),
title="Поиск фильмов по описанию",
description="Введите запрос, и система найдет наиболее похожие фильмы по их описаниям.",
examples=[
["Фильм про ограбление"],
["Комедия 2019 года"],
["Фантастика про космос"],
],
)
# Запускаем поток для обработки фильмов
processing_thread.start()
# Запускаем приложение
iface.queue()
iface.launch() |