File size: 6,879 Bytes
98e0bae 41b0801 98e0bae f7cdc90 98e0bae f7cdc90 e380265 98e0bae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
import torch
import pytube as pt
from transformers import pipeline
import json
import whisper_timestamped as whispertime
from pydub import AudioSegment
from spleeter.separator import Separator
import os
from profanity_check import predict
import sys
import tempfile
import uuid
import shutil
import json
import streamlit as st
# CORE #
MODEL_NAME = "openai/whisper-large-v2"
PROFANE_WORDS = ["Fuck", "fuck","f***","s***", "b****", "c***", "h**","n*****","f*****","p****", "dick", "slit", "slut", "pussy", "ass", "fucking", "fuckin", "pussy."]
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def create_tmp_copy_of_file(file, dir=None):
"""
Creates a temporary copy of the file and returns the path to the copy.
:param file: the path to the file
:param dir: optional directory to place the copy in
:return: path to the temporary copy
"""
if isinstance(file, dict):
file_path = file["path"]
else:
file_path = file
if dir is None:
dir = tempfile.gettempdir()
file_name = os.path.basename(file_path)
tmp_path = os.path.join(dir, f"{str(uuid.uuid4())}_{file_name}")
shutil.copy2(file_path, tmp_path)
return json.dumps(tmp_path).strip('"')
def source_separation(input_file, output_folder="separated_audio"):
separator = Separator('spleeter:2stems')
separator.separate_to_file(input_file, output_folder)
return f"{output_folder}/{os.path.splitext(os.path.basename(input_file))[0]}"
def process_audio(input_file, model_size='tiny', verbose=False, play_output=False):
if not os.path.isfile(input_file):
print('Error: input file not found')
sys.exit()
stems_dir = source_separation(input_file)
vocal_stem = os.path.join(stems_dir, 'vocals.wav')
instr_stem = os.path.join(stems_dir, 'accompaniment.wav')
model = whispertime.load_model(model_size, device=device)
result = whispertime.transcribe(model, vocal_stem, language="en")
if verbose:
print('\nTranscribed text:')
print(result['text']+'\n')
print(result["text"])
profane_indices = predict(result["text"].split())
profanities = [word for word, is_profane in zip(result["text"].split(), profane_indices) if is_profane]
if not profanities:
print(f'No profanities detected found in {input_file} - exiting')
# sys.exit()
if verbose:
print('Profanities found in text:')
print(profanities)
vocals = AudioSegment.from_wav(vocal_stem)
segments = result["segments"]
for segment in segments:
words = segment["words"]
for word in words:
if word["text"].lower() in PROFANE_WORDS:
start_time = int(word["start"] * 1000)
end_time = int(word["end"] * 1000)
silence = AudioSegment.silent(duration=(end_time - start_time))
vocals = vocals[:start_time] + silence + vocals[end_time:]
mix = AudioSegment.from_wav(instr_stem).overlay(vocals)
print("#### \n\n" + input_file)
outpath = input_file.replace('.mp3', '_masked.mp3').replace('.wav', '_masked.wav')
print("#### \n\n" + outpath)
# if input_file.endswith('.wav'):
# mix.export(outpath, format="wav")
# elif input_file.endswith('.mp3'):
final_mix = mix.export(outpath, format="wav")
print(f'Mixed file written to: {outpath}')
# out = create_tmp_copy_of_file(outpath)
print('\n Returning final mix: ', final_mix)
return outpath
# try getting it to work just returning the transcribed text
# return result["text"]
def transcribe(microphone=None, file_upload=None):
if (microphone is not None) and (file_upload is not None):
warn_output = (
"WARNING: You've uploaded an audio file and used the microphone. "
"The recorded file from the microphone will be used and the uploaded audio will be discarded.\n"
)
elif (microphone is None) and (file_upload is None):
return "ERROR: You have to e~ither use the microphone or upload an audio file"
file = microphone if microphone is not None else file_upload
processed_file = process_audio(file)
print('File sucessfully processed:, ', processed_file)
# audio = AudioSegment.from_file(processed_file, format="wav").export()
audio = processed_file
return str(audio)
def _return_yt_html_embed(yt_url):
video_id = yt_url.split("?v=")[-1]
HTML_str = (
f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>'
" </center>"
)
return HTML_str
def yt_transcribe(yt_url):
yt = pt.YouTube(yt_url)
html_embed_str = _return_yt_html_embed(yt_url)
stream = yt.streams.filter(only_audio=True)[0]
stream.download(filename="audio.mp3")
processed_file = process_audio("audio.mp3")
audio = AudioSegment.from_file(processed_file, format="mp3")
return html_embed_str, audio
# STREAMLIT #
import streamlit as st
st.title("Saylss - remove profane audio from uploaded content")
f"""
Saylss censors profane audio inputs with the click of a button! Saylss uses a custom Whisper model to timestamp at the word level and then several audio manipulation libraries to process and return a clean version.
by olmec.ai & batman.
"""
tab1, tab2 = st.tabs(["Transcribe Audio", "Transcribe YouTube"])
with tab1: # file upload
uploaded_files = st.file_uploader("Upload your audio file here", type=["mp3", "wav"], help="Drag and drop or click to choose file")
if uploaded_files is not None:
bytes_data = uploaded_files.read()
st.write("Your uploaded file")
st.audio(bytes_data)
# format can be specified, default is wav
# st.audio(bytes_data, format="audio/mp3")
st.markdown("---")
st.write("## Your processed file")
with st.spinner("...is being processed"):
# uploaded file is stored in RAM, so save it to a file to pass into `transcribe`
with open(uploaded_files.name, "wb") as f:
f.write((uploaded_files).getbuffer())
processed_audio = transcribe(microphone=None, file_upload=uploaded_files.name)
audio_file = open(processed_audio, 'rb')
audio_bytes2 = audio_file.read()
st.audio(audio_bytes2)
with tab2: # youtube
link = st.text_input("Paste your YouTube link", placeholder="https://www.youtube.com/watch?v=EuEe3WKpbCo")
if link != "":
try:
st.video(link)
except:
st.warning("Not a video")
st.stop()
with st.spinner("YouTube link is being processed"):
html_embed_str, audio = yt_transcribe(link)
audio_file = open(audio, 'rb')
audio_bytes_yt = audio_file.read()
st.audio(audio_bytes_yt) |