Spaces:
Runtime error
Runtime error
| import signal | |
| import sys | |
| import os | |
| import time | |
| from typing import Union | |
| import platform | |
| from psutil import virtual_memory, cpu_count | |
| import numpy as np | |
| from torch.utils.data import DataLoader | |
| import torch | |
| from rich.progress import Progress, TextColumn, BarColumn, TimeElapsedColumn, TimeRemainingColumn | |
| from transformers import PreTrainedTokenizerFast | |
| from torch_optimizer import Adafactor | |
| # import accelerate | |
| from accelerate import Accelerator | |
| from accelerate.utils import set_seed | |
| # import 自定义类和函数 | |
| from model.chat_model import TextToTextModel | |
| from utils.logger import Logger | |
| from model.dataset import MyDataset | |
| from config import TrainConfig, T5ModelConfig | |
| from utils.functions import ( | |
| get_bleu4_score, | |
| save_model_config, | |
| get_free_space_of_disk, | |
| my_average, | |
| get_path_of_suffix_files, | |
| get_T5_config, | |
| ) | |
| class ChatTrainer: | |
| def __init__(self, train_config: TrainConfig, model_config: T5ModelConfig, ) -> None: | |
| self.train_config = train_config | |
| self.model_config = model_config | |
| # file_name=None会自动生成以当前日期命名的log文件名 | |
| self.logger = Logger('chat_trainer', std_out=True, save2file=True, file_name=None) | |
| self.model = None | |
| self.accelerator = None | |
| signal.signal(signal.SIGINT, self.process_exit_handler) | |
| self.is_win_platform = True if platform.system().lower() == 'windows' else False | |
| torch.manual_seed(train_config.seed) | |
| torch.cuda.manual_seed_all(train_config.seed) | |
| def process_exit_handler(self, signal_received, frame) -> None: | |
| ''' | |
| 进程退出时的操作,保存模型 | |
| ''' | |
| if self.accelerator and self.model: | |
| ask = "you are pressed `ctrl+c`, do you want to save checkpoint? Yes (y) or No (n)" | |
| self.accelerator.print(ask) | |
| ins = input() | |
| if ins.lower() in ('yes', 'y'): | |
| suffix = 'exit_save_{}'.format(str(time.strftime('%Y%m%d%H%M%S', time.localtime()))) | |
| self.accelerator.wait_for_everyone() | |
| self.accelerator.save_state(output_dir=self.train_config.train_state_dir) | |
| self.accelerator.print('model ckeck point has been saved in {}'.format(self.train_config.train_state_dir)) | |
| sys.exit(0) | |
| else: | |
| print('process not in trainingg, exit.') | |
| sys.exit(0) | |
| def save_model(self, suffix: Union[str, int]) -> None: | |
| '''保存模型到文件 | |
| 注意:save_model不能放到is_main_process里面 | |
| e.g: | |
| >>> self.save_model(epoch) # 在这里使用 | |
| >>> if accelerator.is_main_process: | |
| >>> do_somthing() | |
| ''' | |
| if self.model and self.accelerator: | |
| # 先wait_for_everyone,再保存 | |
| self.accelerator.wait_for_everyone() | |
| if self.accelerator.is_main_process: | |
| unwrap_model = self.accelerator.unwrap_model(self.model) | |
| model_dict = self.accelerator.get_state_dict(unwrap_model) | |
| torch.save(model_dict, self.train_config.model_file.format(suffix)) | |
| def delete_early_checkpoint(self, epoch: int, keep_latest_n: int=3,) -> None: | |
| ''' | |
| 删除最早的模型,最保留最近keep_latest_n个模型文件 | |
| ''' | |
| model_save_path = self.train_config.model_file | |
| model_save_path = model_save_path.replace('\\', '/') # 针对win的路径,将\替换为/ | |
| model_save_path = '/'.join(model_save_path.split('/')[0: -1]) # 删除末尾文件名后缀 | |
| model_files = get_path_of_suffix_files(model_save_path, suffix='.bin', with_create_time=True) | |
| # 进程异常退出保存模型文件不在删除范围 | |
| train_save_model_fils = [] | |
| for item in model_files: | |
| if 'exit_save' not in item[0]: | |
| # 大于当前epoch的文件不不删除 | |
| f_epoch = int(item[0].split('.')[-2]) | |
| if epoch >= f_epoch: | |
| print(epoch, f_epoch, item) | |
| train_save_model_fils.append(item) | |
| train_save_model_fils.sort(key=lambda x: x[1]) # 按照时间从小到大排序 | |
| if len(train_save_model_fils) <= keep_latest_n: | |
| return | |
| to_delete_files = train_save_model_fils[0: -keep_latest_n] | |
| for item in to_delete_files: | |
| os.remove(item[0]) | |
| def train(self, is_keep_training: bool=False, is_finetune: bool=False) -> None: | |
| ''' | |
| is_keep_training: 是否从断点处加载状态继续训练 | |
| is_finetune: 是否微调,微调的话可能需要冻结部分参数 | |
| ''' | |
| log = self.logger | |
| train_config = self.train_config | |
| save_steps = self.train_config.save_steps | |
| logging_steps = self.train_config.logging_steps | |
| # 梯度累计的步数 | |
| accumulation_steps = train_config.gradient_accumulation_steps | |
| set_seed(train_config.seed) | |
| accelerator = Accelerator( | |
| mixed_precision=train_config.mixed_precision, # 混合精度 | |
| gradient_accumulation_steps=accumulation_steps, # 梯度累积 | |
| project_dir=train_config.train_state_dir, | |
| ) | |
| # 根据剩余内存大小决定是否完全加载数据集到内存中 | |
| unuse_mem = virtual_memory().available / (1024 ** 3) # 单位:GB | |
| unuse_disk = get_free_space_of_disk('./') | |
| # 剩余内存≥48GB将把数据集留在内存中,因为2个显卡+全全部装载900多万的训练数据到内存需要大概43GB的CPU内存 | |
| # 如果不放在内存中,将会使用迭代器生成数据,CPU 内存小于16GB也可以运行,但是不支持顺序打乱。 | |
| # 多GPU keep_in_memory必须=True,否则无法进行分布式训练 | |
| keep_in_memory = True if unuse_mem >= 48.0 or torch.cuda.device_count() >= 2 else False | |
| if accelerator.is_main_process: | |
| log.info('cpu memory available: {:.2f} GB, disk space available: {:.2f} GB, keep dataset in memory: {}.'\ | |
| .format(unuse_mem, unuse_disk, keep_in_memory), save_to_file=True) | |
| log.info('operation: {}, keep training: {}, loading datasets ...'.format('finetune' if is_finetune else 'train', is_keep_training)) | |
| # args for dataloader | |
| num_workers = 0 | |
| # if not self.is_win_platform: | |
| # cpu_cnt = cpu_count(logical=False) | |
| # gpu_cnt = torch.cuda.device_count() | |
| # if cpu_cnt >= 8 * gpu_cnt: | |
| # # num_workers = 4 x number of available GPUs | |
| # num_workers = int(4 * gpu_cnt) | |
| # else: | |
| # num_workers = int(cpu_cnt // 2) | |
| train_dataset = MyDataset( | |
| parquet_file=train_config.train_file, | |
| tokenizer_dir=train_config.tokenizer_dir, | |
| keep_in_memory=keep_in_memory, | |
| max_seq_len=train_config.max_seq_len, | |
| ) | |
| valid_dataset = MyDataset( | |
| parquet_file=train_config.validation_file, | |
| tokenizer_dir=train_config.tokenizer_dir, | |
| keep_in_memory=keep_in_memory, | |
| max_seq_len=train_config.max_seq_len, | |
| ) | |
| batch_size = train_config.batch_size_per_gpu | |
| train_dataloader = DataLoader( | |
| train_dataset, | |
| batch_size=batch_size, | |
| shuffle=True, | |
| collate_fn=train_dataset.collate_fn, | |
| pin_memory=False, | |
| num_workers=num_workers, #设置>1会导致cpu内存缓慢增涨,最后OOM,后面再研究为什么,num_workers=4,一个epoch只减少30分钟 | |
| ) | |
| valid_dataloader = DataLoader( | |
| valid_dataset, | |
| batch_size=batch_size, | |
| shuffle=False, | |
| collate_fn=valid_dataset.collate_fn, | |
| pin_memory=False, | |
| num_workers=num_workers, | |
| ) | |
| device = accelerator.device | |
| log.info('using device: {} '.format(str(device)), save_to_file=True) | |
| # T5: All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]` | |
| tokenizer = train_dataset.tokenizer | |
| decoder_start_token_id = tokenizer.pad_token_id | |
| # for t5, set decoder_start_token_id = pad_token_id | |
| t5_config = get_T5_config(T5ModelConfig(), vocab_size=len(tokenizer), decoder_start_token_id=decoder_start_token_id, eos_token_id=tokenizer.eos_token_id) | |
| model = TextToTextModel(t5_config) | |
| # 微调加载的模型并冻结embedding和encoder | |
| if is_finetune: | |
| model.load_state_dict(torch.load(train_config.finetune_from_ckp_file)) | |
| # print(model) | |
| layers_to_freeze = [model.shared, model.encoder] | |
| for layer in layers_to_freeze: | |
| for param in layer.parameters(): | |
| param.requires_grad = False | |
| # 保存模型配置,方便修改配置后恢复 | |
| save_model_config(t5_config.to_diff_dict(), train_config.model_config_file) | |
| # T5训练,论文推荐使用Adafactor | |
| optimizer = Adafactor(params=model.parameters(), lr=train_config.learn_rate) | |
| # 获取当前机器有多少个GPU,默认全部使用 | |
| num_gpus_used = accelerator.state.num_processes | |
| # 单机多卡,每个step总共的batch_size = batch_size_per_gpu * num_gpus_used | |
| # total_batch_size 初始化为batch_size_per_gpu真的只有CPU的情况 | |
| total_batch_size = train_config.batch_size_per_gpu | |
| if num_gpus_used >= 1: | |
| total_batch_size = num_gpus_used * train_config.batch_size_per_gpu | |
| steps_per_epoch = int(np.ceil(len(train_dataset) // total_batch_size)) | |
| eval_steps = int(np.ceil(len(valid_dataset) // total_batch_size)) | |
| if accelerator.is_main_process: | |
| log.info('train dataset size: {}, steps per epoch:{}; validation dataset size: {}, steps per validation: {}; datalodater num_workers: {}.'\ | |
| .format(len(train_dataset), steps_per_epoch, len(valid_dataset), eval_steps, num_workers), save_to_file=True) | |
| lr_scheduler = torch.optim.lr_scheduler.OneCycleLR( | |
| optimizer=optimizer, | |
| max_lr=train_config.div_factor * train_config.learn_rate, | |
| epochs=train_config.epochs, | |
| steps_per_epoch=int(np.ceil( len(train_dataset) / (batch_size * accumulation_steps) )), # 梯度累积相当于增大了batch_size | |
| div_factor=train_config.div_factor, | |
| cycle_momentum=False, | |
| ) | |
| model, optimizer, lr_scheduler, train_dataloader, valid_dataloader = accelerator.prepare( | |
| model, | |
| optimizer, | |
| lr_scheduler, | |
| train_dataloader, | |
| valid_dataloader, | |
| ) | |
| if is_keep_training: | |
| accelerator.load_state(input_dir=train_config.train_state_dir) | |
| accelerator.register_for_checkpointing(lr_scheduler) | |
| self.model = model | |
| self.accelerator = accelerator | |
| best_bleu4 = 0.0 | |
| best_epoch = 0 | |
| epoch_loss_list = [] | |
| # 添加进度条,只在主进程更新 | |
| if accelerator.is_main_process: | |
| progress = Progress(TextColumn("[progress.description]{task.description}"), | |
| BarColumn(), | |
| TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
| TimeRemainingColumn(), | |
| TimeElapsedColumn(), | |
| TextColumn("[bold blue]{task.fields[show_info]}"), | |
| refresh_per_second=1, # 每1秒钟更新一次,不要频繁更新 | |
| ) | |
| epoch_progress = progress.add_task(description='epoch: ', show_info='', total=train_config.epochs) | |
| steps_progress = progress.add_task(description='steps: ', show_info='', \ | |
| total=np.ceil(steps_per_epoch / logging_steps)) | |
| eval_progress = progress.add_task(description='evaluate: ', show_info='', total=eval_steps, visible=False) | |
| self.progress = progress | |
| self.eval_progress = eval_progress | |
| progress.start() | |
| # end if | |
| for epoch in range(train_config.epochs): | |
| if accelerator.is_main_process: | |
| epoch_show_txt = 'epoch: {}/{}, avg_loss: {:.6f}, best_epoch: {}, best_bleu: {}'.format( | |
| epoch, train_config.epochs, my_average(epoch_loss_list), best_epoch, best_bleu4 | |
| ) | |
| progress.update(epoch_progress, show_info=epoch_show_txt) | |
| progress.reset(steps_progress) | |
| epoch_loss_list = [] | |
| model.train() | |
| # torch.cuda.empty_cache() | |
| for step, batch_data in enumerate(train_dataloader): | |
| input_ids, input_mask = batch_data['input_ids'], batch_data['input_mask'] | |
| target_ids = batch_data['target_ids'] | |
| # for t5 model, all labels set to `-100` are ignored (masked) | |
| target_ids[target_ids == decoder_start_token_id] = -100 | |
| outputs = model( | |
| input_ids=input_ids, | |
| attention_mask=input_mask, | |
| labels=target_ids, | |
| ) | |
| loss = outputs.loss.mean() / accumulation_steps | |
| # attention here! loss.backward() | |
| accelerator.backward(loss) | |
| # 梯度累计 | |
| if (step + 1) % accumulation_steps == 0: | |
| accelerator.clip_grad_norm_(model.parameters(), 1.0) | |
| optimizer.step() | |
| lr_scheduler.step() | |
| optimizer.zero_grad() | |
| # 每隔save_steps步保存一次模型 | |
| if (step + 1) % save_steps == 0 or step == steps_per_epoch: | |
| self.save_model('epoch_{}_latest'.format(epoch)) | |
| accelerator.save_state(output_dir=train_config.train_state_dir) | |
| # ==================================以下记录loss到日志============================================ | |
| # 每n步更新一次,避免频繁的cpu-gpu数据复制 | |
| # 参考:https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html#avoid-unnecessary-cpu-gpu-synchronization | |
| if step % logging_steps == 0 or step == steps_per_epoch: | |
| loss_cpu = loss.detach().item() * accumulation_steps | |
| epoch_loss_list.append(loss_cpu) | |
| info_txt = 'training loss: epoch:{}, step:{}, loss:{}, device:{}'.\ | |
| format(epoch, step, loss_cpu, str(accelerator.device)) | |
| log.info(info_txt, std_out=False, save_to_file=True) # 保存 loss 到文件 | |
| # 更新进度条 | |
| if accelerator.is_main_process: | |
| step_show_txt = 'step: {}/{}, loss: {:.6f}'.format(step, steps_per_epoch, loss_cpu) | |
| progress.advance(steps_progress, advance=1) | |
| progress.update(steps_progress, show_info=step_show_txt) | |
| # ==================================以上记录loss到日志============================================ | |
| # if step >= 20:break | |
| # end for batch setps | |
| model.eval() | |
| cur_bleu4_score = self.evaluate( | |
| model=model, | |
| tokenizer=tokenizer, | |
| valid_dataloader=valid_dataloader, | |
| accelerator=accelerator, | |
| eval_steps=eval_steps, | |
| ) | |
| # save model | |
| if cur_bleu4_score >= best_bleu4: | |
| best_bleu4 = cur_bleu4_score | |
| best_epoch = epoch | |
| # 最多保存最近keep_latest_n_ckp个模型文件 | |
| # self.delete_early_checkpoint(epoch=epoch, keep_latest_n=train_config.keep_latest_n_ckp) | |
| self.save_model('best') | |
| accelerator.save_state(output_dir=train_config.train_state_dir) | |
| # 每个epoch打印一下日志 | |
| if accelerator.is_main_process: | |
| progress.advance(epoch_progress, advance=1) | |
| info_txt = 'epoch log: epoch:{}, avg_loss:{}, cur_bleu4:{}, best_bleu4:{}, best_epoch:{}'.\ | |
| format(epoch, my_average(epoch_loss_list), cur_bleu4_score, best_bleu4, best_epoch) | |
| # log.info(info_txt, std_out=True, save_to_file=True) | |
| self.print_and_log(info_txt, accelerator) | |
| def evaluate(self, | |
| model: TextToTextModel, | |
| tokenizer: PreTrainedTokenizerFast, | |
| valid_dataloader: DataLoader, | |
| accelerator: Accelerator, | |
| eval_steps: int, | |
| ) -> float: | |
| ''' | |
| 评估,返回平均的bleu分数 | |
| ''' | |
| max_seq_len = self.train_config.max_seq_len | |
| batch_decode = tokenizer.batch_decode | |
| bleu4_scores = [] | |
| if accelerator.is_main_process: | |
| self.progress.reset(self.eval_progress) | |
| self.progress.update(self.eval_progress, visible=True) | |
| with torch.no_grad(): | |
| for step, batch_data in enumerate(valid_dataloader): | |
| if accelerator.is_main_process: | |
| self.progress.advance(self.eval_progress, advance=1) | |
| self.progress.update(self.eval_progress, show_info='step: {}/{}'.format(step, eval_steps)) | |
| input_ids, input_mask = batch_data['input_ids'], batch_data['input_mask'] | |
| target_ids = batch_data['target_ids'] | |
| outputs = accelerator.unwrap_model(model).my_generate( | |
| input_ids=input_ids, | |
| attention_mask=input_mask, | |
| max_seq_len=max_seq_len, | |
| ) | |
| # gather data from multi-gpus (used when in ddp mode) | |
| outputs = accelerator.gather_for_metrics(outputs).detach().cpu().numpy() | |
| target_ids = accelerator.gather_for_metrics(target_ids).detach().cpu().numpy() | |
| outputs = batch_decode(outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
| target_ids = batch_decode(target_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
| # print(outputs, target_ids) | |
| bleu4_scores = [get_bleu4_score(reference=target_ids[i], outputs=outputs[i]) for i in range(len(target_ids))] | |
| bleu4_scores.extend(bleu4_scores) | |
| # if step >= 5: break | |
| avg_bleu4_score = my_average(bleu4_scores) | |
| if accelerator.is_main_process: | |
| self.progress.update(self.eval_progress, show_info='bleu4 score: {}'.format(avg_bleu4_score)) | |
| self.progress.update(self.eval_progress, visible=False) | |
| return avg_bleu4_score | |
| def test(self, best_epoch: int=0) -> None: | |
| ''' | |
| ''' | |
| import os | |
| train_config = self.train_config | |
| log = self.logger | |
| # args for dataloader | |
| num_workers = 0 if self.is_win_platform else 4 | |
| test_dataset = MyDataset( | |
| parquet_file=train_config.train_file, | |
| tokenizer_dir=train_config.tokenizer_dir, | |
| keep_in_memory=False if self.is_win_platform else True, | |
| max_seq_len=train_config.max_seq_len, | |
| ) | |
| test_dataloader = DataLoader( | |
| test_dataset, | |
| batch_size=train_config.batch_size_per_gpu, | |
| shuffle=False, | |
| collate_fn=test_dataset.collate_fn, | |
| pin_memory=False, | |
| num_workers=num_workers, | |
| ) | |
| log.info('test dataset size: {}.'.format(len(test_dataset)), save_to_file=True) | |
| set_seed(train_config.seed) | |
| accelerator = Accelerator(mixed_precision=train_config.mixed_precision) | |
| device = accelerator.device | |
| log.info('using device: {} '.format(str(device)), save_to_file=True) | |
| # 获取当前运行使用了多少个GPU | |
| num_gpus_used = accelerator.state.num_processes | |
| # 单机多卡,每个step总共的batch_size = batch_size_per_gpu * num_gpus_used | |
| # total_batch_size 初始化为batch_size_per_gpu真的只有CPU的情况 | |
| total_batch_size = train_config.batch_size_per_gpu | |
| if num_gpus_used >= 1: | |
| total_batch_size = num_gpus_used * train_config.batch_size_per_gpu | |
| # T5: All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ..., config.vocab_size]` | |
| tokenizer = test_dataset.tokenizer | |
| model_file = train_config.model_file.format(best_epoch) | |
| if os.path.isdir(model_file): | |
| # 传入文件夹则 from_pretrained | |
| model = TextToTextModel.from_pretrained(model_file) | |
| else: | |
| # load_state_dict | |
| t5_config = get_T5_config(T5ModelConfig(), vocab_size=len(tokenizer), decoder_start_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id) | |
| model = TextToTextModel(t5_config) | |
| model.load_state_dict(torch.load(model_file, map_location='cpu')) # set cpu for no exception | |
| model, test_dataloader = accelerator.prepare( | |
| model, | |
| test_dataloader, | |
| ) | |
| steps = int(np.ceil(len(test_dataset) // total_batch_size)) | |
| bleu4 = 0.0 | |
| bleu4_scores = [] | |
| batch_decode = tokenizer.batch_decode | |
| max_seq_len = self.train_config.max_seq_len | |
| model.eval() | |
| if accelerator.is_main_process: | |
| progress = Progress(TextColumn("[progress.description]{task.description}"), | |
| BarColumn(), | |
| TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
| TimeRemainingColumn(), | |
| TimeElapsedColumn(), | |
| TextColumn("[bold blue]{task.fields[show_info]}"), | |
| refresh_per_second=1.0, | |
| ) | |
| steps_progress = progress.add_task(description='steps: ', show_info='', total=steps) | |
| progress.start() | |
| with torch.no_grad(): | |
| for step, batch_data in enumerate(test_dataloader): | |
| if accelerator.is_main_process: | |
| progress.advance(steps_progress, advance=1) | |
| progress.update(steps_progress, show_info='step: {}/{}'.format(step, steps)) | |
| input_ids, input_mask = batch_data['input_ids'], batch_data['input_mask'] | |
| target_ids = batch_data['target_ids'] | |
| # s = time.time() | |
| outputs = accelerator.unwrap_model(model).my_generate( | |
| input_ids=input_ids, | |
| attention_mask=input_mask, | |
| max_seq_len=max_seq_len, | |
| ) | |
| # accelerator.print('generate used: {}'.format(time.time() - s)) | |
| # gather data from multi-gpus (used when in ddp mode) | |
| outputs = accelerator.gather_for_metrics(outputs).cpu().numpy() | |
| target_ids = accelerator.gather_for_metrics(target_ids).cpu().numpy() | |
| outputs = batch_decode(outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
| target_ids = batch_decode(target_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False) | |
| # print('outputs: {}'.format(outputs[0:5])) | |
| # print('target_ids: {}'.format(target_ids[0:5])) | |
| # print() | |
| bleu4_scores = [get_bleu4_score(reference=target_ids[i], outputs=outputs[i]) for i in range(len(target_ids))] | |
| bleu4_scores.extend(bleu4_scores) | |
| # if step >= 10: break | |
| avg_bleu4_score = my_average(bleu4_scores) | |
| if accelerator.is_main_process: | |
| progress.update(steps_progress, show_info='bleu4 score: {}'.format(avg_bleu4_score)) | |
| info_txt = 'test_dataset_size: {}, avg_bleu4_score:{}.'.format(len(test_dataset), avg_bleu4_score) | |
| log.info(info_txt, save_to_file=True) | |
| return avg_bleu4_score | |
| def print_and_log(self, info: str, accelerator: Accelerator=None) -> None: | |
| ''' | |
| 使用accelerator.print, 否则多进程打印会异常 | |
| ''' | |
| if not accelerator: | |
| print(info) | |
| else: | |
| accelerator.print(info) | |
| self.logger.info(info, std_out=False, save_to_file=True) | |
| if __name__ == '__main__': | |
| # trainer = ChatTrainer() | |
| train_config = TrainConfig() | |
| model_config = T5ModelConfig() | |
| chat_trainer = ChatTrainer(train_config=train_config, model_config=model_config) | |
| chat_trainer.train() | |
| # chat_trainer.test(best_epoch=0) |