OpenSUNO / app.py
ginipick's picture
Update app.py
6df0adf verified
raw
history blame
23.7 kB
import gradio as gr
import subprocess
import os
import shutil
import tempfile
import torch
import logging
import numpy as np
import re
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
# λ‘œκΉ… μ„€μ •
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('yue_generation.log'),
logging.StreamHandler()
]
)
def optimize_gpu_settings():
if torch.cuda.is_available():
# GPU λ©”λͺ¨λ¦¬ 관리 μ΅œμ ν™”
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.enabled = True
torch.backends.cudnn.deterministic = False
# L40S에 μ΅œμ ν™”λœ λ©”λͺ¨λ¦¬ μ„€μ •
torch.cuda.empty_cache()
torch.cuda.set_device(0)
# CUDA 슀트림 μ΅œμ ν™”
torch.cuda.Stream(0)
# λ©”λͺ¨λ¦¬ ν• λ‹Ή μ΅œμ ν™”
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:512'
logging.info(f"Using GPU: {torch.cuda.get_device_name(0)}")
logging.info(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB")
# L40S νŠΉν™” μ„€μ •
if 'L40S' in torch.cuda.get_device_name(0):
torch.cuda.set_per_process_memory_fraction(0.95)
def analyze_lyrics(lyrics, repeat_chorus=2):
lines = [line.strip() for line in lyrics.split('\n') if line.strip()]
sections = {
'verse': 0,
'chorus': 0,
'bridge': 0,
'total_lines': len(lines)
}
current_section = None
section_lines = {
'verse': [],
'chorus': [],
'bridge': []
}
last_section = None
# λ§ˆμ§€λ§‰ μ„Ήμ…˜ νƒœκ·Έ μ°ΎκΈ°
for i, line in enumerate(lines):
if '[verse]' in line.lower() or '[chorus]' in line.lower() or '[bridge]' in line.lower():
last_section = i
for i, line in enumerate(lines):
lower_line = line.lower()
# μ„Ήμ…˜ νƒœκ·Έ 처리
if '[verse]' in lower_line:
if current_section: # 이전 μ„Ήμ…˜μ˜ 라인듀 μ €μž₯
section_lines[current_section].extend(lines[last_section_start:i])
current_section = 'verse'
sections['verse'] += 1
last_section_start = i + 1
continue
elif '[chorus]' in lower_line:
if current_section:
section_lines[current_section].extend(lines[last_section_start:i])
current_section = 'chorus'
sections['chorus'] += 1
last_section_start = i + 1
continue
elif '[bridge]' in lower_line:
if current_section:
section_lines[current_section].extend(lines[last_section_start:i])
current_section = 'bridge'
sections['bridge'] += 1
last_section_start = i + 1
continue
# λ§ˆμ§€λ§‰ μ„Ήμ…˜μ˜ 라인듀 μΆ”κ°€
if current_section and last_section_start < len(lines):
section_lines[current_section].extend(lines[last_section_start:])
# μ½”λŸ¬μŠ€ 반볡 처리
if sections['chorus'] > 0 and repeat_chorus > 1:
original_chorus = section_lines['chorus'][:]
for _ in range(repeat_chorus - 1):
section_lines['chorus'].extend(original_chorus)
# μ„Ήμ…˜λ³„ 라인 수 확인 λ‘œκΉ…
logging.info(f"Section line counts - Verse: {len(section_lines['verse'])}, "
f"Chorus: {len(section_lines['chorus'])}, "
f"Bridge: {len(section_lines['bridge'])}")
return sections, (sections['verse'] + sections['chorus'] + sections['bridge']), len(lines), section_lines
def calculate_generation_params(lyrics):
sections, total_sections, total_lines, section_lines = analyze_lyrics(lyrics)
# κΈ°λ³Έ μ‹œκ°„ 계산 (초 λ‹¨μœ„)
time_per_line = {
'verse': 4, # verseλŠ” ν•œ 쀄당 4초
'chorus': 6, # chorusλŠ” ν•œ 쀄당 6초
'bridge': 5 # bridgeλŠ” ν•œ 쀄당 5초
}
# 각 μ„Ήμ…˜λ³„ μ˜ˆμƒ μ‹œκ°„ 계산 (λ§ˆμ§€λ§‰ μ„Ήμ…˜ 포함)
section_durations = {}
for section_type in ['verse', 'chorus', 'bridge']:
lines_count = len(section_lines[section_type])
section_durations[section_type] = lines_count * time_per_line[section_type]
# 전체 μ‹œκ°„ 계산 (μ—¬μœ  μ‹œκ°„ μΆ”κ°€)
total_duration = sum(duration for duration in section_durations.values())
total_duration = max(60, int(total_duration * 1.2)) # 20% μ—¬μœ  μ‹œκ°„ μΆ”κ°€
# 토큰 계산 (λ§ˆμ§€λ§‰ μ„Ήμ…˜μ„ μœ„ν•œ μΆ”κ°€ 토큰)
base_tokens = 3000
tokens_per_line = 200
extra_tokens = 1000 # λ§ˆμ§€λ§‰ μ„Ήμ…˜μ„ μœ„ν•œ μΆ”κ°€ 토큰
total_tokens = base_tokens + (total_lines * tokens_per_line) + extra_tokens
# μ„Έκ·Έλ¨ΌνŠΈ 수 계산 (λ§ˆμ§€λ§‰ μ„Ήμ…˜μ„ μœ„ν•œ μΆ”κ°€ μ„Έκ·Έλ¨ΌνŠΈ)
if sections['chorus'] > 0:
num_segments = 4 # μ½”λŸ¬μŠ€κ°€ μžˆλŠ” 경우 4개 μ„Έκ·Έλ¨ΌνŠΈ
else:
num_segments = 3 # μ½”λŸ¬μŠ€κ°€ μ—†λŠ” 경우 3개 μ„Έκ·Έλ¨ΌνŠΈ
# 토큰 수 μ œν•œ (더 큰 μ œν•œ)
max_tokens = min(12000, total_tokens) # μ΅œλŒ€ 토큰 수 증가
return {
'max_tokens': max_tokens,
'num_segments': num_segments,
'sections': sections,
'section_lines': section_lines,
'estimated_duration': total_duration,
'section_durations': section_durations,
'has_chorus': sections['chorus'] > 0
}
def detect_and_select_model(text):
if re.search(r'[\u3131-\u318E\uAC00-\uD7A3]', text):
return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot"
elif re.search(r'[\u4e00-\u9fff]', text):
return "m-a-p/YuE-s1-7B-anneal-zh-cot"
elif re.search(r'[\u3040-\u309F\u30A0-\u30FF]', text):
return "m-a-p/YuE-s1-7B-anneal-jp-kr-cot"
else:
return "m-a-p/YuE-s1-7B-anneal-en-cot"
def install_flash_attn():
try:
if not torch.cuda.is_available():
logging.warning("GPU not available, skipping flash-attn installation")
return False
cuda_version = torch.version.cuda
if cuda_version is None:
logging.warning("CUDA not available, skipping flash-attn installation")
return False
logging.info(f"Detected CUDA version: {cuda_version}")
try:
import flash_attn
logging.info("flash-attn already installed")
return True
except ImportError:
logging.info("Installing flash-attn...")
subprocess.run(
["pip", "install", "flash-attn", "--no-build-isolation"],
check=True,
capture_output=True
)
logging.info("flash-attn installed successfully!")
return True
except Exception as e:
logging.warning(f"Failed to install flash-attn: {e}")
return False
def initialize_system():
optimize_gpu_settings()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
futures.append(executor.submit(install_flash_attn))
from huggingface_hub import snapshot_download
folder_path = './inference/xcodec_mini_infer'
os.makedirs(folder_path, exist_ok=True)
logging.info(f"Created folder at: {folder_path}")
futures.append(executor.submit(
snapshot_download,
repo_id="m-a-p/xcodec_mini_infer",
local_dir="./inference/xcodec_mini_infer",
resume_download=True
))
for future in futures:
future.result()
try:
os.chdir("./inference")
logging.info(f"Working directory changed to: {os.getcwd()}")
except FileNotFoundError as e:
logging.error(f"Directory error: {e}")
raise
@lru_cache(maxsize=100)
def get_cached_file_path(content_hash, prefix):
return create_temp_file(content_hash, prefix)
def empty_output_folder(output_dir):
try:
shutil.rmtree(output_dir)
os.makedirs(output_dir)
logging.info(f"Output folder cleaned: {output_dir}")
except Exception as e:
logging.error(f"Error cleaning output folder: {e}")
raise
def create_temp_file(content, prefix, suffix=".txt"):
temp_file = tempfile.NamedTemporaryFile(delete=False, mode="w", prefix=prefix, suffix=suffix)
content = content.strip() + "\n\n"
content = content.replace("\r\n", "\n").replace("\r", "\n")
temp_file.write(content)
temp_file.close()
logging.debug(f"Temporary file created: {temp_file.name}")
return temp_file.name
def get_last_mp3_file(output_dir):
mp3_files = [f for f in os.listdir(output_dir) if f.endswith('.mp3')]
if not mp3_files:
logging.warning("No MP3 files found")
return None
mp3_files_with_path = [os.path.join(output_dir, f) for f in mp3_files]
mp3_files_with_path.sort(key=os.path.getmtime, reverse=True)
return mp3_files_with_path[0]
def get_audio_duration(file_path):
try:
import librosa
duration = librosa.get_duration(path=file_path)
return duration
except Exception as e:
logging.error(f"Failed to get audio duration: {e}")
return None
def infer(genre_txt_content, lyrics_txt_content, num_segments, max_new_tokens):
genre_txt_path = None
lyrics_txt_path = None
try:
model_path, config, params = optimize_model_selection(lyrics_txt_content, genre_txt_content)
logging.info(f"Selected model: {model_path}")
logging.info(f"Lyrics analysis: {params}")
has_chorus = params['sections']['chorus'] > 0
estimated_duration = params.get('estimated_duration', 90)
# μ„Έκ·Έλ¨ΌνŠΈ 및 토큰 수 μ„€μ •
if has_chorus:
actual_max_tokens = min(12000, int(config['max_tokens'] * 1.3)) # 30% 더 λ§Žμ€ 토큰
actual_num_segments = min(5, params['num_segments'] + 2) # μΆ”κ°€ μ„Έκ·Έλ¨ΌνŠΈ
else:
actual_max_tokens = min(10000, int(config['max_tokens'] * 1.2))
actual_num_segments = min(4, params['num_segments'] + 1)
logging.info(f"Estimated duration: {estimated_duration} seconds")
logging.info(f"Has chorus sections: {has_chorus}")
logging.info(f"Using segments: {actual_num_segments}, tokens: {actual_max_tokens}")
genre_txt_path = create_temp_file(genre_txt_content, prefix="genre_")
lyrics_txt_path = create_temp_file(lyrics_txt_content, prefix="lyrics_")
output_dir = "./output"
os.makedirs(output_dir, exist_ok=True)
empty_output_folder(output_dir)
# μˆ˜μ •λœ command - μ§€μ›λ˜μ§€ μ•ŠλŠ” 인수 제거
command = [
"python", "infer.py",
"--stage1_model", model_path,
"--stage2_model", "m-a-p/YuE-s2-1B-general",
"--genre_txt", genre_txt_path,
"--lyrics_txt", lyrics_txt_path,
"--run_n_segments", str(actual_num_segments),
"--stage2_batch_size", "16",
"--output_dir", output_dir,
"--cuda_idx", "0",
"--max_new_tokens", str(actual_max_tokens),
"--disable_offload_model" # GPU λ©”λͺ¨λ¦¬ μ΅œμ ν™”λ₯Ό μœ„ν•΄ μΆ”κ°€
]
env = os.environ.copy()
if torch.cuda.is_available():
env.update({
"CUDA_VISIBLE_DEVICES": "0",
"CUDA_HOME": "/usr/local/cuda",
"PATH": f"/usr/local/cuda/bin:{env.get('PATH', '')}",
"LD_LIBRARY_PATH": f"/usr/local/cuda/lib64:{env.get('LD_LIBRARY_PATH', '')}",
"PYTORCH_CUDA_ALLOC_CONF": "max_split_size_mb:512",
"CUDA_LAUNCH_BLOCKING": "0"
})
# transformers μΊμ‹œ λ§ˆμ΄κ·Έλ ˆμ΄μ…˜ 처리
try:
from transformers.utils import move_cache
move_cache()
except Exception as e:
logging.warning(f"Cache migration warning (non-critical): {e}")
process = subprocess.run(
command,
env=env,
check=False,
capture_output=True,
text=True
)
logging.info(f"Command output: {process.stdout}")
if process.stderr:
logging.error(f"Command error: {process.stderr}")
if process.returncode != 0:
logging.error(f"Command failed with return code: {process.returncode}")
logging.error(f"Command: {' '.join(command)}")
raise RuntimeError(f"Inference failed: {process.stderr}")
last_mp3 = get_last_mp3_file(output_dir)
if last_mp3:
try:
duration = get_audio_duration(last_mp3)
logging.info(f"Generated audio file: {last_mp3}")
if duration:
logging.info(f"Audio duration: {duration:.2f} seconds")
logging.info(f"Expected duration: {estimated_duration} seconds")
if duration < estimated_duration * 0.8:
logging.warning(f"Generated audio is shorter than expected: {duration:.2f}s < {estimated_duration:.2f}s")
except Exception as e:
logging.warning(f"Failed to get audio duration: {e}")
return last_mp3
else:
logging.warning("No output audio file generated")
return None
except Exception as e:
logging.error(f"Inference error: {e}")
raise
finally:
for path in [genre_txt_path, lyrics_txt_path]:
if path and os.path.exists(path):
try:
os.remove(path)
logging.debug(f"Removed temporary file: {path}")
except Exception as e:
logging.warning(f"Failed to remove temporary file {path}: {e}")
def optimize_model_selection(lyrics, genre):
model_path = detect_and_select_model(lyrics)
params = calculate_generation_params(lyrics)
has_chorus = params['sections']['chorus'] > 0
tokens_per_segment = params['max_tokens'] // params['num_segments']
model_config = {
"m-a-p/YuE-s1-7B-anneal-en-cot": {
"max_tokens": params['max_tokens'],
"temperature": 0.8,
"batch_size": 16,
"num_segments": params['num_segments'],
"estimated_duration": params['estimated_duration']
},
"m-a-p/YuE-s1-7B-anneal-jp-kr-cot": {
"max_tokens": params['max_tokens'],
"temperature": 0.7,
"batch_size": 16,
"num_segments": params['num_segments'],
"estimated_duration": params['estimated_duration']
},
"m-a-p/YuE-s1-7B-anneal-zh-cot": {
"max_tokens": params['max_tokens'],
"temperature": 0.7,
"batch_size": 16,
"num_segments": params['num_segments'],
"estimated_duration": params['estimated_duration']
}
}
if has_chorus:
for config in model_config.values():
config['max_tokens'] = int(config['max_tokens'] * 1.5)
return model_path, model_config[model_path], params
css = """
#main-container {
max-width: 1200px;
margin: auto;
padding: 20px;
}
#header {
text-align: center;
margin-bottom: 30px;
}
#genre-input, #lyrics-input {
border-radius: 8px;
}
#generate-btn {
margin-top: 20px;
min-height: 45px;
}
.label {
font-weight: bold;
}
.example-container {
background: #f8f9fa;
padding: 15px;
border-radius: 8px;
margin: 10px 0;
}
"""
def main():
with gr.Blocks(theme=gr.themes.Soft(
primary_hue="indigo",
secondary_hue="purple",
neutral_hue="slate",
font=["Arial", "sans-serif"]
), css=css) as demo:
)) as demo:
with gr.Column(elem_id="main-container"):
# 헀더 μ„Ήμ…˜
with gr.Row(elem_id="header"):
gr.Markdown(
"""
# 🎡 Open SUNO: AI Music Generator
### Create complete songs from your lyrics in multiple languages
""",
elem_id="title"
)
# 메인 컨텐츠λ₯Ό νƒ­μœΌλ‘œ ꡬ성
with gr.Tabs() as tabs:
# 생성 νƒ­
with gr.TabItem("✨ Create Music", id="create"):
with gr.Row():
# μž…λ ₯ μ„Ήμ…˜
with gr.Column(scale=1):
genre_txt = gr.Textbox(
label="🎸 Music Genre & Style",
placeholder="e.g., K-pop bright energetic synth dance electronic...",
elem_id="genre-input"
)
lyrics_txt = gr.Textbox(
label="πŸ“ Lyrics",
placeholder="Enter lyrics with section tags: [verse], [chorus], [bridge]...",
lines=10,
elem_id="lyrics-input"
)
# 정보 ν‘œμ‹œ μ„Ήμ…˜
with gr.Row():
with gr.Column(scale=1):
duration_info = gr.Label(
label="⏱️ Estimated Duration",
elem_id="duration-info"
)
with gr.Column(scale=1):
sections_info = gr.Label(
label="πŸ“Š Section Analysis",
elem_id="sections-info"
)
# 생성 λ²„νŠΌ
submit_btn = gr.Button(
"🎼 Generate Music",
variant="primary",
elem_id="generate-btn"
)
# 좜λ ₯ μ„Ήμ…˜
with gr.Column(scale=1):
music_out = gr.Audio(
label="🎡 Generated Music",
elem_id="music-output"
)
# 진행 μƒνƒœ ν‘œμ‹œ
progress = gr.Textbox(
label="Generation Status",
interactive=False,
elem_id="progress-status"
)
# νžˆμŠ€ν† λ¦¬ νƒ­
with gr.TabItem("πŸ“š History", id="history"):
history_list = gr.Dataset(
components=[gr.Audio, gr.Textbox, gr.Textbox],
headers=["Generated Music", "Genre", "Lyrics"],
samples=[],
elem_id="history-list"
)
gr.Markdown("*Click on any entry to play the music*")
# 예제 μ„Ήμ…˜
with gr.Accordion("πŸ“– Examples", open=False):
gr.Examples(
examples=[
[
"female blues airy vocal bright vocal piano sad romantic guitar jazz",
"""[verse]
In the quiet of the evening, shadows start to fall
Whispers of the night wind echo through the hall
Lost within the silence, I hear your gentle voice
Guiding me back homeward, making my heart rejoice
[chorus]
Don't let this moment fade, hold me close tonight
"""
],
[
"K-pop bright energetic synth dance electronic",
"""
[verse]
μ–Έμ  κ°€ λ§ˆμ£Όν•œ λˆˆλΉ› μ†μ—μ„œ
μ–΄λ‘μš΄ 밀을 지날 λ•Œλ§ˆλ‹€
[chorus]
λ‹€μ‹œ ν•œ 번 λ‚΄κ²Œ λ§ν•΄μ€˜
"""
]
],
inputs=[genre_txt, lyrics_txt]
)
# 도움말 및 μ„€λͺ… μ„Ήμ…˜
with gr.Accordion("ℹ️ Help & Information", open=False):
gr.Markdown(
"""
### 🎡 How to Use
1. **Enter Genre & Style**: Describe the musical style you want (e.g., "K-pop", "Jazz", "Rock")
2. **Input Lyrics**: Write your lyrics using section tags:
- Use `[verse]` for verses
- Use `[chorus]` for choruses
- Use `[bridge]` for bridges
3. **Generate**: Click the Generate button and wait for your music!
### 🌏 Supported Languages
- English
- Korean (ν•œκ΅­μ–΄)
- Japanese (ζ—₯本θͺž)
- Chinese (δΈ­ζ–‡)
### ⚑ Tips
- Be specific with your genre descriptions
- Include emotion and instrument preferences
- Make sure to properly tag your lyrics sections
- For best results, include both verse and chorus sections
"""
)
# μ‹œμŠ€ν…œ μ΄ˆκΈ°ν™”
initialize_system()
def update_info(lyrics):
if not lyrics:
return "No lyrics entered", "No sections detected"
params = calculate_generation_params(lyrics)
duration = params['estimated_duration']
sections = params['sections']
return (
f"⏱️ Estimated: {duration:.1f} seconds",
f"πŸ“Š Verses: {sections['verse']}, Chorus: {sections['chorus']}"
)
def update_history(audio, genre, lyrics):
return history_list.update(samples=[[audio, genre, lyrics]] + history_list.samples)
# 이벀트 ν•Έλ“€λŸ¬
lyrics_txt.change(
fn=update_info,
inputs=[lyrics_txt],
outputs=[duration_info, sections_info]
)
def generate_with_progress(genre, lyrics, num_segments, max_tokens):
progress.update(value="🎡 Starting generation...")
try:
result = infer(genre, lyrics, num_segments, max_tokens)
if result:
progress.update(value="βœ… Generation complete!")
update_history(result, genre, lyrics)
return result
else:
progress.update(value="❌ Generation failed")
return None
except Exception as e:
progress.update(value=f"❌ Error: {str(e)}")
return None
submit_btn.click(
fn=generate_with_progress,
inputs=[genre_txt, lyrics_txt, num_segments, max_new_tokens],
outputs=[music_out]
)
return demo
if __name__ == "__main__":
demo = main()
demo.queue(max_size=20).launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_api=True,
show_error=True,
max_threads=8
)