|
import spaces
|
|
import gradio as gr
|
|
from huggingface_hub import HfApi, ModelInfo, DatasetInfo, SpaceInfo, Collection
|
|
from huggingface_hub.hf_api import PaperInfo
|
|
from typing import Union
|
|
import gc
|
|
import pandas as pd
|
|
import datetime
|
|
import json
|
|
import re
|
|
from hfconstants import DS_SIZE_CATEGORIES, SPACE_HARDWARES, SPACE_STAGES, SPACE_STAGES_EMOJI, EMOJIS
|
|
|
|
@spaces.GPU
|
|
def dummy_gpu():
|
|
pass
|
|
|
|
TYPES_SHORT = {"model": "M", "dataset": "D", "space": "S", "paper": "P", "collection": "C"}
|
|
TYPES_URL = {"model": "https://huggingface.co./models", "dataset": "https://huggingface.co./datasets", "space": "https://huggingface.co./spaces",
|
|
"paper": "https://huggingface.co./papers", "collection": "https://huggingface.co./collections"}
|
|
|
|
TYPES_DESC = " / ".join([f"[{v}={k}]({TYPES_URL.get(k, 'https://hf.co')})" for k, v in zip(list(TYPES_SHORT.keys()), list(TYPES_SHORT.values()))])
|
|
|
|
RESULT_ITEMS = {
|
|
"T": [1, "str", True],
|
|
"ID": [2, "markdown", True, "40%"],
|
|
"User": [4, "str", False],
|
|
"Name": [5, "str", False],
|
|
"URL": [6, "str", False],
|
|
"Status": [7, "markdown", True],
|
|
"Gated": [8, "str", True],
|
|
"Likes": [10, "number", True],
|
|
"DLs": [12, "number", True],
|
|
"AllDLs": [13, "number", False],
|
|
"Trending": [16, "number", True],
|
|
"LastMod.": [17, "str", True],
|
|
"Library": [20, "markdown", False],
|
|
"Pipeline": [21, "markdown", True],
|
|
"SDK": [24, "str", False],
|
|
"Hardware": [25, "str", False],
|
|
"Stage": [26, "str", False],
|
|
"Emoji": [35, "str", False],
|
|
"NFAA": [40, "str", False],
|
|
}
|
|
|
|
SORT_PARAM_TO_ITEM = {
|
|
"last_modified": "LastMod.",
|
|
"likes": "Likes",
|
|
"downloads": "DLs",
|
|
"downloads_all_time": "AllDLs",
|
|
"trending_score": "Trending",
|
|
}
|
|
|
|
try:
|
|
with open("tags.json", encoding="utf-8") as f:
|
|
TAGS = json.load(f)
|
|
with open("subtags.json", encoding="utf-8") as f:
|
|
SUBTAGS = json.load(f)
|
|
except Exception as e:
|
|
TAGS = []
|
|
SUBTAGS = {}
|
|
print(e)
|
|
|
|
def get_tags():
|
|
return TAGS[0:1000]
|
|
|
|
def get_subtag_categories():
|
|
return list(SUBTAGS.keys())
|
|
|
|
def update_subtag_items(category: str):
|
|
choices=[""] + list(SUBTAGS.get(category, []))
|
|
return gr.update(choices=choices, value=choices[0])
|
|
|
|
def update_subtags(tags: str, category: str, item: str):
|
|
addtag = f"{category}:{item}" if item else ""
|
|
newtags = f"{tags}\n{addtag}" if tags else addtag
|
|
return newtags
|
|
|
|
def update_tags(tags: str, item: str):
|
|
newtags = f"{tags}\n{item}" if tags else item
|
|
return newtags
|
|
|
|
def get_repo_type(repo_id: str):
|
|
try:
|
|
api = HfApi()
|
|
if api.repo_exists(repo_id=repo_id, repo_type="dataset"): return "dataset"
|
|
elif api.repo_exists(repo_id=repo_id, repo_type="space"): return "space"
|
|
elif api.repo_exists(repo_id=repo_id): return "model"
|
|
else: return None
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception(f"Repo not found: {repo_id} {e}")
|
|
|
|
def sort_dict(d: dict):
|
|
return dict(sorted(d.items(), key=lambda x: x[1], reverse=True))
|
|
|
|
def get_repo_likers(repo_id: str, repo_type: str="model"):
|
|
try:
|
|
api = HfApi()
|
|
user_list = []
|
|
users = api.list_repo_likers(repo_id=repo_id, repo_type=repo_type)
|
|
for user in users:
|
|
user_list.append(user.username)
|
|
return user_list
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception(e)
|
|
|
|
def get_liked_repos(users: list[str]):
|
|
try:
|
|
api = HfApi()
|
|
likes_dict = {}
|
|
types_dict = {}
|
|
for user in users:
|
|
likes = api.list_liked_repos(user=user)
|
|
for id in likes.models:
|
|
likes_dict[id] = likes_dict.get(id, 1) + 1
|
|
types_dict[id] = "model"
|
|
for id in likes.datasets:
|
|
likes_dict[id] = likes_dict.get(id, 1) + 1
|
|
types_dict[id] = "dataset"
|
|
for id in likes.spaces:
|
|
likes_dict[id] = likes_dict.get(id, 1) + 1
|
|
types_dict[id] = "space"
|
|
likes_dict = sort_dict(likes_dict)
|
|
likes_list = list(likes_dict.keys())
|
|
types_list = [types_dict[x] for x in likes_list]
|
|
counts_list = list(likes_dict.values())
|
|
return likes_list, types_list, counts_list
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception(e)
|
|
|
|
def get_repo_collections(repo_id: str, repo_type: str="model", limit=10):
|
|
try:
|
|
api = HfApi()
|
|
if repo_type == "dataset": item = f"datasets/{repo_id}"
|
|
elif repo_type == "space": item = f"spaces/{repo_id}"
|
|
else: item = f"models/{repo_id}"
|
|
cols_dict = {}
|
|
types_dict = {}
|
|
cols = api.list_collections(item=item, sort="upvotes", limit=limit)
|
|
for c in cols:
|
|
col = api.get_collection(collection_slug=c.slug)
|
|
for i in col.items:
|
|
id = i.item_id
|
|
cols_dict[id] = cols_dict.get(id, 1) + 1
|
|
types_dict[id] = i.item_type
|
|
cols_dict = sort_dict(cols_dict)
|
|
cols_list = list(cols_dict.keys())
|
|
types_list = [types_dict[x] for x in cols_list]
|
|
counts_list = list(cols_dict.values())
|
|
return cols_list, types_list, counts_list
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception(e)
|
|
|
|
def get_users_collections(users: list[str], limit=10):
|
|
try:
|
|
api = HfApi()
|
|
cols_dict = {}
|
|
types_dict = {}
|
|
for user in users[0:6]:
|
|
cols = api.list_collections(owner=user, sort="upvotes", limit=limit)
|
|
for c in cols:
|
|
col = api.get_collection(collection_slug=c.slug)
|
|
for i in col.items:
|
|
id = i.item_id
|
|
cols_dict[id] = cols_dict.get(id, 1) + 1
|
|
types_dict[id] = i.item_type
|
|
cols_dict = sort_dict(cols_dict)
|
|
cols_list = list(cols_dict.keys())
|
|
types_list = [types_dict[x] for x in cols_list]
|
|
counts_list = list(cols_dict.values())
|
|
return cols_list, types_list, counts_list
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception(e)
|
|
|
|
def get_ref_repos(repo_id: str):
|
|
refs = {}
|
|
types = {}
|
|
repo_type = get_repo_type(repo_id)
|
|
likers = get_repo_likers(repo_id, repo_type)[0:10]
|
|
for i, t, c in zip(*get_liked_repos(likers)):
|
|
refs[i] = refs.get(i, 0) + c * 2
|
|
types[i] = t
|
|
for i, t, c in zip(*get_repo_collections(repo_id, repo_type)):
|
|
refs[i] = refs.get(i, 0) + c * 5
|
|
types[i] = t
|
|
refs = sort_dict(refs)
|
|
if repo_id in refs.keys(): refs.pop(repo_id)
|
|
refs_list = list(refs.keys())
|
|
types_list = [types[x] for x in refs_list]
|
|
counts_list = list(refs.values())
|
|
return refs_list, types_list, counts_list
|
|
|
|
def get_collections_by_repo(repo_id: str, repo_type: str="model", limit=100):
|
|
try:
|
|
api = HfApi()
|
|
if repo_type == "dataset": item = f"datasets/{repo_id}"
|
|
elif repo_type == "space": item = f"spaces/{repo_id}"
|
|
else: item = f"models/{repo_id}"
|
|
cols = api.list_collections(item=item, sort="upvotes", limit=limit)
|
|
return [c for c in cols]
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception(e)
|
|
|
|
def get_collections_by_users(users: list[str], limit=100):
|
|
try:
|
|
api = HfApi()
|
|
cols_list = []
|
|
for user in users[0:6]:
|
|
cols = api.list_collections(owner=user, sort="upvotes", limit=limit)
|
|
for col in cols:
|
|
cols_list.append(col)
|
|
return cols_list
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception(e)
|
|
|
|
def get_ref_collections(repo_id: str, limit=10):
|
|
try:
|
|
repo_type = get_repo_type(repo_id)
|
|
likers = get_repo_likers(repo_id, repo_type)[0:10]
|
|
cols = get_collections_by_repo(repo_id, repo_type, limit) + get_collections_by_users(likers, limit)
|
|
cols = list({k.slug: k for k in cols}.values())
|
|
return cols
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception(e)
|
|
|
|
def get_collections(repo_id: str, repo_limit: int=100, user_limit: int=0):
|
|
try:
|
|
if "/" in repo_id:
|
|
repo_type = get_repo_type(repo_id)
|
|
likers = get_repo_likers(repo_id, repo_type)[0:user_limit+1]
|
|
cols = get_collections_by_repo(repo_id, repo_type, repo_limit) + get_collections_by_users(likers, 50)
|
|
else: cols = get_collections_by_users([repo_id], 50)
|
|
cols = list({k.slug: k for k in cols}.values())
|
|
return cols
|
|
except Exception as e:
|
|
print(e)
|
|
raise Exception(e)
|
|
|
|
def str_to_list(s: str):
|
|
try:
|
|
m = re.split("\n", s)
|
|
return [s.strip() for s in list(m)]
|
|
except Exception:
|
|
return []
|
|
|
|
def is_valid_arg(s: str):
|
|
return len(str_to_list(s)) > 0
|
|
|
|
def get_labels():
|
|
return list(RESULT_ITEMS.keys())
|
|
|
|
def get_valid_labels():
|
|
return [k for k in list(RESULT_ITEMS.keys()) if RESULT_ITEMS[k][2]]
|
|
|
|
def date_to_str(dt: datetime.datetime):
|
|
return dt.strftime('%Y-%m-%d %H:%M')
|
|
|
|
class Labels():
|
|
VALID_DTYPE = ["str", "number", "bool", "date", "markdown"]
|
|
|
|
def __init__(self):
|
|
self.types = {}
|
|
self.orders = {}
|
|
self.widths = {}
|
|
|
|
def set(self, label: str):
|
|
if not label in RESULT_ITEMS.keys(): raise Exception(f"Invalid item: {label}")
|
|
item = RESULT_ITEMS.get(label)
|
|
if item[1] not in self.VALID_DTYPE: raise Exception(f"Invalid data type: {type}")
|
|
self.types[label] = item[1]
|
|
self.orders[label] = item[0]
|
|
if len(item) > 3: self.widths[label] = item[3]
|
|
else: self.widths[label] = "10%"
|
|
|
|
def get(self):
|
|
labels = list(self.types.keys())
|
|
labels.sort(key=lambda x: self.orders[x])
|
|
label_types = [self.types[s] for s in labels]
|
|
return labels, label_types
|
|
|
|
def get_widths(self):
|
|
return self.widths.copy()
|
|
|
|
def get_null_value(self, type: str):
|
|
if type == "bool": return False
|
|
elif type == "number" or type == "date": return 0
|
|
else: return ""
|
|
|
|
|
|
|
|
class HFSearchResult():
|
|
def __init__(self):
|
|
self.labels = Labels()
|
|
self.current_item = {}
|
|
self.current_item_info = None
|
|
self.item_list = []
|
|
self.item_info_list = []
|
|
self.item_hide_flags = []
|
|
self.hide_labels = []
|
|
self.show_labels = []
|
|
self.filter_items = None
|
|
self.filters = None
|
|
self.phone_mode = True
|
|
gc.collect()
|
|
|
|
def reset(self):
|
|
self.__init__()
|
|
|
|
def set_mode(self, mode: str):
|
|
if mode == "Phone": self.phone_mode = True
|
|
elif mode == "PC": self.phone_mode = False
|
|
|
|
def get_show_labels(self):
|
|
return ["T", "ID"] if self.phone_mode else self.show_labels
|
|
|
|
def _set(self, data, label: str):
|
|
self.labels.set(label)
|
|
self.current_item[label] = data
|
|
|
|
def _next(self):
|
|
self.item_list.append(self.current_item.copy())
|
|
self.current_item = {}
|
|
self.item_info_list.append(self.current_item_info)
|
|
self.current_item_info = None
|
|
self.item_hide_flags.append(False)
|
|
|
|
def add_item(self, i: Union[ModelInfo, DatasetInfo, SpaceInfo]):
|
|
self.current_item_info = i
|
|
if isinstance(i, ModelInfo): type = "model"
|
|
elif isinstance(i, DatasetInfo): type = "dataset"
|
|
elif isinstance(i, SpaceInfo): type = "space"
|
|
elif isinstance(i, PaperInfo): type = "paper"
|
|
elif isinstance(i, Collection): type = "collection"
|
|
else: return
|
|
self._set(type, "T")
|
|
self._set("", "Emoji")
|
|
if type in ["space", "model", "dataset"]:
|
|
self._set(i.id, "ID")
|
|
self._set(i.id.split("/")[0], "User")
|
|
self._set(i.id.split("/")[1], "Name")
|
|
if type == "dataset": self._set(f"https://hf.co/datasets/{i.id}", "URL")
|
|
elif type == "space": self._set(f"https://hf.co/spaces/{i.id}", "URL")
|
|
else: self._set(f"https://hf.co/{i.id}", "URL")
|
|
if i.likes is not None: self._set(i.likes, "Likes")
|
|
if i.last_modified is not None: self._set(date_to_str(i.last_modified), "LastMod.")
|
|
if i.trending_score is not None: self._set(int(i.trending_score), "Trending")
|
|
if i.tags is not None: self._set("True" if "not-for-all-audiences" in i.tags else "False", "NFAA")
|
|
if type in ["model", "dataset"]:
|
|
if i.gated is not None: self._set(i.gated if i.gated else "off", "Gated")
|
|
if i.downloads is not None: self._set(i.downloads, "DLs")
|
|
if i.downloads_all_time is not None: self._set(i.downloads_all_time, "AllDLs")
|
|
if type == "model":
|
|
if i.inference is not None: self._set(i.inference, "Status")
|
|
if i.library_name is not None: self._set(i.library_name, "Library")
|
|
if i.pipeline_tag is not None: self._set(i.pipeline_tag, "Pipeline")
|
|
if type == "space":
|
|
if i.sdk is not None: self._set(i.sdk, "SDK")
|
|
if i.runtime is not None:
|
|
self._set(i.runtime.hardware, "Hardware")
|
|
self._set(i.runtime.stage, "Stage")
|
|
if i.card_data is not None:
|
|
card = i.card_data
|
|
if card.title is not None: self._set(card.title, "Name")
|
|
elif type == "paper":
|
|
self._set(i.id, "ID")
|
|
self._set(f"https://hf.co/papers/{i.id}", "URL")
|
|
if i.submitted_by is not None: self._set(i.submitted_by, "User")
|
|
if i.title is not None: self._set(i.title, "Name")
|
|
if i.submitted_at is not None: self._set(date_to_str(i.submitted_at), "LastMod.")
|
|
if i.upvotes is not None: self._set(i.upvotes, "Likes")
|
|
elif type == "collection":
|
|
self._set(i.slug, "ID")
|
|
if i.owner is not None: self._set(i.owner["name"], "User")
|
|
if i.title is not None: self._set(i.title, "Name")
|
|
if i.last_updated is not None: self._set(date_to_str(i.last_updated), "LastMod.")
|
|
if i.upvotes is not None: self._set(i.upvotes, "Likes")
|
|
if i.url is not None: self._set(i.url, "URL")
|
|
self._next()
|
|
|
|
def search(self, repo_types: list, sort: str, sort_method: str, filter_str: str, search_str: str, author: str, tags: str, infer: str, gated: str, appr: list[str],
|
|
size_categories: list, limit: int, hardware: list, stage: list, followed: str, fetch_detail: list, show_labels: list, ui_mode="PC"):
|
|
try:
|
|
self.reset()
|
|
self.set_mode(ui_mode)
|
|
self.show_labels = show_labels.copy()
|
|
api = HfApi()
|
|
kwargs = {}
|
|
mkwargs = {}
|
|
dkwargs = {}
|
|
skwargs = {}
|
|
ckwargs = {}
|
|
pkwargs = {}
|
|
if filter_str:
|
|
kwargs["filter"] = str_to_list(filter_str)
|
|
ckwargs["item"] = str_to_list(filter_str)
|
|
pkwargs["query"] = str_to_list(filter_str)
|
|
if search_str: kwargs["search"] = search_str
|
|
if author:
|
|
kwargs["author"] = author
|
|
ckwargs["owner"] = author
|
|
if tags and is_valid_arg(tags):
|
|
mkwargs["tags"] = str_to_list(tags)
|
|
dkwargs["tags"] = str_to_list(tags)
|
|
if limit > 0:
|
|
kwargs["limit"] = limit
|
|
ckwargs["limit"] = 100 if limit > 100 else limit
|
|
if sort_method == "descending order": kwargs["direction"] = -1
|
|
if gated == "gated":
|
|
mkwargs["gated"] = True
|
|
dkwargs["gated"] = True
|
|
elif gated == "non-gated":
|
|
mkwargs["gated"] = False
|
|
dkwargs["gated"] = False
|
|
mkwargs["sort"] = sort
|
|
if len(size_categories) > 0: dkwargs["size_categories"] = size_categories
|
|
if infer != "all": mkwargs["inference"] = infer
|
|
if "model" in repo_types:
|
|
models = api.list_models(full=True, cardData=True, **kwargs, **mkwargs)
|
|
for model in models:
|
|
if model.gated is not None and model.gated and model.gated not in appr: continue
|
|
self.add_item(model)
|
|
if "dataset" in repo_types:
|
|
datasets = api.list_datasets(full=True, **kwargs, **dkwargs)
|
|
for dataset in datasets:
|
|
if dataset.gated is not None and dataset.gated and dataset.gated not in appr: continue
|
|
self.add_item(dataset)
|
|
if "space" in repo_types:
|
|
if "Space Runtime" in fetch_detail:
|
|
spaces = api.list_spaces(expand=["cardData", "datasets", "disabled", "lastModified", "createdAt",
|
|
"likes", "models", "private", "runtime", "sdk", "sha", "tags", "trendingScore"], **kwargs, **skwargs)
|
|
else: spaces = api.list_spaces(full=True, **kwargs, **skwargs)
|
|
for space in spaces:
|
|
if space.gated is not None and space.gated and space.gated not in appr: continue
|
|
if space.runtime is not None:
|
|
if len(hardware) > 0 and space.runtime.stage == "RUNNING" and space.runtime.hardware not in hardware: continue
|
|
if len(stage) > 0 and space.runtime.stage not in stage: continue
|
|
self.add_item(space)
|
|
if "paper" in repo_types:
|
|
papers = api.list_papers(**pkwargs)
|
|
for paper in papers:
|
|
self.add_item(paper)
|
|
if "collection" in repo_types:
|
|
cols = api.list_collections(**ckwargs)
|
|
for col in cols:
|
|
self.add_item(col)
|
|
if followed: self.followed_by(followed)
|
|
self.sort(sort)
|
|
except Exception as e:
|
|
raise Exception(f"Search error: {e}") from e
|
|
|
|
def search_collections(self, repo_id: str, sort: str, show_labels: list, repo_limit: int=100, user_limit: int=0, ui_mode="PC"):
|
|
try:
|
|
self.reset()
|
|
self.set_mode(ui_mode)
|
|
self.show_labels = show_labels.copy()
|
|
cols = get_collections(repo_id, repo_limit, user_limit)
|
|
for col in cols:
|
|
self.add_item(col)
|
|
self.sort(sort)
|
|
except Exception as e:
|
|
raise Exception(f"Search error: {e}") from e
|
|
|
|
def search_ref_repos(self, repo_id: str, repo_types: str, sort: str, show_labels: list, limit=10, ui_mode="PC"):
|
|
try:
|
|
self.reset()
|
|
self.set_mode(ui_mode)
|
|
self.show_labels = show_labels.copy()
|
|
api = HfApi()
|
|
if "model" in repo_types or "dataset" in repo_types or "space" in repo_types or "paper" in repo_types:
|
|
repos, types, counts = get_ref_repos(repo_id)
|
|
i = 0
|
|
for r, t in zip(repos, types):
|
|
if i + 1 > limit: break
|
|
i += 1
|
|
if t not in repo_types: continue
|
|
info = api.repo_info(repo_id=r, repo_type=t)
|
|
if info: self.add_item(info)
|
|
if "collection" in repo_types:
|
|
cols = get_ref_collections(repo_id, limit)
|
|
for col in cols:
|
|
self.add_item(col)
|
|
self.sort(sort)
|
|
except Exception as e:
|
|
raise Exception(f"Search error: {e}") from e
|
|
|
|
def get(self):
|
|
labels, label_types = self.labels.get()
|
|
self._do_filter()
|
|
dflist = [[item.get(l, self.labels.get_null_value(t)) for l, t in zip(labels, label_types)] for item, is_hide in zip(self.item_list, self.item_hide_flags) if not is_hide]
|
|
df = self._to_pandas(dflist, labels)
|
|
show_label_types = [t for l, t in zip(labels, label_types) if l not in self.hide_labels and l in self.get_show_labels()]
|
|
show_labels = [l for l in labels if l not in self.hide_labels and l in self.get_show_labels()]
|
|
return df, show_labels, show_label_types
|
|
|
|
def _to_pandas(self, dflist: list, labels: list):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rank_df(sdf: pd.DataFrame, df: pd.DataFrame, col: str):
|
|
ranks = [(0.5, "gold"), (0.75, "orange"), (0.9, "orangered")]
|
|
for t, color in ranks:
|
|
sdf.loc[df[col] >= df[col].quantile(q=t), [col]] = f'color: {color}'
|
|
return sdf
|
|
|
|
def highlight_df(x: pd.DataFrame, df: pd.DataFrame):
|
|
sdf = pd.DataFrame("", index=x.copy().index, columns=x.copy().columns)
|
|
columns = df.columns
|
|
if "Trending" in columns: sdf = rank_df(sdf, df, "Trending")
|
|
if "Likes" in columns: sdf = rank_df(sdf, df, "Likes")
|
|
if "AllDLs" in columns: sdf = rank_df(sdf, df, "AllDLs")
|
|
if "DLs" in columns: sdf = rank_df(sdf, df, "DLs")
|
|
if "Status" in columns:
|
|
sdf.loc[df["Status"] == "warm", ["T", "Status"]] = 'color: orange'
|
|
sdf.loc[df["Status"] == "cold", ["T", "Status"]] = 'color: dodgerblue'
|
|
if "Gated" in columns:
|
|
sdf.loc[df["Gated"] == "auto", ["Gated"]] = 'color: dodgerblue'
|
|
sdf.loc[df["Gated"] == "manual", ["Gated"]] = 'color: crimson'
|
|
if "Stage" in columns and "Hardware" in columns:
|
|
sdf.loc[(df["Stage"] == "RUNNING") & (df["Hardware"] != "zero-a10g") & (df["Hardware"] != "cpu-basic") & (df["Hardware"]), ["Hardware", "T"]] = 'color: lime'
|
|
sdf.loc[(df["Stage"] == "RUNNING") & (df["Hardware"] == "zero-a10g"), ["Hardware", "T"]] = 'color: limegreen'
|
|
sdf.loc[(df["T"] == "space") & (df["Stage"] != "RUNNING")] = 'opacity: 0.5'
|
|
sdf.loc[(df["T"] == "space") & (df["Stage"] != "RUNNING"), ["T"]] = 'color: crimson'
|
|
sdf.loc[df["Stage"] == "RUNNING", ["Stage"]] = 'color: lime'
|
|
if "NFAA" in columns: sdf.loc[df["NFAA"] == "True", ["T"]] = 'background-color: hotpink'
|
|
show_columns = x.copy().columns
|
|
style_columns = sdf.columns
|
|
drop_columns = [c for c in style_columns if c not in show_columns]
|
|
sdf = sdf.drop(drop_columns, axis=1)
|
|
return sdf
|
|
|
|
def id_to_md(df: pd.DataFrame, verbose=False):
|
|
columns = list(df.index)
|
|
if df["T"] == "collection": id = f'### [{df["User"]}/{df["Name"]}]({df["URL"]}){df["Emoji"]}'
|
|
elif df["T"] == "space": id = f'### [{df["Name"]} ({df["ID"]})]({df["URL"]}){df["Emoji"]}'
|
|
elif df["T"] == "paper": id = f'### [{df["Name"]} (arxiv:{df["ID"]})]({df["URL"]}){df["Emoji"]}'
|
|
else: id = f'### [{df["ID"]}]({df["URL"]}){df["Emoji"]}'
|
|
if verbose:
|
|
l = []
|
|
if "NFAA" in columns and df["NFAA"] == "True": l.append('🤐')
|
|
if "Likes" in columns and df["Likes"] > 0: l.append(f'💕:{df["Likes"]}')
|
|
if df["T"] in ["model", "space", "dataset"]:
|
|
if "Trending" in columns and df["Trending"] > 0: l.append(f'trend:{df["Trending"]}')
|
|
if df["T"] in ["model", "dataset"]:
|
|
if "DLs" in columns and df["DLs"] > 0: l.append(f'DL:{df["DLs"]}')
|
|
if "Gated" in columns and df["Gated"] in ["manual", "auto"]: l.append(f'🔑:{df["Gated"]}')
|
|
if df["T"] == "model":
|
|
if "Status" in columns:
|
|
if df["Status"] == "warm": l.append(f'inference:🔥')
|
|
elif df["Status"] == "cold": l.append(f'inference:🧊')
|
|
if df["T"] == "space":
|
|
if "Hardware" in columns and df["Hardware"] in SPACE_HARDWARES and df["Hardware"] != "cpu-basic": l.append(f'{df["Hardware"]}')
|
|
if "SDK" in columns: l.append(f'{df["SDK"]}')
|
|
if "Stage" in columns and df["Stage"] in SPACE_STAGES_EMOJI.keys(): l.append(f'{SPACE_STAGES_EMOJI[df["Stage"]]}')
|
|
if len(l) > 0: id += f"\n({' '.join(l)})"
|
|
return id
|
|
|
|
def shorten_type(df: pd.DataFrame, shorten=False):
|
|
if shorten:
|
|
for k, v in TYPES_SHORT.items():
|
|
if df["T"] == k: return v
|
|
|
|
def to_emoji(df: pd.DataFrame, label: str, key: str, emoji: str):
|
|
if df[label] == key: return f'{df["Emoji"]}{emoji}' if df["Emoji"] else f' {emoji}'
|
|
else: return df["Emoji"]
|
|
|
|
def apply_emoji_df(df: pd.DataFrame):
|
|
for label, v in EMOJIS.items():
|
|
if label not in df.columns: continue
|
|
for key, emoji in v.items():
|
|
df["Emoji"] = df.apply(to_emoji, axis=1, label=label, key=key, emoji=emoji)
|
|
return df
|
|
|
|
def format_md_df(df: pd.DataFrame, verbose=False):
|
|
df["ID"] = df.apply(id_to_md, axis=1, verbose=verbose)
|
|
df["T"] = df.apply(shorten_type, axis=1, shorten=verbose)
|
|
return df
|
|
|
|
hide_labels = [l for l in labels if l in self.hide_labels or l not in self.get_show_labels()]
|
|
df = format_md_df(apply_emoji_df(pd.DataFrame(dflist, columns=labels)), verbose=self.phone_mode)
|
|
ref_df = df.copy()
|
|
df = df.drop(hide_labels, axis=1).style.apply(highlight_df, axis=None, df=ref_df)
|
|
return df
|
|
|
|
def set_hide(self, hide_labels: list):
|
|
self.hide_labels = hide_labels.copy()
|
|
|
|
def set_filter(self, filter_item1: str, filter1: str):
|
|
if not filter_item1 and not filter1:
|
|
self.filter_items = None
|
|
self.filters = None
|
|
else:
|
|
self.filter_items = [filter_item1]
|
|
self.filters = [filter1]
|
|
|
|
def _do_filter(self):
|
|
if self.filters is None or self.filter_items is None:
|
|
self.item_hide_flags = [False] * len(self.item_list)
|
|
return
|
|
labels, label_types = self.labels.get()
|
|
types = dict(zip(labels, label_types))
|
|
flags = []
|
|
for item in self.item_list:
|
|
flag = False
|
|
for i, f in zip(self.filter_items, self.filters):
|
|
if i not in item.keys(): continue
|
|
t = types[i]
|
|
if item[i] == self.labels.get_null_value(t):
|
|
flag = True
|
|
break
|
|
if t in set(["str", "markdown"]):
|
|
if f in item[i]: flag = False
|
|
else:
|
|
flag = True
|
|
break
|
|
flags.append(flag)
|
|
self.item_hide_flags = flags
|
|
|
|
def sort(self, key="Likes"):
|
|
if len(self.item_list) == 0: raise Exception("No item found.")
|
|
if key in SORT_PARAM_TO_ITEM.keys(): key = SORT_PARAM_TO_ITEM[key]
|
|
types = set()
|
|
for i in self.item_list:
|
|
if "T" in i.keys(): types.add(i["T"])
|
|
if "paper" in types: return
|
|
if key in ["DLs", "AllDLs"] and ("space" in types or "collection" in types): key = "Likes"
|
|
if not key in self.labels.get()[0]: key = "Likes"
|
|
self.item_list, self.item_hide_flags, self.item_info_list = zip(*sorted(zip(self.item_list, self.item_hide_flags, self.item_info_list), key=lambda x: x[0][key], reverse=True))
|
|
|
|
def followed_by(self, user: str):
|
|
if not user: return
|
|
api = HfApi()
|
|
usernames = set([x.username for x in api.list_user_following(username=user)])
|
|
self.item_hide_flags = [True if i["ID"].split("/")[0] not in usernames else is_hide for i, is_hide in zip(self.item_list, self.item_hide_flags)]
|
|
|
|
def get_gr_df(self):
|
|
df, labels, label_types = self.get()
|
|
widths = self.labels.get_widths()
|
|
if self.phone_mode:
|
|
widths["T"] = "10%"
|
|
widths["ID"] = "90%"
|
|
column_widths = [widths[l] for l in labels]
|
|
if self.phone_mode:
|
|
labels = None
|
|
return gr.update(type="pandas", value=df, headers=labels, datatype=label_types, column_widths=column_widths, wrap=True, show_label=False)
|
|
|
|
def get_gr_hide_labels(self):
|
|
return gr.update(choices=self.labels.get()[0], value=[], visible=True)
|
|
|
|
def get_gr_filter_item(self, filter_item: str=""):
|
|
labels, label_types = self.labels.get()
|
|
choices = [s for s, t in zip(labels, label_types) if t in set(["str", "markdown"])]
|
|
if len(choices) == 0: choices = [""]
|
|
return gr.update(choices=choices, value=filter_item if filter_item else choices[0], visible=True)
|
|
|
|
def get_gr_filter(self, filter_item: str=""):
|
|
labels = self.labels.get()[0]
|
|
if not filter_item or filter_item not in set(labels): return gr.update(choices=[""], value="", visible=True)
|
|
d = {}
|
|
for item in self.item_list:
|
|
if filter_item not in item.keys(): continue
|
|
v = item[filter_item]
|
|
if v in d.keys(): d[v] += 1
|
|
else: d[v] = 1
|
|
return gr.update(choices=[""] + [t[0] for t in sorted(d.items(), key=lambda x : x[1])][:100], value="", visible=True)
|
|
|
|
def search(repo_types: list, sort: str, sort_method: str, filter_str: str, search_str: str, author: str, tags: str, infer: str,
|
|
gated: str, appr: list[str], size_categories: list, limit: int, hardware: list, stage: list, followed: str,
|
|
fetch_detail: list, show_labels: list, ui_mode: str, r: HFSearchResult):
|
|
try:
|
|
r.search(repo_types, sort, sort_method, filter_str, search_str, author, tags, infer, gated, appr, size_categories,
|
|
limit, hardware, stage, followed, fetch_detail, show_labels, ui_mode)
|
|
return r.get_gr_df(), r.get_gr_hide_labels(), r
|
|
except Exception as e:
|
|
raise gr.Error(e)
|
|
|
|
def search_ref_repos(repo_id: str, repo_types: list, sort: str, show_labels: list, limit, ui_mode: str, r: HFSearchResult):
|
|
try:
|
|
if not repo_id: raise gr.Error("Input Repo ID")
|
|
r.search_ref_repos(repo_id, repo_types, sort, show_labels, limit, ui_mode)
|
|
return r.get_gr_df(), r.get_gr_hide_labels(), r
|
|
except Exception as e:
|
|
raise gr.Error(e)
|
|
|
|
def search_cols(repo_id: str, sort: str, show_labels: list, repo_limit: int, user_limit: int, ui_mode: str, r: HFSearchResult):
|
|
try:
|
|
if not repo_id: raise gr.Error("Input Repo ID or User ID")
|
|
r.search_collections(repo_id, sort, show_labels, repo_limit, user_limit, ui_mode)
|
|
return r.get_gr_df(), r.get_gr_hide_labels(), r
|
|
except Exception as e:
|
|
raise gr.Error(e)
|
|
|
|
def update_df(hide_labels: list, filter_item1: str, filter1: str, r: HFSearchResult):
|
|
r.set_hide(hide_labels)
|
|
r.set_filter(filter_item1, filter1)
|
|
return r.get_gr_df(), r
|
|
|
|
def update_filter(filter_item1: str, r: HFSearchResult):
|
|
return r.get_gr_filter_item(filter_item1), r.get_gr_filter(filter_item1), gr.update(visible=True), r
|
|
|