|
import pyaudio |
|
import numpy as np |
|
import torch |
|
from torch.nn.functional import pad |
|
import time |
|
from queue import Queue |
|
import sounddevice as sd |
|
from .config import settings |
|
|
|
CHUNK = settings.CHUNK |
|
FORMAT = pyaudio.paFloat32 |
|
CHANNELS = settings.CHANNELS |
|
RATE = settings.RATE |
|
SILENCE_THRESHOLD = settings.SILENCE_THRESHOLD |
|
SPEECH_CHECK_THRESHOLD = settings.SPEECH_CHECK_THRESHOLD |
|
MAX_SILENCE_DURATION = settings.MAX_SILENCE_DURATION |
|
|
|
|
|
def init_vad_pipeline(hf_token): |
|
"""Initializes the Voice Activity Detection pipeline. |
|
|
|
Args: |
|
hf_token (str): Hugging Face API token. |
|
|
|
Returns: |
|
pyannote.audio.pipelines.VoiceActivityDetection: VAD pipeline. |
|
""" |
|
from pyannote.audio import Model |
|
from pyannote.audio.pipelines import VoiceActivityDetection |
|
|
|
model = Model.from_pretrained(settings.VAD_MODEL, use_auth_token=hf_token) |
|
|
|
pipeline = VoiceActivityDetection(segmentation=model) |
|
|
|
HYPER_PARAMETERS = { |
|
"min_duration_on": settings.VAD_MIN_DURATION_ON, |
|
"min_duration_off": settings.VAD_MIN_DURATION_OFF, |
|
} |
|
pipeline.instantiate(HYPER_PARAMETERS) |
|
|
|
return pipeline |
|
|
|
|
|
def detect_speech_segments(pipeline, audio_data, sample_rate=None): |
|
"""Detects speech segments in audio using pyannote VAD. |
|
|
|
Args: |
|
pipeline (pyannote.audio.pipelines.VoiceActivityDetection): VAD pipeline. |
|
audio_data (np.ndarray or torch.Tensor): Audio data. |
|
sample_rate (int, optional): Sample rate of the audio. Defaults to settings.RATE. |
|
|
|
Returns: |
|
torch.Tensor or None: Concatenated speech segments as a torch tensor, or None if no speech is detected. |
|
""" |
|
if sample_rate is None: |
|
sample_rate = settings.RATE |
|
|
|
if len(audio_data.shape) == 1: |
|
audio_data = audio_data.reshape(1, -1) |
|
|
|
if not isinstance(audio_data, torch.Tensor): |
|
audio_data = torch.from_numpy(audio_data) |
|
|
|
if audio_data.shape[1] < sample_rate: |
|
padding_size = sample_rate - audio_data.shape[1] |
|
audio_data = pad(audio_data, (0, padding_size)) |
|
|
|
vad = pipeline({"waveform": audio_data, "sample_rate": sample_rate}) |
|
|
|
speech_segments = [] |
|
for speech in vad.get_timeline().support(): |
|
start_sample = int(speech.start * sample_rate) |
|
end_sample = int(speech.end * sample_rate) |
|
if start_sample < audio_data.shape[1]: |
|
end_sample = min(end_sample, audio_data.shape[1]) |
|
segment = audio_data[0, start_sample:end_sample] |
|
speech_segments.append(segment) |
|
|
|
if speech_segments: |
|
return torch.cat(speech_segments) |
|
return None |
|
|
|
|
|
def record_audio(duration=None): |
|
"""Records audio for a specified duration. |
|
|
|
Args: |
|
duration (int, optional): Recording duration in seconds. Defaults to settings.RECORD_DURATION. |
|
|
|
Returns: |
|
np.ndarray: Recorded audio data as a numpy array. |
|
""" |
|
if duration is None: |
|
duration = settings.RECORD_DURATION |
|
|
|
p = pyaudio.PyAudio() |
|
|
|
stream = p.open( |
|
format=settings.FORMAT, |
|
channels=settings.CHANNELS, |
|
rate=settings.RATE, |
|
input=True, |
|
frames_per_buffer=settings.CHUNK, |
|
) |
|
|
|
print("\nRecording...") |
|
frames = [] |
|
|
|
for i in range(0, int(settings.RATE / settings.CHUNK * duration)): |
|
data = stream.read(settings.CHUNK) |
|
frames.append(np.frombuffer(data, dtype=np.float32)) |
|
|
|
print("Done recording") |
|
|
|
stream.stop_stream() |
|
stream.close() |
|
p.terminate() |
|
|
|
audio_data = np.concatenate(frames, axis=0) |
|
return audio_data |
|
|
|
|
|
def record_continuous_audio(): |
|
"""Continuously monitors audio and detects speech segments. |
|
|
|
Returns: |
|
np.ndarray or None: Recorded audio data as a numpy array, or None if no speech is detected. |
|
""" |
|
p = pyaudio.PyAudio() |
|
|
|
stream = p.open( |
|
format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK |
|
) |
|
|
|
print("\nListening... (Press Ctrl+C to stop)") |
|
frames = [] |
|
buffer_frames = [] |
|
buffer_size = int(RATE * 0.5 / CHUNK) |
|
silence_frames = 0 |
|
max_silence_frames = int(RATE / CHUNK * 1) |
|
recording = False |
|
|
|
try: |
|
while True: |
|
data = stream.read(CHUNK, exception_on_overflow=False) |
|
audio_chunk = np.frombuffer(data, dtype=np.float32) |
|
|
|
buffer_frames.append(audio_chunk) |
|
if len(buffer_frames) > buffer_size: |
|
buffer_frames.pop(0) |
|
|
|
audio_level = np.abs(np.concatenate(buffer_frames)).mean() |
|
|
|
if audio_level > SILENCE_THRESHOLD: |
|
if not recording: |
|
print("\nPotential speech detected...") |
|
recording = True |
|
frames.extend(buffer_frames) |
|
frames.append(audio_chunk) |
|
silence_frames = 0 |
|
elif recording: |
|
frames.append(audio_chunk) |
|
silence_frames += 1 |
|
|
|
if silence_frames >= max_silence_frames: |
|
print("Processing speech segment...") |
|
break |
|
|
|
time.sleep(0.001) |
|
|
|
except KeyboardInterrupt: |
|
pass |
|
finally: |
|
stream.stop_stream() |
|
stream.close() |
|
p.terminate() |
|
|
|
if frames: |
|
return np.concatenate(frames) |
|
return None |
|
|
|
|
|
def check_for_speech(timeout=0.1): |
|
"""Checks if speech is detected in a non-blocking way. |
|
|
|
Args: |
|
timeout (float, optional): Duration to check for speech in seconds. Defaults to 0.1. |
|
|
|
Returns: |
|
tuple: A tuple containing a boolean indicating if speech was detected and the audio data as a numpy array, or (False, None) if no speech is detected. |
|
""" |
|
p = pyaudio.PyAudio() |
|
|
|
frames = [] |
|
is_speech = False |
|
|
|
try: |
|
stream = p.open( |
|
format=FORMAT, |
|
channels=CHANNELS, |
|
rate=RATE, |
|
input=True, |
|
frames_per_buffer=CHUNK, |
|
) |
|
|
|
for _ in range(int(RATE * timeout / CHUNK)): |
|
data = stream.read(CHUNK, exception_on_overflow=False) |
|
audio_chunk = np.frombuffer(data, dtype=np.float32) |
|
frames.append(audio_chunk) |
|
|
|
audio_level = np.abs(audio_chunk).mean() |
|
if audio_level > SPEECH_CHECK_THRESHOLD: |
|
is_speech = True |
|
break |
|
|
|
finally: |
|
stream.stop_stream() |
|
stream.close() |
|
p.terminate() |
|
|
|
if is_speech and frames: |
|
return True, np.concatenate(frames) |
|
return False, None |
|
|
|
|
|
def play_audio_with_interrupt(audio_data, sample_rate=24000): |
|
"""Plays audio while monitoring for speech interruption. |
|
|
|
Args: |
|
audio_data (np.ndarray): Audio data to play. |
|
sample_rate (int, optional): Sample rate for playback. Defaults to 24000. |
|
|
|
Returns: |
|
tuple: A tuple containing a boolean indicating if playback was interrupted and None, or (False, None) if playback completes without interruption. |
|
""" |
|
interrupt_queue = Queue() |
|
|
|
def input_callback(indata, frames, time, status): |
|
"""Callback for monitoring input audio.""" |
|
if status: |
|
print(f"Input status: {status}") |
|
return |
|
|
|
audio_level = np.abs(indata[:, 0]).mean() |
|
if audio_level > settings.INTERRUPTION_THRESHOLD: |
|
interrupt_queue.put(True) |
|
|
|
def output_callback(outdata, frames, time, status): |
|
"""Callback for output audio.""" |
|
if status: |
|
print(f"Output status: {status}") |
|
return |
|
|
|
if not interrupt_queue.empty(): |
|
raise sd.CallbackStop() |
|
|
|
remaining = len(audio_data) - output_callback.position |
|
if remaining == 0: |
|
raise sd.CallbackStop() |
|
valid_frames = min(remaining, frames) |
|
outdata[:valid_frames, 0] = audio_data[ |
|
output_callback.position : output_callback.position + valid_frames |
|
] |
|
if valid_frames < frames: |
|
outdata[valid_frames:] = 0 |
|
output_callback.position += valid_frames |
|
|
|
output_callback.position = 0 |
|
|
|
try: |
|
with sd.InputStream( |
|
channels=1, callback=input_callback, samplerate=settings.RATE |
|
): |
|
with sd.OutputStream( |
|
channels=1, callback=output_callback, samplerate=sample_rate |
|
): |
|
while output_callback.position < len(audio_data): |
|
sd.sleep(100) |
|
if not interrupt_queue.empty(): |
|
return True, None |
|
return False, None |
|
except sd.CallbackStop: |
|
return True, None |
|
except Exception as e: |
|
print(f"Error during playback: {str(e)}") |
|
return False, None |
|
|
|
|
|
def transcribe_audio(processor, model, audio_data, sampling_rate=None): |
|
"""Transcribes audio using Whisper. |
|
|
|
Args: |
|
processor (transformers.WhisperProcessor): Whisper processor. |
|
model (transformers.WhisperForConditionalGeneration): Whisper model. |
|
audio_data (np.ndarray or torch.Tensor): Audio data to transcribe. |
|
sampling_rate (int, optional): Sample rate of the audio. Defaults to settings.RATE. |
|
|
|
Returns: |
|
str: Transcribed text. |
|
""" |
|
if sampling_rate is None: |
|
sampling_rate = settings.RATE |
|
|
|
if audio_data is None: |
|
return "" |
|
|
|
if isinstance(audio_data, torch.Tensor): |
|
audio_data = audio_data.numpy() |
|
|
|
input_features = processor( |
|
audio_data, sampling_rate=sampling_rate, return_tensors="pt" |
|
).input_features |
|
predicted_ids = model.generate(input_features) |
|
transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True) |
|
return transcription[0] |
|
|