h2ogpt-chatbot2 / src /stopping.py
pseudotensor's picture
Update with h2oGPT hash 964011d09a2f696fca4cc476e53152d266d29351
a728cd0
import torch
from transformers import StoppingCriteria, StoppingCriteriaList
from enums import PromptType, t5_type
class StoppingCriteriaSub(StoppingCriteria):
def __init__(self, stops=[], stop_words=[], encounters=[], device="cuda", model_max_length=None, tokenizer=None):
super().__init__()
assert len(stops) % len(encounters) == 0, "Number of stops and encounters must match"
self.encounters = encounters
self.stops = [stop.to(device) for stop in stops]
self.stop_words = stop_words
self.num_stops = [0] * len(stops)
self.model_max_length = model_max_length
self.tokenizer = tokenizer
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
#if self.tokenizer:
# print('stop: %s' % self.tokenizer.decode(input_ids[0]), flush=True)
for stopi, (stop, stop_word) in enumerate(zip(self.stops, self.stop_words)):
current_block = input_ids[0][-len(stop):]
stop_text = self.tokenizer.decode(current_block)
len_new_tokens = current_block.shape[0]
#if len(stop) <= len_new_tokens and torch.all((stop == input_ids[0][-len(stop):])).item():
if len(stop) <= len_new_tokens and stop_word in stop_text:
self.num_stops[stopi] += 1
if self.num_stops[stopi] >= self.encounters[stopi % len(self.encounters)]:
# print("Stopped", flush=True)
return True
if self.model_max_length is not None and input_ids[0].shape[0] >= self.model_max_length:
# critical limit
return True
# print("Tokens: %s" % input_ids[0].cpu().numpy(), flush=True)
# print("Stop Tokens: %s" % [x.cpu().numpy() for x in self.stops], flush=True)
return False
def get_stopping(prompt_type, prompt_dict, tokenizer, device, base_model,
human='<human>:', bot="<bot>:", model_max_length=None,
prompter=None,
stop=None):
stop_words = []
encounters = []
# FIXME: prompt_dict unused currently
user_human_assistant_types = [PromptType.instruct_vicuna.value, str(PromptType.instruct_vicuna.value),
PromptType.instruct_vicuna.name] + \
[PromptType.guanaco.value, str(PromptType.guanaco.value),
PromptType.guanaco.name] + \
[PromptType.one_shot.value, str(PromptType.one_shot.value),
PromptType.one_shot.name] + \
[PromptType.instruct_vicuna2.value, str(PromptType.instruct_vicuna2.value),
PromptType.instruct_vicuna2.name] + \
[PromptType.instruct_vicuna3.value, str(PromptType.instruct_vicuna3.value),
PromptType.instruct_vicuna3.name] + \
[PromptType.instruct_with_end.value, str(PromptType.instruct_with_end.value),
PromptType.instruct_with_end.name]
human_bot_types = [PromptType.human_bot.value, str(PromptType.human_bot.value),
PromptType.human_bot.name] + \
[PromptType.human_bot_orig.value, str(PromptType.human_bot_orig.value),
PromptType.human_bot_orig.name]
all_types = user_human_assistant_types + human_bot_types
if prompt_type in all_types:
if prompt_type in human_bot_types:
# encounters = [prompt.count(human) + 1, prompt.count(bot) + 1]
# stopping only starts once output is beyond prompt
# 1 human is enough to trigger, but need 2 bots, because very first view back will be bot we added
stop_words = [human, bot, '\n' + human, '\n' + bot]
encounters = [1, 2]
elif prompt_type in user_human_assistant_types:
# even below is not enough, generic strings and many ways to encode
stop_words = [
'### Human:',
"""
### Human:""",
"""
### Human:
""",
"""### Human: """,
"""### Human:""",
'### Assistant:',
"""
### Assistant:""",
"""
### Assistant:
""",
"""### Assistant: """,
"""### Assistant:"""
]
if prompt_type in [PromptType.instruct_vicuna2.value, str(PromptType.instruct_vicuna2.value),
PromptType.instruct_vicuna2.name]:
stop_words = [x.upper() for x in stop_words]
if prompt_type in [PromptType.instruct_vicuna3.value, str(PromptType.instruct_vicuna3.value),
PromptType.instruct_vicuna3.name]:
stop_words = [x.replace('Human', 'User') for x in stop_words]
encounters = [1, 2]
else:
# some instruct prompts have this as end, doesn't hurt to stop on it since not common otherwise
stop_words = ['### End']
encounters = [1]
elif prompter and prompter.terminate_response:
stop_words = prompter.terminate_response
encounters = [1] * len(stop_words)
handle_newlines = [True] * len(stop_words)
# add other stop words too if passed, e.g. for LangChain agents
if stop:
stop_words += stop
encounters += [1] * len(stop)
handle_newlines += [False] * len(stop)
# get stop tokens
stop_words_ids = [
tokenizer(stop_word, return_tensors='pt')['input_ids'].squeeze() for stop_word in stop_words]
# handle single token case
stop_words_ids = [x if len(x.shape) > 0 else torch.tensor([x]) for x in stop_words_ids]
stop_words_ids = [x for x in stop_words_ids if x.shape[0] > 0]
# avoid padding in front of tokens
if tokenizer._pad_token: # use hidden variable to avoid annoying properly logger bug
stop_words_ids = [x[1:] if x[0] == tokenizer.pad_token_id and len(x) > 1 else x for x in stop_words_ids]
if tokenizer._unk_token: # use hidden variable to avoid annoying properly logger bug
stop_words_ids = [x[1:] if x[0] == tokenizer.unk_token_id and len(x) > 1 else x for x in stop_words_ids]
stop_words_ids = [x[:-1] if x[-1] == tokenizer.unk_token_id and len(x) > 1 else x for x in stop_words_ids]
if tokenizer._eos_token: # use hidden variable to avoid annoying properly logger bug
stop_words_ids = [x[:-1] if x[-1] == tokenizer.eos_token_id and len(x) > 1 else x for x in stop_words_ids]
if tokenizer._bos_token: # use hidden variable to avoid annoying properly logger bug
stop_words_ids = [x[1:] if x[0] == tokenizer.bos_token_id and len(x) > 1 else x for x in stop_words_ids]
stop_words_ids = [x[:-1] if x[-1] == tokenizer.bos_token_id and len(x) > 1 else x for x in stop_words_ids]
if base_model and t5_type(base_model):
# T5 encoder converts internal double space to space+new line, so fix
for stopi, stop_word_id in enumerate(stop_words_ids):
start = stop_word_id[0:1]
mlist = stop_word_id[1:-1]
end = stop_word_id[-1:]
mlist = [tokenizer.vocab[' '] if x == tokenizer.vocab['\n'] else x for x in mlist]
stop_words_ids[stopi] = torch.tensor(list(start) + list(mlist) + list(end), device=stop_word_id.device)
# handle fake \n added
stop_words_ids = [x[1:] if y[0] == '\n' and handle_newline else x for x, y, handle_newline in
zip(stop_words_ids, stop_words, handle_newlines)]
if stop_words_ids:
# build stopper
stopping_criteria = StoppingCriteriaList(
[StoppingCriteriaSub(stops=stop_words_ids,
stop_words=stop_words,
encounters=encounters, device=device,
model_max_length=model_max_length, tokenizer=tokenizer)])
else:
# nothing to stop on
stopping_criteria = StoppingCriteriaList()
return stopping_criteria