|
from .config import settings |
|
|
|
|
|
class TextChunker: |
|
"""A class to handle intelligent text chunking for voice generation.""" |
|
|
|
def __init__(self): |
|
"""Initialize the TextChunker with break points and priorities.""" |
|
self.current_text = [] |
|
self.found_first_sentence = False |
|
self.semantic_breaks = { |
|
"however": 4, |
|
"therefore": 4, |
|
"furthermore": 4, |
|
"moreover": 4, |
|
"nevertheless": 4, |
|
"while": 3, |
|
"although": 3, |
|
"unless": 3, |
|
"since": 3, |
|
"and": 2, |
|
"but": 2, |
|
"because": 2, |
|
"then": 2, |
|
} |
|
self.punctuation_priorities = { |
|
".": 5, |
|
"!": 5, |
|
"?": 5, |
|
";": 4, |
|
":": 4, |
|
",": 3, |
|
"-": 2, |
|
} |
|
|
|
def should_process(self, text: str) -> bool: |
|
"""Determines if text should be processed based on length or punctuation. |
|
|
|
Args: |
|
text (str): The text to check. |
|
|
|
Returns: |
|
bool: True if the text should be processed, False otherwise. |
|
""" |
|
if any(text.endswith(p) for p in self.punctuation_priorities): |
|
return True |
|
|
|
words = text.split() |
|
target = ( |
|
settings.FIRST_SENTENCE_SIZE |
|
if not self.found_first_sentence |
|
else settings.TARGET_SIZE |
|
) |
|
return len(words) >= target |
|
|
|
def find_break_point(self, words: list, target_size: int) -> int: |
|
"""Finds optimal break point in text. |
|
|
|
Args: |
|
words (list): The list of words to find a break point in. |
|
target_size (int): The target size of the chunk. |
|
|
|
Returns: |
|
int: The index of the break point. |
|
""" |
|
if len(words) <= target_size: |
|
return len(words) |
|
|
|
break_points = [] |
|
|
|
for i, word in enumerate(words[: target_size + 3]): |
|
word_lower = word.lower() |
|
|
|
priority = self.semantic_breaks.get(word_lower, 0) |
|
for punct, punct_priority in self.punctuation_priorities.items(): |
|
if word.endswith(punct): |
|
priority = max(priority, punct_priority) |
|
|
|
if priority > 0: |
|
break_points.append((i, priority, -abs(i - target_size))) |
|
|
|
if not break_points: |
|
return target_size |
|
|
|
break_points.sort(key=lambda x: (x[1], x[2]), reverse=True) |
|
return break_points[0][0] + 1 |
|
|
|
def process(self, text: str, audio_queue) -> str: |
|
"""Process text chunk and return remaining text. |
|
|
|
Args: |
|
text (str): The text to process. |
|
audio_queue: The audio queue to add sentences to. |
|
|
|
Returns: |
|
str: The remaining text after processing. |
|
""" |
|
if not text: |
|
return "" |
|
|
|
words = text.split() |
|
if not words: |
|
return "" |
|
|
|
target_size = ( |
|
settings.FIRST_SENTENCE_SIZE |
|
if not self.found_first_sentence |
|
else settings.TARGET_SIZE |
|
) |
|
split_point = self.find_break_point(words, target_size) |
|
|
|
if split_point: |
|
chunk = " ".join(words[:split_point]).strip() |
|
if chunk and any(c.isalnum() for c in chunk): |
|
chunk = chunk.rstrip(",") |
|
audio_queue.add_sentences([chunk]) |
|
self.found_first_sentence = True |
|
return " ".join(words[split_point:]) if split_point < len(words) else "" |
|
|
|
return "" |
|
|