Chris4K commited on
Commit
917a0e7
·
verified ·
1 Parent(s): 1e952bf

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +29 -19
app.py CHANGED
@@ -1,6 +1,8 @@
1
 
2
  from langfuse import Langfuse
3
  from langfuse.decorators import observe, langfuse_context
 
 
4
 
5
  from config.config import settings
6
  from services.llama_generator import LlamaGenerator
@@ -68,15 +70,17 @@ async def detect_wakeword(audio_chunk: bytes) -> bool:
68
  # You might want to use libraries like Porcupine or build your own wake word detector
69
  return True
70
 
 
71
  @app.websocket("/ws")
72
  async def websocket_endpoint(websocket: WebSocket):
 
 
 
 
73
  await websocket.accept()
74
  try:
75
- # Use a queue to manage audio chunks
76
- audio_queue = asyncio.Queue()
77
-
78
  # Create a task to process the audio stream
79
- stream_task = asyncio.create_task(process_audio_stream(audio_queue))
80
 
81
  # Main receive loop
82
  while True:
@@ -104,20 +108,26 @@ async def websocket_endpoint(websocket: WebSocket):
104
  print(f"WebSocket endpoint error: {e}")
105
 
106
  finally:
107
- # Cancel the stream processing task
108
- stream_task.cancel()
 
 
 
 
 
 
109
  try:
110
  await websocket.close(code=1000)
111
  except Exception as close_error:
112
  print(f"Error closing WebSocket: {close_error}")
113
 
114
- async def process_audio_stream(audio_queue: asyncio.Queue) -> AsyncGenerator[str, None]:
115
  buffer = []
116
  is_speaking = False
117
  silence_frames = 0
118
 
119
- while True:
120
- try:
121
  # Get audio data from queue with timeout
122
  try:
123
  audio_data = await asyncio.wait_for(audio_queue.get(), timeout=5.0)
@@ -172,30 +182,30 @@ async def process_audio_stream(audio_queue: asyncio.Queue) -> AsyncGenerator[str
172
  bot_response_de = from_en_translation(response, desired_language)
173
 
174
  # Stream the response
175
- yield json.dumps({
176
  "user_text": user_speech_text,
177
  "response_de": bot_response_de,
178
  "response_en": response
179
  })
 
180
 
181
  # Generate and stream audio response
182
  bot_voice = tts(bot_response_de, desired_language)
183
  bot_voice_bytes = tts_to_bytesio(bot_voice)
184
- yield json.dumps({
185
  "audio": bot_voice_bytes.decode('latin1')
186
  })
 
187
 
188
  except Exception as processing_error:
189
  print(f"Error processing speech utterance: {processing_error}")
 
 
 
 
 
 
190
 
191
- except asyncio.CancelledError:
192
- # Handle task cancellation
193
- break
194
- except Exception as e:
195
- print(f"Unexpected error in audio stream processing: {e}")
196
- # Prevent tight error loop
197
- await asyncio.sleep(1)
198
-
199
  @app.get("/", response_class=HTMLResponse)
200
  async def get_index():
201
  with open("static/index.html") as f:
 
1
 
2
  from langfuse import Langfuse
3
  from langfuse.decorators import observe, langfuse_context
4
+ from fastapi import WebSocketDisconnect
5
+ import asyncio
6
 
7
  from config.config import settings
8
  from services.llama_generator import LlamaGenerator
 
70
  # You might want to use libraries like Porcupine or build your own wake word detector
71
  return True
72
 
73
+
74
  @app.websocket("/ws")
75
  async def websocket_endpoint(websocket: WebSocket):
76
+ # Create the queue outside the try block
77
+ audio_queue = asyncio.Queue()
78
+ stream_task = None
79
+
80
  await websocket.accept()
81
  try:
 
 
 
82
  # Create a task to process the audio stream
83
+ stream_task = asyncio.create_task(process_audio_stream(audio_queue, websocket))
84
 
85
  # Main receive loop
86
  while True:
 
108
  print(f"WebSocket endpoint error: {e}")
109
 
110
  finally:
111
+ # Cancel the stream processing task if it exists
112
+ if stream_task:
113
+ stream_task.cancel()
114
+ try:
115
+ await stream_task # Wait for the task to be fully cancelled
116
+ except asyncio.CancelledError:
117
+ pass
118
+
119
  try:
120
  await websocket.close(code=1000)
121
  except Exception as close_error:
122
  print(f"Error closing WebSocket: {close_error}")
123
 
124
+ async def process_audio_stream(audio_queue: asyncio.Queue, websocket: WebSocket) -> AsyncGenerator[str, None]:
125
  buffer = []
126
  is_speaking = False
127
  silence_frames = 0
128
 
129
+ try:
130
+ while True:
131
  # Get audio data from queue with timeout
132
  try:
133
  audio_data = await asyncio.wait_for(audio_queue.get(), timeout=5.0)
 
182
  bot_response_de = from_en_translation(response, desired_language)
183
 
184
  # Stream the response
185
+ response_data = json.dumps({
186
  "user_text": user_speech_text,
187
  "response_de": bot_response_de,
188
  "response_en": response
189
  })
190
+ await websocket.send_text(response_data)
191
 
192
  # Generate and stream audio response
193
  bot_voice = tts(bot_response_de, desired_language)
194
  bot_voice_bytes = tts_to_bytesio(bot_voice)
195
+ audio_data = json.dumps({
196
  "audio": bot_voice_bytes.decode('latin1')
197
  })
198
+ await websocket.send_text(audio_data)
199
 
200
  except Exception as processing_error:
201
  print(f"Error processing speech utterance: {processing_error}")
202
+
203
+ except asyncio.CancelledError:
204
+ # Handle task cancellation
205
+ print("Audio stream processing task cancelled")
206
+ except Exception as e:
207
+ print(f"Unexpected error in audio stream processing: {e}")
208
 
 
 
 
 
 
 
 
 
209
  @app.get("/", response_class=HTMLResponse)
210
  async def get_index():
211
  with open("static/index.html") as f: