""" THis is the main file for the gradio web demo. It uses the CogVideoX-5B model to generate videos gradio web demo. set environment variable OPENAI_API_KEY to use the OpenAI API to enhance the prompt. Usage: OpenAI_API_KEY=your_openai_api_key OPENAI_BASE_URL=https://api.openai.com/v1 python inference/gradio_web_demo.py """ import math import os import random import threading import time import spaces import cv2 import tempfile import imageio_ffmpeg import gradio as gr import torch from PIL import Image # from diffusers import ( # CogVideoXPipeline, # CogVideoXDPMScheduler, # CogVideoXVideoToVideoPipeline, # CogVideoXImageToVideoPipeline, # CogVideoXTransformer3DModel, # ) from typing import Union, List from CogVideoX.pipeline_rgba import CogVideoXPipeline from CogVideoX.rgba_utils import * from diffusers import CogVideoXDPMScheduler from diffusers.utils import load_video, load_image, export_to_video from datetime import datetime, timedelta from diffusers.image_processor import VaeImageProcessor import moviepy.editor as mp import numpy as np from huggingface_hub import hf_hub_download, snapshot_download import gc device = "cuda" if torch.cuda.is_available() else "cpu" hf_hub_download(repo_id="wileewang/TransPixar", filename="cogvideox_rgba_lora.safetensors", local_dir="model_cogvideox_rgba_lora") pipe = CogVideoXPipeline.from_pretrained("THUDM/CogVideoX-5B", torch_dtype=torch.bfloat16) # pipe.enable_sequential_cpu_offload() pipe.vae.enable_slicing() pipe.vae.enable_tiling() pipe.scheduler = CogVideoXDPMScheduler.from_config(pipe.scheduler.config, timestep_spacing="trailing") seq_length = 2 * ( (480 // pipe.vae_scale_factor_spatial // 2) * (720 // pipe.vae_scale_factor_spatial // 2) * ((13 - 1) // pipe.vae_scale_factor_temporal + 1) ) prepare_for_rgba_inference( pipe.transformer, rgba_weights_path="model_cogvideox_rgba_lora/cogvideox_rgba_lora.safetensors", device=device, dtype=torch.bfloat16, text_length=226, seq_length=seq_length, # this is for the creation of attention mask. ) # pipe.transformer.to(memory_format=torch.channels_last) # pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune", fullgraph=True) # pipe_image.transformer.to(memory_format=torch.channels_last) # pipe_image.transformer = torch.compile(pipe_image.transformer, mode="max-autotune", fullgraph=True) os.makedirs("./output", exist_ok=True) os.makedirs("./gradio_tmp", exist_ok=True) # upscale_model = utils.load_sd_upscale("model_real_esran/RealESRGAN_x4.pth", device) # frame_interpolation_model = load_rife_model("model_rife") def save_video(tensor: Union[List[np.ndarray], List[Image.Image]], fps: int = 8, prefix='rgb'): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") video_path = f"./output/{prefix}_{timestamp}.mp4" os.makedirs(os.path.dirname(video_path), exist_ok=True) export_to_video(tensor, video_path, fps=fps) return video_path def resize_if_unfit(input_video, progress=gr.Progress(track_tqdm=True)): width, height = get_video_dimensions(input_video) if width == 720 and height == 480: processed_video = input_video else: processed_video = center_crop_resize(input_video) return processed_video def get_video_dimensions(input_video_path): reader = imageio_ffmpeg.read_frames(input_video_path) metadata = next(reader) return metadata["size"] def center_crop_resize(input_video_path, target_width=720, target_height=480): cap = cv2.VideoCapture(input_video_path) orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) orig_fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width_factor = target_width / orig_width height_factor = target_height / orig_height resize_factor = max(width_factor, height_factor) inter_width = int(orig_width * resize_factor) inter_height = int(orig_height * resize_factor) target_fps = 8 ideal_skip = max(0, math.ceil(orig_fps / target_fps) - 1) skip = min(5, ideal_skip) # Cap at 5 while (total_frames / (skip + 1)) < 49 and skip > 0: skip -= 1 processed_frames = [] frame_count = 0 total_read = 0 while frame_count < 49 and total_read < total_frames: ret, frame = cap.read() if not ret: break if total_read % (skip + 1) == 0: resized = cv2.resize(frame, (inter_width, inter_height), interpolation=cv2.INTER_AREA) start_x = (inter_width - target_width) // 2 start_y = (inter_height - target_height) // 2 cropped = resized[start_y : start_y + target_height, start_x : start_x + target_width] processed_frames.append(cropped) frame_count += 1 total_read += 1 cap.release() with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file: temp_video_path = temp_file.name fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(temp_video_path, fourcc, target_fps, (target_width, target_height)) for frame in processed_frames: out.write(frame) out.release() return temp_video_path @spaces.GPU(duration=300) def infer( prompt: str, num_inference_steps: int, guidance_scale: float, seed: int = -1, progress=gr.Progress(track_tqdm=True), ): if seed == -1: seed = random.randint(0, 2**8 - 1) device = "cuda" if torch.cuda.is_available() else "cpu" pipe.to(device) video_pt = pipe( prompt=prompt + ", isolated background", num_videos_per_prompt=1, num_inference_steps=num_inference_steps, num_frames=13, use_dynamic_cfg=True, output_type="latent", guidance_scale=guidance_scale, generator=torch.Generator(device=device).manual_seed(int(seed)), ).frames # pipe.to("cpu") gc.collect() return (video_pt, seed) def convert_to_gif(video_path): clip = mp.VideoFileClip(video_path) clip = clip.set_fps(8) clip = clip.resize(height=240) gif_path = video_path.replace(".mp4", ".gif") clip.write_gif(gif_path, fps=8) return gif_path def delete_old_files(): while True: now = datetime.now() cutoff = now - timedelta(minutes=10) directories = ["./output", "./gradio_tmp"] for directory in directories: for filename in os.listdir(directory): file_path = os.path.join(directory, filename) if os.path.isfile(file_path): file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path)) if file_mtime < cutoff: os.remove(file_path) time.sleep(600) threading.Thread(target=delete_old_files, daemon=True).start() # examples_videos = [["example_videos/horse.mp4"], ["example_videos/kitten.mp4"], ["example_videos/train_running.mp4"]] # examples_images = [["example_images/beach.png"], ["example_images/street.png"], ["example_images/camping.png"]] with gr.Blocks() as demo: gr.HTML("""