import os import subprocess import signal os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" import gradio as gr import tempfile import torch import requests from huggingface_hub import HfApi, ModelCard, whoami from gradio_huggingfacehub_search import HuggingfaceHubSearch from pathlib import Path from textwrap import dedent ########### import subprocess import threading from queue import Queue, Empty def stream_output(pipe, queue): """Read output from pipe and put it in the queue.""" for line in iter(pipe.readline, b''): queue.put(line.decode('utf-8').rstrip()) pipe.close() def run_command(command, env_vars): # Create process with pipes for stdout and stderr process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, #bufsize=1, universal_newlines=False, env=env_vars, ) # Create queues to store output stdout_queue = Queue() stderr_queue = Queue() # Create and start threads to read output stdout_thread = threading.Thread(target=stream_output, args=(process.stdout, stdout_queue)) stderr_thread = threading.Thread(target=stream_output, args=(process.stderr, stderr_queue)) stdout_thread.daemon = True stderr_thread.daemon = True stdout_thread.start() stderr_thread.start() output_stdout = "" output_stderr = "" # Monitor output in real-time while process.poll() is None: # Check stdout try: stdout_line = stdout_queue.get_nowait() print(f"STDOUT: {stdout_line}") output_stdout += stdout_line + "\n" except Empty: pass # Check stderr try: stderr_line = stderr_queue.get_nowait() print(f"STDERR: {stderr_line}") output_stderr += stderr_line + "\n" except Empty: pass # Get remaining lines stdout_thread.join() stderr_thread.join() return (process.returncode, output_stdout, output_stderr) ########### def guess_base_model(ft_model_id): res = requests.get(f"https://huggingface.co./api/models/{ft_model_id}") res = res.json() for tag in res["tags"]: if tag.startswith("base_model:"): return tag.split(":")[-1] raise Exception("Cannot guess the base model, please enter it manually") def process_model(ft_model_id: str, base_model_id: str, rank: str, private_repo, oauth_token: gr.OAuthToken | None): # validate the oauth token try: whoami(oauth_token.token) except Exception as e: raise gr.Error("You must be logged in") model_name = ft_model_id.split('/')[-1] # validate the oauth token whoami(oauth_token.token) if not os.path.exists("outputs"): os.makedirs("outputs") try: api = HfApi(token=oauth_token.token) if not base_model_id: base_model_id = guess_base_model(ft_model_id) print("guess_base_model", base_model_id) with tempfile.TemporaryDirectory(dir="outputs") as outputdir: device = "cuda" if torch.cuda.is_available() else "cpu" cmd = [ "mergekit-extract-lora", ft_model_id, base_model_id, outputdir, f"--rank={rank}", f"--device={device}" ] print("cmd", cmd) env_vars = dict(os.environ, HF_TOKEN=oauth_token.token) returncode, output_stdout, output_stderr = run_command(cmd, env_vars) print("returncode", returncode) print("output_stdout", output_stdout) print("output_stderr", output_stderr) if returncode != 0: raise Exception(f"Error converting to LoRA PEFT {output_stderr}") print("Model converted to LoRA PEFT successfully!") print(f"Converted model path: {outputdir}") # Check output dir if not os.listdir(outputdir): raise Exception("Output directory is empty!") # Create repo username = whoami(oauth_token.token)["name"] new_repo_url = api.create_repo(repo_id=f"{username}/LoRA-{model_name}", exist_ok=True, private=private_repo) new_repo_id = new_repo_url.repo_id print("Repo created successfully!", new_repo_url) # Upload files api.upload_folder( folder_path=outputdir, path_in_repo="", repo_id=new_repo_id, ) print("Uploaded", outputdir) return ( f'

✅ DONE



Find your repo here: {new_repo_id}' ) except Exception as e: return (f"

❌ ERROR



{e}") css="""/* Custom CSS to allow scrolling */ .gradio-container {overflow-y: auto;} """ # Create Gradio interface with gr.Blocks(css=css) as demo: gr.Markdown("You must be logged in.") gr.LoginButton(min_width=250) ft_model_id = HuggingfaceHubSearch( label="Fine tuned model repository", placeholder="Fine tuned model", search_type="model", ) base_model_id = HuggingfaceHubSearch( label="Base model repository (optional)", placeholder="If empty, it will be guessed from repo tags", search_type="model", ) rank = gr.Dropdown( ["16", "32", "64", "128"], label="LoRA rank", info="Higher the rank, better the result, but heavier the adapter", value="32", filterable=False, visible=True ) private_repo = gr.Checkbox( value=False, label="Private Repo", info="Create a private repo under your username." ) iface = gr.Interface( fn=process_model, inputs=[ ft_model_id, base_model_id, rank, private_repo, ], outputs=[ gr.Markdown(label="output"), ], title="Convert fine tuned model into LoRA with mergekit-extract-lora", description="The space takes a fine tuned model, a base model, then make a PEFT-compatible LoRA adapter based on the difference between 2 models.

NOTE: Each conversion takes about 5 to 20 minutes, depending on how big the model is.", api_name=False ) # Launch the interface demo.queue(default_concurrency_limit=1, max_size=5).launch(debug=True, show_api=False)