# -*- coding: utf-8 -*- """ @author:XuMing(xuming624@qq.com) @description: refer https://github.com/ThilinaRajapakse/simpletransformers """ import math import os import random import warnings from dataclasses import asdict from multiprocessing import Pool import numpy as np import pandas as pd import torch from loguru import logger from torch.utils.data import DataLoader, RandomSampler, SequentialSampler from torch.utils.tensorboard import SummaryWriter from tqdm.auto import tqdm, trange from transformers import ByT5Tokenizer from transformers import MT5Config, MT5ForConditionalGeneration from transformers import T5Config, T5ForConditionalGeneration, T5Tokenizer, TextStreamer from transformers.optimization import AdamW, Adafactor from transformers.optimization import ( get_constant_schedule, get_constant_schedule_with_warmup, get_linear_schedule_with_warmup, get_cosine_schedule_with_warmup, get_cosine_with_hard_restarts_schedule_with_warmup, get_polynomial_decay_schedule_with_warmup, ) from t5.config.model_args import T5Args from t5.t5_utils import T5Dataset, load_hf_dataset try: import wandb wandb_available = True except ImportError: wandb_available = False has_cuda = torch.cuda.is_available() os.environ["TOKENIZERS_PARALLELISM"] = "FALSE" os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i: i + n] MODEL_CLASSES = { "t5": (T5Config, T5ForConditionalGeneration), "mt5": (MT5Config, MT5ForConditionalGeneration), "byt5": (T5Config, T5ForConditionalGeneration), } class T5Model: def __init__( self, model_type, model_name, args=None, tokenizer=None, use_cuda=has_cuda, cuda_device=-1, evaluate=False, **kwargs, ): """ Initializes a T5Model model. Args: model_type: The type of model (t5, mt5, byt5) model_name: The exact architecture and trained weights to use. This may be a Hugging Face Transformers compatible pre-trained model, a community model, or the path to a directory containing model files. args (optional): Default args will be used if this parameter is not provided. If provided, it should be a dict containing the args that should be changed in the default args. use_cuda (optional): Use GPU if available. Setting to False will force model to use CPU only. cuda_device (optional): Specific GPU that should be used. Will use the first available GPU by default. **kwargs (optional): For providing proxies, force_download, resume_download, cache_dir and other options specific to the 'from_pretrained' implementation where this will be supplied. """ # noqa: ignore flake8" self.args = self._load_model_args(model_name) if isinstance(args, dict): self.args.update_from_dict(args) elif isinstance(args, T5Args): self.args = args self.is_sweeping = False if self.args.manual_seed: random.seed(self.args.manual_seed) np.random.seed(self.args.manual_seed) torch.manual_seed(self.args.manual_seed) if self.args.n_gpu > 0: torch.cuda.manual_seed_all(self.args.manual_seed) if use_cuda: if torch.cuda.is_available(): if cuda_device == -1: self.device = torch.device("cuda") else: self.device = torch.device(f"cuda:{cuda_device}") else: raise ValueError( "'use_cuda' set to True when cuda is unavailable." "Make sure CUDA is available or set `use_cuda=False`." ) else: if torch.backends.mps.is_available(): self.device = torch.device("mps") else: self.device = "cpu" logger.debug(f"Device: {self.device}") self.results = {} config_class, model_class = MODEL_CLASSES[model_type] if model_name is None: self.config = self.args.config self.model = model_class(config=self.config) else: self.config = config_class.from_pretrained(model_name, **self.args.config) self.model = model_class.from_pretrained(model_name, config=self.config) if isinstance(tokenizer, T5Tokenizer): self.tokenizer = tokenizer self.model.resize_token_embeddings(len(self.tokenizer)) elif model_type == "byt5": self.tokenizer = ByT5Tokenizer.from_pretrained(model_name, truncate=True) else: self.tokenizer = T5Tokenizer.from_pretrained(model_name, truncate=True) print(len(self.tokenizer)) if not evaluate: with open('./data/字音混淆集_s13.txt', 'r', encoding='utf-8') as confusion: n = 0 for line in confusion.readlines()+[str(chr(c+65248)) for c in range(33, 127)]: token = line.split(' ')[0] n+=1 self.tokenizer.add_tokens([token]) with open('./data/字音混淆集.txt', 'r', encoding='utf-8') as confusion: for line in confusion.readlines(): token = line.split(' ')[0] n+=1 self.tokenizer.add_tokens([token]) with open('./data/wordtest4.txt', 'r', encoding='utf-8') as confusion: for line in confusion.readlines(): token = line.split(',')[0] n+=1 self.tokenizer.add_tokens([token]) with open('./data/vocab.txt', 'r', encoding='utf-8') as confusion: for line in confusion.readlines(): n+=1 self.tokenizer.add_tokens([line.replace('\n', '')]) print(n) self.streamer = TextStreamer(self.tokenizer) print(len(self.tokenizer)) self.model.resize_token_embeddings(len(self.tokenizer)) if self.args.dynamic_quantize: self.model = torch.quantization.quantize_dynamic( self.model, {torch.nn.Linear}, dtype=torch.qint8 ) if not use_cuda: self.args.fp16 = False if self.args.special_tokens_list: self.tokenizer.add_tokens( self.args.special_tokens_list, special_tokens=True ) self.model.resize_token_embeddings(len(self.tokenizer)) self.args.model_type = model_type if model_name is None: self.args.model_name = "T5_from_scratch" else: self.args.model_name = model_name if self.args.wandb_project and not wandb_available: warnings.warn( "wandb_project specified but wandb is not available. Wandb disabled." ) self.args.wandb_project = None def train_model( self, train_data, output_dir=None, show_running_loss=True, args=None, eval_data=None, verbose=True, **kwargs, ): """ Trains the model using 'train_data' Args: train_data: Pandas DataFrame containing the 3 columns - `prefix`, `input_text`, `target_text`. - `prefix`: A string indicating the task to perform. (E.g. `"question"`, `"stsb"`) - `input_text`: The input text sequence. `prefix` is automatically prepended to form the full input. (: ) - `target_text`: The target sequence output_dir: The directory where model files will be saved. If not given, self.args.output_dir will be used. show_running_loss (optional): Set to False to prevent running loss from being printed to console. Defaults to True. args (optional): Optional changes to the args dict of the model. Any changes made will persist for the model. eval_data (optional): A DataFrame against which evaluation will be performed when evaluate_during_training is enabled. Is required if evaluate_during_training is enabled. **kwargs: Additional metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use). A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs will be lists of strings. Note that this will slow down training significantly as the predicted sequences need to be generated. Returns: global_step: Number of global steps trained training_details: Average training loss if evaluate_during_training is False or full training progress scores if evaluate_during_training is True """ # noqa: ignore flake8" if args: self.args.update_from_dict(args) if self.args.evaluate_during_training and eval_data is None: raise ValueError( "evaluate_during_training is enabled but eval_data is not specified." " Pass eval_data to model.train_model() if using evaluate_during_training." ) if not output_dir: output_dir = self.args.output_dir if ( os.path.exists(output_dir) and os.listdir(output_dir) and not self.args.overwrite_output_dir ): raise ValueError( "Output directory ({}) already exists and is not empty." " Set args.overwrite_output_dir = True to overcome.".format(output_dir) ) self._move_model_to_device() train_dataset = self.load_and_cache_examples(train_data, verbose=verbose) os.makedirs(output_dir, exist_ok=True) global_step, training_details = self.train( train_dataset, output_dir, show_running_loss=show_running_loss, eval_data=eval_data, verbose=verbose, **kwargs, ) self.save_model(model=self.model) if verbose: logger.info( " Training of {} model complete. Saved to {}.".format( self.args.model_name, output_dir ) ) return global_step, training_details def train( self, train_dataset, output_dir, show_running_loss=True, eval_data=None, verbose=True, **kwargs, ): """ Trains the model on train_dataset. Utility function to be used by the train_model() method. Not intended to be used directly. """ model = self.model args = self.args device = self.device tb_writer = SummaryWriter(log_dir=args.tensorboard_dir) train_sampler = RandomSampler(train_dataset) train_dataloader = DataLoader( train_dataset, sampler=train_sampler, batch_size=args.train_batch_size, num_workers=self.args.dataloader_num_workers, ) if args.max_steps > 0: t_total = args.max_steps args.num_train_epochs = ( args.max_steps // (len(train_dataloader) // args.gradient_accumulation_steps) + 1 ) else: t_total = ( len(train_dataloader) // args.gradient_accumulation_steps * args.num_train_epochs ) no_decay = ["bias", "LayerNorm.weight"] optimizer_grouped_parameters = [] custom_parameter_names = set() for group in self.args.custom_parameter_groups: params = group.pop("params") custom_parameter_names.update(params) param_group = {**group} param_group["params"] = [ p for n, p in model.named_parameters() if n in params ] optimizer_grouped_parameters.append(param_group) for group in self.args.custom_layer_parameters: layer_number = group.pop("layer") layer = f"layer.{layer_number}." group_d = {**group} group_nd = {**group} group_nd["weight_decay"] = 0.0 params_d = [] params_nd = [] for n, p in model.named_parameters(): if n not in custom_parameter_names and layer in n: if any(nd in n for nd in no_decay): params_nd.append(p) else: params_d.append(p) custom_parameter_names.add(n) group_d["params"] = params_d group_nd["params"] = params_nd optimizer_grouped_parameters.append(group_d) optimizer_grouped_parameters.append(group_nd) if not self.args.train_custom_parameters_only: optimizer_grouped_parameters.extend( [ { "params": [ p for n, p in model.named_parameters() if n not in custom_parameter_names and not any(nd in n for nd in no_decay) ], "weight_decay": args.weight_decay, }, { "params": [ p for n, p in model.named_parameters() if n not in custom_parameter_names and any(nd in n for nd in no_decay) ], "weight_decay": 0.0, }, ] ) warmup_steps = math.ceil(t_total * args.warmup_ratio) args.warmup_steps = ( warmup_steps if args.warmup_steps == 0 else args.warmup_steps ) if args.optimizer == "AdamW": optimizer = AdamW( optimizer_grouped_parameters, lr=args.learning_rate, eps=args.adam_epsilon, ) elif args.optimizer == "Adafactor": optimizer = Adafactor( optimizer_grouped_parameters, lr=args.learning_rate, eps=args.adafactor_eps, clip_threshold=args.adafactor_clip_threshold, decay_rate=args.adafactor_decay_rate, beta1=args.adafactor_beta1, weight_decay=args.weight_decay, scale_parameter=args.adafactor_scale_parameter, relative_step=args.adafactor_relative_step, warmup_init=args.adafactor_warmup_init, ) else: raise ValueError( "{} is not a valid optimizer class. Please use one of ('AdamW', 'Adafactor') instead.".format( args.optimizer ) ) if args.scheduler == "constant_schedule": scheduler = get_constant_schedule(optimizer) elif args.scheduler == "constant_schedule_with_warmup": scheduler = get_constant_schedule_with_warmup( optimizer, num_warmup_steps=args.warmup_steps ) elif args.scheduler == "linear_schedule_with_warmup": scheduler = get_linear_schedule_with_warmup( optimizer, num_warmup_steps=args.warmup_steps, num_training_steps=t_total, ) elif args.scheduler == "cosine_schedule_with_warmup": scheduler = get_cosine_schedule_with_warmup( optimizer, num_warmup_steps=args.warmup_steps, num_training_steps=t_total, num_cycles=args.cosine_schedule_num_cycles, ) elif args.scheduler == "cosine_with_hard_restarts_schedule_with_warmup": scheduler = get_cosine_with_hard_restarts_schedule_with_warmup( optimizer, num_warmup_steps=args.warmup_steps, num_training_steps=t_total, num_cycles=args.cosine_schedule_num_cycles, ) elif args.scheduler == "polynomial_decay_schedule_with_warmup": scheduler = get_polynomial_decay_schedule_with_warmup( optimizer, num_warmup_steps=args.warmup_steps, num_training_steps=t_total, lr_end=args.polynomial_decay_schedule_lr_end, power=args.polynomial_decay_schedule_power, ) else: raise ValueError("{} is not a valid scheduler.".format(args.scheduler)) if ( args.model_name and os.path.isfile(os.path.join(args.model_name, "optimizer.pt")) and os.path.isfile(os.path.join(args.model_name, "scheduler.pt")) ): # Load in optimizer and scheduler states optimizer.load_state_dict( torch.load(os.path.join(args.model_name, "optimizer.pt")) ) scheduler.load_state_dict( torch.load(os.path.join(args.model_name, "scheduler.pt")) ) if args.n_gpu > 1: model = torch.nn.DataParallel(model) logger.info(" Training started") global_step = 0 training_progress_scores = None tr_loss, logging_loss = 0.0, 0.0 model.zero_grad() train_iterator = trange( int(args.num_train_epochs), desc="Epoch", disable=args.silent, mininterval=0 ) epoch_number = 0 best_eval_metric = None early_stopping_counter = 0 steps_trained_in_current_epoch = 0 epochs_trained = 0 if args.model_name and os.path.exists(args.model_name): try: # set global_step to gobal_step of last saved checkpoint from model path checkpoint_suffix = args.model_name.split("/")[-1].split("-") if len(checkpoint_suffix) > 2: checkpoint_suffix = checkpoint_suffix[1] else: checkpoint_suffix = checkpoint_suffix[-1] global_step = int(checkpoint_suffix) epochs_trained = global_step // ( len(train_dataloader) // args.gradient_accumulation_steps ) steps_trained_in_current_epoch = global_step % ( len(train_dataloader) // args.gradient_accumulation_steps ) logger.info( " Continuing training from checkpoint, will skip to saved global_step" ) logger.info(" Continuing training from epoch %d", epochs_trained) logger.info(" Continuing training from global step %d", global_step) logger.info( " Will skip the first %d steps in the current epoch", steps_trained_in_current_epoch, ) except ValueError: logger.info(" Starting fine-tuning.") if args.evaluate_during_training: training_progress_scores = self._create_training_progress_scores(**kwargs) if args.wandb_project: wandb.init( project=args.wandb_project, config={**asdict(args)}, **args.wandb_kwargs, ) wandb.run._label(repo="textgen") wandb.watch(self.model) self.wandb_run_id = wandb.run.id if args.fp16: from torch.cuda import amp scaler = amp.GradScaler() for current_epoch in train_iterator: model.train() if epochs_trained > 0: epochs_trained -= 1 continue train_iterator.set_description( f"Epoch {epoch_number + 1} of {args.num_train_epochs}" ) batch_iterator = tqdm( train_dataloader, desc=f"Running Epoch {epoch_number} of {args.num_train_epochs}", disable=args.silent, mininterval=0, ) for step, batch in enumerate(batch_iterator): if steps_trained_in_current_epoch > 0: steps_trained_in_current_epoch -= 1 continue inputs = self._get_inputs_dict(batch) if args.fp16: with amp.autocast(): outputs = model(**inputs) # model outputs are always tuple in pytorch-transformers (see doc) loss = outputs[0] else: outputs = model(**inputs) # model outputs are always tuple in pytorch-transformers (see doc) loss = outputs[0] if args.n_gpu > 1: loss = ( loss.mean() ) # mean() to average on multi-gpu parallel training current_loss = loss.item() if show_running_loss: batch_iterator.set_description( f"Epochs {epoch_number}/{args.num_train_epochs}. Running Loss: {current_loss:9.4f}" ) if args.gradient_accumulation_steps > 1: loss = loss / args.gradient_accumulation_steps if args.fp16: scaler.scale(loss).backward() else: loss.backward() tr_loss += loss.item() if (step + 1) % args.gradient_accumulation_steps == 0: if args.fp16: scaler.unscale_(optimizer) if args.optimizer == "AdamW": torch.nn.utils.clip_grad_norm_( model.parameters(), args.max_grad_norm ) if args.fp16: scaler.step(optimizer) scaler.update() else: optimizer.step() scheduler.step() # Update learning rate schedule model.zero_grad() global_step += 1 if args.logging_steps > 0 and global_step % args.logging_steps == 0: # Log metrics tb_writer.add_scalar( "lr", scheduler.get_last_lr()[0], global_step ) tb_writer.add_scalar( "loss", (tr_loss - logging_loss) / args.logging_steps, global_step, ) logging_loss = tr_loss if args.wandb_project or self.is_sweeping: wandb.log( { "Training loss": current_loss, "lr": scheduler.get_last_lr()[0], "global_step": global_step, } ) if args.save_steps > 0 and global_step % args.save_steps == 0: # Save model checkpoint output_dir_current = os.path.join( output_dir, "checkpoint-{}".format(global_step) ) self.save_model( output_dir_current, optimizer, scheduler, model=model ) if args.evaluate_during_training and ( args.evaluate_during_training_steps > 0 and global_step % args.evaluate_during_training_steps == 0 ): # Only evaluate when single GPU otherwise metrics may not average well results = self.eval_model( eval_data, verbose=verbose and args.evaluate_during_training_verbose, silent=args.evaluate_during_training_silent, **kwargs, ) for key, value in results.items(): try: tb_writer.add_scalar( "eval_{}".format(key), value, global_step ) except (NotImplementedError, AssertionError): pass output_dir_current = os.path.join( output_dir, "checkpoint-{}".format(global_step) ) if args.save_eval_checkpoints: self.save_model( output_dir_current, optimizer, scheduler, model=model, results=results, ) training_progress_scores["global_step"].append(global_step) training_progress_scores["train_loss"].append(current_loss) for key in results: training_progress_scores[key].append(results[key]) report = pd.DataFrame(training_progress_scores) report.to_csv( os.path.join( args.output_dir, "training_progress_scores.csv" ), index=False, ) if args.wandb_project or self.is_sweeping: wandb.log(self._get_last_metrics(training_progress_scores)) if not best_eval_metric: best_eval_metric = results[args.early_stopping_metric] self.save_model( args.best_model_dir, optimizer, scheduler, model=model, results=results, ) if best_eval_metric and args.early_stopping_metric_minimize: if ( results[args.early_stopping_metric] - best_eval_metric < args.early_stopping_delta ): best_eval_metric = results[args.early_stopping_metric] self.save_model( args.best_model_dir, optimizer, scheduler, model=model, results=results, ) early_stopping_counter = 0 else: if args.use_early_stopping: if ( early_stopping_counter < args.early_stopping_patience ): early_stopping_counter += 1 if verbose: logger.info( f" No improvement in {args.early_stopping_metric}" ) logger.info( f" Current step: {early_stopping_counter}" ) logger.info( f" Early stopping patience: {args.early_stopping_patience}" ) else: if verbose: logger.info( f" Patience of {args.early_stopping_patience} steps reached" ) logger.info(" Training terminated.") train_iterator.close() return ( global_step, tr_loss / global_step if not self.args.evaluate_during_training else training_progress_scores, ) else: if ( results[args.early_stopping_metric] - best_eval_metric > args.early_stopping_delta ): best_eval_metric = results[args.early_stopping_metric] self.save_model( args.best_model_dir, optimizer, scheduler, model=model, results=results, ) early_stopping_counter = 0 else: if args.use_early_stopping: if ( early_stopping_counter < args.early_stopping_patience ): early_stopping_counter += 1 if verbose: logger.info( f" No improvement in {args.early_stopping_metric}" ) logger.info( f" Current step: {early_stopping_counter}" ) logger.info( f" Early stopping patience: {args.early_stopping_patience}" ) else: if verbose: logger.info( f" Patience of {args.early_stopping_patience} steps reached" ) logger.info(" Training terminated.") train_iterator.close() return ( global_step, tr_loss / global_step if not self.args.evaluate_during_training else training_progress_scores, ) model.train() epoch_number += 1 output_dir_current = os.path.join( output_dir, "checkpoint-{}-epoch-{}".format(global_step, epoch_number) ) if args.save_model_every_epoch: self.save_model(output_dir_current, optimizer, scheduler, model=model) if args.evaluate_during_training and args.evaluate_each_epoch: results = self.eval_model( eval_data, verbose=verbose and args.evaluate_during_training_verbose, silent=args.evaluate_during_training_silent, **kwargs, ) if args.save_eval_checkpoints: self.save_model( output_dir_current, optimizer, scheduler, results=results ) training_progress_scores["global_step"].append(global_step) training_progress_scores["train_loss"].append(current_loss) for key in results: training_progress_scores[key].append(results[key]) report = pd.DataFrame(training_progress_scores) report.to_csv( os.path.join(args.output_dir, "training_progress_scores.csv"), index=False, ) if args.wandb_project or self.is_sweeping: wandb.log(self._get_last_metrics(training_progress_scores)) if not best_eval_metric: best_eval_metric = results[args.early_stopping_metric] self.save_model( args.best_model_dir, optimizer, scheduler, model=model, results=results, ) if best_eval_metric and args.early_stopping_metric_minimize: if ( results[args.early_stopping_metric] - best_eval_metric < args.early_stopping_delta ): best_eval_metric = results[args.early_stopping_metric] self.save_model( args.best_model_dir, optimizer, scheduler, model=model, results=results, ) early_stopping_counter = 0 else: if ( args.use_early_stopping and args.early_stopping_consider_epochs ): if early_stopping_counter < args.early_stopping_patience: early_stopping_counter += 1 if verbose: logger.info( f" No improvement in {args.early_stopping_metric}" ) logger.info( f" Current step: {early_stopping_counter}" ) logger.info( f" Early stopping patience: {args.early_stopping_patience}" ) else: if verbose: logger.info( f" Patience of {args.early_stopping_patience} steps reached" ) logger.info(" Training terminated.") train_iterator.close() return ( global_step, tr_loss / global_step if not self.args.evaluate_during_training else training_progress_scores, ) else: if ( results[args.early_stopping_metric] - best_eval_metric > args.early_stopping_delta ): best_eval_metric = results[args.early_stopping_metric] self.save_model( args.best_model_dir, optimizer, scheduler, model=model, results=results, ) early_stopping_counter = 0 else: if ( args.use_early_stopping and args.early_stopping_consider_epochs ): if early_stopping_counter < args.early_stopping_patience: early_stopping_counter += 1 if verbose: logger.info( f" No improvement in {args.early_stopping_metric}" ) logger.info( f" Current step: {early_stopping_counter}" ) logger.info( f" Early stopping patience: {args.early_stopping_patience}" ) else: if verbose: logger.info( f" Patience of {args.early_stopping_patience} steps reached" ) logger.info(" Training terminated.") train_iterator.close() return ( global_step, tr_loss / global_step if not self.args.evaluate_during_training else training_progress_scores, ) return ( global_step, tr_loss / global_step if not self.args.evaluate_during_training else training_progress_scores, ) def eval_model( self, eval_data, output_dir=None, verbose=True, silent=False, **kwargs ): """ Evaluates the model on eval_data. Saves results to output_dir. Args: eval_data: Pandas DataFrame containing the 3 columns - `prefix`, `input_text`, `target_text`. - `prefix`: A string indicating the task to perform. (E.g. `"question"`, `"stsb"`) - `input_text`: The input text sequence. `prefix` is automatically prepended to form the full input. (: ) - `target_text`: The target sequence output_dir: The directory where model files will be saved. If not given, self.args.output_dir will be used. verbose: If verbose, results will be printed to the console on completion of evaluation. silent: If silent, tqdm progress bars will be hidden. **kwargs: Additional metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use). A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs will be lists of strings. Note that this will slow down evaluation significantly as the predicted sequences need to be generated. Returns: results: Dictionary containing evaluation results. """ # noqa: ignore flake8" if not output_dir: output_dir = self.args.output_dir self._move_model_to_device() eval_dataset = self.load_and_cache_examples( eval_data, evaluate=True, verbose=verbose, silent=silent ) os.makedirs(output_dir, exist_ok=True) result = self.evaluate( eval_dataset, output_dir, verbose=verbose, silent=silent, **kwargs ) self.results.update(result) if self.args.evaluate_generated_text: if self.args.preprocess_inputs: to_predict = [ input_text for prefix, input_text in zip( eval_data["prefix"], eval_data["input_text"] ) ] else: to_predict = [ prefix + input_text for prefix, input_text in zip( eval_data["prefix"], eval_data["input_text"] ) ] preds = self.predict(to_predict[:self.args.eval_batch_size*3]) result = self.compute_metrics( eval_data["target_text"].tolist()[:self.args.eval_batch_size*3], preds, **kwargs ) self.results.update(result) if verbose: logger.info(self.results) return self.results def evaluate(self, eval_dataset, output_dir, verbose=True, silent=False, **kwargs): """ Evaluates the model on eval_dataset. Utility function to be used by the eval_model() method. Not intended to be used directly. """ model = self.model args = self.args eval_output_dir = output_dir device = self.device results = {} eval_sampler = SequentialSampler(eval_dataset) eval_dataloader = DataLoader( eval_dataset, sampler=eval_sampler, batch_size=args.eval_batch_size ) if args.n_gpu > 1: model = torch.nn.DataParallel(model) eval_loss = 0.0 nb_eval_steps = 0 model.eval() if self.args.fp16: from torch.cuda import amp for batch in tqdm( eval_dataloader, disable=args.silent or silent, desc="Running Evaluation" ): inputs = self._get_inputs_dict(batch) with torch.no_grad(): if self.args.fp16: with amp.autocast(): outputs = model(**inputs) loss = outputs[0] else: outputs = model(**inputs) loss = outputs[0] if self.args.n_gpu > 1: loss = loss.mean() eval_loss += loss.item() nb_eval_steps += 1 eval_loss = eval_loss / nb_eval_steps results["eval_loss"] = eval_loss output_eval_file = os.path.join(eval_output_dir, "eval_results.txt") with open(output_eval_file, "w") as writer: for key in sorted(results.keys()): writer.write("{} = {}\n".format(key, str(results[key]))) return results def predict(self, to_predict, split_on_space=False): """ Performs predictions on a list of text. Args: to_predict: A python list of text (str) to be sent to the model for prediction. Note that the prefix should be prepended to the text. split_on_space (optional): If True, input is english string, if False, input is chinese string. Returns: preds: A python list of the generated sequences. """ # noqa: ignore flake8" self._move_model_to_device() all_outputs = [] # Batching for batch in tqdm( [ to_predict[i: i + self.args.eval_batch_size] for i in range(0, len(to_predict), self.args.eval_batch_size) ], desc="Generating outputs", disable=self.args.silent, ): input_batch = self.tokenizer.prepare_seq2seq_batch( src_texts=batch, max_length=self.args.max_seq_length, padding="max_length", return_tensors="pt", truncation=True, ) input_ids = input_batch["input_ids"] attention_mask = input_batch["attention_mask"] input_ids = input_ids.to(self.device) attention_mask = attention_mask.to(self.device) outputs = self.model.generate( input_ids=input_ids, attention_mask=attention_mask, num_beams=self.args.num_beams, max_length=self.args.max_length, length_penalty=self.args.length_penalty, early_stopping=self.args.early_stopping, repetition_penalty=self.args.repetition_penalty, do_sample=self.args.do_sample, top_k=self.args.top_k, top_p=self.args.top_p, num_return_sequences=self.args.num_return_sequences, #streamer=self.streamer, ) all_outputs.extend(outputs.cpu().numpy()) if self.args.use_multiprocessed_decoding: self.model.to("cpu") with Pool(self.args.process_count) as p: if self.args.multiprocessing_chunksize == -1: chunksize = max( len(all_outputs) // (self.args.process_count * 2), 500 ) else: chunksize = self.args.multiprocessing_chunksize outputs = list( tqdm( p.imap(self._decode, all_outputs, chunksize=chunksize), total=len(all_outputs), desc="Decoding outputs", disable=self.args.silent, ) ) self._move_model_to_device() else: outputs = [ self.tokenizer.decode( output_id, skip_special_tokens=self.args.skip_special_tokens, clean_up_tokenization_spaces=True, ) for output_id in all_outputs ] if not split_on_space: outputs = [''.join(gen_text.split(' ')) for gen_text in outputs] if self.args.num_return_sequences > 1: return [ outputs[i: i + self.args.num_return_sequences] for i in range(0, len(outputs), self.args.num_return_sequences) ] else: return outputs def _decode(self, output_id): return self.tokenizer.decode( output_id, skip_special_tokens=self.args.skip_special_tokens, clean_up_tokenization_spaces=True, ) def compute_metrics(self, labels, preds, **kwargs): """ Computes the evaluation metrics for the model predictions. Args: labels: List of target sequences preds: List of model generated outputs **kwargs: Custom metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use). A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs will be lists of strings. Note that this will slow down evaluation significantly as the predicted sequences need to be generated. Returns: result: Dictionary containing evaluation results. """ # noqa: ignore flake8" assert len(labels) == len(preds) results = {} for metric, func in kwargs.items(): results[metric] = func(labels, preds) return results def _move_model_to_device(self): self.model.to(self.device) def _get_inputs_dict(self, batch): if self.args.use_hf_datasets: inputs = {**batch, "labels": batch["input_ids"]} return {key: value.to(self.device) for key, value in inputs.items()} else: batch = tuple(t.to(self.device) for t in batch) input_ids = batch[0] attention_mask = batch[1] labels = batch[2] labels[labels == self.tokenizer.pad_token_id] = -100 inputs = { "input_ids": input_ids, "attention_mask": attention_mask, "labels": labels, } return inputs def load_and_cache_examples( self, data, evaluate=False, no_cache=False, verbose=True, silent=False ): """ Creates a T5Dataset from data. Utility function for train() and eval() methods. Not intended to be used directly. """ tokenizer = self.tokenizer args = self.args if not no_cache: no_cache = args.no_cache if not no_cache: os.makedirs(self.args.cache_dir, exist_ok=True) mode = "dev" if evaluate else "train" if self.args.use_hf_datasets: dataset = load_hf_dataset(data, tokenizer, self.args) return dataset elif args.dataset_class: CustomDataset = args.dataset_class return CustomDataset(tokenizer, args, data, mode) else: return T5Dataset( tokenizer, self.args, data, mode, ) def _create_training_progress_scores(self, **kwargs): extra_metrics = {key: [] for key in kwargs} training_progress_scores = { "global_step": [], "eval_loss": [], "train_loss": [], **extra_metrics, } return training_progress_scores def _get_last_metrics(self, metric_values): return {metric: values[-1] for metric, values in metric_values.items()} def save_model( self, output_dir=None, optimizer=None, scheduler=None, model=None, results=None ): if not output_dir: output_dir = self.args.output_dir os.makedirs(output_dir, exist_ok=True) if model and not self.args.no_save: # Take care of distributed/parallel training model_to_save = model.module if hasattr(model, "module") else model model_to_save.save_pretrained(output_dir) self.tokenizer.save_pretrained(output_dir) torch.save(self.args, os.path.join(output_dir, "training_args.bin")) if optimizer and scheduler and self.args.save_optimizer_and_scheduler: torch.save( optimizer.state_dict(), os.path.join(output_dir, "optimizer.pt") ) torch.save( scheduler.state_dict(), os.path.join(output_dir, "scheduler.pt") ) self.save_model_args(output_dir) if results: output_eval_file = os.path.join(output_dir, "eval_results.txt") with open(output_eval_file, "w") as writer: for key in sorted(results.keys()): writer.write("{} = {}\n".format(key, str(results[key]))) def save_model_args(self, output_dir): os.makedirs(output_dir, exist_ok=True) self.args.save(output_dir) def _load_model_args(self, input_dir): args = T5Args() args.load(input_dir) return args def get_named_parameters(self): return [n for n, p in self.model.named_parameters()]