|
|
|
""" |
|
@author:XuMing([email protected]) |
|
@description: refer https://github.com/ThilinaRajapakse/simpletransformers |
|
""" |
|
|
|
import math |
|
import os |
|
import random |
|
import warnings |
|
from dataclasses import asdict |
|
from multiprocessing import Pool |
|
|
|
import numpy as np |
|
import pandas as pd |
|
import torch |
|
from loguru import logger |
|
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler |
|
from torch.utils.tensorboard import SummaryWriter |
|
from tqdm.auto import tqdm, trange |
|
from transformers import ByT5Tokenizer |
|
from transformers import MT5Config, MT5ForConditionalGeneration |
|
from transformers import T5Config, T5ForConditionalGeneration, T5Tokenizer, TextStreamer |
|
from transformers.optimization import AdamW, Adafactor |
|
from transformers.optimization import ( |
|
get_constant_schedule, |
|
get_constant_schedule_with_warmup, |
|
get_linear_schedule_with_warmup, |
|
get_cosine_schedule_with_warmup, |
|
get_cosine_with_hard_restarts_schedule_with_warmup, |
|
get_polynomial_decay_schedule_with_warmup, |
|
) |
|
|
|
from t5.config.model_args import T5Args |
|
from t5.t5_utils import T5Dataset, load_hf_dataset |
|
|
|
try: |
|
import wandb |
|
|
|
wandb_available = True |
|
except ImportError: |
|
wandb_available = False |
|
|
|
has_cuda = torch.cuda.is_available() |
|
os.environ["TOKENIZERS_PARALLELISM"] = "FALSE" |
|
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" |
|
|
|
|
|
def chunks(lst, n): |
|
"""Yield successive n-sized chunks from lst.""" |
|
for i in range(0, len(lst), n): |
|
yield lst[i: i + n] |
|
|
|
|
|
MODEL_CLASSES = { |
|
"t5": (T5Config, T5ForConditionalGeneration), |
|
"mt5": (MT5Config, MT5ForConditionalGeneration), |
|
"byt5": (T5Config, T5ForConditionalGeneration), |
|
} |
|
|
|
|
|
class T5Model: |
|
def __init__( |
|
self, |
|
model_type, |
|
model_name, |
|
args=None, |
|
tokenizer=None, |
|
use_cuda=has_cuda, |
|
cuda_device=-1, |
|
evaluate=False, |
|
**kwargs, |
|
): |
|
|
|
""" |
|
Initializes a T5Model model. |
|
|
|
Args: |
|
model_type: The type of model (t5, mt5, byt5) |
|
model_name: The exact architecture and trained weights to use. This may be a Hugging Face Transformers compatible pre-trained model, a community model, or the path to a directory containing model files. |
|
args (optional): Default args will be used if this parameter is not provided. If provided, it should be a dict containing the args that should be changed in the default args. |
|
use_cuda (optional): Use GPU if available. Setting to False will force model to use CPU only. |
|
cuda_device (optional): Specific GPU that should be used. Will use the first available GPU by default. |
|
**kwargs (optional): For providing proxies, force_download, resume_download, cache_dir and other options specific to the 'from_pretrained' implementation where this will be supplied. |
|
""" |
|
|
|
self.args = self._load_model_args(model_name) |
|
|
|
if isinstance(args, dict): |
|
self.args.update_from_dict(args) |
|
elif isinstance(args, T5Args): |
|
self.args = args |
|
|
|
self.is_sweeping = False |
|
|
|
if self.args.manual_seed: |
|
random.seed(self.args.manual_seed) |
|
np.random.seed(self.args.manual_seed) |
|
torch.manual_seed(self.args.manual_seed) |
|
if self.args.n_gpu > 0: |
|
torch.cuda.manual_seed_all(self.args.manual_seed) |
|
|
|
if use_cuda: |
|
if torch.cuda.is_available(): |
|
if cuda_device == -1: |
|
self.device = torch.device("cuda") |
|
else: |
|
self.device = torch.device(f"cuda:{cuda_device}") |
|
else: |
|
raise ValueError( |
|
"'use_cuda' set to True when cuda is unavailable." |
|
"Make sure CUDA is available or set `use_cuda=False`." |
|
) |
|
else: |
|
if torch.backends.mps.is_available(): |
|
self.device = torch.device("mps") |
|
else: |
|
self.device = "cpu" |
|
logger.debug(f"Device: {self.device}") |
|
|
|
self.results = {} |
|
|
|
config_class, model_class = MODEL_CLASSES[model_type] |
|
|
|
if model_name is None: |
|
self.config = self.args.config |
|
self.model = model_class(config=self.config) |
|
else: |
|
self.config = config_class.from_pretrained(model_name, **self.args.config) |
|
self.model = model_class.from_pretrained(model_name, config=self.config) |
|
|
|
if isinstance(tokenizer, T5Tokenizer): |
|
self.tokenizer = tokenizer |
|
self.model.resize_token_embeddings(len(self.tokenizer)) |
|
elif model_type == "byt5": |
|
self.tokenizer = ByT5Tokenizer.from_pretrained(model_name, truncate=True) |
|
else: |
|
self.tokenizer = T5Tokenizer.from_pretrained(model_name, truncate=True) |
|
print(len(self.tokenizer)) |
|
if not evaluate: |
|
with open('./data/字音混淆集_s13.txt', 'r', encoding='utf-8') as confusion: |
|
n = 0 |
|
for line in confusion.readlines()+[str(chr(c+65248)) for c in range(33, 127)]: |
|
token = line.split(' ')[0] |
|
n+=1 |
|
self.tokenizer.add_tokens([token]) |
|
with open('./data/字音混淆集.txt', 'r', encoding='utf-8') as confusion: |
|
for line in confusion.readlines(): |
|
token = line.split(' ')[0] |
|
n+=1 |
|
self.tokenizer.add_tokens([token]) |
|
with open('./data/wordtest4.txt', 'r', encoding='utf-8') as confusion: |
|
for line in confusion.readlines(): |
|
token = line.split(',')[0] |
|
n+=1 |
|
self.tokenizer.add_tokens([token]) |
|
|
|
with open('./data/vocab.txt', 'r', encoding='utf-8') as confusion: |
|
for line in confusion.readlines(): |
|
n+=1 |
|
self.tokenizer.add_tokens([line.replace('\n', '')]) |
|
|
|
print(n) |
|
self.streamer = TextStreamer(self.tokenizer) |
|
print(len(self.tokenizer)) |
|
self.model.resize_token_embeddings(len(self.tokenizer)) |
|
|
|
if self.args.dynamic_quantize: |
|
self.model = torch.quantization.quantize_dynamic( |
|
self.model, {torch.nn.Linear}, dtype=torch.qint8 |
|
) |
|
|
|
if not use_cuda: |
|
self.args.fp16 = False |
|
|
|
if self.args.special_tokens_list: |
|
self.tokenizer.add_tokens( |
|
self.args.special_tokens_list, special_tokens=True |
|
) |
|
self.model.resize_token_embeddings(len(self.tokenizer)) |
|
|
|
self.args.model_type = model_type |
|
if model_name is None: |
|
self.args.model_name = "T5_from_scratch" |
|
else: |
|
self.args.model_name = model_name |
|
|
|
if self.args.wandb_project and not wandb_available: |
|
warnings.warn( |
|
"wandb_project specified but wandb is not available. Wandb disabled." |
|
) |
|
self.args.wandb_project = None |
|
|
|
def train_model( |
|
self, |
|
train_data, |
|
output_dir=None, |
|
show_running_loss=True, |
|
args=None, |
|
eval_data=None, |
|
verbose=True, |
|
**kwargs, |
|
): |
|
""" |
|
Trains the model using 'train_data' |
|
|
|
Args: |
|
train_data: Pandas DataFrame containing the 3 columns - `prefix`, `input_text`, `target_text`. |
|
- `prefix`: A string indicating the task to perform. (E.g. `"question"`, `"stsb"`) |
|
- `input_text`: The input text sequence. `prefix` is automatically prepended to form the full input. (<prefix>: <input_text>) |
|
- `target_text`: The target sequence |
|
output_dir: The directory where model files will be saved. If not given, self.args.output_dir will be used. |
|
show_running_loss (optional): Set to False to prevent running loss from being printed to console. Defaults to True. |
|
args (optional): Optional changes to the args dict of the model. Any changes made will persist for the model. |
|
eval_data (optional): A DataFrame against which evaluation will be performed when evaluate_during_training is enabled. Is required if evaluate_during_training is enabled. |
|
**kwargs: Additional metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use). |
|
A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs |
|
will be lists of strings. Note that this will slow down training significantly as the predicted sequences need to be generated. |
|
|
|
Returns: |
|
global_step: Number of global steps trained |
|
training_details: Average training loss if evaluate_during_training is False or full training progress scores if evaluate_during_training is True |
|
""" |
|
|
|
if args: |
|
self.args.update_from_dict(args) |
|
if self.args.evaluate_during_training and eval_data is None: |
|
raise ValueError( |
|
"evaluate_during_training is enabled but eval_data is not specified." |
|
" Pass eval_data to model.train_model() if using evaluate_during_training." |
|
) |
|
|
|
if not output_dir: |
|
output_dir = self.args.output_dir |
|
|
|
if ( |
|
os.path.exists(output_dir) |
|
and os.listdir(output_dir) |
|
and not self.args.overwrite_output_dir |
|
): |
|
raise ValueError( |
|
"Output directory ({}) already exists and is not empty." |
|
" Set args.overwrite_output_dir = True to overcome.".format(output_dir) |
|
) |
|
|
|
self._move_model_to_device() |
|
|
|
train_dataset = self.load_and_cache_examples(train_data, verbose=verbose) |
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
global_step, training_details = self.train( |
|
train_dataset, |
|
output_dir, |
|
show_running_loss=show_running_loss, |
|
eval_data=eval_data, |
|
verbose=verbose, |
|
**kwargs, |
|
) |
|
|
|
self.save_model(model=self.model) |
|
|
|
if verbose: |
|
logger.info( |
|
" Training of {} model complete. Saved to {}.".format( |
|
self.args.model_name, output_dir |
|
) |
|
) |
|
|
|
return global_step, training_details |
|
|
|
def train( |
|
self, |
|
train_dataset, |
|
output_dir, |
|
show_running_loss=True, |
|
eval_data=None, |
|
verbose=True, |
|
**kwargs, |
|
): |
|
""" |
|
Trains the model on train_dataset. |
|
|
|
Utility function to be used by the train_model() method. Not intended to be used directly. |
|
""" |
|
|
|
model = self.model |
|
args = self.args |
|
device = self.device |
|
|
|
tb_writer = SummaryWriter(log_dir=args.tensorboard_dir) |
|
train_sampler = RandomSampler(train_dataset) |
|
train_dataloader = DataLoader( |
|
train_dataset, |
|
sampler=train_sampler, |
|
batch_size=args.train_batch_size, |
|
num_workers=self.args.dataloader_num_workers, |
|
) |
|
|
|
if args.max_steps > 0: |
|
t_total = args.max_steps |
|
args.num_train_epochs = ( |
|
args.max_steps |
|
// (len(train_dataloader) // args.gradient_accumulation_steps) |
|
+ 1 |
|
) |
|
else: |
|
t_total = ( |
|
len(train_dataloader) |
|
// args.gradient_accumulation_steps |
|
* args.num_train_epochs |
|
) |
|
|
|
no_decay = ["bias", "LayerNorm.weight"] |
|
|
|
optimizer_grouped_parameters = [] |
|
custom_parameter_names = set() |
|
for group in self.args.custom_parameter_groups: |
|
params = group.pop("params") |
|
custom_parameter_names.update(params) |
|
param_group = {**group} |
|
param_group["params"] = [ |
|
p for n, p in model.named_parameters() if n in params |
|
] |
|
optimizer_grouped_parameters.append(param_group) |
|
|
|
for group in self.args.custom_layer_parameters: |
|
layer_number = group.pop("layer") |
|
layer = f"layer.{layer_number}." |
|
group_d = {**group} |
|
group_nd = {**group} |
|
group_nd["weight_decay"] = 0.0 |
|
params_d = [] |
|
params_nd = [] |
|
for n, p in model.named_parameters(): |
|
if n not in custom_parameter_names and layer in n: |
|
if any(nd in n for nd in no_decay): |
|
params_nd.append(p) |
|
else: |
|
params_d.append(p) |
|
custom_parameter_names.add(n) |
|
group_d["params"] = params_d |
|
group_nd["params"] = params_nd |
|
|
|
optimizer_grouped_parameters.append(group_d) |
|
optimizer_grouped_parameters.append(group_nd) |
|
|
|
if not self.args.train_custom_parameters_only: |
|
optimizer_grouped_parameters.extend( |
|
[ |
|
{ |
|
"params": [ |
|
p |
|
for n, p in model.named_parameters() |
|
if n not in custom_parameter_names |
|
and not any(nd in n for nd in no_decay) |
|
], |
|
"weight_decay": args.weight_decay, |
|
}, |
|
{ |
|
"params": [ |
|
p |
|
for n, p in model.named_parameters() |
|
if n not in custom_parameter_names |
|
and any(nd in n for nd in no_decay) |
|
], |
|
"weight_decay": 0.0, |
|
}, |
|
] |
|
) |
|
|
|
warmup_steps = math.ceil(t_total * args.warmup_ratio) |
|
args.warmup_steps = ( |
|
warmup_steps if args.warmup_steps == 0 else args.warmup_steps |
|
) |
|
|
|
if args.optimizer == "AdamW": |
|
optimizer = AdamW( |
|
optimizer_grouped_parameters, |
|
lr=args.learning_rate, |
|
eps=args.adam_epsilon, |
|
) |
|
elif args.optimizer == "Adafactor": |
|
optimizer = Adafactor( |
|
optimizer_grouped_parameters, |
|
lr=args.learning_rate, |
|
eps=args.adafactor_eps, |
|
clip_threshold=args.adafactor_clip_threshold, |
|
decay_rate=args.adafactor_decay_rate, |
|
beta1=args.adafactor_beta1, |
|
weight_decay=args.weight_decay, |
|
scale_parameter=args.adafactor_scale_parameter, |
|
relative_step=args.adafactor_relative_step, |
|
warmup_init=args.adafactor_warmup_init, |
|
) |
|
|
|
else: |
|
raise ValueError( |
|
"{} is not a valid optimizer class. Please use one of ('AdamW', 'Adafactor') instead.".format( |
|
args.optimizer |
|
) |
|
) |
|
|
|
if args.scheduler == "constant_schedule": |
|
scheduler = get_constant_schedule(optimizer) |
|
|
|
elif args.scheduler == "constant_schedule_with_warmup": |
|
scheduler = get_constant_schedule_with_warmup( |
|
optimizer, num_warmup_steps=args.warmup_steps |
|
) |
|
|
|
elif args.scheduler == "linear_schedule_with_warmup": |
|
scheduler = get_linear_schedule_with_warmup( |
|
optimizer, |
|
num_warmup_steps=args.warmup_steps, |
|
num_training_steps=t_total, |
|
) |
|
|
|
elif args.scheduler == "cosine_schedule_with_warmup": |
|
scheduler = get_cosine_schedule_with_warmup( |
|
optimizer, |
|
num_warmup_steps=args.warmup_steps, |
|
num_training_steps=t_total, |
|
num_cycles=args.cosine_schedule_num_cycles, |
|
) |
|
|
|
elif args.scheduler == "cosine_with_hard_restarts_schedule_with_warmup": |
|
scheduler = get_cosine_with_hard_restarts_schedule_with_warmup( |
|
optimizer, |
|
num_warmup_steps=args.warmup_steps, |
|
num_training_steps=t_total, |
|
num_cycles=args.cosine_schedule_num_cycles, |
|
) |
|
|
|
elif args.scheduler == "polynomial_decay_schedule_with_warmup": |
|
scheduler = get_polynomial_decay_schedule_with_warmup( |
|
optimizer, |
|
num_warmup_steps=args.warmup_steps, |
|
num_training_steps=t_total, |
|
lr_end=args.polynomial_decay_schedule_lr_end, |
|
power=args.polynomial_decay_schedule_power, |
|
) |
|
|
|
else: |
|
raise ValueError("{} is not a valid scheduler.".format(args.scheduler)) |
|
|
|
if ( |
|
args.model_name |
|
and os.path.isfile(os.path.join(args.model_name, "optimizer.pt")) |
|
and os.path.isfile(os.path.join(args.model_name, "scheduler.pt")) |
|
): |
|
|
|
optimizer.load_state_dict( |
|
torch.load(os.path.join(args.model_name, "optimizer.pt")) |
|
) |
|
scheduler.load_state_dict( |
|
torch.load(os.path.join(args.model_name, "scheduler.pt")) |
|
) |
|
|
|
if args.n_gpu > 1: |
|
model = torch.nn.DataParallel(model) |
|
|
|
logger.info(" Training started") |
|
|
|
global_step = 0 |
|
training_progress_scores = None |
|
tr_loss, logging_loss = 0.0, 0.0 |
|
model.zero_grad() |
|
train_iterator = trange( |
|
int(args.num_train_epochs), desc="Epoch", disable=args.silent, mininterval=0 |
|
) |
|
epoch_number = 0 |
|
best_eval_metric = None |
|
early_stopping_counter = 0 |
|
steps_trained_in_current_epoch = 0 |
|
epochs_trained = 0 |
|
|
|
if args.model_name and os.path.exists(args.model_name): |
|
try: |
|
|
|
checkpoint_suffix = args.model_name.split("/")[-1].split("-") |
|
if len(checkpoint_suffix) > 2: |
|
checkpoint_suffix = checkpoint_suffix[1] |
|
else: |
|
checkpoint_suffix = checkpoint_suffix[-1] |
|
global_step = int(checkpoint_suffix) |
|
epochs_trained = global_step // ( |
|
len(train_dataloader) // args.gradient_accumulation_steps |
|
) |
|
steps_trained_in_current_epoch = global_step % ( |
|
len(train_dataloader) // args.gradient_accumulation_steps |
|
) |
|
|
|
logger.info( |
|
" Continuing training from checkpoint, will skip to saved global_step" |
|
) |
|
logger.info(" Continuing training from epoch %d", epochs_trained) |
|
logger.info(" Continuing training from global step %d", global_step) |
|
logger.info( |
|
" Will skip the first %d steps in the current epoch", |
|
steps_trained_in_current_epoch, |
|
) |
|
except ValueError: |
|
logger.info(" Starting fine-tuning.") |
|
|
|
if args.evaluate_during_training: |
|
training_progress_scores = self._create_training_progress_scores(**kwargs) |
|
|
|
if args.wandb_project: |
|
wandb.init( |
|
project=args.wandb_project, |
|
config={**asdict(args)}, |
|
**args.wandb_kwargs, |
|
) |
|
wandb.run._label(repo="textgen") |
|
wandb.watch(self.model) |
|
self.wandb_run_id = wandb.run.id |
|
|
|
if args.fp16: |
|
from torch.cuda import amp |
|
|
|
scaler = amp.GradScaler() |
|
|
|
for current_epoch in train_iterator: |
|
model.train() |
|
if epochs_trained > 0: |
|
epochs_trained -= 1 |
|
continue |
|
train_iterator.set_description( |
|
f"Epoch {epoch_number + 1} of {args.num_train_epochs}" |
|
) |
|
batch_iterator = tqdm( |
|
train_dataloader, |
|
desc=f"Running Epoch {epoch_number} of {args.num_train_epochs}", |
|
disable=args.silent, |
|
mininterval=0, |
|
) |
|
for step, batch in enumerate(batch_iterator): |
|
if steps_trained_in_current_epoch > 0: |
|
steps_trained_in_current_epoch -= 1 |
|
continue |
|
|
|
inputs = self._get_inputs_dict(batch) |
|
if args.fp16: |
|
with amp.autocast(): |
|
outputs = model(**inputs) |
|
|
|
loss = outputs[0] |
|
else: |
|
outputs = model(**inputs) |
|
|
|
loss = outputs[0] |
|
|
|
if args.n_gpu > 1: |
|
loss = ( |
|
loss.mean() |
|
) |
|
|
|
current_loss = loss.item() |
|
|
|
if show_running_loss: |
|
batch_iterator.set_description( |
|
f"Epochs {epoch_number}/{args.num_train_epochs}. Running Loss: {current_loss:9.4f}" |
|
) |
|
|
|
if args.gradient_accumulation_steps > 1: |
|
loss = loss / args.gradient_accumulation_steps |
|
|
|
if args.fp16: |
|
scaler.scale(loss).backward() |
|
else: |
|
loss.backward() |
|
|
|
tr_loss += loss.item() |
|
if (step + 1) % args.gradient_accumulation_steps == 0: |
|
if args.fp16: |
|
scaler.unscale_(optimizer) |
|
if args.optimizer == "AdamW": |
|
torch.nn.utils.clip_grad_norm_( |
|
model.parameters(), args.max_grad_norm |
|
) |
|
|
|
if args.fp16: |
|
scaler.step(optimizer) |
|
scaler.update() |
|
else: |
|
optimizer.step() |
|
scheduler.step() |
|
model.zero_grad() |
|
global_step += 1 |
|
|
|
if args.logging_steps > 0 and global_step % args.logging_steps == 0: |
|
|
|
tb_writer.add_scalar( |
|
"lr", scheduler.get_last_lr()[0], global_step |
|
) |
|
tb_writer.add_scalar( |
|
"loss", |
|
(tr_loss - logging_loss) / args.logging_steps, |
|
global_step, |
|
) |
|
logging_loss = tr_loss |
|
if args.wandb_project or self.is_sweeping: |
|
wandb.log( |
|
{ |
|
"Training loss": current_loss, |
|
"lr": scheduler.get_last_lr()[0], |
|
"global_step": global_step, |
|
} |
|
) |
|
|
|
if args.save_steps > 0 and global_step % args.save_steps == 0: |
|
|
|
output_dir_current = os.path.join( |
|
output_dir, "checkpoint-{}".format(global_step) |
|
) |
|
|
|
self.save_model( |
|
output_dir_current, optimizer, scheduler, model=model |
|
) |
|
|
|
if args.evaluate_during_training and ( |
|
args.evaluate_during_training_steps > 0 |
|
and global_step % args.evaluate_during_training_steps == 0 |
|
): |
|
|
|
results = self.eval_model( |
|
eval_data, |
|
verbose=verbose and args.evaluate_during_training_verbose, |
|
silent=args.evaluate_during_training_silent, |
|
**kwargs, |
|
) |
|
for key, value in results.items(): |
|
try: |
|
tb_writer.add_scalar( |
|
"eval_{}".format(key), value, global_step |
|
) |
|
except (NotImplementedError, AssertionError): |
|
pass |
|
|
|
output_dir_current = os.path.join( |
|
output_dir, "checkpoint-{}".format(global_step) |
|
) |
|
|
|
if args.save_eval_checkpoints: |
|
self.save_model( |
|
output_dir_current, |
|
optimizer, |
|
scheduler, |
|
model=model, |
|
results=results, |
|
) |
|
|
|
training_progress_scores["global_step"].append(global_step) |
|
training_progress_scores["train_loss"].append(current_loss) |
|
for key in results: |
|
training_progress_scores[key].append(results[key]) |
|
report = pd.DataFrame(training_progress_scores) |
|
report.to_csv( |
|
os.path.join( |
|
args.output_dir, "training_progress_scores.csv" |
|
), |
|
index=False, |
|
) |
|
|
|
if args.wandb_project or self.is_sweeping: |
|
wandb.log(self._get_last_metrics(training_progress_scores)) |
|
|
|
if not best_eval_metric: |
|
best_eval_metric = results[args.early_stopping_metric] |
|
self.save_model( |
|
args.best_model_dir, |
|
optimizer, |
|
scheduler, |
|
model=model, |
|
results=results, |
|
) |
|
if best_eval_metric and args.early_stopping_metric_minimize: |
|
if ( |
|
results[args.early_stopping_metric] - best_eval_metric |
|
< args.early_stopping_delta |
|
): |
|
best_eval_metric = results[args.early_stopping_metric] |
|
self.save_model( |
|
args.best_model_dir, |
|
optimizer, |
|
scheduler, |
|
model=model, |
|
results=results, |
|
) |
|
early_stopping_counter = 0 |
|
else: |
|
if args.use_early_stopping: |
|
if ( |
|
early_stopping_counter |
|
< args.early_stopping_patience |
|
): |
|
early_stopping_counter += 1 |
|
if verbose: |
|
logger.info( |
|
f" No improvement in {args.early_stopping_metric}" |
|
) |
|
logger.info( |
|
f" Current step: {early_stopping_counter}" |
|
) |
|
logger.info( |
|
f" Early stopping patience: {args.early_stopping_patience}" |
|
) |
|
else: |
|
if verbose: |
|
logger.info( |
|
f" Patience of {args.early_stopping_patience} steps reached" |
|
) |
|
logger.info(" Training terminated.") |
|
train_iterator.close() |
|
return ( |
|
global_step, |
|
tr_loss / global_step |
|
if not self.args.evaluate_during_training |
|
else training_progress_scores, |
|
) |
|
else: |
|
if ( |
|
results[args.early_stopping_metric] - best_eval_metric |
|
> args.early_stopping_delta |
|
): |
|
best_eval_metric = results[args.early_stopping_metric] |
|
self.save_model( |
|
args.best_model_dir, |
|
optimizer, |
|
scheduler, |
|
model=model, |
|
results=results, |
|
) |
|
early_stopping_counter = 0 |
|
else: |
|
if args.use_early_stopping: |
|
if ( |
|
early_stopping_counter |
|
< args.early_stopping_patience |
|
): |
|
early_stopping_counter += 1 |
|
if verbose: |
|
logger.info( |
|
f" No improvement in {args.early_stopping_metric}" |
|
) |
|
logger.info( |
|
f" Current step: {early_stopping_counter}" |
|
) |
|
logger.info( |
|
f" Early stopping patience: {args.early_stopping_patience}" |
|
) |
|
else: |
|
if verbose: |
|
logger.info( |
|
f" Patience of {args.early_stopping_patience} steps reached" |
|
) |
|
logger.info(" Training terminated.") |
|
train_iterator.close() |
|
return ( |
|
global_step, |
|
tr_loss / global_step |
|
if not self.args.evaluate_during_training |
|
else training_progress_scores, |
|
) |
|
model.train() |
|
|
|
epoch_number += 1 |
|
output_dir_current = os.path.join( |
|
output_dir, "checkpoint-{}-epoch-{}".format(global_step, epoch_number) |
|
) |
|
|
|
if args.save_model_every_epoch: |
|
self.save_model(output_dir_current, optimizer, scheduler, model=model) |
|
|
|
if args.evaluate_during_training and args.evaluate_each_epoch: |
|
results = self.eval_model( |
|
eval_data, |
|
verbose=verbose and args.evaluate_during_training_verbose, |
|
silent=args.evaluate_during_training_silent, |
|
**kwargs, |
|
) |
|
|
|
if args.save_eval_checkpoints: |
|
self.save_model( |
|
output_dir_current, optimizer, scheduler, results=results |
|
) |
|
|
|
training_progress_scores["global_step"].append(global_step) |
|
training_progress_scores["train_loss"].append(current_loss) |
|
for key in results: |
|
training_progress_scores[key].append(results[key]) |
|
report = pd.DataFrame(training_progress_scores) |
|
report.to_csv( |
|
os.path.join(args.output_dir, "training_progress_scores.csv"), |
|
index=False, |
|
) |
|
|
|
if args.wandb_project or self.is_sweeping: |
|
wandb.log(self._get_last_metrics(training_progress_scores)) |
|
|
|
if not best_eval_metric: |
|
best_eval_metric = results[args.early_stopping_metric] |
|
self.save_model( |
|
args.best_model_dir, |
|
optimizer, |
|
scheduler, |
|
model=model, |
|
results=results, |
|
) |
|
if best_eval_metric and args.early_stopping_metric_minimize: |
|
if ( |
|
results[args.early_stopping_metric] - best_eval_metric |
|
< args.early_stopping_delta |
|
): |
|
best_eval_metric = results[args.early_stopping_metric] |
|
self.save_model( |
|
args.best_model_dir, |
|
optimizer, |
|
scheduler, |
|
model=model, |
|
results=results, |
|
) |
|
early_stopping_counter = 0 |
|
else: |
|
if ( |
|
args.use_early_stopping |
|
and args.early_stopping_consider_epochs |
|
): |
|
if early_stopping_counter < args.early_stopping_patience: |
|
early_stopping_counter += 1 |
|
if verbose: |
|
logger.info( |
|
f" No improvement in {args.early_stopping_metric}" |
|
) |
|
logger.info( |
|
f" Current step: {early_stopping_counter}" |
|
) |
|
logger.info( |
|
f" Early stopping patience: {args.early_stopping_patience}" |
|
) |
|
else: |
|
if verbose: |
|
logger.info( |
|
f" Patience of {args.early_stopping_patience} steps reached" |
|
) |
|
logger.info(" Training terminated.") |
|
train_iterator.close() |
|
return ( |
|
global_step, |
|
tr_loss / global_step |
|
if not self.args.evaluate_during_training |
|
else training_progress_scores, |
|
) |
|
else: |
|
if ( |
|
results[args.early_stopping_metric] - best_eval_metric |
|
> args.early_stopping_delta |
|
): |
|
best_eval_metric = results[args.early_stopping_metric] |
|
self.save_model( |
|
args.best_model_dir, |
|
optimizer, |
|
scheduler, |
|
model=model, |
|
results=results, |
|
) |
|
early_stopping_counter = 0 |
|
else: |
|
if ( |
|
args.use_early_stopping |
|
and args.early_stopping_consider_epochs |
|
): |
|
if early_stopping_counter < args.early_stopping_patience: |
|
early_stopping_counter += 1 |
|
if verbose: |
|
logger.info( |
|
f" No improvement in {args.early_stopping_metric}" |
|
) |
|
logger.info( |
|
f" Current step: {early_stopping_counter}" |
|
) |
|
logger.info( |
|
f" Early stopping patience: {args.early_stopping_patience}" |
|
) |
|
else: |
|
if verbose: |
|
logger.info( |
|
f" Patience of {args.early_stopping_patience} steps reached" |
|
) |
|
logger.info(" Training terminated.") |
|
train_iterator.close() |
|
return ( |
|
global_step, |
|
tr_loss / global_step |
|
if not self.args.evaluate_during_training |
|
else training_progress_scores, |
|
) |
|
|
|
return ( |
|
global_step, |
|
tr_loss / global_step |
|
if not self.args.evaluate_during_training |
|
else training_progress_scores, |
|
) |
|
|
|
def eval_model( |
|
self, eval_data, output_dir=None, verbose=True, silent=False, **kwargs |
|
): |
|
""" |
|
Evaluates the model on eval_data. Saves results to output_dir. |
|
|
|
Args: |
|
eval_data: Pandas DataFrame containing the 3 columns - `prefix`, `input_text`, `target_text`. |
|
- `prefix`: A string indicating the task to perform. (E.g. `"question"`, `"stsb"`) |
|
- `input_text`: The input text sequence. `prefix` is automatically prepended to form the full input. (<prefix>: <input_text>) |
|
- `target_text`: The target sequence |
|
output_dir: The directory where model files will be saved. If not given, self.args.output_dir will be used. |
|
verbose: If verbose, results will be printed to the console on completion of evaluation. |
|
silent: If silent, tqdm progress bars will be hidden. |
|
**kwargs: Additional metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use). |
|
A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs |
|
will be lists of strings. Note that this will slow down evaluation significantly as the predicted sequences need to be generated. |
|
Returns: |
|
results: Dictionary containing evaluation results. |
|
""" |
|
|
|
if not output_dir: |
|
output_dir = self.args.output_dir |
|
|
|
self._move_model_to_device() |
|
|
|
eval_dataset = self.load_and_cache_examples( |
|
eval_data, evaluate=True, verbose=verbose, silent=silent |
|
) |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
result = self.evaluate( |
|
eval_dataset, output_dir, verbose=verbose, silent=silent, **kwargs |
|
) |
|
self.results.update(result) |
|
|
|
if self.args.evaluate_generated_text: |
|
if self.args.preprocess_inputs: |
|
to_predict = [ |
|
input_text |
|
for prefix, input_text in zip( |
|
eval_data["prefix"], eval_data["input_text"] |
|
) |
|
] |
|
else: |
|
to_predict = [ |
|
prefix + input_text |
|
for prefix, input_text in zip( |
|
eval_data["prefix"], eval_data["input_text"] |
|
) |
|
] |
|
preds = self.predict(to_predict[:self.args.eval_batch_size*3]) |
|
|
|
result = self.compute_metrics( |
|
eval_data["target_text"].tolist()[:self.args.eval_batch_size*3], preds, **kwargs |
|
) |
|
self.results.update(result) |
|
|
|
if verbose: |
|
logger.info(self.results) |
|
|
|
return self.results |
|
|
|
def evaluate(self, eval_dataset, output_dir, verbose=True, silent=False, **kwargs): |
|
""" |
|
Evaluates the model on eval_dataset. |
|
|
|
Utility function to be used by the eval_model() method. Not intended to be used directly. |
|
""" |
|
|
|
model = self.model |
|
args = self.args |
|
eval_output_dir = output_dir |
|
device = self.device |
|
|
|
results = {} |
|
|
|
eval_sampler = SequentialSampler(eval_dataset) |
|
eval_dataloader = DataLoader( |
|
eval_dataset, sampler=eval_sampler, batch_size=args.eval_batch_size |
|
) |
|
|
|
if args.n_gpu > 1: |
|
model = torch.nn.DataParallel(model) |
|
|
|
eval_loss = 0.0 |
|
nb_eval_steps = 0 |
|
model.eval() |
|
|
|
if self.args.fp16: |
|
from torch.cuda import amp |
|
|
|
for batch in tqdm( |
|
eval_dataloader, disable=args.silent or silent, desc="Running Evaluation" |
|
): |
|
inputs = self._get_inputs_dict(batch) |
|
with torch.no_grad(): |
|
if self.args.fp16: |
|
with amp.autocast(): |
|
outputs = model(**inputs) |
|
loss = outputs[0] |
|
else: |
|
outputs = model(**inputs) |
|
loss = outputs[0] |
|
if self.args.n_gpu > 1: |
|
loss = loss.mean() |
|
eval_loss += loss.item() |
|
nb_eval_steps += 1 |
|
|
|
eval_loss = eval_loss / nb_eval_steps |
|
|
|
results["eval_loss"] = eval_loss |
|
|
|
output_eval_file = os.path.join(eval_output_dir, "eval_results.txt") |
|
with open(output_eval_file, "w") as writer: |
|
for key in sorted(results.keys()): |
|
writer.write("{} = {}\n".format(key, str(results[key]))) |
|
|
|
return results |
|
|
|
def predict(self, to_predict, split_on_space=False): |
|
""" |
|
Performs predictions on a list of text. |
|
|
|
Args: |
|
to_predict: A python list of text (str) to be sent to the model for prediction. Note that the prefix should be prepended to the text. |
|
split_on_space (optional): If True, input is english string, if False, input is chinese string. |
|
|
|
Returns: |
|
preds: A python list of the generated sequences. |
|
""" |
|
|
|
self._move_model_to_device() |
|
|
|
all_outputs = [] |
|
|
|
for batch in tqdm( |
|
[ |
|
to_predict[i: i + self.args.eval_batch_size] |
|
for i in range(0, len(to_predict), self.args.eval_batch_size) |
|
], |
|
desc="Generating outputs", |
|
disable=self.args.silent, |
|
): |
|
input_batch = self.tokenizer.prepare_seq2seq_batch( |
|
src_texts=batch, |
|
max_length=self.args.max_seq_length, |
|
padding="max_length", |
|
return_tensors="pt", |
|
truncation=True, |
|
) |
|
input_ids = input_batch["input_ids"] |
|
attention_mask = input_batch["attention_mask"] |
|
|
|
input_ids = input_ids.to(self.device) |
|
attention_mask = attention_mask.to(self.device) |
|
|
|
outputs = self.model.generate( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
num_beams=self.args.num_beams, |
|
max_length=self.args.max_length, |
|
length_penalty=self.args.length_penalty, |
|
early_stopping=self.args.early_stopping, |
|
repetition_penalty=self.args.repetition_penalty, |
|
do_sample=self.args.do_sample, |
|
top_k=self.args.top_k, |
|
top_p=self.args.top_p, |
|
num_return_sequences=self.args.num_return_sequences, |
|
|
|
) |
|
all_outputs.extend(outputs.cpu().numpy()) |
|
|
|
if self.args.use_multiprocessed_decoding: |
|
self.model.to("cpu") |
|
with Pool(self.args.process_count) as p: |
|
if self.args.multiprocessing_chunksize == -1: |
|
chunksize = max( |
|
len(all_outputs) // (self.args.process_count * 2), 500 |
|
) |
|
else: |
|
chunksize = self.args.multiprocessing_chunksize |
|
outputs = list( |
|
tqdm( |
|
p.imap(self._decode, all_outputs, chunksize=chunksize), |
|
total=len(all_outputs), |
|
desc="Decoding outputs", |
|
disable=self.args.silent, |
|
) |
|
) |
|
self._move_model_to_device() |
|
else: |
|
outputs = [ |
|
self.tokenizer.decode( |
|
output_id, |
|
skip_special_tokens=self.args.skip_special_tokens, |
|
clean_up_tokenization_spaces=True, |
|
) |
|
for output_id in all_outputs |
|
] |
|
if not split_on_space: |
|
outputs = [''.join(gen_text.split(' ')) for gen_text in outputs] |
|
if self.args.num_return_sequences > 1: |
|
return [ |
|
outputs[i: i + self.args.num_return_sequences] |
|
for i in range(0, len(outputs), self.args.num_return_sequences) |
|
] |
|
else: |
|
return outputs |
|
|
|
def _decode(self, output_id): |
|
return self.tokenizer.decode( |
|
output_id, |
|
skip_special_tokens=self.args.skip_special_tokens, |
|
clean_up_tokenization_spaces=True, |
|
) |
|
|
|
def compute_metrics(self, labels, preds, **kwargs): |
|
""" |
|
Computes the evaluation metrics for the model predictions. |
|
|
|
Args: |
|
labels: List of target sequences |
|
preds: List of model generated outputs |
|
**kwargs: Custom metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use). |
|
A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs |
|
will be lists of strings. Note that this will slow down evaluation significantly as the predicted sequences need to be generated. |
|
|
|
Returns: |
|
result: Dictionary containing evaluation results. |
|
""" |
|
assert len(labels) == len(preds) |
|
|
|
results = {} |
|
for metric, func in kwargs.items(): |
|
results[metric] = func(labels, preds) |
|
|
|
return results |
|
|
|
def _move_model_to_device(self): |
|
self.model.to(self.device) |
|
|
|
def _get_inputs_dict(self, batch): |
|
if self.args.use_hf_datasets: |
|
inputs = {**batch, "labels": batch["input_ids"]} |
|
|
|
return {key: value.to(self.device) for key, value in inputs.items()} |
|
else: |
|
batch = tuple(t.to(self.device) for t in batch) |
|
|
|
input_ids = batch[0] |
|
attention_mask = batch[1] |
|
labels = batch[2] |
|
labels[labels == self.tokenizer.pad_token_id] = -100 |
|
|
|
inputs = { |
|
"input_ids": input_ids, |
|
"attention_mask": attention_mask, |
|
"labels": labels, |
|
} |
|
|
|
return inputs |
|
|
|
def load_and_cache_examples( |
|
self, data, evaluate=False, no_cache=False, verbose=True, silent=False |
|
): |
|
""" |
|
Creates a T5Dataset from data. |
|
|
|
Utility function for train() and eval() methods. Not intended to be used directly. |
|
""" |
|
|
|
tokenizer = self.tokenizer |
|
args = self.args |
|
|
|
if not no_cache: |
|
no_cache = args.no_cache |
|
|
|
if not no_cache: |
|
os.makedirs(self.args.cache_dir, exist_ok=True) |
|
|
|
mode = "dev" if evaluate else "train" |
|
|
|
if self.args.use_hf_datasets: |
|
dataset = load_hf_dataset(data, tokenizer, self.args) |
|
return dataset |
|
elif args.dataset_class: |
|
CustomDataset = args.dataset_class |
|
return CustomDataset(tokenizer, args, data, mode) |
|
else: |
|
return T5Dataset( |
|
tokenizer, |
|
self.args, |
|
data, |
|
mode, |
|
) |
|
|
|
def _create_training_progress_scores(self, **kwargs): |
|
extra_metrics = {key: [] for key in kwargs} |
|
training_progress_scores = { |
|
"global_step": [], |
|
"eval_loss": [], |
|
"train_loss": [], |
|
**extra_metrics, |
|
} |
|
|
|
return training_progress_scores |
|
|
|
def _get_last_metrics(self, metric_values): |
|
return {metric: values[-1] for metric, values in metric_values.items()} |
|
|
|
def save_model( |
|
self, output_dir=None, optimizer=None, scheduler=None, model=None, results=None |
|
): |
|
if not output_dir: |
|
output_dir = self.args.output_dir |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
if model and not self.args.no_save: |
|
|
|
model_to_save = model.module if hasattr(model, "module") else model |
|
model_to_save.save_pretrained(output_dir) |
|
self.tokenizer.save_pretrained(output_dir) |
|
torch.save(self.args, os.path.join(output_dir, "training_args.bin")) |
|
if optimizer and scheduler and self.args.save_optimizer_and_scheduler: |
|
torch.save( |
|
optimizer.state_dict(), os.path.join(output_dir, "optimizer.pt") |
|
) |
|
torch.save( |
|
scheduler.state_dict(), os.path.join(output_dir, "scheduler.pt") |
|
) |
|
self.save_model_args(output_dir) |
|
|
|
if results: |
|
output_eval_file = os.path.join(output_dir, "eval_results.txt") |
|
with open(output_eval_file, "w") as writer: |
|
for key in sorted(results.keys()): |
|
writer.write("{} = {}\n".format(key, str(results[key]))) |
|
|
|
def save_model_args(self, output_dir): |
|
os.makedirs(output_dir, exist_ok=True) |
|
self.args.save(output_dir) |
|
|
|
def _load_model_args(self, input_dir): |
|
args = T5Args() |
|
args.load(input_dir) |
|
return args |
|
|
|
def get_named_parameters(self): |
|
return [n for n, p in self.model.named_parameters()] |
|
|