|
import spaces |
|
import torch |
|
import torch.nn.functional as F |
|
from torch import Tensor |
|
from transformers import AutoTokenizer, AutoModel |
|
import threading |
|
import queue |
|
import gradio as gr |
|
import os |
|
|
|
title = """ |
|
# 👋🏻Welcome to 🙋🏻♂️Tonic's 🐣e5-mistral🛌🏻Embeddings """ |
|
description = """ |
|
You can use this ZeroGPU Space to test out the current model [intfloat/e5-mistral-7b-instruct](https://huggingface.co./intfloat/e5-mistral-7b-instruct). 🐣e5-mistral🛌🏻 has a larger context🪟window, a different prompting/return🛠️mechanism and generally better results than other embedding models. use it via API to create embeddings or try out the sentence similarity to see how various optimization parameters affect performance. |
|
You can also use 🐣e5-mistral🛌🏻 by cloning this space. 🧬🔬🔍 Simply click here: <a style="display:inline-block" href="https://huggingface.co./spaces/Tonic/e5?duplicate=true"><img src="https://img.shields.io/badge/-Duplicate%20Space-blue?labelColor=white&style=flat&logo=&logoWidth=14" alt="Duplicate Space"></a></h3> |
|
Join us : 🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community 👻 [![Join us on Discord](https://img.shields.io/discord/1109943800132010065?label=Discord&logo=discord&style=flat-square)](https://discord.gg/GWpVpekp) On 🤗Huggingface: [TeamTonic](https://huggingface.co./TeamTonic) & [MultiTransformer](https://huggingface.co./MultiTransformer) On 🌐Github: [Tonic-AI](https://github.com/tonic-ai) & contribute to 🌟 [DataTonic](https://github.com/Tonic-AI/DataTonic) 🤗Big thanks to Yuvi Sharma and all the folks at huggingface for the community grant 🤗 |
|
""" |
|
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:30' |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
tasks = { |
|
'ArguAna': 'Given a claim, find documents that refute the claim', |
|
'ClimateFEVER': 'Given a claim about climate change, retrieve documents that support or refute the claim', |
|
'DBPedia': 'Given a query, retrieve relevant entity descriptions from DBPedia', |
|
'FEVER': 'Given a claim, retrieve documents that support or refute the claim', |
|
'FiQA2018': 'Given a financial question, retrieve user replies that best answer the question', |
|
'HotpotQA': 'Given a multi-hop question, retrieve documents that can help answer the question', |
|
'MSMARCO': 'Given a web search query, retrieve relevant passages that answer the query', |
|
'NFCorpus': 'Given a question, retrieve relevant documents that best answer the question', |
|
'NQ': 'Given a question, retrieve Wikipedia passages that answer the question', |
|
'QuoraRetrieval': 'Given a question, retrieve questions that are semantically equivalent to the given question', |
|
'SCIDOCS': 'Given a scientific paper title, retrieve paper abstracts that are cited by the given paper', |
|
'SciFact': 'Given a scientific claim, retrieve documents that support or refute the claim', |
|
'Touche2020': 'Given a question, retrieve detailed and persuasive arguments that answer the question', |
|
'TRECCOVID': 'Given a query on COVID-19, retrieve documents that answer the query', |
|
} |
|
|
|
|
|
|
|
embedding_request_queue = queue.Queue() |
|
embedding_response_queue = queue.Queue() |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained('intfloat/e5-mistral-7b-instruct') |
|
model = AutoModel.from_pretrained('intfloat/e5-mistral-7b-instruct', torch_dtype=torch.float16, device_map=device) |
|
|
|
def last_token_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor: |
|
left_padding = (attention_mask[:, -1].sum() == attention_mask.shape[0]) |
|
if left_padding: |
|
return last_hidden_states[:, -1] |
|
else: |
|
sequence_lengths = attention_mask.sum(dim=1) - 1 |
|
batch_size = last_hidden_states.shape[0] |
|
return last_hidden_states[torch.arange(batch_size, device=last_hidden_states.device), sequence_lengths] |
|
|
|
def clear_cuda_cache(): |
|
torch.cuda.empty_cache() |
|
|
|
def free_memory(*args): |
|
for arg in args: |
|
del arg |
|
|
|
def load_corpus_from_json(file_path): |
|
with open(file_path, 'r') as file: |
|
data = json.load(file) |
|
return data |
|
|
|
|
|
def embedding_worker(): |
|
while True: |
|
|
|
item = embedding_request_queue.get() |
|
if item is None: |
|
break |
|
selected_task, input_text = item |
|
embeddings = compute_embeddings(selected_task, input_text) |
|
formatted_response = format_response(embeddings) |
|
|
|
embedding_response_queue.put(formatted_response) |
|
embedding_request_queue.task_done() |
|
clear_cuda_cache() |
|
|
|
threading.Thread(target=embedding_worker, daemon=True).start() |
|
|
|
|
|
@spaces.GPU |
|
def compute_embeddings(selected_task, input_text): |
|
try: |
|
task_description = tasks[selected_task] |
|
except KeyError: |
|
print(f"Selected task not found: {selected_task}") |
|
return f"Error: Task '{selected_task}' not found. Please select a valid task." |
|
max_length = 2048 |
|
processed_texts = [f'Instruct: {task_description}\nQuery: {input_text}'] |
|
|
|
batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True) |
|
batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']] |
|
batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt') |
|
batch_dict = {k: v.to(device) for k, v in batch_dict.items()} |
|
outputs = model(**batch_dict) |
|
embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask']) |
|
embeddings = F.normalize(embeddings, p=2, dim=1) |
|
embeddings_list = embeddings.detach().cpu().numpy().tolist() |
|
clear_cuda_cache() |
|
return embeddings_list |
|
|
|
@spaces.GPU |
|
def decode_embedding(embedding_str): |
|
try: |
|
embedding = [float(num) for num in embedding_str.split(',')] |
|
embedding_tensor = torch.tensor(embedding, dtype=torch.float16, device=device) |
|
decoded_embedding = tokenizer.decode(embedding_tensor[0], skip_special_tokens=True) |
|
return decoded_embedding.cpu().numpy().tolist() |
|
except Exception as e: |
|
return f"Error in decoding: {str(e)}" |
|
|
|
@spaces.GPU |
|
def compute_similarity(selected_task, sentence1, sentence2, extra_sentence1, extra_sentence2): |
|
try: |
|
task_description = tasks[selected_task] |
|
except KeyError: |
|
print(f"Selected task not found: {selected_task}") |
|
return f"Error: Task '{selected_task}' not found. Please select a valid task." |
|
|
|
embeddings1 = compute_embeddings(selected_task, sentence1) |
|
embeddings2 = compute_embeddings(selected_task, sentence2) |
|
embeddings3 = compute_embeddings(selected_task, extra_sentence1) |
|
embeddings4 = compute_embeddings(selected_task, extra_sentence2) |
|
|
|
|
|
embeddings_tensor1 = torch.tensor(embeddings1).to(device).half() |
|
embeddings_tensor2 = torch.tensor(embeddings2).to(device).half() |
|
embeddings_tensor3 = torch.tensor(embeddings3).to(device).half() |
|
embeddings_tensor4 = torch.tensor(embeddings4).to(device).half() |
|
|
|
|
|
similarity1 = compute_cosine_similarity(embeddings1, embeddings2) |
|
similarity2 = compute_cosine_similarity(embeddings1, embeddings3) |
|
similarity3 = compute_cosine_similarity(embeddings1, embeddings4) |
|
|
|
|
|
free_memory(embeddings1, embeddings2, embeddings3, embeddings4) |
|
|
|
similarity_scores = {"Similarity 1-2": similarity1, "Similarity 1-3": similarity2, "Similarity 1-4": similarity3} |
|
clear_cuda_cache() |
|
return similarity_scores |
|
|
|
@spaces.GPU |
|
def compute_cosine_similarity(emb1, emb2): |
|
tensor1 = torch.tensor(emb1).to(device).half() |
|
tensor2 = torch.tensor(emb2).to(device).half() |
|
similarity = F.cosine_similarity(tensor1, tensor2).item() |
|
free_memory(tensor1, tensor2) |
|
clear_cuda_cache() |
|
return similarity |
|
|
|
|
|
@spaces.GPU |
|
def compute_embeddings_batch(input_texts): |
|
max_length = 2042 |
|
processed_texts = [f'Instruct: {task_description}\nQuery: {text}' for text in input_texts] |
|
|
|
batch_dict = tokenizer(processed_texts, max_length=max_length - 1, return_attention_mask=False, padding=False, truncation=True) |
|
batch_dict['input_ids'] = [input_ids + [tokenizer.eos_token_id] for input_ids in batch_dict['input_ids']] |
|
batch_dict = tokenizer.pad(batch_dict, padding=True, return_attention_mask=True, return_tensors='pt') |
|
batch_dict = {k: v.to(device) for k, v in batch_dict.items()} |
|
outputs = model(**batch_dict) |
|
embeddings = last_token_pool(outputs.last_hidden_state, batch_dict['attention_mask']) |
|
embeddings = F.normalize(embeddings, p=2, dim=1) |
|
clear_cuda_cache() |
|
return embeddings.detach().cpu().numpy() |
|
|
|
def semantic_search(query_embedding, corpus_embeddings, top_k=5): |
|
scores = np.dot(corpus_embeddings, query_embedding.T).flatten() |
|
top_k_indices = np.argsort(scores)[::-1][:top_k] |
|
return top_k_indices, scores[top_k_indices] |
|
|
|
def search_similar_sentences(input_question, corpus_sentences, corpus_embeddings): |
|
question_embedding = compute_embeddings_batch([input_question])[0] |
|
top_k_indices, top_k_scores = semantic_search(question_embedding, corpus_embeddings) |
|
results = [(corpus_sentences[i], top_k_scores[i]) for i in top_k_indices] |
|
return results |
|
|
|
|
|
def format_response(embeddings): |
|
return { |
|
"data": [ |
|
{ |
|
"embedding": embeddings, |
|
"index": 0, |
|
"object": "embedding" |
|
} |
|
], |
|
"model": "e5-mistral", |
|
"object": "list", |
|
"usage": { |
|
"prompt_tokens": 17, |
|
"total_tokens": 17 |
|
} |
|
} |
|
|
|
def generate_and_format_embeddings(selected_task, input_text): |
|
embedding_request_queue.put((selected_task, input_text)) |
|
response = embedding_response_queue.get() |
|
embedding_response_queue.task_done() |
|
clear_cuda_cache() |
|
return response |
|
|
|
|
|
def app_interface(): |
|
corpus_sentences = [] |
|
corpus_embeddings = [] |
|
with gr.Blocks() as demo: |
|
gr.Markdown(title) |
|
gr.Markdown(description) |
|
with gr.Row(): |
|
task_dropdown = gr.Dropdown(list(tasks.keys()), label="Select a Task", value=list(tasks.keys())[0]) |
|
|
|
with gr.Tab("Embedding Generation"): |
|
input_text_box = gr.Textbox(label="📖Input Text") |
|
compute_button = gr.Button("Try🐣🛌🏻e5") |
|
output_display = gr.Textbox(label="🐣e5-mistral🛌🏻 Embeddings") |
|
compute_button.click( |
|
fn=compute_embeddings, |
|
inputs=[task_dropdown, input_text_box], |
|
outputs=output_display |
|
) |
|
|
|
with gr.Tab("Sentence Similarity"): |
|
sentence1_box = gr.Textbox(label="'Focus Sentence' - The 'Subject'") |
|
sentence2_box = gr.Textbox(label="'Input Sentence' - 1") |
|
extra_sentence1_box = gr.Textbox(label="'Input Sentence' - 2") |
|
extra_sentence2_box = gr.Textbox(label="'Input Sentence' - 3") |
|
similarity_button = gr.Button("Compute Similarity") |
|
similarity_output = gr.Textbox(label="🐣e5-mistral🛌🏻 Similarity Scores") |
|
similarity_button.click( |
|
fn=compute_similarity, |
|
inputs=[task_dropdown, sentence1_box, sentence2_box, extra_sentence1_box, extra_sentence2_box], |
|
outputs=similarity_output |
|
) |
|
with gr.Tab("Load Corpus"): |
|
json_uploader = gr.File(label="Upload JSON File") |
|
load_corpus_button = gr.Button("Load Corpus") |
|
corpus_status = gr.Textbox(label="Corpus Status", value="Corpus not loaded") |
|
|
|
def load_corpus(file_info): |
|
if file_info is None: |
|
return "No file uploaded. Please upload a JSON file." |
|
try: |
|
global corpus_sentences, corpus_embeddings |
|
corpus_sentences = load_corpus_from_json(file_info['name']) |
|
corpus_embeddings = compute_embeddings_batch(corpus_sentences) |
|
return "Corpus loaded successfully with {} sentences.".format(len(corpus_sentences)) |
|
except Exception as e: |
|
return "Error loading corpus: {}".format(e) |
|
|
|
load_corpus_button.click( |
|
fn=load_corpus, |
|
inputs=json_uploader, |
|
outputs=corpus_status |
|
) |
|
|
|
with gr.Tab("Semantic Search"): |
|
input_question_box = gr.Textbox(label="Enter your question") |
|
search_button = gr.Button("Search") |
|
search_results_output = gr.Textbox(label="Search Results") |
|
|
|
def perform_search(input_question): |
|
if not corpus_sentences or not corpus_embeddings: |
|
return "Corpus is not loaded. Please load a corpus first." |
|
return search_similar_sentences(input_question, corpus_sentences, corpus_embeddings) |
|
|
|
search_button.click( |
|
fn=perform_search, |
|
inputs=input_question_box, |
|
outputs=search_results_output |
|
) |
|
|
|
with gr.Tab("Connector-like Embeddings"): |
|
with gr.Row(): |
|
input_text_box_connector = gr.Textbox(label="Input Text", placeholder="Enter text or array of texts") |
|
model_dropdown_connector = gr.Dropdown(label="Model", choices=["ArguAna", "ClimateFEVER", "DBPedia", "FEVER", "FiQA2018", "HotpotQA", "MSMARCO", "NFCorpus", "NQ", "QuoraRetrieval", "SCIDOCS", "SciFact", "Touche2020", "TRECCOVID"], value="text-embedding-ada-002") |
|
encoding_format_connector = gr.Radio(label="Encoding Format", choices=["float", "base64"], value="float") |
|
user_connector = gr.Textbox(label="User", placeholder="Enter user identifier (optional)") |
|
submit_button_connector = gr.Button("Generate Embeddings") |
|
output_display_connector = gr.JSON(label="Embeddings Output") |
|
submit_button_connector.click( |
|
fn=generate_and_format_embeddings, |
|
inputs=[model_dropdown_connector, input_text_box_connector], |
|
outputs=output_display_connector |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
input_text_box |
|
with gr.Column(): |
|
compute_button |
|
output_display |
|
|
|
return demo |
|
|
|
app_interface().queue() |
|
app_interface().launch() |