Spaces:
Runtime error
Runtime error
# app.py | |
import gradio as gr | |
import asyncio | |
import os | |
import thinkingframes | |
import soundfile as sf | |
import numpy as np | |
import logging | |
from huggingface_hub import InferenceClient | |
from streaming_stt_nemo import Model | |
import edge_tts | |
from dotenv import load_dotenv | |
from policy import user_acceptance_policy | |
from styles import theme | |
from thinkingframes import generate_prompt, strategy_options | |
from utils import get_image_html, collect_student_info | |
from database_functions import add_submission | |
from tab_teachers_dashboard import create_teachers_dashboard_tab | |
from config import CLASS_OPTIONS | |
from concurrent.futures import ThreadPoolExecutor | |
import tempfile | |
import spaces | |
# Load CSS from external file | |
with open('styles.css', 'r') as file: | |
css = file.read() | |
# For maintaining user session (to keep track of userID) | |
user_state = gr.State(value="") | |
load_dotenv() | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
default_lang = "en" | |
engines = {default_lang: Model(default_lang)} | |
image_path = "picturePerformance.jpg" | |
img_html = get_image_html(image_path) | |
# Create a thread pool executor | |
executor = ThreadPoolExecutor() | |
# Transcription function using streaming_stt_nemo | |
def transcribe(audio_path): | |
lang = "en" | |
model = engines[lang] | |
with open(audio_path, "rb") as audio_file: | |
text = model.stt_file(audio_file)[0] | |
return text | |
# Inference function using Hugging Face InferenceClient | |
def model(conversation): | |
system_instructions = ( | |
"[SYSTEM] You are Oral Coach, an AI-powered conversational coach. Guide the student through their oral responses " | |
) | |
generate_kwargs = dict( | |
temperature=0.6, | |
max_new_tokens=512, | |
top_p=0.95, | |
repetition_penalty=1, | |
do_sample=True, | |
seed=29, | |
) | |
formatted_prompt = "\n".join([f"{msg['role'].upper()}: {msg['content']}" for msg in conversation]) | |
stream = client.text_generation( | |
formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
output = "" | |
for response in stream: | |
if not response.token.text == "</s>": | |
output += response.token.text | |
return {"choices": [{"delta": {"content": output}}]} | |
# Text-to-Speech function using edge_tts | |
async def generate_audio_feedback(feedback_text): | |
communicate = edge_tts.Communicate(feedback_text) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
tmp_path = tmp_file.name | |
try: | |
await communicate.save(tmp_path) | |
except asyncio.TimeoutError: | |
logging.error("Timeout occurred during TTS generation. Retrying...") | |
try: | |
await communicate.save(tmp_path) | |
except asyncio.TimeoutError: | |
logging.error("Retry failed. Unable to generate TTS.") | |
return None | |
return tmp_path | |
#generate feedback | |
async def generate_feedback(user_id, question_choice, strategy_choice, message, feedback_level): | |
current_question_index = thinkingframes.questions.index(question_choice) | |
strategy, explanation = thinkingframes.strategy_options[strategy_choice] | |
system_instructions = { | |
"role": "system", | |
"content": ( | |
f"You are an expert Primary 6 English Language Teacher in a Singapore Primary school, " | |
f"directly guiding a Primary 6 student in Singapore in their oral responses. " | |
f"Format the feedback in Markdown so that it can be easily read. " | |
f"Address the student directly in the second person in your feedback. " | |
f"The student is answering the question: '{thinkingframes.questions[current_question_index]}'. " | |
f"For Question 1, consider the picture description: '{thinkingframes.description}'. " | |
f"For Questions 2 and 3, the picture is not relevant, so the student should not refer to it in their response. " | |
f"Analyze the student's response using the following step-by-step approach: " | |
f"1. Evaluate the response against the {strategy} thinking frame. " | |
f"2. Assess how well the student's response addresses each criteria of the {strategy} thinking frame: " | |
f" - Assign emoticon scores based on how well the student comprehensively covered each criteria: " | |
f" - 😊😊😊 (three smiling faces) for a good coverage " | |
f" - 😊😊 (two smiling faces) for an average coverage " | |
f" - 😊 (one smiling face) for a poor coverage " | |
f" - Provide a clear, direct, and concise explanation of how well the answer addresses each criteria. " | |
f" - Identify specific areas for improvement in students responses, and provide targeted suggestions for improvement. " | |
f"3. Identify overall strengths and areas for improvement in the student's response using the {strategy} to format and provide targeted areas for improvement. " | |
f"4. Provide specific feedback on grammar, vocabulary, and sentence structure. " | |
f" Suggest age-appropriate enhancements that are one level higher than the student's current response. " | |
f"5. Conclude with follow-up questions for reflection. " | |
f"If the student's response deviates from the question, provide clear and concise feedback to help them refocus and try again. " | |
f"Ensure that the vocabulary and sentence structure recommendations are achievable for Primary 6 students in Singapore. " | |
f"Example Feedback Structure for Each Criteria: " | |
f"Criteria: [Criteria Name] " | |
f"Score: [Smiling emoticons] " | |
f"Explanation: [Clear, direct, and concise explanation of how well the answer addresses the criteria. Identify specific areas for improvement, and provide targeted suggestions for improvement.] " | |
f"{thinkingframes.generate_prompt(feedback_level)}" | |
) | |
} | |
conversation = [ | |
system_instructions, | |
{"role": "user", "content": message} | |
] | |
response = model(conversation) | |
chat_history = [] # Initialize chat history outside the loop | |
full_feedback = "" # Accumulate the entire feedback message | |
try: | |
for chunk in response["choices"]: | |
if chunk["delta"] and chunk["delta"]["content"]: | |
feedback_chunk = chunk["delta"]["content"] | |
full_feedback += feedback_chunk | |
yield feedback_chunk # Yield each feedback chunk as it is generated | |
await asyncio.sleep(0) | |
except Exception as e: | |
logging.error(f"An error occurred during feedback generation: {str(e)}") | |
questionNo = current_question_index + 1 | |
add_submission(user_id, message, full_feedback, int(0), "", questionNo) | |
# Function to predict and handle the entire workflow | |
async def predict(question_choice, strategy_choice, feedback_level, audio): | |
current_audio_output = None # Initialize current_audio_output to None | |
final_feedback = "" # Store only the assistant's feedback | |
if audio is None: | |
yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "No audio data received. Please try again.")], current_audio_output | |
return | |
sample_rate, audio_data = audio | |
if audio_data is None or len(audio_data) == 0: | |
yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "No audio data received. Please try again.")], current_audio_output | |
return | |
audio_path = "audio.wav" | |
if not isinstance(audio_data, np.ndarray): | |
raise ValueError("audio_data must be a numpy array") | |
sf.write(audio_path, audio_data, sample_rate) | |
chat_history = [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "Transcribing your audio, please listen to your oral response while waiting ...")] | |
yield chat_history, current_audio_output | |
try: | |
transcription_future = executor.submit(transcribe, audio_path) | |
student_response = await asyncio.wrap_future(transcription_future) | |
if not student_response.strip(): | |
yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "Transcription failed. Please try again or seek assistance.")], current_audio_output | |
return | |
chat_history.append(("Student", student_response)) # Add student's transcript | |
yield chat_history, current_audio_output | |
chat_history.append(("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "Transcription complete. Generating feedback. Please continue listening to your oral response while waiting ...")) | |
yield chat_history, current_audio_output | |
accumulated_feedback = "" # Variable to store the accumulated feedback | |
async for feedback_chunk in generate_feedback(int(user_state.value), question_choice, strategy_choice, student_response, feedback_level): | |
accumulated_feedback += feedback_chunk # Accumulate the feedback chunks | |
if chat_history and chat_history[-1][0] == "Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡": | |
chat_history[-1] = ("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", accumulated_feedback) # Update the last message in chat_history | |
else: | |
chat_history.append(("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", accumulated_feedback)) # Append a new message to chat_history | |
yield chat_history, current_audio_output # Yield the updated chat_history and current_audio_output | |
feedback_buffer = accumulated_feedback # Use the accumulated feedback for TTS | |
audio_task = asyncio.create_task(generate_audio_feedback(feedback_buffer)) | |
current_audio_output = await audio_task # Store audio output | |
yield chat_history, current_audio_output # Yield the final chat_history and current_audio_output | |
except Exception as e: | |
logging.error(f"An error occurred: {str(e)}", exc_info=True) | |
yield [("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", "An error occurred. Please try again or seek assistance.")], current_audio_output | |
with gr.Blocks(title="Oral Coach powered by ZeroGPU⚡ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡ and Meta AI 🦙 (LLama3)", theme=theme, css="footer {visibility: hidden}textbox{resize:none}") as demo: | |
with gr.Tab("Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡"): | |
gr.Markdown("## Student Information") | |
class_name = gr.Dropdown(label="Class", choices=CLASS_OPTIONS) | |
index_no = gr.Dropdown(label="Index No", choices=[f"{i:02}" for i in range(1, 46)]) | |
policy_text = gr.Markdown(user_acceptance_policy) | |
policy_checkbox = gr.Checkbox(label="I have read and agree to the Things to Note When using the Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡", value=False) | |
submit_info_btn = gr.Button("Submit Info") | |
info_output = gr.Text() | |
with gr.Column(visible=False) as oral_coach_content: | |
gr.Markdown("## English Language Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡") | |
gr.Markdown(img_html) # Display the image | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Step 1: Choose a Question") | |
question_choice = gr.Radio(thinkingframes.questions, label="Questions", value=thinkingframes.questions[0]) | |
gr.Markdown("### Step 2: Choose a Thinking Frame") | |
strategy_choice = gr.Dropdown(list(strategy_options.keys()), label="Thinking Frame", value=list(strategy_options.keys())[0]) | |
gr.Markdown("### Step 3: Choose Feedback Level") | |
feedback_level = gr.Radio(["Brief Feedback", "Moderate Feedback", "Comprehensive Feedback"], label="Feedback Level") | |
feedback_level.value = "Brief Feedback" | |
with gr.Column(scale=1): | |
gr.Markdown("### Step 4: Record Your Answer") | |
audio_input = gr.Audio(type="numpy", sources=["microphone"], label="Record") | |
submit_answer_btn = gr.Button("Submit Oral Response") | |
gr.Markdown("### Step 5: Review your personalised feedback") | |
feedback_output = gr.Chatbot( | |
label="Feedback", | |
scale=4, | |
height=700, | |
show_label=True | |
) | |
audio_output = gr.Audio(type="numpy", label="Audio Playback", format="wav", autoplay=True) | |
submit_answer_btn.click( | |
predict, | |
inputs=[question_choice, strategy_choice, feedback_level, audio_input], | |
outputs=[feedback_output, audio_output], | |
api_name="predict" | |
) | |
def toggle_oral_coach_visibility(class_name, index_no, policy_checked): | |
if not policy_checked: | |
return "Please agree to the Things to Note When using the Oral Coach ⚡ ϞϞ(๑⚈ ․̫ ⚈๑)∩ ⚡ before submitting.", gr.update(visible=False) | |
validation_passed, message, userid = collect_student_info(class_name, index_no) | |
if not validation_passed: | |
return message, gr.update(visible=False) | |
user_state.value = userid | |
return message, gr.update(visible=True) | |
submit_info_btn.click( | |
toggle_oral_coach_visibility, | |
inputs=[class_name, index_no, policy_checkbox], | |
outputs=[info_output, oral_coach_content] | |
) | |
# Define other tabs like Teacher's Dashboard | |
create_teachers_dashboard_tab() | |
demo.queue(max_size=20) | |
demo.launch(share=False) | |