OralCoachZeroGPU / database_functions.py
simonraj's picture
Upload 17 files
cf815ef verified
# database_functions.py
import psycopg2
import random
import string
from urllib.parse import urlparse
from datetime import datetime
import json
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
# Get the database URL from the environment variables
url = os.getenv("DATABASE_URL")
if not url:
raise ValueError("DATABASE_URL is not set in the environment variables")
parsed_url = urlparse(url)
# Extract connection parameters
db_config = {
'user': parsed_url.username,
'password': parsed_url.password,
'host': parsed_url.hostname,
'port': parsed_url.port,
'database': parsed_url.path.lstrip('/')
}
# Since we define password in schema, we will just generate password
def generate_password():
characters = string.ascii_letters + string.digits + string.punctuation
password = ''.join(random.choice(characters) for _ in range(8))
return password
# add student method (privacy with only class & indexNo)
def add_user_privacy(class_name, index_no):
connection = psycopg2.connect(**db_config)
cursor = connection.cursor()
password = generate_password()
dbMsg = ""
try:
# Check if user with the same email already exists
cursor.execute("SELECT id FROM oc_students WHERE class = %s and index_no = %s", (class_name,index_no))
existing_user = cursor.fetchone()
if existing_user:
user_id = existing_user[0]
dbMsg = "User already exists"
else:
# If user doesn't exist, insert a new user
cursor.execute("INSERT INTO oc_students (index_no, class, hashPassword) VALUES (%s, %s, %s) RETURNING id",
(index_no, class_name, password))
user_id = cursor.fetchone()[0] # Fetch the ID of the newly inserted user
connection.commit() # without this, data is not persist on db!
dbMsg = "User Created"
return user_id, dbMsg
except psycopg2.Error as e:
return "Error adding user:" + str(e)
def add_submission(userid, transcribed_text, ai_responses, scores, feedback, questionNo):
connection = psycopg2.connect(**db_config)
cursor = connection.cursor()
dbMsg = ""
try:
current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cursor.execute("INSERT INTO oc_submissions (userid, datetime, Transcribed_text, AI_conversation_responses, Scores, Feedback, questionNo) "
"VALUES (%s, %s, %s, %s, %s, %s, %s)",
(userid, current_datetime, transcribed_text, ai_responses, scores, feedback, questionNo))
connection.commit()
dbMsg = "Submission added"
except psycopg2.Error as e:
print("Error adding submission:", e)
finally:
if connection:
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
def get_submissions_by_date_and_class(from_date, to_date, class_name, display_ai_feedback):
# Connect to the database
conn = psycopg2.connect(**db_config)
cursor = conn.cursor()
try:
print(f"From Date: {from_date}")
print(f"To Date: {to_date}")
print(f"Class Name: {class_name}")
# Swap from_date and to_date if from_date is later than to_date
if from_date > to_date:
from_date, to_date = to_date, from_date
query = """
SELECT s.index_no, s.class, sub.datetime, sub.questionNo, sub.transcribed_text,
CASE WHEN %s THEN sub.ai_conversation_responses ELSE NULL END AS ai_conversation_responses,
sub.userid
FROM oc_students AS s
JOIN oc_submissions AS sub ON s.id = sub.userid
WHERE TO_DATE(sub.datetime::text, 'YYYY-MM-DD') BETWEEN TO_DATE(%s, 'YYYY-MM-DD') AND TO_DATE(%s, 'YYYY-MM-DD')
AND s.class = %s
ORDER BY sub.userid, sub.questionNo, sub.datetime DESC
"""
cursor.execute(query, (display_ai_feedback, from_date, to_date, class_name))
results = cursor.fetchall()
if results:
return generate_report_as_json(results, display_ai_feedback)
else:
return [{"Email": "No data found for the selected date range and class", "Name": "", "Class": "", "Datetime": "", "Transcribed Text": "", "AI Conversation Responses": ""}]
except Exception as e:
print(f"An error occurred: {e}")
return [{"Email": "Error occurred while fetching data", "Name": "", "Class": "", "Datetime": "", "Transcribed Text": "", "AI Conversation Responses": ""}]
finally:
cursor.close()
conn.close()
def generate_report_as_json(results, display_ai_feedback):
user_ids_info = [] # To store tuples of (UserID, Name, Class)
user_question_map = {} # To map UserID to answered questions
if results:
for result in results:
user_id = result[6] # Assuming UserID is at index 6
# Storing tuples of (UserID, Name, Class)
user_info = (user_id, result[0], result[1]) # (UserID, Name, Class)
if user_info not in user_ids_info:
user_ids_info.append(user_info)
# Creating a map of UserIDs to answered questions
question = result[3] # Assuming Question number is at index 3
details = {
"Datetime": result[2].strftime("%Y-%m-%d %H:%M:%S") if result[2] else "",
"Question": question,
"Student Response": result[4],
"AI Feedback": result[5] if display_ai_feedback else "Not displayed"
}
if user_id in user_question_map:
user_question_map[user_id].append(details)
else:
user_question_map[user_id] = [details]
report_data = []
for user_info in user_ids_info:
user_id, name, class_ = user_info
user_dict = {
"Index No": name,
"Class": class_,
"Questions": []
}
question_numbers = [1, 2, 3] # List of required question numbers
if user_id in user_question_map:
user_questions = user_question_map[user_id]
for question_details in user_questions:
question_data = {
"Question": question_details["Question"],
"Datetime": question_details["Datetime"],
"Student Response": question_details["Student Response"],
"AI Feedback": question_details["AI Feedback"]
}
user_dict["Questions"].append(question_data)
# Remove answered question number from the (fixed list)
if question_data["Question"] in question_numbers:
question_numbers.remove(question_data["Question"])
# Add NA entries for unanswered questions
for missing_question in question_numbers:
missing_question_data = {
"Question": missing_question,
"Datetime": "NA",
"Student Response": "NA",
"AI Feedback": "NA" if display_ai_feedback else "Not displayed"
}
user_dict["Questions"].append(missing_question_data)
# Sort the user's questions by question number before appending to report
user_dict["Questions"] = sorted(user_dict["Questions"], key=lambda x: x['Question'])
report_data.append(user_dict)
return json.dumps(report_data, indent=4)
def getUniqueSubmitDate():
# Connect to the database
conn = psycopg2.connect(**db_config)
cursor = conn.cursor()
try:
# Fetch all submissions on the provided date
cursor.execute("""
SELECT DISTINCT DATE(datetime) AS unique_date
FROM public.oc_submissions
ORDER BY unique_date desc
LIMIT 14;
""")
dates = [str(row[0]) for row in cursor.fetchall()]
return dates
except Exception as e:
print(f"An error occurred: {e}")
return [{"Error": "Error occurred while fetching data"}]
finally:
cursor.close()
conn.close()
def getUniqueClass():
# Connect to the database
conn = psycopg2.connect(**db_config)
cursor = conn.cursor()
try:
# Fetch all submissions on the provided date
cursor.execute("""
SELECT DISTINCT s.class
FROM oc_students AS s
JOIN oc_submissions AS sub ON s.id = sub.userid
ORDER BY s.class
""")
listClass = [str(row[0]) for row in cursor.fetchall()]
return listClass
except Exception as e:
print(f"An error occurred: {e}")
return [{"Error": "Error occurred while fetching data"}]
finally:
cursor.close()
conn.close()