import pandas as pd import numpy as np from zipfile import ZipFile import tensorflow as tf from tensorflow import keras from pathlib import Path import matplotlib.pyplot as plt import gradio as gr from huggingface_hub import from_pretrained_keras # Download the actual data from http://files.grouplens.org/datasets/movielens/ml-latest-small.zip" movielens_data_file_url = "http://files.grouplens.org/datasets/movielens/ml-latest-small.zip" movielens_zipped_file = keras.utils.get_file("ml-latest-small.zip", movielens_data_file_url, extract=False) keras_datasets_path = Path(movielens_zipped_file).parents[0] movielens_dir = keras_datasets_path / "ml-latest-small" # Only extract the data the first time the script is run. if not movielens_dir.exists(): with ZipFile(movielens_zipped_file, "r") as zip: # Extract files print("Extracting all the files now...") zip.extractall(path=keras_datasets_path) print("Done!") # Get the ratings file ratings_file = movielens_dir / "ratings.csv" df = pd.read_csv(ratings_file) # Make the encodings for users user_ids = df["userId"].unique().tolist() user2user_encoded = {x: i for i, x in enumerate(user_ids)} user_encoded2user = {i: x for i, x in enumerate(user_ids)} df["user"] = df["userId"].map(user2user_encoded) num_users = len(user2user_encoded) # Make the encodings for movies movie_ids = df["movieId"].unique().tolist() movie2movie_encoded = {x: i for i, x in enumerate(movie_ids)} movie_encoded2movie = {i: x for i, x in enumerate(movie_ids)} df["movie"] = df["movieId"].map(movie2movie_encoded) num_movies = len(movie_encoded2movie) # Set ratings type df["rating"] = df["rating"].values.astype(np.float32) # min and max ratings will be used to normalize the ratings later # min_rating = min(df["rating"]) # max_rating = max(df["rating"]) # Load model model = from_pretrained_keras('mindwrapped/collaborative-filtering-movielens') movie_df = pd.read_csv(movielens_dir / "movies.csv") def update_user(id): return get_top_rated_movies_from_user(id), get_recommendations(id) def get_top_rated_movies_from_user(id): decoded_id = user_encoded2user.get(id) # Get the top rated movies by this user movies_watched_by_user = df[df.userId == decoded_id] top_movies_user = ( movies_watched_by_user.sort_values(by="rating", ascending=False) .head(5) .movieId.values ) movie_df_rows = movie_df[movie_df["movieId"].isin(top_movies_user)] movie_df_rows = movie_df_rows.drop('movieId', axis=1) return movie_df_rows def random_user(): return update_user(np.random.randint(0, num_users-1)) def get_recommendations(id): decoded_id = user_encoded2user.get(id) # Get the top 10 recommended movies for this user movies_watched_by_user = df[df.userId == decoded_id] movies_not_watched = movie_df[ ~movie_df["movieId"].isin(movies_watched_by_user.movieId.values) ]["movieId"] movies_not_watched = list( set(movies_not_watched).intersection(set(movie2movie_encoded.keys())) ) movies_not_watched = [[movie2movie_encoded.get(x)] for x in movies_not_watched] # Encoded user id encoded_id = id # Create data [[user_id, movie_id],...] user_movie_array = np.hstack( ([[encoded_id]] * len(movies_not_watched), movies_not_watched) ) # Predict ratings for movies not watched ratings = model.predict(user_movie_array).flatten() # Get indices of top ten movies top_ratings_indices = ratings.argsort()[-10:][::-1] # Decode each movie recommended_movie_ids = [ movie_encoded2movie.get(movies_not_watched[x][0]) for x in top_ratings_indices ] recommended_movies = movie_df[movie_df["movieId"].isin(recommended_movie_ids)] recommended_movies = recommended_movies.drop('movieId', axis=1) return recommended_movies demo = gr.Blocks() with demo: gr.Markdown("""
Keras Example by Siddhartha Banerjee
Space by Scott Krstyen (mindwrapped)