from queue import Queue import threading import time from typing import Optional, Tuple, List import numpy as np from pathlib import Path import logging from datetime import datetime from .audio_io import save_audio_file logging.getLogger("phonemizer").setLevel(logging.ERROR) logging.getLogger("speechbrain.utils.quirks").setLevel(logging.ERROR) logging.basicConfig(format="%(message)s", level=logging.INFO) class AudioGenerationQueue: """ A queue system for managing asynchronous audio generation from text input. This class implements a threaded queue system that handles text-to-audio generation in a background thread. It provides functionality for adding sentences to be processed, retrieving generated audio, and monitoring the generation process. Attributes: generator: Audio generator instance used for text-to-speech conversion speed (float): Speed multiplier for audio generation output_dir (Path): Directory where generated audio files are saved sentences_processed (int): Count of processed sentences audio_generated (int): Count of successfully generated audio files failed_sentences (list): List of tuples containing failed sentences and error messages """ def __init__( self, generator, speed: float = 1.0, output_dir: Optional[Path] = None ): """ Initialize the audio generation queue system. Args: generator: Audio generator instance for text-to-speech conversion speed: Speed multiplier for audio generation (default: 1.0) output_dir: Directory path for saving generated audio files (default: "generated_audio") """ self.generator = generator self.speed = speed self.lock = threading.Lock() self.output_dir = output_dir or Path("generated_audio") self.output_dir.mkdir(exist_ok=True) self.sentence_queue = Queue() self.audio_queue = Queue() self.is_running = False self.generation_thread = None self.sentences_processed = 0 self.audio_generated = 0 self.failed_sentences = [] def start(self): """ Start the audio generation thread if not already running. The thread will process sentences from the queue until stopped. """ if not self.is_running: self.is_running = True self.generation_thread = threading.Thread(target=self._generation_worker) self.generation_thread.daemon = True self.generation_thread.start() def stop(self): """ Stop the audio generation thread gracefully. Waits for the current queue to be processed before stopping. Outputs final processing statistics. """ if self.generation_thread: while not self.sentence_queue.empty(): time.sleep(0.1) time.sleep(0.5) self.is_running = False self.generation_thread.join() self.generation_thread = None logging.info( f"\nAudio Generation Complete - Processed: {self.sentences_processed}, Generated: {self.audio_generated}, Failed: {len(self.failed_sentences)}" ) def add_sentences(self, sentences: List[str]): """ Add a list of sentences to the generation queue. Args: sentences: List of text strings to be converted to audio """ added_count = 0 for sentence in sentences: sentence = sentence.strip() if sentence: self.sentence_queue.put(sentence) added_count += 1 if not self.is_running: self.start() def get_next_audio(self) -> Tuple[Optional[np.ndarray], Optional[Path]]: """ Retrieve the next generated audio segment from the queue. Returns: Tuple containing: - numpy array of audio data (or None if queue is empty) - Path object for the saved audio file (or None if queue is empty) """ try: audio_data, output_path = self.audio_queue.get_nowait() return audio_data, output_path except: return None, None def clear_queues(self): """ Clear both sentence and audio queues, removing all pending items. Returns immediately without waiting for queue processing. """ sentences_cleared = 0 audio_cleared = 0 while not self.sentence_queue.empty(): try: self.sentence_queue.get_nowait() sentences_cleared += 1 except: pass while not self.audio_queue.empty(): try: self.audio_queue.get_nowait() audio_cleared += 1 except: pass def _generation_worker(self): """ Internal worker method that runs in a separate thread. Continuously processes sentences from the queue, generating audio and handling any errors that occur during generation. """ while self.is_running or not self.sentence_queue.empty(): try: try: sentence = self.sentence_queue.get_nowait() self.sentences_processed += 1 except: if not self.is_running and self.sentence_queue.empty(): break time.sleep(0.01) continue try: audio_data, phonemes = self.generator.generate( sentence, speed=self.speed ) if audio_data is None or len(audio_data) == 0: raise ValueError("Generated audio data is empty") output_path = save_audio_file(audio_data, self.output_dir) self.audio_generated += 1 self.audio_queue.put((audio_data, output_path)) except Exception as e: error_msg = str(e) self.failed_sentences.append((sentence, error_msg)) continue except Exception as e: if not self.is_running and self.sentence_queue.empty(): break time.sleep(0.1)