Spaces:
Paused
Paused
import streamlit as st | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
from transformers.utils import is_flash_attn_2_available | |
from transformers import BitsAndBytesConfig | |
import pandas as pd | |
import os | |
import torch | |
import numpy as np | |
from scipy import sparse | |
from sklearn.metrics.pairwise import cosine_similarity | |
from scipy import sparse | |
from langchain_community.vectorstores import Chroma | |
from langchain_community.embeddings.sentence_transformer import ( | |
SentenceTransformerEmbeddings, | |
) | |
# SET TO WIDE LAYOUT | |
st.set_page_config(layout="wide") | |
#_______________________________________________SET VARIABLES_____________________________________________________ | |
MODEL_ID = 'google/gemma-2b-it' | |
CHUNK_SIZE = 1000 | |
OVERLAP_SIZE = 100 | |
EMBEDDING = "all-MiniLM-L6-v2" | |
COLLECTION_NAME = f'vb_summarizer_{EMBEDDING}_test' | |
CHROMA_DATA_PATH = 'feedback_360' | |
#_______________________________________________LOAD MODELS_____________________________________________________ | |
# LOAD MODEL | |
def load_model(model_id) : | |
HF_TOKEN = os.environ['HF_TOKEN'] | |
print(torch.backends.mps.is_available()) | |
#device = torch.device("mps") if torch.backends.mps.is_available() else "cpu" | |
device = 'cpu' | |
print(device) | |
if device=='cpu' : | |
print('Warning! No GPU available') | |
# IMPORT MODEL | |
print(model_id) | |
quantization_config = BitsAndBytesConfig(load_in_4bit=True, | |
bnb_4bit_compute_dtype=torch.float16) | |
# if (is_flash_attn_2_available()) and (torch.cuda.get_device_capability(0)[0] >= 8): | |
# attn_implementation = "flash_attention_2" | |
# else: | |
# attn_implementation = "sdpa" | |
# print(f"[INFO] Using attention implementation: {attn_implementation}") | |
tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_id, token=HF_TOKEN) | |
llm_model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path=model_id, | |
token=HF_TOKEN, | |
torch_dtype=torch.float16, | |
#quantization_config=quantization_config if quantization_config else None, | |
low_cpu_mem_usage=False,) # use full memory | |
#attn_implementation=attn_implementation) # which attention version to use | |
llm_model.to(device) | |
return llm_model, tokenizer, device | |
# LOAD VECTORSTORE | |
def load_data(embedding) : | |
# CREATE EMBEDDING | |
embedding_function = SentenceTransformerEmbeddings(model_name=embedding) | |
db3 = Chroma(collection_name = COLLECTION_NAME, persist_directory="./chroma", embedding_function = embedding_function) | |
return db3 | |
# Create a text element and let the reader know the data is loading. | |
model_load_state = st.text('Loading model...') | |
# Load 10,000 rows of data into the dataframe. | |
llm_model, tokenizer, device = load_model(MODEL_ID) | |
# Notify the reader that the data was successfully loaded. | |
model_load_state.text('Loading model...done!') | |
# Create a text element and let the reader know the data is loading. | |
data_load_state = st.text('Loading data...') | |
# Load 10,000 rows of data into the dataframe. | |
vectorstore = load_data(EMBEDDING) | |
# Notify the reader that the data was successfully loaded. | |
data_load_state.text('Loading data...done!') | |
#_______________________________________________SUMMARIZATION_____________________________________________________ | |
# INFERENCE | |
# def prompt_formatter(reviews, type_of_doc): | |
# return f"""You are a summarization bot. | |
# You will receive {type_of_doc} and you will extract all relevant information from {type_of_doc} and return one paragraph in which you will summarize what was said. | |
# {type_of_doc} are listed below under inputs. | |
# Inputs: {reviews} | |
# Answer : | |
# """ | |
# def prompt_formatter(reviews, type_of_doc): | |
# return f"""You are a summarization bot. | |
# You will receive {type_of_doc} and you will summarize what was said in the input. | |
# {type_of_doc} are listed below under inputs. | |
# Inputs: {reviews} | |
# Answer : | |
# """ | |
def prompt_formatter(reviews): | |
return f"""You are a summarization bot. | |
You will receive reviews of Clockify from different users. | |
You will summarize what these reviews said while keeping the information about each of the user. | |
Reviews are listed below. | |
Reviews: {reviews} | |
Answer : | |
""" | |
def mirror_mirror(inputs, prompt_formatter, tokenizer): | |
print('Mirror_mirror') | |
prompt = prompt_formatter(inputs) | |
input_ids = tokenizer(prompt, return_tensors="pt").to(device) | |
outputs = llm_model.generate(**input_ids, | |
temperature=0.3, | |
do_sample=True, | |
max_new_tokens=275) | |
output_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return prompt, output_text.replace(prompt, '') | |
def summarization(example : str, results_df : pd.DataFrame = pd.DataFrame()) -> pd.DataFrame : | |
# INFERENCE | |
results = [] | |
for cnt in range(0,2) : | |
prompt, result = mirror_mirror(example, prompt_formatter, tokenizer) | |
list_temp = [result, example] | |
tokenized = tokenizer(list_temp, return_tensors="pt", padding = True) | |
A = tokenized.input_ids.numpy() | |
A = sparse.csr_matrix(A) | |
score = cosine_similarity(A)[0,1] | |
#print(cosine_similarity(A)[0,1]) | |
#print(cosine_similarity(A)[1,0]) | |
print(score) | |
if score>0.1 : | |
fin_result = result | |
max_score = score | |
break | |
results.append(result) | |
#print(result+'\n\n') | |
# tokenize results and example together | |
try : | |
fin_result | |
except : | |
# if fin_result not already defined, use the best of available results | |
# add example to results so tokenization is done together (due to padding limitations) | |
results.append(example) | |
tokenized = tokenizer(results, return_tensors="pt", padding = True) | |
A = tokenized.input_ids.numpy() | |
A = sparse.csr_matrix(A) | |
# calculate cosine similarity of each pair | |
# keep only example X result column | |
scores = cosine_similarity(A)[:,2] | |
# final result is the one with greaters cos_score | |
fin_result = results[np.argmax(scores)] | |
max_score = max(scores) | |
#print(fin_result) | |
# save final result and its attributes | |
row = pd.DataFrame({'model' : MODEL_ID, 'prompt' : prompt, 'reviews' : example, 'summarization' : fin_result, 'score' : [max_score] }) | |
results_df = pd.concat([results_df,row], ignore_index = True) | |
return results_df | |
def create_filter(group:str=None, platform:str=None, ReviewerPosition:str=None, Industry:str=None, CompanySize:str=None, | |
UsagePeriod:str=None, LinkedinVerified:str=None, Date:str=None, Rating:str=None) : | |
keys = ['group', 'Platform', 'ReviewerPosition', 'Industry', 'CompanySize', | |
'UsagePeriod', 'LinkedinVerified', 'Date', 'Rating'] | |
input_keys = [group,platform, ReviewerPosition, Industry, CompanySize, UsagePeriod, LinkedinVerified, Date, Rating] | |
# create filter dict | |
filter_dict = {} | |
for key, in_key in zip(keys, input_keys) : | |
if not in_key == None and not in_key == ' ': | |
filter_dict[key] = {'$eq' : in_key} | |
print(filter_dict) | |
return filter_dict | |
#_______________________________________________UI_____________________________________________________ | |
st.title("Mirror, mirror, on the cloud, what do Clockify users say aloud?") | |
st.subheader("--Clockify review summarizer--") | |
col1, col2, col3 = st.columns(3, gap = 'small') | |
with col1: | |
platform = st.selectbox(label = 'Platform', | |
options = [' ', 'Capterra', 'Chrome Extension', 'GetApp', 'AppStore', 'GooglePlay', | |
'Firefox Extension', 'JIRA Plugin', 'Trustpilot', 'G2', | |
'TrustRadius'] | |
) | |
with col2: | |
company_size = st.selectbox(label = 'Company Size', | |
options = [' ', '1-10 employees', 'Self-employed', 'self-employed', | |
'Small-Business(50 or fewer emp.)', '51-200 employees', | |
'Mid-Market(51-1000 emp.)', '11-50 employees', | |
'501-1,000 employees', '10,001+ employees', '201-500 employees', | |
'1,001-5,000 employees', '5,001-10,000 employees', | |
'Enterprise(> 1000 emp.)', 'Unknown', '1001-5000 employees'] | |
) | |
with col3: | |
linkedin_verified = st.selectbox(label = 'Linkedin Verified', | |
options = [' ', 'True', 'False'], | |
placeholder = 'Choose an option' | |
) | |
num_to_return = int(st.number_input(label = 'Number of documents to return', min_value = 2, max_value = 50, step = 1)) | |
# group = st.selectbox(label = 'Review Platform Group', | |
# options = ['Software Review Platforms', 'Browser Extension Stores', 'Mobile App Stores', 'Plugin Marketplace'] | |
# ) | |
default_value = "Clockify" | |
query = st.text_area("Query", default_value, height = 50) | |
#type_of_doc = st.text_area("Type of text", 'text', height = 25) | |
# result = '' | |
# score = '' | |
# reviews = '' | |
if 'result' not in st.session_state: | |
st.session_state['result'] = '' | |
if 'score' not in st.session_state: | |
st.session_state['score'] = '' | |
if 'reviews' not in st.session_state: | |
st.session_state['reviews'] = '' | |
col11, col21 = st.columns(2, gap = 'small') | |
with col11: | |
button_query = st.button('Conquer and query!') | |
with col21: | |
button_summarize = st.button('Summon the summarizer!') | |
if button_query : | |
print('Querying') | |
# create filter from drop-downs | |
filter_dict = create_filter(#group = group, | |
platform = platform, | |
CompanySize = company_size, | |
LinkedinVerified = linkedin_verified | |
) | |
# FILTER BY META | |
if filter_dict == {} : | |
retriever = vectorstore.as_retriever(search_kwargs = {"k": num_to_return}) | |
elif len(filter_dict.keys()) == 1 : | |
retriever = vectorstore.as_retriever(search_kwargs = {"k": num_to_return, | |
"filter": filter_dict}) | |
else : | |
retriever = vectorstore.as_retriever(search_kwargs = {"k": num_to_return, | |
"filter":{'$and': [{key : value} for key,value in filter_dict.items()]} | |
} | |
) | |
reviews = retriever.get_relevant_documents(query = query) | |
# only get page content | |
st.session_state['reviews'] = [review.page_content for review in reviews] | |
print(st.session_state['reviews']) | |
result = 'You may summarize now!' | |
if button_summarize : | |
print('Summarization in progress') | |
st.session_state['result'] = 'Summarization in progress' | |
results_df = summarization("\n".join(st.session_state['reviews'])) | |
# only one input | |
st.session_state['result'] = results_df.summarization[0] | |
score = results_df.score[0] | |
col12, col22 = st.columns(2, gap = 'small') | |
with col12: | |
chosen_reviews = st.text_area("Reviews to be summarized", "\n".join(st.session_state['reviews']), height = 275) | |
with col22: | |
summarized_text = st.text_area("Summarized text", st.session_state['result'], height = 275) | |
score = st.text_area("Cosine similarity score", st.session_state['score'], height = 25) | |
# max_length = st.sidebar.slider("Max Length", min_value = 10, max_value=30) | |
# temperature = st.sidebar.slider("Temperature", value = 1.0, min_value = 0.0, max_value=1.0, step=0.05) | |
# top_k = st.sidebar.slider("Top-k", min_value = 0, max_value=5, value = 0) | |
# top_p = st.sidebar.slider("Top-p", min_value = 0.0, max_value=1.0, step = 0.05, value = 0.9) | |
# num_return_sequences = st.sidebar.number_input('Number of Return Sequences', min_value=1, max_value=5, value=1, step=1)s |