VideoModelStudio / import_service.py
jbilcke-hf's picture
jbilcke-hf HF staff
initial commit log 🪵🦫
91fb4ef
import os
import shutil
import zipfile
import tempfile
import gradio as gr
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from pytubefix import YouTube
import logging
from utils import is_image_file, is_video_file, add_prefix_to_caption
from image_preprocessing import normalize_image
from config import NORMALIZE_IMAGES_TO, TRAINING_VIDEOS_PATH, VIDEOS_TO_SPLIT_PATH, TRAINING_PATH, DEFAULT_PROMPT_PREFIX
logger = logging.getLogger(__name__)
class ImportService:
def process_uploaded_files(self, file_paths: List[str]) -> str:
"""Process uploaded file (ZIP, MP4, or image)
Args:
file_paths: File paths to the ploaded files from Gradio
Returns:
Status message string
"""
for file_path in file_paths:
file_path = Path(file_path)
try:
original_name = file_path.name
print("original_name = ", original_name)
# Determine file type from name
file_ext = file_path.suffix.lower()
if file_ext == '.zip':
return self.process_zip_file(file_path)
elif file_ext == '.mp4' or file_ext == '.webm':
return self.process_mp4_file(file_path, original_name)
elif is_image_file(file_path):
return self.process_image_file(file_path, original_name)
else:
raise gr.Error(f"Unsupported file type: {file_ext}")
except Exception as e:
raise gr.Error(f"Error processing file: {str(e)}")
def process_image_file(self, file_path: Path, original_name: str) -> str:
"""Process a single image file
Args:
file_path: Path to the image
original_name: Original filename
Returns:
Status message string
"""
try:
# Create a unique filename with configured extension
stem = Path(original_name).stem
target_path = STAGING_PATH / f"{stem}.{NORMALIZE_IMAGES_TO}"
# If file already exists, add number suffix
counter = 1
while target_path.exists():
target_path = STAGING_PATH / f"{stem}___{counter}.{NORMALIZE_IMAGES_TO}"
counter += 1
# Convert to normalized format and remove black bars
success = normalize_image(file_path, target_path)
if not success:
raise gr.Error(f"Failed to process image: {original_name}")
# Handle caption
src_caption_path = file_path.with_suffix('.txt')
if src_caption_path.exists():
caption = src_caption_path.read_text()
caption = add_prefix_to_caption(caption, DEFAULT_PROMPT_PREFIX)
target_path.with_suffix('.txt').write_text(caption)
logger.info(f"Successfully stored image: {target_path.name}")
gr.Info(f"Successfully stored image: {target_path.name}")
return f"Successfully stored image: {target_path.name}"
except Exception as e:
raise gr.Error(f"Error processing image file: {str(e)}")
def process_zip_file(self, file_path: Path) -> str:
"""Process uploaded ZIP file containing media files
Args:
file_path: Path to the uploaded ZIP file
Returns:
Status message string
"""
try:
video_count = 0
image_count = 0
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Extract ZIP
extract_dir = Path(temp_dir) / "extracted"
extract_dir.mkdir()
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# Process each file
for root, _, files in os.walk(extract_dir):
for file in files:
if file.startswith('._'): # Skip Mac metadata
continue
file_path = Path(root) / file
try:
if is_video_file(file_path):
# Copy video to videos_to_split
target_path = VIDEOS_TO_SPLIT_PATH / file_path.name
counter = 1
while target_path.exists():
target_path = VIDEOS_TO_SPLIT_PATH / f"{file_path.stem}___{counter}{file_path.suffix}"
counter += 1
shutil.copy2(file_path, target_path)
video_count += 1
elif is_image_file(file_path):
# Convert image and save to staging
target_path = STAGING_PATH / f"{file_path.stem}.{NORMALIZE_IMAGES_TO}"
counter = 1
while target_path.exists():
target_path = STAGING_PATH / f"{file_path.stem}___{counter}.{NORMALIZE_IMAGES_TO}"
counter += 1
if normalize_image(file_path, target_path):
image_count += 1
# Copy associated caption file if it exists
txt_path = file_path.with_suffix('.txt')
if txt_path.exists():
if is_video_file(file_path):
shutil.copy2(txt_path, target_path.with_suffix('.txt'))
elif is_image_file(file_path):
shutil.copy2(txt_path, target_path.with_suffix('.txt'))
except Exception as e:
logger.error(f"Error processing {file_path.name}: {str(e)}")
continue
# Generate status message
parts = []
if video_count > 0:
parts.append(f"{video_count} videos")
if image_count > 0:
parts.append(f"{image_count} images")
if not parts:
return "No supported media files found in ZIP"
status = f"Successfully stored {' and '.join(parts)}"
gr.Info(status)
return status
except Exception as e:
raise gr.Error(f"Error processing ZIP: {str(e)}")
def process_mp4_file(self, file_path: Path, original_name: str) -> str:
"""Process a single video file
Args:
file_path: Path to the file
original_name: Original filename
Returns:
Status message string
"""
try:
# Create a unique filename
target_path = VIDEOS_TO_SPLIT_PATH / original_name
# If file already exists, add number suffix
counter = 1
while target_path.exists():
stem = Path(original_name).stem
target_path = VIDEOS_TO_SPLIT_PATH / f"{stem}___{counter}.mp4"
counter += 1
# Copy the file to the target location
shutil.copy2(file_path, target_path)
gr.Info(f"Successfully stored video: {target_path.name}")
return f"Successfully stored video: {target_path.name}"
except Exception as e:
raise gr.Error(f"Error processing video file: {str(e)}")
def download_youtube_video(self, url: str, progress=None) -> Dict:
"""Download a video from YouTube
Args:
url: YouTube video URL
progress: Optional Gradio progress indicator
Returns:
Dict with status message and error (if any)
"""
try:
# Extract video ID and create YouTube object
yt = YouTube(url, on_progress_callback=lambda stream, chunk, bytes_remaining:
progress((1 - bytes_remaining / stream.filesize), desc="Downloading...")
if progress else None)
video_id = yt.video_id
output_path = VIDEOS_TO_SPLIT_PATH / f"{video_id}.mp4"
# Download highest quality progressive MP4
if progress:
print("Getting video streams...")
progress(0, desc="Getting video streams...")
video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
if not video:
print("Could not find a compatible video format")
gr.Error("Could not find a compatible video format")
return "Could not find a compatible video format"
# Download the video
if progress:
print("Starting YouTube video download...")
progress(0, desc="Starting download...")
video.download(output_path=str(VIDEOS_TO_SPLIT_PATH), filename=f"{video_id}.mp4")
# Update UI
if progress:
print("YouTube video download complete!")
gr.Info("YouTube video download complete!")
progress(1, desc="Download complete!")
return f"Successfully downloaded video: {yt.title}"
except Exception as e:
print(e)
gr.Error(f"Error downloading video: {str(e)}")
return f"Error downloading video: {str(e)}"