|
|
|
from langfuse import Langfuse |
|
from langfuse.decorators import observe, langfuse_context |
|
from fastapi import WebSocketDisconnect |
|
import asyncio |
|
|
|
from config.config import settings |
|
from services.llama_generator import LlamaGenerator |
|
import os |
|
|
|
|
|
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-04d2302a-aa5c-4870-9703-58ab64c3bcae" |
|
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-d34ea200-feec-428e-a621-784fce93a5af" |
|
os.environ["LANGFUSE_HOST"] = "https://chris4k-langfuse-template-space.hf.space" |
|
|
|
try: |
|
langfuse = Langfuse() |
|
except Exception as e: |
|
print("Langfuse Offline") |
|
|
|
|
|
from fastapi import FastAPI, WebSocket |
|
from fastapi.staticfiles import StaticFiles |
|
from fastapi.responses import StreamingResponse, HTMLResponse |
|
import asyncio |
|
import json |
|
import webrtcvad |
|
import numpy as np |
|
import wave |
|
import io |
|
from typing import AsyncGenerator |
|
|
|
from utils import ( |
|
from_en_translation, |
|
to_en_translation, |
|
tts, |
|
tts_to_bytesio, |
|
) |
|
|
|
from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, VisitWebpageTool |
|
|
|
app = FastAPI() |
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
|
|
model = HfApiModel() |
|
search_tool = DuckDuckGoSearchTool() |
|
visit_webpage_tool = VisitWebpageTool() |
|
agent = CodeAgent( |
|
tools=[search_tool, visit_webpage_tool], |
|
model=model, |
|
additional_authorized_imports=['requests', 'bs4', 'pandas', 'concurrent.futures', 'csv', 'json'] |
|
) |
|
|
|
|
|
SAMPLE_RATE = 16000 |
|
CHANNELS = 1 |
|
CHUNK_SIZE = 480 |
|
VAD_MODE = 3 |
|
desired_language = "de" |
|
max_answer_length = 100 |
|
|
|
|
|
|
|
vad = webrtcvad.Vad(VAD_MODE) |
|
|
|
async def detect_wakeword(audio_chunk: bytes) -> bool: |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
@app.websocket("/ws") |
|
async def websocket_endpoint(websocket: WebSocket): |
|
|
|
audio_queue = asyncio.Queue() |
|
stream_task = None |
|
|
|
await websocket.accept() |
|
try: |
|
|
|
stream_task = asyncio.create_task(process_audio_stream(audio_queue, websocket)) |
|
|
|
|
|
while True: |
|
try: |
|
|
|
audio_data = await asyncio.wait_for(websocket.receive_bytes(), timeout=5.0) |
|
|
|
|
|
await audio_queue.put(audio_data) |
|
|
|
except asyncio.TimeoutError: |
|
|
|
continue |
|
|
|
except WebSocketDisconnect: |
|
|
|
print("WebSocket disconnected") |
|
break |
|
|
|
except Exception as e: |
|
print(f"WebSocket receive error: {e}") |
|
break |
|
|
|
except Exception as e: |
|
print(f"WebSocket endpoint error: {e}") |
|
|
|
finally: |
|
|
|
if stream_task: |
|
stream_task.cancel() |
|
try: |
|
await stream_task |
|
except asyncio.CancelledError: |
|
pass |
|
|
|
try: |
|
await websocket.close(code=1000) |
|
except Exception as close_error: |
|
print(f"Error closing WebSocket: {close_error}") |
|
|
|
async def process_audio_stream(audio_queue: asyncio.Queue, websocket: WebSocket) -> AsyncGenerator[str, None]: |
|
buffer = [] |
|
is_speaking = False |
|
silence_frames = 0 |
|
|
|
try: |
|
while True: |
|
|
|
try: |
|
audio_data = await asyncio.wait_for(audio_queue.get(), timeout=5.0) |
|
except asyncio.TimeoutError: |
|
|
|
buffer = [] |
|
is_speaking = False |
|
silence_frames = 0 |
|
continue |
|
|
|
|
|
if not audio_data or len(audio_data) < CHUNK_SIZE: |
|
continue |
|
|
|
try: |
|
is_speech = vad.is_speech(audio_data, SAMPLE_RATE) |
|
except Exception as vad_error: |
|
print(f"VAD processing error: {vad_error}") |
|
continue |
|
|
|
if is_speech: |
|
silence_frames = 0 |
|
buffer.append(audio_data) |
|
is_speaking = True |
|
elif is_speaking: |
|
silence_frames += 1 |
|
if silence_frames > 30: |
|
|
|
try: |
|
audio_bytes = b''.join(buffer) |
|
|
|
|
|
wav_buffer = io.BytesIO() |
|
with wave.open(wav_buffer, 'wb') as wav_file: |
|
wav_file.setnchannels(CHANNELS) |
|
wav_file.setsampwidth(2) |
|
wav_file.setframerate(SAMPLE_RATE) |
|
wav_file.writeframes(audio_bytes) |
|
|
|
|
|
buffer = [] |
|
is_speaking = False |
|
silence_frames = 0 |
|
|
|
|
|
if await detect_wakeword(audio_bytes): |
|
|
|
user_speech_text = stt(wav_buffer, desired_language) |
|
if "computer" in user_speech_text.lower(): |
|
translated_text = to_en_translation(user_speech_text, desired_language) |
|
response = await agent.arun(translated_text) |
|
bot_response_de = from_en_translation(response, desired_language) |
|
|
|
|
|
response_data = json.dumps({ |
|
"user_text": user_speech_text, |
|
"response_de": bot_response_de, |
|
"response_en": response |
|
}) |
|
await websocket.send_text(response_data) |
|
|
|
|
|
bot_voice = tts(bot_response_de, desired_language) |
|
bot_voice_bytes = tts_to_bytesio(bot_voice) |
|
audio_data = json.dumps({ |
|
"audio": bot_voice_bytes.decode('latin1') |
|
}) |
|
await websocket.send_text(audio_data) |
|
|
|
except Exception as processing_error: |
|
print(f"Error processing speech utterance: {processing_error}") |
|
|
|
except asyncio.CancelledError: |
|
|
|
print("Audio stream processing task cancelled") |
|
except Exception as e: |
|
print(f"Unexpected error in audio stream processing: {e}") |
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
async def get_index(): |
|
with open("static/index.html") as f: |
|
return f.read() |
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |