arjunguha's picture
Copy from repository
861c325 unverified
raw
history blame
7.26 kB
"""
This program helps us explore model's responses to the benchmark. It is a web
app that displays the following:
1. A list of benchmark items loaded from puzzles_cleaned.csv. The list shows
the columns ID, challenge, and answer.
2. When we select a puzzle from the list, we see the transcript, Explanation,
and Editor's Note in textboxes. (Scrollable since they can be long.)
3. The list in (1) also has a column for each model, with checkboxes indicating
whether the model's response is correct or not. We load the model responses
from results.duckdb. That file has a table called completions with
columns 'prompt_id', 'parent_dir', and 'completion'. The prompt_id can be
joined with ID from puzzles_cleaned.csv. The parent_dir is the model name.
The completion is the model response, which we compare with the answer from
puzzles_cleaned.csv using the function check_answer defined below.
4. Finally, when an item is selected from the list, we get a dropdown that lets
us select a model to see the completion from that model.
Note that not every model has a response for every puzzle.
"""
import re
import duckdb
import gradio as gr
import textwrap
def split_into_words(text: str) -> list:
return re.findall(r'\b\w+\b', text.lower())
def all_words_match(completion: str, answer: str) -> bool:
answer_words = split_into_words(answer)
completion = completion.lower()
return all(word in completion for word in answer_words)
def answer_without_thoughts(completion: str) -> str:
if "<think>" not in completion[:200]:
return completion
chunks = completion.split("</think>")
if len(chunks) <= 1:
return ""
return chunks[-1].strip()
def check_answer(completion: str, answer: str) -> bool:
"""
Check if all words in the answer are in the completion, in the same order.
"""
completion_words = split_into_words(answer_without_thoughts(completion))
answer_words = split_into_words(answer)
indices = []
for word in answer_words:
if word in completion_words:
indices.append(completion_words.index(word))
else:
return False
return indices == sorted(indices) or indices == sorted(indices, reverse=True)
def clip_text(text: str, width: int) -> str:
return text if len(text) <= width else text[:width] + "..."
def wrap_text(text: str, width: int) -> str:
return textwrap.fill(text, width=width)
def get_model_response(prompt_id, model_name):
query = f"""
SELECT completion FROM results.completions
WHERE prompt_id = {prompt_id} AND parent_dir = '{model_name}'
"""
response = conn.sql(query).fetchone()
return response[0] if response else None
def display_puzzle(puzzle_id):
query = f"""
SELECT challenge, answer, transcript, Explanation, "Editor's Notes"
FROM challenges
WHERE ID = {puzzle_id}
"""
puzzle = conn.sql(query).fetchone()
return puzzle if puzzle else (None, None,None, None, None)
def display_model_response(puzzle_id, model_name):
response = get_model_response(puzzle_id, model_name)
split_thoughts = response.split("</think>")
if len(split_thoughts) > 1:
response = split_thoughts[-1].strip()
return "From " + model_name + ":\n" + response if response else "No response from this model."
conn = duckdb.connect(":memory:")
conn.execute("ATTACH DATABASE 'results.duckdb' AS results")
conn.execute("CREATE TABLE challenges as SELECT * FROM 'puzzles_cleaned.csv'")
conn.create_function("check_answer", check_answer)
conn.create_function("clip_text", clip_text)
conn.create_function("wrap_text", wrap_text)
# Get all unique model names
model_names = [item[0] for item in conn.sql("SELECT DISTINCT parent_dir FROM results.completions").fetchall()]
# Just for display.
cleaned_model_names = [name.replace("completions-", "") for name in model_names]
print(cleaned_model_names)
def build_table():
# Construct the query to create two columns for each model: MODEL_answer and MODEL_ok
query = """
SELECT c.ID, c.challenge, wrap_text(c.answer, 40) AS answer,
"""
model_correct_columns = []
for model in model_names:
normalized_model_name = model.replace("-", "_")
model_correct_columns.append(normalized_model_name + "_ok")
query += f"""
MAX(CASE WHEN r.parent_dir = '{model}' THEN r.completion ELSE NULL END) AS {normalized_model_name}_answer,
MAX(CASE WHEN r.parent_dir = '{model}' THEN check_answer(r.completion, c.answer) ELSE NULL END) AS {normalized_model_name}_ok,
"""
query = query.rstrip(',') # Remove the trailing comma
query += """
clip_text(c.challenge, 40) as challenge_clipped,
FROM challenges c
LEFT JOIN results.completions r
ON c.ID = r.prompt_id
GROUP BY c.ID, c.challenge, c.answer
"""
joined_df = conn.sql(query).fetchdf()
# Transform the model_correct columns to use emojis
for model in model_names:
normalized_model_name = model.replace("-", "_")
joined_df[normalized_model_name + '_ok'] = joined_df[normalized_model_name + '_ok'].apply(
lambda x: "βœ…" if x == 1 else ("❌" if x == 0 else "❓")
)
return joined_df, model_correct_columns
joined_df, model_correct_columns = build_table()
relabelled_df = joined_df[['ID', 'challenge_clipped', 'answer', *model_correct_columns]].rename(columns={
'ID': 'Puzzle ID',
'challenge_clipped': 'Challenge',
'answer': 'Answer',
**{model.replace("-", "_") + '_ok': model.replace("completions-", "") for model in model_names}
})
model_columns = {
index + 3: name for index, name in enumerate(model_names)
}
valid_model_indices = list(model_columns.keys())
default_model = model_columns[valid_model_indices[0]]
def create_interface():
with gr.Blocks() as demo:
# Using "markdown" as the datatype makes Gradio interpret newlines.
puzzle_list = gr.DataFrame(
value=relabelled_df,
datatype=["number", "str", "markdown", *["str"] * len(model_correct_columns)],
# headers=["ID", "Challenge", "Answer", *cleaned_model_names],
)
model_response = gr.Textbox(label="Model Response", interactive=False)
challenge = gr.Textbox(label="Challenge", interactive=False)
answer = gr.Textbox(label="Answer", interactive=False)
explanation = gr.Textbox(label="Explanation", interactive=False)
editors_note = gr.Textbox(label="Editor's Note", interactive=False)
transcript = gr.Textbox(label="Transcript", interactive=False)
def update_puzzle(evt: gr.SelectData):
row = evt.index[0]
model_index = evt.index[1]
model_name = model_columns[model_index] if model_index in valid_model_indices else default_model
return (*display_puzzle(row), display_model_response(row, model_name))
puzzle_list.select(
fn=update_puzzle,
inputs=[],
outputs=[challenge, answer, transcript, explanation, editors_note, model_response]
)
demo.launch()
if __name__ == "__main__":
create_interface()