|
import gradio as gr |
|
import os |
|
|
|
|
|
|
|
import json |
|
from huggingface_hub import hf_hub_download, file_exists, HfApi |
|
from random import shuffle |
|
from markdown import markdown |
|
|
|
|
|
qIDs = ["mbe_46", "mbe_132", "mbe_287", "mbe_326", "mbe_334", "mbe_389", "mbe_563", "mbe_614", "mbe_642", "mbe_747", "mbe_779", "mbe_826", "mbe_845", "mbe_1042", "mbe_1134"] |
|
mode_options = ["e5", "colbert"] |
|
with open("question_data.json", "r") as f: |
|
all_questions = json.load(f) |
|
|
|
""" |
|
# State variables which interact with loading and unloading |
|
user_data = {} |
|
current_response = {} |
|
current_question = {} # read-only within gradio blocks |
|
user_id = "no_id" |
|
# Control global variables |
|
step = 0 |
|
mode = 1 |
|
|
|
def load_user_data(id): |
|
global user_data |
|
filename = id.replace('@', '_AT_').replace('.', '_DOT_') |
|
if file_exists(filename = "users/" + filename + ".json", repo_id = "ebrowne/test-data", repo_type = "dataset", token = os.getenv("HF_TOKEN")): |
|
print("File exists, downloading data.") |
|
# If the ID exists, download the file from HuggingFace |
|
path = hf_hub_download(repo_id = "ebrowne/test-data", token = os.getenv("HF_TOKEN"), filename = "users/" + filename + ".json", repo_type = "dataset") |
|
# Add their current status to user_data |
|
with open(path, "r") as f: |
|
user_data = json.load(f) |
|
else: |
|
# If the ID doesn't exist, create a format for the file and upload it to HuggingFace |
|
print("File does not exist, creating user.") |
|
shuffle(qIDs) |
|
modes = [] |
|
for i in range(len(qIDs)): |
|
temp = mode_options[:] |
|
shuffle(temp) |
|
modes.append(temp) |
|
# This is the format for a user's file on HuggingFace |
|
user_data = { |
|
"user_id": id, # original in email format, which was passed here |
|
"order": qIDs, # randomized order for each user |
|
"modes": modes, # randomized order for each user |
|
"current": 0, # user starts on first question |
|
"responses": [] # formatted as a list of current_responses |
|
} |
|
# Run the update method to upload the new JSON file to HuggingFace |
|
update_huggingface(id) |
|
|
|
def update_huggingface(id): |
|
global user_data |
|
print("Updating data...") |
|
filename = id.replace('@', '_AT_').replace('.', '_DOT_') |
|
# Create a local file that will be uploaded to HuggingFace |
|
with open(filename + ".json", "w") as f: |
|
json.dump(user_data, f) |
|
# Upload to hub (overwriting existing files...) |
|
api = HfApi() |
|
api.upload_file( |
|
path_or_fileobj=filename + ".json", |
|
path_in_repo="users/" + filename + ".json", |
|
repo_id="ebrowne/test-data", |
|
repo_type="dataset", |
|
token = os.getenv("HF_TOKEN") |
|
) |
|
|
|
def reset_current_response(qid): |
|
global current_response |
|
current_response = { |
|
"user_id": user_id, |
|
"question_id": qid, |
|
"user_answer": 0, |
|
"e5_scores": [], # list of ten [score, score, score, score] |
|
"e5_set": [], # two values |
|
"e5_generation": [], # two values |
|
"colbert_scores": [], |
|
"colbert_set": [], |
|
"colbert_generation": [], |
|
"gold_set": [], |
|
"gold_generation": [] |
|
} |
|
|
|
with open("question_data.json", "r") as f: |
|
all_questions = json.load(f) |
|
|
|
# Loads the user's current question — this is the first question that the user has not made any progress on. |
|
def load_current_question(): |
|
global current_question |
|
q_index = user_data["current"] |
|
if q_index >= len(all_questions): |
|
print("Done") |
|
gr.Info("You've finished — thank you so much! There are no more questions. :)") |
|
current_question = {"question": "You're done! Thanks so much for your help.", "answers": ["I want to log out now.", "I want to keep answering questions.","I want to keep answering questions.", "I want to keep answering questions."], "correct_answer_index": 0, "top10_e5": ["You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!"], "generation_e5": "I don't know how to exit this code right now, so you're in an endless loop of this question until you quit.", "top10_colbert": ["You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!"], "generation_colbert": "I don't know how to exit this code right now, so you're in an endless loop of this question until you quit.", "top10_contains_gold_passage": False, "gold_passage": "GOLD PASSAGE: LOG OFF!", "gold_passage_generation": "what do you gain"} |
|
reset_current_response("USER FINISHED") |
|
else: |
|
qid = user_data["order"][q_index] |
|
current_question = all_questions[qid] |
|
reset_current_response(user_data["order"][q_index]) |
|
|
|
""" |
|
|
|
|
|
|
|
theme = gr.themes.Soft( |
|
primary_hue="sky", |
|
secondary_hue="sky", |
|
neutral_hue="slate", |
|
font=[gr.themes.GoogleFont('Inter'), 'ui-sans-serif', 'system-ui', 'sans-serif'], |
|
) |
|
|
|
|
|
|
|
with gr.Blocks(theme = theme) as user_eval: |
|
|
|
|
|
|
|
user_data = gr.State({}) |
|
current_response = gr.State({}) |
|
current_question = gr.State({}) |
|
user_id = gr.State("no_id") |
|
|
|
step = gr.State(0) |
|
mode = gr.State(1) |
|
|
|
def load_user_data(id): |
|
filename = id.replace('@', '_AT_').replace('.', '_DOT_') |
|
if file_exists(filename = "users/" + filename + ".json", repo_id = "ebrowne/test-data", repo_type = "dataset", token = os.getenv("HF_TOKEN")): |
|
print("File exists, downloading data.") |
|
|
|
path = hf_hub_download(repo_id = "ebrowne/test-data", token = os.getenv("HF_TOKEN"), filename = "users/" + filename + ".json", repo_type = "dataset") |
|
|
|
with open(path, "r") as f: |
|
return json.load(f) |
|
else: |
|
|
|
print("File does not exist, creating user.") |
|
shuffle(qIDs) |
|
modes = [] |
|
for i in range(len(qIDs)): |
|
temp = mode_options[:] |
|
shuffle(temp) |
|
modes.append(temp) |
|
|
|
return { |
|
"user_id": id, |
|
"order": qIDs, |
|
"modes": modes, |
|
"current": 0, |
|
"responses": [] |
|
} |
|
|
|
|
|
def update_huggingface(id, data): |
|
print("Updating data...") |
|
filename = id.replace('@', '_AT_').replace('.', '_DOT_') |
|
|
|
with open(filename + ".json", "w") as f: |
|
json.dump(data, f) |
|
|
|
api = HfApi() |
|
api.upload_file( |
|
path_or_fileobj=filename + ".json", |
|
path_in_repo="users/" + filename + ".json", |
|
repo_id="ebrowne/test-data", |
|
repo_type="dataset", |
|
token = os.getenv("HF_TOKEN") |
|
) |
|
|
|
def reset_current_response(qid, user_id): |
|
return { |
|
current_response : { |
|
"user_id": user_id, |
|
"question_id": qid, |
|
"user_answer": 0, |
|
"e5_scores": [], |
|
"e5_set": [], |
|
"e5_generation": [], |
|
"colbert_scores": [], |
|
"colbert_set": [], |
|
"colbert_generation": [], |
|
"gold_set": [], |
|
"gold_generation": [] |
|
} |
|
} |
|
|
|
|
|
def load_current_question(user_data, user_id): |
|
q_index = user_data["current"] |
|
if q_index >= len(all_questions): |
|
print("Done") |
|
gr.Info("You've finished — thank you so much! There are no more questions. :)") |
|
reset_current_response("USER FINISHED", user_id) |
|
return {"question": "You're done! Thanks so much for your help.", "answers": ["I want to log out now.", "I want to keep answering questions.","I want to keep answering questions.", "I want to keep answering questions."], "correct_answer_index": 0, "top10_e5": ["You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!"], "generation_e5": "I don't know how to exit this code right now, so you're in an endless loop of this question until you quit.", "top10_colbert": ["You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!", "You're done; thank you!"], "generation_colbert": "I don't know how to exit this code right now, so you're in an endless loop of this question until you quit.", "top10_contains_gold_passage": False, "gold_passage": "GOLD PASSAGE: LOG OFF!", "gold_passage_generation": "what do you gain"} |
|
else: |
|
qid = user_data["order"][q_index] |
|
reset_current_response(user_data["order"][q_index], user_id) |
|
return all_questions[qid] |
|
|
|
|
|
|
|
forward_btn = gr.Textbox("unchanged", visible = False, elem_id = "togglebutton") |
|
gr.HTML(""" |
|
<h1> Legal Retriever Evaluation Study </h1> |
|
<p> Score the passages based on the question and provided answer choices. Detailed instructions are found <a href="https://docs.google.com/document/d/1ReODJ0hlXz_M3kE2UG1cwSRVoyDLQo88OvG71Gt8lUQ/edit?usp=sharing" target="_blank">here</a>. </p> |
|
""") |
|
gr.Markdown("---") |
|
|
|
|
|
with gr.Row(equal_height = False, visible = False) as evals: |
|
|
|
with gr.Column(scale = 2) as passages: |
|
selection = gr.HTML() |
|
""" |
|
selection = gr.HTML(" |
|
<h2> Retrieved Passage </h2> |
|
<p> " + current_question["top10_" + user_data["modes"][user_data["current"]][mode]][0] + "</p>") |
|
""" |
|
print(step) |
|
line = gr.Markdown("---") |
|
|
|
|
|
|
|
passage_display = gr.Markdown() |
|
temp = """ |
|
## Question and Answer |
|
""" |
|
|
|
|
|
with gr.Column(scale = 1) as scores_p: |
|
desc_0 = gr.Markdown("Does the passage describe **a legal rule or principle?**") |
|
eval_0 = gr.Radio(["Yes", "No"], label = "Legal Rule?") |
|
desc_1 = gr.Markdown("How **relevant** is this passage to the question?") |
|
eval_1 = gr.Slider(1, 5, step = 0.5, label = "Relevance", value = 3) |
|
desc_2 = gr.Markdown("How would you rate the passage's **quality** in terms of detail, clarity, and focus?") |
|
eval_2 = gr.Slider(1, 5, step = 0.5, label = "Quality", value = 3) |
|
desc_3 = gr.Markdown("How effectively does the passage **lead you to the correct answer?**") |
|
eval_3 = gr.Slider(-2, 2, step = 0.5, label = "Helpfulness", value = 0) |
|
btn_p = gr.Button("Next", interactive = False) |
|
|
|
def sanitize_score(rad): |
|
if rad == None: |
|
return {btn_p: gr.Button(interactive = False)} |
|
else: |
|
return {btn_p: gr.Button(interactive = True)} |
|
eval_0.change(fn = sanitize_score, inputs = [eval_0], outputs = [btn_p]) |
|
|
|
with gr.Column(scale = 1, visible = False) as scores_g: |
|
helps = gr.Markdown("Does this information **help answer** the question?") |
|
eval_helps = gr.Slider(-2, 2, step = 0.5, label = "Helpfulness", value = 0) |
|
satisfied = gr.Markdown("How **satisfied** are you by this answer?") |
|
eval_satisfied = gr.Slider(1, 5, step = 0.5, label = "User Satisfaction", value = 3) |
|
btn_g = gr.Button("Next") |
|
|
|
def next_p(e0, e1, e2, e3, cur_step, mode, current_response): |
|
step = cur_step + 1 |
|
|
|
current_response["e5_scores"].append([e0, e1, e2, e3]) |
|
|
|
if step >= len(current_question["top10_e5"]): |
|
|
|
collapsible_string = "<h2> Set of Passages </h2>\n" |
|
for i, passage in enumerate(current_question["top10_e5"]): |
|
collapsible_string += """ |
|
<strong>Passage """ + str(i + 1) + """</strong> |
|
<p> """ + passage + """ </p> |
|
""" |
|
return { |
|
selection: gr.HTML(collapsible_string), |
|
scores_p: gr.Column(visible = False), |
|
scores_g: gr.Column(visible = True), |
|
eval_0: gr.Radio(value = None), |
|
eval_1: gr.Slider(value = 3), |
|
eval_2: gr.Slider(value = 3), |
|
eval_3: gr.Slider(value = 0), |
|
step: step, |
|
mode: 1, |
|
current_response: current_response |
|
} |
|
else: |
|
return { |
|
selection: gr.HTML(""" |
|
<h2> Retrieved Passage </h2> |
|
<p> """ + current_question["top10_e5"][step] + "</p>"), |
|
eval_0: gr.Radio(value = None), |
|
eval_1: gr.Slider(value = 3), |
|
eval_2: gr.Slider(value = 3), |
|
eval_3: gr.Slider(value = 0), |
|
step: step, |
|
mode: 1, |
|
current_response: current_response |
|
} |
|
|
|
def next_g(e_h, e_s, cur_step, mode, user_data, current_response): |
|
step = cur_step + 1 |
|
|
|
if step == 11: |
|
|
|
|
|
|
|
current_response["e5_set"] = [e_h, e_s] |
|
return { |
|
selection: gr.HTML(""" |
|
<h2> Autogenerated Response </h2> |
|
<p>""" + markdown(current_question["generation_e5"]) + "</p>"), |
|
eval_helps: gr.Slider(value = 0), |
|
eval_satisfied: gr.Slider(value = 3), |
|
step: step, |
|
mode: mode, |
|
user_data: user_data, |
|
current_response: current_response |
|
} |
|
|
|
if step > 11: |
|
|
|
""" |
|
if mode == 0: |
|
# The user just evaluated a generation for mode 0 |
|
current_response[user_data["modes"][user_data["current"]][mode] + "_generation"] = [e_h, e_s] |
|
return { |
|
selection: gr.HTML(\""" |
|
<h2> Retrieved Passage </h2> |
|
<p> \""" + current_question["top10_" + user_data["modes"][user_data["current"]][1]][0] + "</p>"), # hard coded: first passage (0) of mode 2 (1), |
|
forward_btn: gr.Textbox("load new data"), |
|
eval_helps: gr.Slider(value = 0), |
|
eval_satisfied: gr.Slider(value = 3) |
|
} |
|
""" |
|
|
|
if step == 12: |
|
|
|
|
|
current_response["e5_generation"] = [e_h, e_s] |
|
return { |
|
selection: gr.HTML(""" |
|
<h2> Retrieved Passage </h2> |
|
<p> """ + current_question["gold_passage"] + "</p>"), |
|
forward_btn: gr.Textbox(), |
|
eval_helps: gr.Slider(value = 0), |
|
eval_satisfied: gr.Slider(value = 3), |
|
step: step, |
|
mode: mode, |
|
user_data: user_data, |
|
current_response: current_response |
|
} |
|
elif step == 13: |
|
|
|
current_response["gold_set"] = [e_h, e_s] |
|
return { |
|
selection: gr.HTML(""" |
|
<h2> Autogenerated Response </h2> |
|
<p> """ + markdown(current_question["gold_passage_generation"]) + "</p>"), |
|
forward_btn: gr.Textbox(), |
|
eval_helps: gr.Slider(value = 0), |
|
eval_satisfied: gr.Slider(value = 3), |
|
step: step, |
|
mode: mode, |
|
user_data: user_data, |
|
current_response: current_response |
|
} |
|
else: |
|
|
|
current_response["gold_generation"] = [e_h, e_s] |
|
user_data["current"] += 1 |
|
user_data["responses"].append(current_response) |
|
update_huggingface(user_id) |
|
current_question = load_current_question(user_data, user_id) |
|
return { |
|
selection: gr.Markdown("Advancing to the next question..."), |
|
forward_btn: gr.Textbox("changed" + str(user_data["current"])), |
|
eval_helps: gr.Slider(value = 0), |
|
eval_satisfied: gr.Slider(value = 3), |
|
step: step, |
|
mode: mode, |
|
user_data: user_data, |
|
current_response: current_response |
|
} |
|
|
|
btn_p.click(fn = next_p, inputs = [eval_0, eval_1, eval_2, eval_3, step, mode, current_response], outputs = [selection, scores_p, scores_g, eval_0, eval_1, eval_2, eval_3, step, mode, current_response]) |
|
btn_g.click(fn = next_g, inputs = [eval_helps, eval_satisfied, step, mode, user_data, current_response], outputs = [selection, forward_btn, eval_helps, eval_satisfied, step, mode, user_data, current_response]) |
|
|
|
|
|
with gr.Row(equal_height = False, visible = False) as question: |
|
with gr.Column(): |
|
gr.Markdown("**Question**") |
|
q_text = gr.Markdown("Question") |
|
a = gr.Button("A") |
|
b = gr.Button("B") |
|
c = gr.Button("C") |
|
d = gr.Button("D") |
|
|
|
|
|
def answer_a(): |
|
global current_response |
|
current_response["user_answer"] = 0 |
|
return { |
|
question: gr.Row(visible = False), |
|
evals: gr.Row(visible = True) |
|
} |
|
def answer_b(): |
|
global current_response |
|
current_response["user_answer"] = 1 |
|
return { |
|
question: gr.Row(visible = False), |
|
evals: gr.Row(visible = True) |
|
} |
|
def answer_c(): |
|
global current_response |
|
current_response["user_answer"] = 2 |
|
return { |
|
question: gr.Row(visible = False), |
|
evals: gr.Row(visible = True) |
|
} |
|
def answer_d(): |
|
global current_response |
|
current_response["user_answer"] = 3 |
|
return { |
|
question: gr.Row(visible = False), |
|
evals: gr.Row(visible = True) |
|
} |
|
|
|
a.click(fn = answer_a, outputs = [question, evals]) |
|
b.click(fn = answer_b, outputs = [question, evals]) |
|
c.click(fn = answer_c, outputs = [question, evals]) |
|
d.click(fn = answer_d, outputs = [question, evals]) |
|
|
|
def toggle(step, mode): |
|
step = 0 |
|
if mode == 0: |
|
mode = 1 |
|
print("Next set of passages for same question") |
|
return { |
|
scores_p: gr.Column(visible = True), |
|
scores_g: gr.Column(visible = False), |
|
evals: gr.Row(visible = True), |
|
question: gr.Row(visible = False), |
|
step: step, |
|
mode: mode |
|
} |
|
else: |
|
|
|
|
|
print("New question") |
|
new_answers = current_question["answers"].copy() |
|
new_answers[current_question["correct_answer_index"]] = "**" + current_question["answers"][current_question["correct_answer_index"]] + "** ✅" |
|
return { |
|
scores_p: gr.Column(visible = True), |
|
scores_g: gr.Column(visible = False), |
|
evals: gr.Row(visible = False), |
|
question: gr.Row(visible = True), |
|
q_text: gr.Markdown(current_question["question"]), |
|
a: gr.Button(current_question["answers"][0]), |
|
b: gr.Button(current_question["answers"][1]), |
|
c: gr.Button(current_question["answers"][2]), |
|
d: gr.Button(current_question["answers"][3]), |
|
passage_display: gr.Markdown(""" |
|
## Question and Answer |
|
*""" + current_question["question"] + |
|
"""* \n |
|
+ """ + new_answers[0] + |
|
""" \n |
|
+ """ + new_answers[1] + |
|
""" \n |
|
+ """ + new_answers[2] + |
|
""" \n |
|
+ """ + new_answers[3]), |
|
selection: gr.HTML(""" |
|
<h2> Retrieved Passage </h2> |
|
<p> """ + current_question["top10_e5"][0] + "</p>"), |
|
step: step, |
|
mode: mode |
|
} |
|
|
|
forward_btn.change(fn = toggle, inputs = [step, mode], outputs = [scores_p, scores_g, evals, question, q_text, a, b, c, d, passage_display, selection, step, mode]) |
|
|
|
with gr.Row() as login: |
|
with gr.Column(): |
|
gr.Markdown("# Enter email to start") |
|
gr.Markdown("Thank you so much for your participation in our study! We're using emails to keep track of which questions you've answered and which you haven't seen. Use the same email every time to keep your progress saved. :)") |
|
email = gr.Textbox(label = "Email", placeholder = "[email protected]") |
|
s = gr.Button("Start!", interactive = False) |
|
|
|
def sanitize_login(text): |
|
if text == "": |
|
return {s: gr.Button(interactive = False)} |
|
else: |
|
return {s: gr.Button(interactive = True)} |
|
email.change(fn = sanitize_login, inputs = [email], outputs = [s]) |
|
|
|
def submit_email(email): |
|
user_id = email |
|
loaded_data = load_user_data(user_id) |
|
|
|
new_q = load_current_question(loaded_data, user_id) |
|
new_answers = new_q["answers"].copy() |
|
new_answers[new_q["correct_answer_index"]] = "**" + new_q["answers"][new_q["correct_answer_index"]] + "** ✅" |
|
return { |
|
question: gr.Row(visible = True), |
|
login: gr.Row(visible = False), |
|
selection: gr.HTML(""" |
|
<h2> Retrieved Passage </h2> |
|
<p> """ + current_question["top10_e5"][0] + "</p>"), |
|
|
|
passage_display: gr.Markdown(""" |
|
## Question and Answer |
|
*""" + current_question["question"] + |
|
"""* \n |
|
+ """ + new_answers[0] + |
|
""" \n |
|
+ """ + new_answers[1] + |
|
""" \n |
|
+ """ + new_answers[2] + |
|
""" \n |
|
+ """ + new_answers[3]), |
|
q_text: gr.Markdown(current_question["question"]), |
|
a: gr.Button(current_question["answers"][0]), |
|
b: gr.Button(current_question["answers"][1]), |
|
c: gr.Button(current_question["answers"][2]), |
|
d: gr.Button(current_question["answers"][3]), |
|
user_id: user_id, |
|
user_data: loaded_data, |
|
current_question: new_q |
|
} |
|
s.click(fn = submit_email, inputs = [email], outputs = [question, login, selection, passage_display, q_text, a, b, c, d, user_id, user_data, current_question]) |
|
|
|
|
|
user_eval.launch() |
|
|
|
|