Abdullah Al Asif
--base
78cb487
import pyaudio
import numpy as np
import torch
from torch.nn.functional import pad
import time
from queue import Queue
import sounddevice as sd
from .config import settings
CHUNK = settings.CHUNK
FORMAT = pyaudio.paFloat32
CHANNELS = settings.CHANNELS
RATE = settings.RATE
SILENCE_THRESHOLD = settings.SILENCE_THRESHOLD
SPEECH_CHECK_THRESHOLD = settings.SPEECH_CHECK_THRESHOLD
MAX_SILENCE_DURATION = settings.MAX_SILENCE_DURATION
def init_vad_pipeline(hf_token):
"""Initializes the Voice Activity Detection pipeline.
Args:
hf_token (str): Hugging Face API token.
Returns:
pyannote.audio.pipelines.VoiceActivityDetection: VAD pipeline.
"""
from pyannote.audio import Model
from pyannote.audio.pipelines import VoiceActivityDetection
model = Model.from_pretrained(settings.VAD_MODEL, use_auth_token=hf_token)
pipeline = VoiceActivityDetection(segmentation=model)
HYPER_PARAMETERS = {
"min_duration_on": settings.VAD_MIN_DURATION_ON,
"min_duration_off": settings.VAD_MIN_DURATION_OFF,
}
pipeline.instantiate(HYPER_PARAMETERS)
return pipeline
def detect_speech_segments(pipeline, audio_data, sample_rate=None):
"""Detects speech segments in audio using pyannote VAD.
Args:
pipeline (pyannote.audio.pipelines.VoiceActivityDetection): VAD pipeline.
audio_data (np.ndarray or torch.Tensor): Audio data.
sample_rate (int, optional): Sample rate of the audio. Defaults to settings.RATE.
Returns:
torch.Tensor or None: Concatenated speech segments as a torch tensor, or None if no speech is detected.
"""
if sample_rate is None:
sample_rate = settings.RATE
if len(audio_data.shape) == 1:
audio_data = audio_data.reshape(1, -1)
if not isinstance(audio_data, torch.Tensor):
audio_data = torch.from_numpy(audio_data)
if audio_data.shape[1] < sample_rate:
padding_size = sample_rate - audio_data.shape[1]
audio_data = pad(audio_data, (0, padding_size))
vad = pipeline({"waveform": audio_data, "sample_rate": sample_rate})
speech_segments = []
for speech in vad.get_timeline().support():
start_sample = int(speech.start * sample_rate)
end_sample = int(speech.end * sample_rate)
if start_sample < audio_data.shape[1]:
end_sample = min(end_sample, audio_data.shape[1])
segment = audio_data[0, start_sample:end_sample]
speech_segments.append(segment)
if speech_segments:
return torch.cat(speech_segments)
return None
def record_audio(duration=None):
"""Records audio for a specified duration.
Args:
duration (int, optional): Recording duration in seconds. Defaults to settings.RECORD_DURATION.
Returns:
np.ndarray: Recorded audio data as a numpy array.
"""
if duration is None:
duration = settings.RECORD_DURATION
p = pyaudio.PyAudio()
stream = p.open(
format=settings.FORMAT,
channels=settings.CHANNELS,
rate=settings.RATE,
input=True,
frames_per_buffer=settings.CHUNK,
)
print("\nRecording...")
frames = []
for i in range(0, int(settings.RATE / settings.CHUNK * duration)):
data = stream.read(settings.CHUNK)
frames.append(np.frombuffer(data, dtype=np.float32))
print("Done recording")
stream.stop_stream()
stream.close()
p.terminate()
audio_data = np.concatenate(frames, axis=0)
return audio_data
def record_continuous_audio():
"""Continuously monitors audio and detects speech segments.
Returns:
np.ndarray or None: Recorded audio data as a numpy array, or None if no speech is detected.
"""
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK
)
print("\nListening... (Press Ctrl+C to stop)")
frames = []
buffer_frames = []
buffer_size = int(RATE * 0.5 / CHUNK)
silence_frames = 0
max_silence_frames = int(RATE / CHUNK * 1)
recording = False
try:
while True:
data = stream.read(CHUNK, exception_on_overflow=False)
audio_chunk = np.frombuffer(data, dtype=np.float32)
buffer_frames.append(audio_chunk)
if len(buffer_frames) > buffer_size:
buffer_frames.pop(0)
audio_level = np.abs(np.concatenate(buffer_frames)).mean()
if audio_level > SILENCE_THRESHOLD:
if not recording:
print("\nPotential speech detected...")
recording = True
frames.extend(buffer_frames)
frames.append(audio_chunk)
silence_frames = 0
elif recording:
frames.append(audio_chunk)
silence_frames += 1
if silence_frames >= max_silence_frames:
print("Processing speech segment...")
break
time.sleep(0.001)
except KeyboardInterrupt:
pass
finally:
stream.stop_stream()
stream.close()
p.terminate()
if frames:
return np.concatenate(frames)
return None
def check_for_speech(timeout=0.1):
"""Checks if speech is detected in a non-blocking way.
Args:
timeout (float, optional): Duration to check for speech in seconds. Defaults to 0.1.
Returns:
tuple: A tuple containing a boolean indicating if speech was detected and the audio data as a numpy array, or (False, None) if no speech is detected.
"""
p = pyaudio.PyAudio()
frames = []
is_speech = False
try:
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
for _ in range(int(RATE * timeout / CHUNK)):
data = stream.read(CHUNK, exception_on_overflow=False)
audio_chunk = np.frombuffer(data, dtype=np.float32)
frames.append(audio_chunk)
audio_level = np.abs(audio_chunk).mean()
if audio_level > SPEECH_CHECK_THRESHOLD:
is_speech = True
break
finally:
stream.stop_stream()
stream.close()
p.terminate()
if is_speech and frames:
return True, np.concatenate(frames)
return False, None
def play_audio_with_interrupt(audio_data, sample_rate=24000):
"""Plays audio while monitoring for speech interruption.
Args:
audio_data (np.ndarray): Audio data to play.
sample_rate (int, optional): Sample rate for playback. Defaults to 24000.
Returns:
tuple: A tuple containing a boolean indicating if playback was interrupted and None, or (False, None) if playback completes without interruption.
"""
interrupt_queue = Queue()
def input_callback(indata, frames, time, status):
"""Callback for monitoring input audio."""
if status:
print(f"Input status: {status}")
return
audio_level = np.abs(indata[:, 0]).mean()
if audio_level > settings.INTERRUPTION_THRESHOLD:
interrupt_queue.put(True)
def output_callback(outdata, frames, time, status):
"""Callback for output audio."""
if status:
print(f"Output status: {status}")
return
if not interrupt_queue.empty():
raise sd.CallbackStop()
remaining = len(audio_data) - output_callback.position
if remaining == 0:
raise sd.CallbackStop()
valid_frames = min(remaining, frames)
outdata[:valid_frames, 0] = audio_data[
output_callback.position : output_callback.position + valid_frames
]
if valid_frames < frames:
outdata[valid_frames:] = 0
output_callback.position += valid_frames
output_callback.position = 0
try:
with sd.InputStream(
channels=1, callback=input_callback, samplerate=settings.RATE
):
with sd.OutputStream(
channels=1, callback=output_callback, samplerate=sample_rate
):
while output_callback.position < len(audio_data):
sd.sleep(100)
if not interrupt_queue.empty():
return True, None
return False, None
except sd.CallbackStop:
return True, None
except Exception as e:
print(f"Error during playback: {str(e)}")
return False, None
def transcribe_audio(processor, model, audio_data, sampling_rate=None):
"""Transcribes audio using Whisper.
Args:
processor (transformers.WhisperProcessor): Whisper processor.
model (transformers.WhisperForConditionalGeneration): Whisper model.
audio_data (np.ndarray or torch.Tensor): Audio data to transcribe.
sampling_rate (int, optional): Sample rate of the audio. Defaults to settings.RATE.
Returns:
str: Transcribed text.
"""
if sampling_rate is None:
sampling_rate = settings.RATE
if audio_data is None:
return ""
if isinstance(audio_data, torch.Tensor):
audio_data = audio_data.numpy()
input_features = processor(
audio_data, sampling_rate=sampling_rate, return_tensors="pt"
).input_features
predicted_ids = model.generate(input_features)
transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)
return transcription[0]