Spaces:
Runtime error
Runtime error
# database_functions.py | |
import psycopg2 | |
import random | |
import string | |
from urllib.parse import urlparse | |
from datetime import datetime | |
import json | |
from dotenv import load_dotenv | |
import os | |
# Load environment variables from .env file | |
load_dotenv() | |
# Get the database URL from the environment variables | |
url = os.getenv("DATABASE_URL") | |
if not url: | |
raise ValueError("DATABASE_URL is not set in the environment variables") | |
parsed_url = urlparse(url) | |
# Extract connection parameters | |
db_config = { | |
'user': parsed_url.username, | |
'password': parsed_url.password, | |
'host': parsed_url.hostname, | |
'port': parsed_url.port, | |
'database': parsed_url.path.lstrip('/') | |
} | |
# Since we define password in schema, we will just generate password | |
def generate_password(): | |
characters = string.ascii_letters + string.digits + string.punctuation | |
password = ''.join(random.choice(characters) for _ in range(8)) | |
return password | |
# add student method (privacy with only class & indexNo) | |
def add_user_privacy(class_name, index_no): | |
connection = psycopg2.connect(**db_config) | |
cursor = connection.cursor() | |
password = generate_password() | |
dbMsg = "" | |
try: | |
# Check if user with the same email already exists | |
cursor.execute("SELECT id FROM oc_students WHERE class = %s and index_no = %s", (class_name,index_no)) | |
existing_user = cursor.fetchone() | |
if existing_user: | |
user_id = existing_user[0] | |
dbMsg = "User already exists" | |
else: | |
# If user doesn't exist, insert a new user | |
cursor.execute("INSERT INTO oc_students (index_no, class, hashPassword) VALUES (%s, %s, %s) RETURNING id", | |
(index_no, class_name, password)) | |
user_id = cursor.fetchone()[0] # Fetch the ID of the newly inserted user | |
connection.commit() # without this, data is not persist on db! | |
dbMsg = "User Created" | |
return user_id, dbMsg | |
except psycopg2.Error as e: | |
return "Error adding user:" + str(e) | |
def add_submission(userid, transcribed_text, ai_responses, scores, feedback, questionNo): | |
connection = psycopg2.connect(**db_config) | |
cursor = connection.cursor() | |
dbMsg = "" | |
try: | |
current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
cursor.execute("INSERT INTO oc_submissions (userid, datetime, Transcribed_text, AI_conversation_responses, Scores, Feedback, questionNo) " | |
"VALUES (%s, %s, %s, %s, %s, %s, %s)", | |
(userid, current_datetime, transcribed_text, ai_responses, scores, feedback, questionNo)) | |
connection.commit() | |
dbMsg = "Submission added" | |
except psycopg2.Error as e: | |
print("Error adding submission:", e) | |
finally: | |
if connection: | |
cursor.close() | |
connection.close() | |
print("PostgreSQL connection is closed") | |
def get_submissions_by_date_and_class(from_date, to_date, class_name, display_ai_feedback): | |
# Connect to the database | |
conn = psycopg2.connect(**db_config) | |
cursor = conn.cursor() | |
try: | |
print(f"From Date: {from_date}") | |
print(f"To Date: {to_date}") | |
print(f"Class Name: {class_name}") | |
# Swap from_date and to_date if from_date is later than to_date | |
if from_date > to_date: | |
from_date, to_date = to_date, from_date | |
query = """ | |
SELECT s.index_no, s.class, sub.datetime, sub.questionNo, sub.transcribed_text, | |
CASE WHEN %s THEN sub.ai_conversation_responses ELSE NULL END AS ai_conversation_responses, | |
sub.userid | |
FROM oc_students AS s | |
JOIN oc_submissions AS sub ON s.id = sub.userid | |
WHERE TO_DATE(sub.datetime::text, 'YYYY-MM-DD') BETWEEN TO_DATE(%s, 'YYYY-MM-DD') AND TO_DATE(%s, 'YYYY-MM-DD') | |
AND s.class = %s | |
ORDER BY sub.userid, sub.questionNo, sub.datetime DESC | |
""" | |
cursor.execute(query, (display_ai_feedback, from_date, to_date, class_name)) | |
results = cursor.fetchall() | |
if results: | |
return generate_report_as_json(results, display_ai_feedback) | |
else: | |
return [{"Email": "No data found for the selected date range and class", "Name": "", "Class": "", "Datetime": "", "Transcribed Text": "", "AI Conversation Responses": ""}] | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return [{"Email": "Error occurred while fetching data", "Name": "", "Class": "", "Datetime": "", "Transcribed Text": "", "AI Conversation Responses": ""}] | |
finally: | |
cursor.close() | |
conn.close() | |
def generate_report_as_json(results, display_ai_feedback): | |
user_ids_info = [] # To store tuples of (UserID, Name, Class) | |
user_question_map = {} # To map UserID to answered questions | |
if results: | |
for result in results: | |
user_id = result[6] # Assuming UserID is at index 6 | |
# Storing tuples of (UserID, Name, Class) | |
user_info = (user_id, result[0], result[1]) # (UserID, Name, Class) | |
if user_info not in user_ids_info: | |
user_ids_info.append(user_info) | |
# Creating a map of UserIDs to answered questions | |
question = result[3] # Assuming Question number is at index 3 | |
details = { | |
"Datetime": result[2].strftime("%Y-%m-%d %H:%M:%S") if result[2] else "", | |
"Question": question, | |
"Student Response": result[4], | |
"AI Feedback": result[5] if display_ai_feedback else "Not displayed" | |
} | |
if user_id in user_question_map: | |
user_question_map[user_id].append(details) | |
else: | |
user_question_map[user_id] = [details] | |
report_data = [] | |
for user_info in user_ids_info: | |
user_id, name, class_ = user_info | |
user_dict = { | |
"Index No": name, | |
"Class": class_, | |
"Questions": [] | |
} | |
question_numbers = [1, 2, 3] # List of required question numbers | |
if user_id in user_question_map: | |
user_questions = user_question_map[user_id] | |
for question_details in user_questions: | |
question_data = { | |
"Question": question_details["Question"], | |
"Datetime": question_details["Datetime"], | |
"Student Response": question_details["Student Response"], | |
"AI Feedback": question_details["AI Feedback"] | |
} | |
user_dict["Questions"].append(question_data) | |
# Remove answered question number from the (fixed list) | |
if question_data["Question"] in question_numbers: | |
question_numbers.remove(question_data["Question"]) | |
# Add NA entries for unanswered questions | |
for missing_question in question_numbers: | |
missing_question_data = { | |
"Question": missing_question, | |
"Datetime": "NA", | |
"Student Response": "NA", | |
"AI Feedback": "NA" if display_ai_feedback else "Not displayed" | |
} | |
user_dict["Questions"].append(missing_question_data) | |
# Sort the user's questions by question number before appending to report | |
user_dict["Questions"] = sorted(user_dict["Questions"], key=lambda x: x['Question']) | |
report_data.append(user_dict) | |
return json.dumps(report_data, indent=4) | |
def getUniqueSubmitDate(): | |
# Connect to the database | |
conn = psycopg2.connect(**db_config) | |
cursor = conn.cursor() | |
try: | |
# Fetch all submissions on the provided date | |
cursor.execute(""" | |
SELECT DISTINCT DATE(datetime) AS unique_date | |
FROM public.oc_submissions | |
ORDER BY unique_date desc | |
LIMIT 14; | |
""") | |
dates = [str(row[0]) for row in cursor.fetchall()] | |
return dates | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return [{"Error": "Error occurred while fetching data"}] | |
finally: | |
cursor.close() | |
conn.close() | |
def getUniqueClass(): | |
# Connect to the database | |
conn = psycopg2.connect(**db_config) | |
cursor = conn.cursor() | |
try: | |
# Fetch all submissions on the provided date | |
cursor.execute(""" | |
SELECT DISTINCT s.class | |
FROM oc_students AS s | |
JOIN oc_submissions AS sub ON s.id = sub.userid | |
ORDER BY s.class | |
""") | |
listClass = [str(row[0]) for row in cursor.fetchall()] | |
return listClass | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return [{"Error": "Error occurred while fetching data"}] | |
finally: | |
cursor.close() | |
conn.close() |