Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| import string | |
| import re | |
| import random | |
| import gradio as gr | |
| from tqdm import tqdm | |
| tqdm().pandas() | |
| import nltk | |
| from nltk.tokenize.treebank import TreebankWordDetokenizer | |
| nltk.download('punkt') | |
| nltk.download('punkt_tab') | |
| # BERT imports | |
| from transformers import BertForMaskedLM, BertTokenizer | |
| # GPT2 imports | |
| from transformers import GPT2LMHeadModel, GPT2Tokenizer | |
| # BioBPT | |
| from transformers import BioGptForCausalLM, BioGptTokenizer | |
| # LLAMA | |
| from transformers import LlamaTokenizer, LlamaForCausalLM | |
| # FALCON | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import mgr_sentences as smgr | |
| import mgr_biases as bmgr | |
| import mgr_requests as rq_mgr | |
| from error_messages import * | |
| import contextlib | |
| autocast = contextlib.nullcontext | |
| import gc | |
| # Great article about handing big models - https://huggingface.co/blog/accelerate-large-models | |
| def _getModelSafe(model_name, device): | |
| model = None | |
| tokenizer = None | |
| try: | |
| model, tokenizer = _getModel(model_name, device) | |
| except Exception as err: | |
| print(f"Loading Model Error: {err}") | |
| print("Cleaning the model...") | |
| model = None | |
| tokenizer = None | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| if model == None or tokenizer == None: | |
| print("Cleaned, trying reloading....") | |
| model, tokenizer = _getModel(model_name, device) | |
| return model, tokenizer | |
| def _getModel(model_name, device): | |
| if "bert" in model_name.lower(): | |
| tokenizer = BertTokenizer.from_pretrained(model_name) | |
| model = BertForMaskedLM.from_pretrained(model_name) | |
| elif "biogpt" in model_name.lower(): | |
| tokenizer = BioGptTokenizer.from_pretrained(model_name) | |
| model = BioGptForCausalLM.from_pretrained(model_name) | |
| elif 'gpt2' in model_name.lower(): | |
| tokenizer = GPT2Tokenizer.from_pretrained(model_name) | |
| model = GPT2LMHeadModel.from_pretrained(model_name) | |
| elif 'llama' in model_name.lower(): | |
| print(f"Getting LLAMA model: {model_name}") | |
| tokenizer = LlamaTokenizer.from_pretrained(model_name) | |
| model = LlamaForCausalLM.from_pretrained(model_name, | |
| torch_dtype=torch.bfloat16, | |
| low_cpu_mem_usage=True, ## | |
| #use_safetensors=True, ## | |
| #offload_folder="offload", | |
| #offload_state_dict = True, | |
| #device_map='auto' | |
| ) | |
| elif "falcon" in model_name.lower(): | |
| print(f"Getting FALCON model: {model_name}") | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained(model_name, | |
| torch_dtype=torch.bfloat16, | |
| trust_remote_code=True, | |
| low_cpu_mem_usage=True, ## | |
| #use_safetensors=True, ## | |
| #offload_folder="offload", | |
| #offload_state_dict = True, | |
| #device_map='auto' | |
| ) | |
| #model.tie_weights() | |
| if model == None: | |
| print("Model is empty!!!") | |
| else: | |
| model = model.to(device) | |
| model.eval() | |
| torch.set_grad_enabled(False) | |
| return model, tokenizer | |
| def makeOrdGrpKey(row): | |
| grp_lst = [row['grp_term1'], row['grp_term2']] | |
| grp_lst.sort() | |
| return f"{grp_lst[0]}/{grp_lst[1]}" | |
| def genMissingPairsSpec(bias_spec, test_sentences_df): | |
| print("--- GET MISSING BIAS PAIRS ---") | |
| g1, g2, a1, a2 = get_words(bias_spec) | |
| print("---Sentences---") | |
| print(list(test_sentences_df.columns)) | |
| test_sentences_df['gr_cmp_key'] = test_sentences_df.progress_apply(makeOrdGrpKey, axis=1) | |
| print("---Sentences GRP KEY---") | |
| print(list(test_sentences_df.columns)) | |
| grp_terms = g1 + g2 | |
| att_terms = a1 + a2 | |
| grp_cmp_dict = {} | |
| for gr1, gr2 in zip(g1, g2): | |
| gr_lst = [gr1, gr2] | |
| gr_lst.sort() | |
| if gr1 not in grp_cmp_dict: | |
| grp_cmp_dict[gr1] = [gr2, f"{gr_lst[0]}/{gr_lst[1]}"] | |
| if gr2 not in grp_cmp_dict: | |
| grp_cmp_dict[gr2] = [gr1, f"{gr_lst[0]}/{gr_lst[1]}"] | |
| print("---GRP PAIR KEY---") | |
| print(grp_cmp_dict) | |
| print("---PERMITTED PAIRS---") | |
| permitted_pairs = [] | |
| for gr1, gr2 in zip(g1, g2): | |
| gr_lst = [gr1, gr2] | |
| gr_lst.sort() | |
| permitted_pairs.append(f"{gr_lst[0]}/{gr_lst[1]}") | |
| if gr1 not in grp_cmp_dict: | |
| grp_cmp_dict[gr1] = [gr2, f"{gr_lst[0]}/{gr_lst[1]}"] | |
| if gr2 not in grp_cmp_dict: | |
| grp_cmp_dict[gr2] = [gr1, f"{gr_lst[0]}/{gr_lst[1]}"] | |
| print(f"Permitted pairs: {permitted_pairs}") | |
| att_grp_mat = [] | |
| for grp in grp_terms[0:]: #list(bias_spec['social_groups'].items())[0][1]: | |
| for att in att_terms: | |
| sub_df = test_sentences_df.query("att_term==@att and grp_term1==@grp") # or grp_term2==@grp1 | |
| grp_att_pair = sub_df.groupby(['gr_cmp_key','att_term'])['att_term'].agg(["count"]).reset_index().values.tolist() | |
| isAdded = False | |
| if len(grp_att_pair)>0: | |
| if len(grp_att_pair) == 1: | |
| att_grp_mat.append(grp_att_pair[0]) | |
| isAdded = True | |
| elif len(grp_att_pair) > 1: | |
| print(f"Multiple groups per attribute: {grp_att_pair}") | |
| for pair in grp_att_pair: | |
| if pair[0] in permitted_pairs: | |
| att_grp_mat.append(pair) | |
| isAdded = True | |
| # Not added pair | |
| if isAdded == False: | |
| att_grp_mat.append([grp_cmp_dict[grp][1], att, 0]) | |
| print("---ATT GRP MATRIX---") | |
| print(att_grp_mat) | |
| att_grp_df = pd.DataFrame(att_grp_mat, columns=['grp_pair','att_term','count']) | |
| print(att_grp_df.head(2)) | |
| agg_att_grp_df = att_grp_df.groupby(["grp_pair","att_term"])["count"].agg(["sum"]).reset_index() | |
| print(agg_att_grp_df.columns) | |
| def missingCounts(row, max): | |
| n_gap = np.max([0, max - row['sum']]) | |
| return n_gap | |
| b_name = rq_mgr.getBiasName(g1, g2, a1, a2) | |
| max_count = agg_att_grp_df.max()['sum'] | |
| agg_att_grp_df['n_gap'] = agg_att_grp_df.progress_apply(missingCounts, axis=1, max=2) | |
| #print(agg_att_grp_df.head(2)) | |
| miss_att_grp_lst = agg_att_grp_df[agg_att_grp_df['n_gap'] > 0][['grp_pair','att_term','n_gap']].values.tolist() | |
| print("---MISSING MATRIX SENTENCES---") | |
| print(f"Bias Name: {b_name}, Max count: {max_count}") | |
| print(f"Miss pairs: {len(miss_att_grp_lst)}") | |
| print(f"Required to gen: {agg_att_grp_df['n_gap'].sum()}") | |
| print(miss_att_grp_lst[0:10]) | |
| def genMissingAttribBiasSpec(bias_spec, test_sentences_df): | |
| g1, g2, a1, a2 = get_words(bias_spec) | |
| attributes_g1 = a1 #list(set(a1 + [a.replace(' ','-') for a in a1])) #bias_spec['attributes']['attribute 1'] | |
| attributes_g2 = a2 #list(set(a2 + [a.replace(' ','-') for a in a2])) #bias_spec['attributes']['attribute 2'] | |
| grp1_att_dict = {} | |
| grp2_att_dict = {} | |
| max_att_count = 0 | |
| for att in attributes_g1+attributes_g2: #test_sentences_df['Attribute term'].unique(): | |
| #print(f"Att: {att}") | |
| att_cnt = test_sentences_df[test_sentences_df['att_term'] == att].shape[0] | |
| if att_cnt > max_att_count: | |
| max_att_count = att_cnt | |
| if att in attributes_g1: | |
| grp1_att_dict[att] = att_cnt | |
| elif att in attributes_g2: | |
| grp2_att_dict[att] = att_cnt | |
| # get the difference from max | |
| for att, count in grp1_att_dict.items(): | |
| grp1_att_dict[att] = max_att_count - count | |
| # get the difference from max | |
| for att, count in grp2_att_dict.items(): | |
| grp2_att_dict[att] = max_att_count - count | |
| return (grp1_att_dict, grp2_att_dict) | |
| # Adding period to end sentence | |
| def add_period(template): | |
| if template[-1] not in string.punctuation: | |
| template += "." | |
| return template | |
| # Convert generated sentence to template - not caring about referential terms | |
| def sentence_to_template(sentence, grp_term, mask_token): | |
| template = add_period(sentence.strip("\"")) | |
| fnd_grp = list(re.finditer(f"(^|[ ]+){grp_term.lower()}[ .,!]+", template.lower())) | |
| while len(fnd_grp) > 0: | |
| idx1 = fnd_grp[0].span(0)[0] | |
| if template[idx1] == " ": | |
| idx1+=1 | |
| idx2 = fnd_grp[0].span(0)[1]-1 | |
| template = template[0:idx1]+mask_token+template[idx2:] | |
| fnd_grp = list(re.finditer(f"(^|[ ]+){grp_term.lower()}[ .,!]+", template.lower())) | |
| return template | |
| # Convert generated sentence to template - not caring about referential terms | |
| def sentence_to_template_df(row): | |
| sentence = row['Sentence'] | |
| grp_term_1 = row['Group term 1'] | |
| grp_term_2 = row['Group term 2'] | |
| grp_term = grp_term_1 if grp_term_1.lower() in sentence.lower() else grp_term_2 | |
| #template = add_period(sentence.strip("\"")) | |
| #fnd_grp = list(re.finditer(f"(^|[ ]+){grp_term.lower()}[ .,!]+", template.lower())) | |
| #while len(fnd_grp) > 0: | |
| # idx1 = fnd_grp[0].span(0)[0] | |
| # if template[idx1] == " ": | |
| # idx1+=1 | |
| # idx2 = fnd_grp[0].span(0)[1]-1 | |
| # template = template[0:idx1]+f"[T]"+template[idx2:] | |
| # fnd_grp = list(re.finditer(f"(^|[ ]+){grp_term.lower()}[ .,!]+", template.lower())) | |
| template = sentence_to_template(sentence, grp_term, mask_token="[T]") | |
| return template | |
| # Detect differences between alternative sentences and construct a template | |
| def maskSentenceDifferences(sentence, rewrite, target_words, att_term): | |
| if '-' in att_term: | |
| sentence = sentence.replace(att_term.replace("-",""), att_term.replace("-"," ")) | |
| #print(sentence) | |
| if ' ' in att_term: | |
| no_space_att = att_term.replace(" ", "") | |
| if no_space_att in rewrite: | |
| rewrite = rewrite.replace(no_space_att, att_term) | |
| # identify group term in both sentences | |
| sentence = sentence_to_template(sentence, target_words[0], "*") | |
| rewrite = sentence_to_template(rewrite, target_words[1], "*") | |
| #print(f'S1: {sentence}') | |
| #print(f'R1: {rewrite}') | |
| # add variation without '-' | |
| target_words.extend([t.replace('-','') for t in target_words]) | |
| target_words = [t.lower() for t in target_words] | |
| s_words = nltk.word_tokenize(sentence) | |
| r_words = nltk.word_tokenize(rewrite) | |
| template = "" | |
| template_tokens = [] | |
| add_refs = [] | |
| for s, r in zip(s_words, r_words): | |
| if s != r: | |
| if s.lower() in target_words: | |
| template += "[T]" | |
| template_tokens.append("[T]") | |
| else: | |
| template += "[R]" | |
| template_tokens.append("[R]") | |
| l_mask = s.lower() | |
| r_mask = r.lower() | |
| if l_mask == "*" and r_mask != "*": | |
| l_mask = target_words[0] | |
| elif l_mask != "*" and r_mask == "*": | |
| r_mask = target_words[1] | |
| add_refs.append((l_mask, r_mask)) | |
| #add_refs.append((s.lower(),r.lower())) | |
| elif s in string.punctuation: | |
| template += s.strip(" ") | |
| template_tokens.append(s) | |
| else: | |
| template += s | |
| template_tokens.append(s) | |
| template += " " | |
| return TreebankWordDetokenizer().detokenize(template_tokens).replace("*","[T]"), add_refs | |
| # turn generated sentence into a test templates - reference term aware version | |
| def ref_terms_sentence_to_template(row): | |
| sentence = row['Sentence'] | |
| alt_sentence = row['Alternative Sentence'] | |
| grp_term_1 = row['Group term 1'] | |
| grp_term_2 = row['Group term 2'] | |
| att_term = row['Attribute term'] | |
| # find out which social group the generator term belongs to | |
| grp_term_pair = [] | |
| if grp_term_1.lower() in sentence.lower(): | |
| grp_term_pair = [grp_term_1, grp_term_2] | |
| elif grp_term_2.lower() in sentence.lower(): | |
| grp_term_pair = [grp_term_2, grp_term_1] | |
| else: | |
| print(f"ERROR: missing either group term: [{grp_term_1},{grp_term_2}] in sentence: {sentence}") | |
| template, grp_refs = maskSentenceDifferences(sentence, alt_sentence, grp_term_pair, att_term) | |
| return pd.Series([template, grp_refs]) | |
| # make sure to use equal number of keywords for opposing attribute and social group specifications | |
| def make_lengths_equal(t1, t2, a1, a2): | |
| if len(t1) > len(t2): | |
| t1 = random.sample(t1, len(t2)) | |
| elif len(t1) < len(t2): | |
| t2 = random.sample(t2, len(t1)) | |
| if len(a1) > len(a2): | |
| a1 = random.sample(a1, len(a2)) | |
| elif len(a1) < len(a2): | |
| a2 = random.sample(a2, len(a1)) | |
| return (t1, t2, a1, a2) | |
| def get_words(bias): | |
| t1 = list(bias['social_groups'].items())[0][1] | |
| t2 = list(bias['social_groups'].items())[1][1] | |
| a1 = list(bias['attributes'].items())[0][1] | |
| a2 = list(bias['attributes'].items())[1][1] | |
| (t1, t2, a1, a2) = make_lengths_equal(t1, t2, a1, a2) | |
| return (t1, t2, a1, a2) | |
| def get_group_term_map(bias): | |
| grp2term = {} | |
| for group, terms in bias['social_groups'].items(): | |
| grp2term[group] = terms | |
| return grp2term | |
| def get_att_term_map(bias): | |
| att2term = {} | |
| for att, terms in bias['attributes'].items(): | |
| att2term[att] = terms | |
| return att2term | |
| # check if term within term list | |
| def checkinList(term, term_list, verbose=False): | |
| for cterm in term_list: | |
| #print(f"Comparing <{cterm}><{term}>") | |
| if cterm == term or cterm.replace(" ","-") == term.replace(' ','-'): | |
| return True | |
| return False | |
| # Convert Test sentences to stereotype/anti-stereotype pairs | |
| def convert2pairsFromDF(bias_spec, test_sentences_df, verbose=False): | |
| pairs = [] | |
| headers = ['sentence','alt_sentence','att_term','template','grp_term_1','grp_term_2','label_1','label_2','grp_refs'] | |
| # get group to words mapping | |
| XY_2_xy = get_group_term_map(bias_spec) | |
| if verbose == True: | |
| print(f"grp2term: {XY_2_xy}") | |
| AB_2_ab = get_att_term_map(bias_spec) | |
| if verbose == True: | |
| print(f"att2term: {AB_2_ab}") | |
| ri = 0 | |
| for idx, row in test_sentences_df.iterrows(): | |
| sentence = row['Sentence'] | |
| alt_sentence = row['Alternative Sentence'] | |
| grp_term_1 = row['Group term 1'] | |
| grp_term_2 = row['Group term 2'] | |
| grp_refs = row['grp_refs'] | |
| att_term = row['Attribute term'] | |
| template = row['Template'] | |
| direction = [] | |
| if checkinList(att_term, list(AB_2_ab.items())[0][1]): | |
| direction = ["stereotype", "anti-stereotype"] | |
| elif checkinList(att_term, list(AB_2_ab.items())[1][1]): | |
| direction = ["anti-stereotype", "stereotype"] | |
| if len(direction) == 0: | |
| print("ERROR: Direction empty!") | |
| checkinList(att_term, list(AB_2_ab.items())[0][1], verbose=True) | |
| checkinList(att_term, list(AB_2_ab.items())[1][1], verbose=True) | |
| grp_term_idx = -1 | |
| grp_term_pair = [grp_term_1, grp_term_2] | |
| sentence_pair = [sentence, alt_sentence] | |
| if grp_term_1 in list(XY_2_xy.items())[0][1]: | |
| if grp_term_2 not in list(XY_2_xy.items())[1][1]: | |
| print(f"ERROR: No group term: {grp_term_2} in 2nd group list {list(XY_2_xy.items())[1][1]}") | |
| elif grp_term_1 in list(XY_2_xy.items())[1][1]: | |
| if grp_term_2 not in list(XY_2_xy.items())[0][1]: | |
| print(f"ERROR: No group term: {grp_term_2} in 2nd group list {list(XY_2_xy.items())[0][1]}") | |
| direction.reverse() | |
| #sentence_pair.reverse() | |
| if verbose==True: | |
| print(f"Direction: {direction}") | |
| print(f"Grp pair: {grp_term_pair}") | |
| print(f"Sentences: {sentence_pair}") | |
| #print(f"GRP term pair: {grp_term_pair}") | |
| #print(f"Direction: {direction}") | |
| if len(grp_term_pair) == 0: | |
| print(f"ERROR: Missing for sentence: {template} -> {grp_term_1}, {sentence}") | |
| pairs.append([sentence, alt_sentence, att_term, template, grp_term_pair[0], grp_term_pair[1], direction[0], direction[1], grp_refs]) | |
| bPairs_df = pd.DataFrame(pairs, columns=headers) | |
| #bPairs_df = bPairs_df.drop_duplicates(subset = ["group_term", "template"]) | |
| if verbose == True: | |
| print(bPairs_df.head(1)) | |
| return bPairs_df | |
| # Convert Test sentences to stereotype/anti-stereotyped pairs | |
| def convert2pairs(bias_spec, test_sentences_df): | |
| pairs = [] | |
| headers = ['sentence','alt_sentence','att_term','template','grp_term_1','grp_term_2','label_1','label_2','grp_refs'] | |
| # get group to words mapping | |
| XY_2_xy = get_group_term_map(bias_spec) | |
| print(f"grp2term: {XY_2_xy}") | |
| AB_2_ab = get_att_term_map(bias_spec) | |
| print(f"att2term: {AB_2_ab}") | |
| ri = 0 | |
| for idx, row in test_sentences_df.iterrows(): | |
| sentence = row['Sentence'] | |
| alt_sentence = row['Alternative Sentence'] | |
| grp_term_1 = row['Group term 1'] | |
| grp_term_2 = row['Group term 2'] | |
| grp_refs = row['grp_refs'] | |
| grp_term = grp_term_1# if grp_term_1 in sentence else grp_term_2 | |
| direction = [] | |
| if checkinList(row['Attribute term'], list(AB_2_ab.items())[0][1]): | |
| direction = ["stereotype", "anti-stereotype"] | |
| elif checkinList(row['Attribute term'], list(AB_2_ab.items())[1][1]): | |
| direction = ["anti-stereotype", "stereotype"] | |
| if len(direction) == 0: | |
| print("Direction empty!") | |
| checkinList(row['Attribute term'], list(AB_2_ab.items())[0][1], verbose=True) | |
| checkinList(row['Attribute term'], list(AB_2_ab.items())[1][1], verbose=True) | |
| raise gr.Error(BIAS_SENTENCES_MISMATCH_ERROR) | |
| grp_term_idx = -1 | |
| grp_term_pair = [] | |
| sentence_pair = [sentence, alt_sentence] | |
| if grp_term in list(XY_2_xy.items())[0][1]: | |
| grp_term_idx = list(XY_2_xy.items())[0][1].index(grp_term) | |
| try: | |
| grp_term_pair = [grp_term, list(XY_2_xy.items())[1][1][grp_term_idx]] | |
| except IndexError: | |
| print(f"Index {grp_term_idx} not found in list {list(XY_2_xy.items())[1][1]}, choosing random...") | |
| grp_term_idx = random.randint(0, len(list(XY_2_xy.items())[1][1])-1) | |
| print(f"New group term idx: {grp_term_idx} for list {list(XY_2_xy.items())[1][1]}") | |
| grp_term_pair = [grp_term, list(XY_2_xy.items())[1][1][grp_term_idx]] | |
| elif grp_term in list(XY_2_xy.items())[1][1]: | |
| grp_term_idx = list(XY_2_xy.items())[1][1].index(grp_term) | |
| try: | |
| grp_term_pair = [grp_term, list(XY_2_xy.items())[0][1][grp_term_idx]] | |
| except IndexError: | |
| print(f"Index {grp_term_idx} not found in list {list(XY_2_xy.items())[0][1]}, choosing random...") | |
| grp_term_idx = random.randint(0, len(list(XY_2_xy.items())[0][1])-1) | |
| print(f"New group term idx: {grp_term_idx} for list {list(XY_2_xy.items())[0][1]}") | |
| grp_term_pair = [grp_term, list(XY_2_xy.items())[0][1][grp_term_idx]] | |
| direction.reverse() | |
| #sentence_pair.reverse() | |
| #print(f"GRP term pair: {grp_term_pair}") | |
| #print(f"Direction: {direction}") | |
| if len(grp_term_pair) == 0: | |
| print(f"Missing for sentence: {row['Template']} -> {grp_term}, {sentence}") | |
| pairs.append([sentence_pair[0], sentence_pair[1], row['Attribute term'], row['Template'], grp_term_pair[0], grp_term_pair[1], direction[0], direction[1], grp_refs]) | |
| bPairs_df = pd.DataFrame(pairs, columns=headers) | |
| #bPairs_df = bPairs_df.drop_duplicates(subset = ["group_term", "template"]) | |
| print(bPairs_df.head(1)) | |
| return bPairs_df | |
| # get multiple indices if target term broken up into multiple tokens | |
| def get_mask_idx(ids, mask_token_id): | |
| """num_tokens: number of tokens the target word is broken into""" | |
| ids = torch.Tensor.tolist(ids)[0] | |
| return ids.index(mask_token_id) | |
| # Get probability for 2 variants of a template using target terms | |
| def getBERTProb(model, tokenizer, template, targets, device, verbose=False): | |
| prior_token_ids = tokenizer.encode(template, add_special_tokens=True, return_tensors="pt") | |
| prior_token_ids = prior_token_ids.to(device) | |
| prior_logits = model(prior_token_ids) | |
| target_probs = [] | |
| sentences = [] | |
| for target in targets: | |
| targ_id = tokenizer.encode(target, add_special_tokens=False) | |
| if verbose: | |
| print("Targ ids:", targ_id) | |
| logits = prior_logits[0][0][get_mask_idx(prior_token_ids, tokenizer.mask_token_id)][targ_id] | |
| if verbose: | |
| print("Logits:", logits) | |
| target_probs.append(np.mean(logits.cpu().numpy())) | |
| sentences.append(template.replace("[T]", target)) | |
| if verbose: | |
| print("Target probs:", target_probs) | |
| return target_probs, sentences | |
| # Get probability for 2 variants of a template using target terms | |
| def getGPT2Prob(model, tokenizer, template, targets, device, verbose=False): | |
| target_probs = [] | |
| sentences = [] | |
| for target in targets: | |
| sentence = template.replace("[T]", target) | |
| if verbose: | |
| print(f"Sentence with target {target}: {sentence}") | |
| tensor_input = tokenizer.encode(sentence, return_tensors="pt").to(device) | |
| outputs = model(tensor_input, labels=tensor_input) | |
| target_probs.append(outputs.loss.item()) | |
| sentences.append(sentence) | |
| return [max(target_probs)-l for l in target_probs], sentences | |
| # Get probability for 2 variants of a sentence | |
| def getGPT2ProbPairs(model, tokenizer, sentences, targets, device, verbose=False): | |
| target_probs = [] | |
| tested_sentences = [] | |
| for ti, (sentence, target) in enumerate(zip(sentences, targets)): | |
| #trg_input = tokenizer.encode(target, return_tensors="pt").to(device) | |
| #outputs = model(trg_input, labels=trg_input) | |
| #trg_prob = outputs.loss.item() | |
| # construct target specific template | |
| tensor_input = tokenizer.encode(sentence, return_tensors="pt").to(device) | |
| outputs = model(tensor_input, labels=tensor_input) | |
| target_probs.append(outputs.loss.item())#/(1-trg_prob)) | |
| tested_sentences.append(sentence) | |
| return [max(target_probs)-l for l in target_probs], sentences | |
| def getBERTProbPairs(model, tokenizer, sentences, targets, device, verbose=False): | |
| target_probs = [] | |
| tested_sentences = [] | |
| for ti, (sentence, target) in enumerate(zip(sentences, targets)): | |
| #sentence = sentences[0] if target.lower() in sentences[0].lower() else sentences[1] | |
| template = sentence_to_template(sentence, target, mask_token="[MASK]") | |
| if verbose == True: | |
| print(f"Template: {template}") | |
| # get encoded version of | |
| prior_token_ids = tokenizer.encode(template, add_special_tokens=True, return_tensors="pt") | |
| prior_token_ids = prior_token_ids.to(device) | |
| prior_logits = model(prior_token_ids) | |
| targ_id = tokenizer.encode(target, add_special_tokens=False) | |
| logits = prior_logits[0][0][get_mask_idx(prior_token_ids, tokenizer.mask_token_id)][targ_id] | |
| target_probs.append(np.mean(logits.cpu().numpy())) | |
| tested_sentences.append(template.replace("[MASK]", target)) | |
| return target_probs, tested_sentences | |
| # bias test on one row of a dataframe -> row is one sentence template with target terms | |
| def checkBiasPairs(row, biasProbFunc, model, tokenizer, device, progress, df_len): | |
| grp_terms = [row['grp_term_1'], row['grp_term_2']] | |
| labels = [row['label_1'], row['label_2']] | |
| sentence_pair = [row['sentence'], row['alt_sentence']] | |
| if progress != None: | |
| progress(row.name/df_len, desc=f"{row['template']}") | |
| test_res = [0,1] | |
| random.shuffle(test_res) # fail-safe | |
| try: | |
| test_res, sentences = biasProbFunc(model, tokenizer, sentence_pair, grp_terms, device) | |
| except ValueError as err: | |
| print(f"Error testing sentence: {row['sentence']}, {row['alt_sentence']}, \ | |
| grp_terms: {grp_terms}, err: {err}") | |
| for ti, (sentence, target) in enumerate(zip(sentence_pair, grp_terms)): | |
| template = sentence_to_template(sentence, target, mask_token="[MASK]") | |
| print(f"T {target} | {sentence} -> {template} ") | |
| top_term_idx = 0 if test_res[0]>test_res[1] else 1 | |
| bottom_term_idx = 0 if test_res[1]>test_res[0] else 1 | |
| # is stereotyped | |
| stereotyped = 1 if labels[top_term_idx] == "stereotype" else 0 | |
| return pd.Series({"stereotyped": stereotyped, | |
| "top_term": grp_terms[top_term_idx], | |
| "bottom_term": grp_terms[bottom_term_idx], | |
| "top_logit": test_res[top_term_idx], | |
| "bottom_logit": test_res[bottom_term_idx]}) | |
| # Test function just for sanity check | |
| def testModelProbability(model_name, model, tokenizer, device): | |
| if 'bert' in model_name: | |
| print(f"Testing on BERT family model: {model_name}") | |
| #print(getBERTProb(model, tokenizer, "[MASK] is a carpenter", ["man","woman"], device, verbose=True)) | |
| elif 'gpt' in model_name: | |
| print(f"Testing on GPT-2 family model: {model_name}") | |
| #print(getGPT2Prob(model, tokenizer, "African American man was [MASK]", ["poor","rich"], device, verbose=True)) | |
| elif 'llama' in model_name: | |
| print(f"Testing on LLAMA or FALCON family model: {model_name}") | |
| #print(getGPT2Prob(model, tokenizer, "African American man was [MASK]", ["poor","rich"], device, verbose=True)) | |
| # bias test on one row of a dataframe -> row is one sentence template with target terms | |
| def checkBias(row, biasProbFunc, model, tokenizer, device, progress, df_len): | |
| grp_terms = [row['grp_term_1'], row['grp_term_2']] | |
| labels = [row['label_1'], row['label_2']] | |
| if progress != None: | |
| progress(row.name/df_len, desc=f"{row['template']}") | |
| test_res = [0,1] | |
| random.shuffle(test_res) # fail-safe | |
| try: | |
| test_res, sentences = biasProbFunc(model, tokenizer, row['template'].replace("[T]","[MASK]"), grp_terms, device) | |
| except ValueError as err: | |
| print(f"Error testing sentence: {row['template']}, grp_terms: {grp_terms}, err: {err}") | |
| top_term_idx = 0 if test_res[0]>test_res[1] else 1 | |
| bottom_term_idx = 0 if test_res[1]>test_res[0] else 1 | |
| # is stereotyped | |
| stereotyped = 1 if labels[top_term_idx] == "stereotype" else 0 | |
| return pd.Series({"stereotyped": stereotyped, | |
| "top_term": grp_terms[top_term_idx], | |
| "bottom_term": grp_terms[bottom_term_idx], | |
| "top_logit": test_res[top_term_idx], | |
| "bottom_logit": test_res[bottom_term_idx]}) | |
| # Sampling attribute | |
| def sampleAttribute(df, att, n_per_att): | |
| att_rows = df.query("group_term == @att") | |
| # copy-paste all gens - no bootstrap | |
| #grp_bal = att_rows | |
| grp_bal = pd.DataFrame() | |
| if att_rows.shape[0] >= n_per_att: | |
| grp_bal = att_rows.sample(n_per_att) | |
| elif att_rows.shape[0] > 0 and att_rows.shape[0] < n_per_att: | |
| grp_bal = att_rows.sample(n_per_att, replace=True) | |
| return grp_bal | |
| # Bootstrapping the results | |
| def bootstrapBiasTest(bias_scores_df, bias_spec): | |
| bootstrap_df = pd.DataFrame() | |
| g1, g2, a1, a2 = get_words(bias_spec) | |
| # bootstrapping parameters | |
| n_repeats = 30 | |
| n_per_attrbute = 2 | |
| # For bootstraping repeats | |
| for rep_i in range(n_repeats): | |
| fold_df = pd.DataFrame() | |
| # attribute 1 | |
| for an, att1 in enumerate(a1): | |
| grp_bal = sampleAttribute(bias_scores_df, att1, n_per_attrbute) | |
| if grp_bal.shape[0] == 0: | |
| grp_bal = sampleAttribute(bias_scores_df, att1.replace(" ","-"), n_per_attrbute) | |
| if grp_bal.shape[0] > 0: | |
| fold_df = pd.concat([fold_df, grp_bal.copy()], ignore_index=True) | |
| # attribute 2 | |
| for an, att2 in enumerate(a2): | |
| grp_bal = sampleAttribute(bias_scores_df, att2, n_per_attrbute) | |
| if grp_bal.shape[0] == 0: | |
| grp_bal = sampleAttribute(bias_scores_df, att2.replace(" ","-"), n_per_attrbute) | |
| if grp_bal.shape[0] > 0: | |
| fold_df = pd.concat([fold_df, grp_bal.copy()], ignore_index=True) | |
| #if fold_df.shape[0]>0: | |
| # unnorm_model, norm_model, perBias_df = biasStatsFold(test_df) | |
| # print(f"Gen: {gen_model}, Test: {test_model} [{rep_i}], df-size: {test_df.shape[0]}, Model bias: {norm_model:0.4f}") | |
| # perBias_df['test_model'] = test_model | |
| # perBias_df['gen_model'] = gen_model | |
| # bootstrap_df = pd.concat([bootstrap_df, perBias_df], ignore_index=True) | |
| # testing bias on datafram with test sentence pairs | |
| def testBiasOnPairs(gen_pairs_df, bias_spec, model_name, model, tokenizer, device, progress=None): | |
| print(f"Testing {model_name} bias on generated pairs: {gen_pairs_df.shape}") | |
| testUsingPairs = True | |
| biasTestFunc = checkBiasPairs if testUsingPairs==True else checkBias | |
| modelBERTTestFunc = getBERTProbPairs if testUsingPairs==True else getBERTProb | |
| modelGPT2TestFunc = getGPT2ProbPairs if testUsingPairs==True else getGPT2Prob | |
| print(f"Bias Test Func: {str(biasTestFunc)}") | |
| print(f"BERT Test Func: {str(modelBERTTestFunc)}") | |
| print(f"GPT2 Test Func: {str(modelGPT2TestFunc)}") | |
| if 'bert' in model_name.lower(): | |
| print(f"Testing on BERT family model: {model_name}") | |
| gen_pairs_df[['stereotyped','top_term','bottom_term','top_logit','bottom_logit']] = gen_pairs_df.progress_apply( | |
| biasTestFunc, biasProbFunc=modelBERTTestFunc, model=model, tokenizer=tokenizer, device=device, progress=progress, df_len=gen_pairs_df.shape[0], axis=1) | |
| elif 'gpt' in model_name.lower(): | |
| print(f"Testing on GPT-2 family model: {model_name}") | |
| gen_pairs_df[['stereotyped','top_term','bottom_term','top_logit','bottom_logit']] = gen_pairs_df.progress_apply( | |
| biasTestFunc, biasProbFunc=modelGPT2TestFunc, model=model, tokenizer=tokenizer, device=device, progress=progress, df_len=gen_pairs_df.shape[0], axis=1) | |
| elif 'llama' in model_name.lower() or 'falcon' in model_name.lower(): | |
| print(f"Testing on LLAMA or FALCON family model: {model_name}") | |
| gen_pairs_df[['stereotyped','top_term','bottom_term','top_logit','bottom_logit']] = gen_pairs_df.progress_apply( | |
| biasTestFunc, biasProbFunc=modelGPT2TestFunc, model=model, tokenizer=tokenizer, device=device, progress=progress, df_len=gen_pairs_df.shape[0], axis=1) | |
| # Bootstrap | |
| print(f"BIAS ON PAIRS: {gen_pairs_df}") | |
| #bootstrapBiasTest(gen_pairs_df, bias_spec) | |
| grp_df = gen_pairs_df.groupby(['att_term'])['stereotyped'].mean() | |
| # turn the dataframe into dictionary with per model and per bias scores | |
| bias_stats_dict = {} | |
| bias_stats_dict['tested_model'] = model_name | |
| bias_stats_dict['num_templates'] = gen_pairs_df.shape[0] | |
| bias_stats_dict['model_bias'] = round(grp_df.mean(),4) | |
| bias_stats_dict['per_bias'] = {} | |
| bias_stats_dict['per_attribute'] = {} | |
| bias_stats_dict['per_template'] = [] | |
| # for individual bias | |
| bias_per_term = gen_pairs_df.groupby(["att_term"])['stereotyped'].mean() | |
| bias_stats_dict['per_bias'] = round(bias_per_term.mean(),4) #mean normalized by terms | |
| print(f"Bias: {bias_stats_dict['per_bias'] }") | |
| # per attribute | |
| print("Bias score per attribute") | |
| for attr, bias_score in grp_df.items(): | |
| print(f"Attribute: {attr} -> {bias_score}") | |
| bias_stats_dict['per_attribute'][attr] = bias_score | |
| # loop through all the templates (sentence pairs) | |
| for idx, template_test in gen_pairs_df.iterrows(): | |
| bias_stats_dict['per_template'].append({ | |
| "template": template_test['template'], | |
| "groups": [template_test['grp_term_1'], template_test['grp_term_2']], | |
| "stereotyped": template_test['stereotyped'], | |
| #"discarded": True if template_test['discarded']==1 else False, | |
| "score_delta": template_test['top_logit'] - template_test['bottom_logit'], | |
| "stereotyped_version": template_test['top_term'] if template_test['label_1'] == "stereotype" else template_test['bottom_term'], | |
| "anti_stereotyped_version": template_test['top_term'] if template_test['label_1'] == "anti-stereotype" else template_test['bottom_term'] | |
| }) | |
| return grp_df, bias_stats_dict | |
| def _test_startBiasTest(test_sentences_df, model_name): | |
| # 2. convert to templates | |
| test_sentences_df['Template'] = test_sentences_df.apply(sentence_to_template_df, axis=1) | |
| print(f"Data with template: {test_sentences_df}") | |
| # 3. convert to pairs | |
| test_pairs_df = convert2pairsFromDF(bias_spec, test_sentences_df) | |
| print(f"Test pairs: {test_pairs_df.head(3)}") | |
| # 4. get the per sentence bias scores | |
| print(f"Test model name: {model_name}") | |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| print(f"Device: {device}") | |
| tested_model, tested_tokenizer = _getModelSafe(model_name, device) | |
| #print(f"Mask token id: {tested_toknizer.mask_token_id}") | |
| if tested_tokenizer == None: | |
| print("Tokanizer is empty!!!") | |
| if tested_model == None: | |
| print("Model is empty!!!") | |
| # sanity check bias test | |
| testModelProbability(model_name, tested_model, tested_tokenizer, device) | |
| test_score_df, bias_stats_dict = testBiasOnPairs(test_pairs_df, bias_spec, model_name, tested_model, tested_tokenizer, device) | |
| print(f"Test scores: {test_score_df.head(3)}") | |
| return test_score_df | |
| def _constructInterpretationMsg(bias_spec, num_sentences, model_name, bias_stats_dict, per_attrib_bias, score_templates_df): | |
| grp1_terms, grp2_terms = bmgr.getSocialGroupTerms(bias_spec) | |
| att1_terms, att2_terms = bmgr.getAttributeTerms(bias_spec) | |
| total_att_terms = len(att1_terms) + len(att2_terms) | |
| interpret_msg = f"Test result on <b>{model_name}</b> using <b>{num_sentences}</b> sentences. " | |
| if num_sentences < total_att_terms or num_sentences < 20: | |
| interpret_msg += "We recommend generating more sentences to get more robust estimates! <br />" | |
| else: | |
| interpret_msg += "<br />" | |
| attrib_by_score = dict(sorted(per_attrib_bias.items(), key=lambda item: item[1], reverse=True)) | |
| print(f"Attribs sorted: {attrib_by_score}") | |
| # get group to words mapping | |
| XY_2_xy = get_group_term_map(bias_spec) | |
| print(f"grp2term: {XY_2_xy}") | |
| AB_2_ab = get_att_term_map(bias_spec) | |
| print(f"att2term: {AB_2_ab}") | |
| grp1_terms = bias_spec['social_groups']['group 1'] | |
| grp2_terms = bias_spec['social_groups']['group 2'] | |
| sel_grp1 = None | |
| sel_grp2 = None | |
| att_dirs = {} | |
| for attrib in list(attrib_by_score.keys()): | |
| att_label = None | |
| if checkinList(attrib, list(AB_2_ab.items())[0][1]): | |
| att_label = 0 | |
| elif checkinList(attrib, list(AB_2_ab.items())[1][1]): | |
| att_label = 1 | |
| else: | |
| print("Error!") | |
| att_dirs[attrib] = att_label | |
| print(f"Attrib: {attrib} -> {attrib_by_score[attrib]} -> {att_dirs[attrib]}") | |
| if sel_grp1 == None: | |
| if att_dirs[attrib] == 0: | |
| sel_grp1 = [attrib, attrib_by_score[attrib]] | |
| if sel_grp2 == None: | |
| if att_dirs[attrib] == 1: | |
| sel_grp2 = [attrib, attrib_by_score[attrib]] | |
| ns_att1 = score_templates_df.query(f"Attribute == '{sel_grp1[0]}'").shape[0] | |
| #<b>{ns_att1}</b> | |
| grp1_str = ', '.join([f'<b>\"{t}\"</b>' for t in grp1_terms[0:2]]) | |
| att1_msg = f"For the sentences including <b>\"{sel_grp1[0]}\"</b> the terms from Social Group 1 such as {grp1_str},... are more probable {sel_grp1[1]*100:2.0f}% of the time. " | |
| print(att1_msg) | |
| ns_att2 = score_templates_df.query(f"Attribute == '{sel_grp2[0]}'").shape[0] | |
| #<b>{ns_att2}</b> | |
| grp2_str = ', '.join([f'<b>\"{t}\"</b>' for t in grp2_terms[0:2]]) | |
| att2_msg = f"For the sentences including <b>\"{sel_grp2[0]}\"</b> the terms from Social Group 2 such as {grp2_str},... are more probable {sel_grp2[1]*100:2.0f}% of the time. " | |
| print(att2_msg) | |
| interpret_msg += f"<b>Interpretation:</b> Model chooses stereotyped version of the sentence {bias_stats_dict['model_bias']*100:2.0f}% of time. " | |
| #interpret_msg += f"It suggests that for the sentences including \"{list(per_attrib_bias.keys())[0]}\" the social group terms \"{bias_spec['social_groups']['group 1'][0]}\", ... are more probable {list(per_attrib_bias.values())[0]*100:2.0f}% of the time. " | |
| interpret_msg += "<br />" | |
| interpret_msg += "<div style=\"margin-top: 3px; margin-left: 3px\"><b>◼ </b>" + att1_msg + "<br /></div>" | |
| interpret_msg += "<div style=\"margin-top: 3px; margin-left: 3px; margin-bottom: 3px\"><b>◼ </b>" + att2_msg + "<br /></div>" | |
| interpret_msg += "Please examine the exact test sentences used below." | |
| interpret_msg += "<br />More details about Stereotype Score metric: <a href='https://arxiv.org/abs/2004.09456' target='_blank'>Nadeem et al. 2020<a>" | |
| return interpret_msg | |
| if __name__ == '__main__': | |
| print("Testing bias manager...") | |
| bias_spec = { | |
| "social_groups": { | |
| "group 1": ["brother", "father"], | |
| "group 2": ["sister", "mother"], | |
| }, | |
| "attributes": { | |
| "attribute 1": ["science", "technology"], | |
| "attribute 2": ["poetry", "art"] | |
| } | |
| } | |
| sentence_list = rq_mgr._getSavedSentences(bias_spec) | |
| sentence_df = pd.DataFrame(sentence_list, columns=["Test sentence","Group term","Attribute term"]) | |
| print(sentence_df) | |
| _test_startBiasTest(sentence_df, 'bert-base-uncased') | |