Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import os | |
from constants import EVAL_REQUESTS_PATH | |
from pathlib import Path | |
from huggingface_hub import HfApi, Repository | |
TOKEN_HUB = os.environ.get("TOKEN_HUB", None) | |
QUEUE_REPO = os.environ.get("QUEUE_REPO", None) | |
QUEUE_PATH = os.environ.get("QUEUE_PATH", None) | |
hf_api = HfApi( | |
endpoint="https://huggingface.co.", | |
token=TOKEN_HUB, | |
) | |
def load_all_info_from_dataset_hub(): | |
eval_queue_repo = None | |
requested_models = None | |
passed = True | |
if TOKEN_HUB is None: | |
passed = False | |
else: | |
print("Pulling evaluation requests and results.") | |
eval_queue_repo = Repository( | |
local_dir=QUEUE_PATH, | |
clone_from=QUEUE_REPO, | |
use_auth_token=TOKEN_HUB, | |
repo_type="dataset", | |
) | |
eval_queue_repo.git_pull() | |
# Local directory where dataset repo is cloned + folder with eval requests | |
directory = QUEUE_PATH / EVAL_REQUESTS_PATH | |
requested_models = get_all_requested_models(directory) | |
requested_models = [p.stem for p in requested_models] | |
# Local directory where dataset repo is cloned | |
csv_results = get_csv_with_results(QUEUE_PATH) | |
if csv_results is None: | |
passed = False | |
if not passed: | |
raise ValueError("No Hugging Face token provided. Skipping evaluation requests and results.") | |
return eval_queue_repo, requested_models, csv_results | |
def upload_file(requested_model_name, path_or_fileobj): | |
dest_repo_file = Path(EVAL_REQUESTS_PATH) / path_or_fileobj.name | |
dest_repo_file = str(dest_repo_file) | |
hf_api.upload_file( | |
path_or_fileobj=path_or_fileobj, | |
path_in_repo=str(dest_repo_file), | |
repo_id=QUEUE_REPO, | |
token=TOKEN_HUB, | |
repo_type="dataset", | |
commit_message=f"Add {requested_model_name} to eval queue") | |
def get_all_requested_models(directory): | |
directory = Path(directory) | |
all_requested_models = list(directory.glob("*.txt")) | |
return all_requested_models | |
def get_csv_with_results(directory): | |
directory = Path(directory) | |
all_csv_files = list(directory.glob("*.csv")) | |
latest = [f for f in all_csv_files if f.stem.endswith("latest")] | |
if len(latest) != 1: | |
return None | |
return latest[0] | |
def is_model_on_hub(model_name, revision="main") -> bool: | |
try: | |
model_name = model_name.replace(" ","") | |
author = model_name.split("/")[0] | |
model_id = model_name.split("/")[1] | |
if len(author) == 0 or len(model_id) == 0: | |
return False, "is not a valid model name. Please use the format `author/model_name`." | |
except Exception as e: | |
return False, "is not a valid model name. Please use the format `author/model_name`." | |
try: | |
models = list(hf_api.list_models(author=author, search=model_id)) | |
matched = [model_name for m in models if m.modelId == model_name] | |
if len(matched) != 1: | |
return False, "was not found on the hub!" | |
else: | |
return True, None | |
except Exception as e: | |
print(f"Could not get the model from the hub.: {e}") | |
return False, "was not found on hub!" |