import pyaudio import numpy as np import torch from torch.nn.functional import pad import time from queue import Queue import sounddevice as sd from .config import settings CHUNK = settings.CHUNK FORMAT = pyaudio.paFloat32 CHANNELS = settings.CHANNELS RATE = settings.RATE SILENCE_THRESHOLD = settings.SILENCE_THRESHOLD SPEECH_CHECK_THRESHOLD = settings.SPEECH_CHECK_THRESHOLD MAX_SILENCE_DURATION = settings.MAX_SILENCE_DURATION def init_vad_pipeline(hf_token): """Initializes the Voice Activity Detection pipeline. Args: hf_token (str): Hugging Face API token. Returns: pyannote.audio.pipelines.VoiceActivityDetection: VAD pipeline. """ from pyannote.audio import Model from pyannote.audio.pipelines import VoiceActivityDetection model = Model.from_pretrained(settings.VAD_MODEL, use_auth_token=hf_token) pipeline = VoiceActivityDetection(segmentation=model) HYPER_PARAMETERS = { "min_duration_on": settings.VAD_MIN_DURATION_ON, "min_duration_off": settings.VAD_MIN_DURATION_OFF, } pipeline.instantiate(HYPER_PARAMETERS) return pipeline def detect_speech_segments(pipeline, audio_data, sample_rate=None): """Detects speech segments in audio using pyannote VAD. Args: pipeline (pyannote.audio.pipelines.VoiceActivityDetection): VAD pipeline. audio_data (np.ndarray or torch.Tensor): Audio data. sample_rate (int, optional): Sample rate of the audio. Defaults to settings.RATE. Returns: torch.Tensor or None: Concatenated speech segments as a torch tensor, or None if no speech is detected. """ if sample_rate is None: sample_rate = settings.RATE if len(audio_data.shape) == 1: audio_data = audio_data.reshape(1, -1) if not isinstance(audio_data, torch.Tensor): audio_data = torch.from_numpy(audio_data) if audio_data.shape[1] < sample_rate: padding_size = sample_rate - audio_data.shape[1] audio_data = pad(audio_data, (0, padding_size)) vad = pipeline({"waveform": audio_data, "sample_rate": sample_rate}) speech_segments = [] for speech in vad.get_timeline().support(): start_sample = int(speech.start * sample_rate) end_sample = int(speech.end * sample_rate) if start_sample < audio_data.shape[1]: end_sample = min(end_sample, audio_data.shape[1]) segment = audio_data[0, start_sample:end_sample] speech_segments.append(segment) if speech_segments: return torch.cat(speech_segments) return None def record_audio(duration=None): """Records audio for a specified duration. Args: duration (int, optional): Recording duration in seconds. Defaults to settings.RECORD_DURATION. Returns: np.ndarray: Recorded audio data as a numpy array. """ if duration is None: duration = settings.RECORD_DURATION p = pyaudio.PyAudio() stream = p.open( format=settings.FORMAT, channels=settings.CHANNELS, rate=settings.RATE, input=True, frames_per_buffer=settings.CHUNK, ) print("\nRecording...") frames = [] for i in range(0, int(settings.RATE / settings.CHUNK * duration)): data = stream.read(settings.CHUNK) frames.append(np.frombuffer(data, dtype=np.float32)) print("Done recording") stream.stop_stream() stream.close() p.terminate() audio_data = np.concatenate(frames, axis=0) return audio_data def record_continuous_audio(): """Continuously monitors audio and detects speech segments. Returns: np.ndarray or None: Recorded audio data as a numpy array, or None if no speech is detected. """ p = pyaudio.PyAudio() stream = p.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK ) print("\nListening... (Press Ctrl+C to stop)") frames = [] buffer_frames = [] buffer_size = int(RATE * 0.5 / CHUNK) silence_frames = 0 max_silence_frames = int(RATE / CHUNK * 1) recording = False try: while True: data = stream.read(CHUNK, exception_on_overflow=False) audio_chunk = np.frombuffer(data, dtype=np.float32) buffer_frames.append(audio_chunk) if len(buffer_frames) > buffer_size: buffer_frames.pop(0) audio_level = np.abs(np.concatenate(buffer_frames)).mean() if audio_level > SILENCE_THRESHOLD: if not recording: print("\nPotential speech detected...") recording = True frames.extend(buffer_frames) frames.append(audio_chunk) silence_frames = 0 elif recording: frames.append(audio_chunk) silence_frames += 1 if silence_frames >= max_silence_frames: print("Processing speech segment...") break time.sleep(0.001) except KeyboardInterrupt: pass finally: stream.stop_stream() stream.close() p.terminate() if frames: return np.concatenate(frames) return None def check_for_speech(timeout=0.1): """Checks if speech is detected in a non-blocking way. Args: timeout (float, optional): Duration to check for speech in seconds. Defaults to 0.1. Returns: tuple: A tuple containing a boolean indicating if speech was detected and the audio data as a numpy array, or (False, None) if no speech is detected. """ p = pyaudio.PyAudio() frames = [] is_speech = False try: stream = p.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, ) for _ in range(int(RATE * timeout / CHUNK)): data = stream.read(CHUNK, exception_on_overflow=False) audio_chunk = np.frombuffer(data, dtype=np.float32) frames.append(audio_chunk) audio_level = np.abs(audio_chunk).mean() if audio_level > SPEECH_CHECK_THRESHOLD: is_speech = True break finally: stream.stop_stream() stream.close() p.terminate() if is_speech and frames: return True, np.concatenate(frames) return False, None def play_audio_with_interrupt(audio_data, sample_rate=24000): """Plays audio while monitoring for speech interruption. Args: audio_data (np.ndarray): Audio data to play. sample_rate (int, optional): Sample rate for playback. Defaults to 24000. Returns: tuple: A tuple containing a boolean indicating if playback was interrupted and None, or (False, None) if playback completes without interruption. """ interrupt_queue = Queue() def input_callback(indata, frames, time, status): """Callback for monitoring input audio.""" if status: print(f"Input status: {status}") return audio_level = np.abs(indata[:, 0]).mean() if audio_level > settings.INTERRUPTION_THRESHOLD: interrupt_queue.put(True) def output_callback(outdata, frames, time, status): """Callback for output audio.""" if status: print(f"Output status: {status}") return if not interrupt_queue.empty(): raise sd.CallbackStop() remaining = len(audio_data) - output_callback.position if remaining == 0: raise sd.CallbackStop() valid_frames = min(remaining, frames) outdata[:valid_frames, 0] = audio_data[ output_callback.position : output_callback.position + valid_frames ] if valid_frames < frames: outdata[valid_frames:] = 0 output_callback.position += valid_frames output_callback.position = 0 try: with sd.InputStream( channels=1, callback=input_callback, samplerate=settings.RATE ): with sd.OutputStream( channels=1, callback=output_callback, samplerate=sample_rate ): while output_callback.position < len(audio_data): sd.sleep(100) if not interrupt_queue.empty(): return True, None return False, None except sd.CallbackStop: return True, None except Exception as e: print(f"Error during playback: {str(e)}") return False, None def transcribe_audio(processor, model, audio_data, sampling_rate=None): """Transcribes audio using Whisper. Args: processor (transformers.WhisperProcessor): Whisper processor. model (transformers.WhisperForConditionalGeneration): Whisper model. audio_data (np.ndarray or torch.Tensor): Audio data to transcribe. sampling_rate (int, optional): Sample rate of the audio. Defaults to settings.RATE. Returns: str: Transcribed text. """ if sampling_rate is None: sampling_rate = settings.RATE if audio_data is None: return "" if isinstance(audio_data, torch.Tensor): audio_data = audio_data.numpy() input_features = processor( audio_data, sampling_rate=sampling_rate, return_tensors="pt" ).input_features predicted_ids = model.generate(input_features) transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True) return transcription[0]