import base64 import gc import io import os import tempfile import zipfile from datetime import datetime from threading import Thread from huggingface_hub import Repository import subprocess import random import requests import streamlit as st import torch from huggingface_hub import HfApi from huggingface_hub.utils._errors import RepositoryNotFoundError from huggingface_hub.utils._validators import HFValidationError from loguru import logger from PIL.PngImagePlugin import PngInfo from st_clickable_images import clickable_images no_safety_checker = None CODE_OF_CONDUCT = """ ## Code of conduct The app should not be used to intentionally create or disseminate images that create hostile or alienating environments for people. This includes generating images that people would foreseeably find disturbing, distressing, or offensive; or content that propagates historical or current stereotypes. Using the app to generate content that is cruel to individuals is a misuse of this app. One shall not use this app to generate content that is intended to be cruel to individuals, or to generate content that is intended to be cruel to individuals in a way that is not obvious to the viewer. This includes, but is not limited to: - Generating demeaning, dehumanizing, or otherwise harmful representations of people or their environments, cultures, religions, etc. - Intentionally promoting or propagating discriminatory content or harmful stereotypes. - Impersonating individuals without their consent. - Sexual content without consent of the people who might see it. - Mis- and disinformation - Representations of egregious violence and gore - Sharing of copyrighted or licensed material in violation of its terms of use. - Sharing content that is an alteration of copyrighted or licensed material in violation of its terms of use. By using this app, you agree to the above code of conduct. """ def use_auth_token(): token_path = os.path.join(os.path.expanduser("~"), ".huggingface", "token") if os.path.exists(token_path): return True if "HF_TOKEN" in os.environ: return os.environ["HF_TOKEN"] return False def download_file(file_url): r = requests.get(file_url, stream=True) with tempfile.NamedTemporaryFile(delete=False) as tmp: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks tmp.write(chunk) return tmp.name def cache_folder(): _cache_folder = os.path.join(os.path.expanduser("~"), ".ffusion") os.makedirs(_cache_folder, exist_ok=True) return _cache_folder def clear_memory(preserve): torch.cuda.empty_cache() gc.collect() to_clear = ["inpainting", "text2img", "img2text"] for key in to_clear: if key not in preserve and key in st.session_state: del st.session_state[key] import subprocess from huggingface_hub import Repository def save_to_hub(image, current_datetime, metadata, output_path): """Saves an image to Hugging Face Hub""" try: # Convert image to byte array byte_arr = io.BytesIO() # Check if the image has metadata if image.info: # Save as PNG image.save(byte_arr, format='PNG') else: # Save as JPG image.save(byte_arr, format='JPEG') byte_arr = byte_arr.getvalue() # Create a repository object token = os.getenv("HF_TOKEN") api = HfApi() username = "FFusion" repo_name = "FF" try: repo = Repository(f"{username}/{repo_name}", clone_from=f"{username}/{repo_name}", use_auth_token=token, repo_type="dataset") except RepositoryNotFoundError: repo = Repository(f"{username}/{repo_name}", clone_from=f"{username}/{repo_name}", use_auth_token=token, repo_type="dataset") # Pull the latest changes from the remote repository repo.git_pull() # Generate a random 10-digit number random_number = random.randint(1000000000, 9999999999) # Replace "0.png" in output_path with the random number output_path = output_path.replace("0.png", f"{random_number}.png") # Create the directory if it does not exist os.makedirs(os.path.dirname(f"{repo.local_dir}/{output_path}"), exist_ok=True) # Write image to repository with open(f"{repo.local_dir}/{output_path}", "wb") as f: f.write(byte_arr) # Set Git username and email subprocess.run(["git", "config", "user.name", "idle stoev"], check=True, cwd=repo.local_dir) subprocess.run(["git", "config", "user.email", "di@ffusion.ai"], check=True, cwd=repo.local_dir) # Commit and push changes repo.git_add(pattern=".") repo.git_commit(f"Add image at {current_datetime}") print(f"Pushing changes to {username}/{repo_name}...") repo.git_push() print(f"Image saved to {username}/{repo_name}/{output_path}") except Exception as e: print(f"Failed to save image to Hugging Face Hub: {e}") def save_to_local(images, module, current_datetime, metadata, output_path): _metadata = PngInfo() _metadata.add_text("text2img", metadata) os.makedirs(output_path, exist_ok=True) os.makedirs(f"{output_path}/{module}", exist_ok=True) os.makedirs(f"{output_path}/{module}/{current_datetime}", exist_ok=True) for i, img in enumerate(images): img.save( f"{output_path}/{module}/{current_datetime}/{i}.png", pnginfo=_metadata, ) # save metadata as text file with open(f"{output_path}/{module}/{current_datetime}/metadata.txt", "w") as f: f.write(metadata) logger.info(f"Saved images to {output_path}/{module}/{current_datetime}") def save_images(images, module, metadata, output_path): if output_path is None: logger.warning("No output path specified, skipping saving images") return api = HfApi() dset_info = None try: dset_info = api.dataset_info(output_path) except (HFValidationError, RepositoryNotFoundError): logger.warning("No valid hugging face repo. Saving locally...") current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") if not dset_info: save_to_local(images, module, current_datetime, metadata, output_path) else: Thread(target=save_to_hub, args=(api, images, module, current_datetime, metadata, output_path)).start() def display_and_download_images(output_images, metadata, download_col=None): # st.image(output_images, width=128, output_format="PNG") with st.spinner("Preparing images for download..."): # save images to a temporary directory with tempfile.TemporaryDirectory() as tmpdir: gallery_images = [] for i, image in enumerate(output_images): image.save(os.path.join(tmpdir, f"{i + 1}.png"), pnginfo=metadata) with open(os.path.join(tmpdir, f"{i + 1}.png"), "rb") as img: encoded = base64.b64encode(img.read()).decode() gallery_images.append(f"data:image/jpeg;base64,{encoded}") # zip the images zip_path = os.path.join(tmpdir, "images.zip") with zipfile.ZipFile(zip_path, "w") as zip: for filename in os.listdir(tmpdir): if filename.endswith(".png"): zip.write(os.path.join(tmpdir, filename), filename) # convert zip to base64 with open(zip_path, "rb") as f: encoded = base64.b64encode(f.read()).decode() _ = clickable_images( gallery_images, titles=[f"Image #{str(i)}" for i in range(len(gallery_images))], div_style={"display": "flex", "justify-content": "center", "flex-wrap": "wrap"}, img_style={"margin": "5px", "height": "200px"}, ) # add download link st.markdown( f""" Download Images """, unsafe_allow_html=True, )