Spaces:
Running
Running
""" | |
This program helps us explore model's responses to the benchmark. It is a web | |
app that displays the following: | |
1. A list of benchmark items loaded from puzzles_cleaned.csv. The list shows | |
the columns ID, challenge, and answer. | |
2. When we select a puzzle from the list, we see the transcript, Explanation, | |
and Editor's Note in textboxes. (Scrollable since they can be long.) | |
3. The list in (1) also has a column for each model, with checkboxes indicating | |
whether the model's response is correct or not. We load the model responses | |
from results.duckdb. That file has a table called completions with | |
columns 'prompt_id', 'parent_dir', and 'completion'. The prompt_id can be | |
joined with ID from puzzles_cleaned.csv. The parent_dir is the model name. | |
The completion is the model response, which we compare with the answer from | |
puzzles_cleaned.csv using the function check_answer defined below. | |
4. Finally, when an item is selected from the list, we get a dropdown that lets | |
us select a model to see the completion from that model. | |
Note that not every model has a response for every puzzle. | |
""" | |
import gradio as gr | |
from metrics import load_results | |
def get_model_response(prompt_id, model_name): | |
query = f""" | |
SELECT completion FROM results.completions | |
WHERE prompt_id = {prompt_id} AND parent_dir = '{model_name}' | |
""" | |
response = conn.sql(query).fetchone() | |
return response[0] if response else None | |
def display_puzzle(puzzle_id): | |
query = f""" | |
SELECT challenge, answer, transcript, Explanation, "Editor's Notes" | |
FROM challenges | |
WHERE ID = {puzzle_id} | |
""" | |
puzzle = conn.sql(query).fetchone() | |
return puzzle if puzzle else (None, None,None, None, None) | |
def display_model_response(puzzle_id, model_name): | |
response = get_model_response(puzzle_id, model_name) | |
split_thoughts = response.split("</think>") | |
if len(split_thoughts) > 1: | |
response = split_thoughts[-1].strip() | |
return "From " + model_name + ":\n" + response if response else "No response from this model." | |
conn = load_results() | |
# Get all unique model names | |
model_names = [item[0] for item in conn.sql("SELECT DISTINCT parent_dir FROM results.completions").fetchall()] | |
model_names.sort() | |
# Just for display. | |
cleaned_model_names = [name.replace("completions-", "") for name in model_names] | |
def build_table(): | |
# Construct the query to create two columns for each model: MODEL_answer and MODEL_ok | |
query = """ | |
SELECT c.ID, c.challenge, wrap_text(c.answer, 40) AS answer, | |
""" | |
model_correct_columns = [] | |
for model in model_names: | |
normalized_model_name = model.replace("-", "_") | |
model_correct_columns.append(normalized_model_name + "_ok") | |
query += f""" | |
MAX(CASE WHEN r.parent_dir = '{model}' THEN r.completion ELSE NULL END) AS {normalized_model_name}_answer, | |
MAX(CASE WHEN r.parent_dir = '{model}' THEN check_answer(r.completion, c.answer) ELSE NULL END) AS {normalized_model_name}_ok, | |
""" | |
query = query.rstrip(',') # Remove the trailing comma | |
query += """ | |
clip_text(c.challenge, 40) as challenge_clipped, | |
FROM challenges c | |
LEFT JOIN results.completions r | |
ON c.ID = r.prompt_id | |
GROUP BY c.ID, c.challenge, c.answer | |
""" | |
joined_df = conn.sql(query).fetchdf() | |
# Transform the model_correct columns to use emojis | |
for model in model_names: | |
normalized_model_name = model.replace("-", "_") | |
joined_df[normalized_model_name + '_ok'] = joined_df[normalized_model_name + '_ok'].apply( | |
lambda x: "β " if x == 1 else ("β" if x == 0 else "β") | |
) | |
return joined_df, model_correct_columns | |
joined_df, model_correct_columns = build_table() | |
relabelled_df = joined_df[['ID', 'challenge_clipped', 'answer', *model_correct_columns]].rename(columns={ | |
'ID': 'ID', | |
'challenge_clipped': 'Challenge', | |
'answer': 'Answer', | |
**{model.replace("-", "_") + '_ok': model.replace("completions-", "") for model in model_names} | |
}).sort_values(by='ID') | |
model_columns = { | |
index + 3: name for index, name in enumerate(model_names) | |
} | |
valid_model_indices = list(model_columns.keys()) | |
default_model = model_columns[valid_model_indices[0]] | |
def create_interface(): | |
with gr.Blocks() as demo: | |
# Using "markdown" as the datatype makes Gradio interpret newlines. | |
puzzle_list = gr.DataFrame( | |
value=relabelled_df, | |
datatype=["number", "str", "markdown", *["str"] * len(model_correct_columns)], | |
# headers=["ID", "Challenge", "Answer", *cleaned_model_names], | |
) | |
model_response = gr.Textbox(label="Model Response", interactive=False) | |
challenge = gr.Textbox(label="Challenge", interactive=False) | |
answer = gr.Textbox(label="Answer", interactive=False) | |
explanation = gr.Textbox(label="Explanation", interactive=False) | |
editors_note = gr.Textbox(label="Editor's Note", interactive=False) | |
transcript = gr.Textbox(label="Transcript", interactive=False) | |
def update_puzzle(evt: gr.SelectData): | |
row = evt.index[0] | |
model_index = evt.index[1] | |
model_name = model_columns[model_index] if model_index in valid_model_indices else default_model | |
return (*display_puzzle(row), display_model_response(row, model_name)) | |
puzzle_list.select( | |
fn=update_puzzle, | |
inputs=[], | |
outputs=[challenge, answer, transcript, explanation, editors_note, model_response] | |
) | |
demo.launch() | |
if __name__ == "__main__": | |
create_interface() | |