2023年11月

最近看到一个写的挺好的多任务框架(https://github.com/SwinTransformer/AiT,参考了detectron2),分享一下~

多任务训练一般分为两种:data mixing 和 batch mixing。简单来说,对于前者,一个batch中的样本可以来自不同任务,而后者一个batch中任务都是一样的。两者相比,后者实现更加容易,效率更高,并且做数据增强也更方便一点(由Pix2Seq提出,但其实数据增强我认为并不是两者的主要差异)。

先给出我写的一个batch mixing的例子(来自JJJYmmm/Pix2SeqV2-Pytorch):

def get_multi_task_loaders(tokenizer,tasks):

    assert set(tasks) <= set(['detection', 'keypoint', 'segmentation', 'captioning'])

    train_loaders = {}
    valid_loaders = {}

    if 'detection' in tasks:
        detection_train_loader, detection_valid_loader = detection_loaders(
        CFG.dir_root, tokenizer, CFG.img_size, CFG.batch_size, CFG.max_len, tokenizer.PAD_code)
        train_loaders['detection'] = detection_train_loader
        valid_loaders['detection'] = detection_valid_loader
   
    if 'keypoint' in tasks: 
        keypoint_train_loader, keypoint_valid_loader = keypoint_loaders(
        CFG.dir_root, tokenizer,person_kps_info, CFG.img_size, CFG.batch_size, CFG.max_len, tokenizer.PAD_code)
        train_loaders['keypoint'] = keypoint_train_loader
        valid_loaders['keypoint'] = keypoint_valid_loader

    if 'segmentation' in tasks: 
        segmentation_train_loader, segmentation_valid_loader = segmentation_loaders(
        CFG.dir_root, tokenizer,person_kps_info, CFG.img_size, CFG.batch_size, CFG.max_len, tokenizer.PAD_code)
        train_loaders['segmentation'] = segmentation_train_loader
        valid_loaders['segmentation'] = segmentation_valid_loader

    if 'captioning' in tasks:
        img_caption_train_loader, img_caption_valid_loader = img_caption_loaders(
        CFG.dir_root, tokenizer,vocab, CFG.img_size, CFG.batch_size, CFG.max_len, tokenizer.PAD_code)
        train_loaders['captioning'] = img_caption_train_loader
        valid_loaders['captioning'] = img_caption_valid_loader
    
    return train_loaders, valid_loaders

以上代码首先维护一个多任务的dataloader字典,在这里之前(即创建dataset)就可以针对不同任务做对应的数据增强。

# get longest dataloader
epoch_size = 0
longest_loader = None
for name, loader in train_loaders.items():
    if len(loader) > epoch_size:
        epoch_size = len(loader)
        longest_loader = name

# create iter for dataloaders
loader_iters = dict()
for k, v in train_loaders.items():
    if k != longest_loader:
        loader_iters[k] = iter(v)

# iter longest dataloader
tqdm_object = tqdm(train_loaders[longest_loader], total=len(train_loaders[longest_loader]))
for iteration,(x, y, init_lens) in enumerate(tqdm_object):

    optimizer.zero_grad()
        
    total_loss = torch.zeros(1, requires_grad=False, device=CFG.device)
    total_batch = x.size(0)
    # loss_1
    loss = cal_loss_multi_task(model, criterion, x, y, init_lens, task_id = task_ids[longest_loader])
    total_loss = total_loss + loss.item() * task_weights[longest_loader]
    loss *= task_weights[longest_loader] # mul weight
    loss.backward()
    # calculate other tasks' loss
    for k, v in loader_iters.items():
        try:
            (x, y, init_lens) = next(v)
        except StopIteration: # recover other tasks' iter
            loader_iters[k] = iter(train_loaders[k])
            (x, y, init_lens) = next(loader_iters[k])
        total_batch += x.size(0)
        # loss_i
        loss = cal_loss_multi_task(model, criterion, x, y, init_lens, task_id=task_ids[k])
        total_loss = total_loss + loss.item() * task_weights[k]
        loss *= task_weights[k]
        loss.backward()

    # total_loss.backward()
    optimizer.step()

训练时,首先确定batch数最多的任务(数据集A),把它作为训练的最外层,这一部分和单任务训练一致,对于其他任务,则分别创建一个迭代器iterator负责取数据。之后在循环数据集A的时候,每次算出loss_A后,会从其他迭代器中取出对应的数据并计算loss_B/loss_C...,之后根据任务权重对loss进行加权平均,并进行反向传播。

在上述代码中,为了节省显存,每次计算完loss,我都直接乘上权重反向传播了,这个好处在于,每个loss计算完后对应的计算图会被自动释放,如果显式显出加权平均,那么所有任务的计算图都会被保留~

可以看出,batch mixing其实只是在训练过程中加入多个数据集的batch,然后分别算出loss并反向传播罢了。(data mixing也差不多,只是粒度更细一点)

关于data mixing,其实就比batch mixing多了一步操作,就是把所有的数据集拼起来,然后对于每个样本都添加一个字段表示任务。取数据的时候直接从大数据集里面取就可以了。

class ConcatDataset(Dataset[T_co]):
    r"""Dataset as a concatenation of multiple datasets.

    This class is useful to assemble different existing datasets.

    Args:
        datasets (sequence): List of datasets to be concatenated
    """
    datasets: List[Dataset[T_co]]
    cumulative_sizes: List[int]

    @staticmethod
    def cumsum(sequence):
        r, s = [], 0
        for e in sequence:
            l = len(e)
            r.append(l + s)
            s += l
        return r

    def __init__(self, datasets: Iterable[Dataset]) -> None:
        super(ConcatDataset, self).__init__()
        self.datasets = list(datasets)
        assert len(self.datasets) > 0, 'datasets should not be an empty iterable'  # type: ignore[arg-type]
        for d in self.datasets:
            assert not isinstance(d, IterableDataset), "ConcatDataset does not support IterableDataset"
        self.cumulative_sizes = self.cumsum(self.datasets)

    def __len__(self):
        return self.cumulative_sizes[-1]

    def __getitem__(self, idx):
        if idx < 0:
            if -idx > len(self):
                raise ValueError("absolute value of index should not exceed dataset length")
            idx = len(self) + idx
        dataset_idx = bisect.bisect_right(self.cumulative_sizes, idx)
        if dataset_idx == 0:
            sample_idx = idx
        else:
            sample_idx = idx - self.cumulative_sizes[dataset_idx - 1]
        return self.datasets[dataset_idx][sample_idx]

    @property
    def cummulative_sizes(self):
        warnings.warn("cummulative_sizes attribute is renamed to "
                      "cumulative_sizes", DeprecationWarning, stacklevel=2)
        return self.cumulative_sizes

到这里也就知道,data mixing的数据增强也很方便,在拼接数据集之前,各个数据集定义自己的增强方式即可。但是data mixing的最大问题在于:训练过程中,需要循环batch里的每个样本,根据任务的不同分配给不同的Heads处理loss,并行度其实很差。当然,对于大模型训练来说,有时候单张GPU就放一个样本,那这个劣势就相当于没有了~

最后总结就是,data mixing相比于batch mixing,任务粒度更细,并且对于多任务的支持更好(不像batch mixing每加一个任务就要改train代码,data mixing只需要写好数据集和对应处理的Head即可),但是并行度相比于batch mixing较差。

复现多卡训练项目经常会遇到一些匪夷所思的问题,包括但不限于numpy<1.2.0 && numpy>=1.2.0OSError: [Errno 12] 。以下给出尽可能"少犯错"的几个步骤:

  • 拿到一个项目,首先检查包依赖(如requirements.txt),如果使用conda管理虚拟环境,建议每个项目都创建一个env,conda一般会存有各个包的cache,所以不用担心重新搭建一个环境的时间成本;如果作者提供了docker,优先使用docker
  • 有些包会存在历史依赖,最经典的是numpy.float/numpy.intnumpy>=1.2.0后弃用,而有些包如boundary-iou还是沿用了旧版本的写法,所以遇到这种情况直接对着报错改包就行(如np.float->np.float64),当然也可以尝试版本回退
  • 对于深度学习项目,如果允许先拿一张卡跑,因为多卡多进程debug信息很难处理
  • 对于dataloader,统一先设置num_workers=0,有时候线程过多也会导致错误(例如共享内存shm不足导致的bus error),另外windows下跑项目我记得这个值必须为0
  • batch_size_per_GPU=1有时候很有用,特别是对网络大小没什么概念的时候(当然这个也是最好debug的了,谁会不懂OOM呢[哭])
  • 多卡训练debug比较麻烦,网上有很多Vscode/Pycharm方案,这里给一个最原始的pdb注入方式,一般可以处理除了IPC以外的大部分错误了,在想要打断点的地方插入这两行代码即可,调试方式跟gdb差不多

    import pdb
    pdb.set_trace()

剩下的以后想到再补充~

多卡训练的时候发现一个问题:cuda() 方法会无视环境变量CUDA_VISIBLE_DEVICES。示例如下:

CUDA_VISIBLE_DEVICES="3,4,5,6" python -m torch.distributed.launch --nproc_per_node=${N_GPUS} train.py --config_option <your_config_here>

这边指定了可见显卡为3-6号,然而如果在代码里出现cuda(),模型/数据还是会load到0卡(default)。

# load to GPU_0
model.cuda()
model = torch.nn.parallel.DistributedDataParallel(
    model, device_ids=[local_rank], broadcast_buffers=False, find_unused_parameters=False)
sample.cuda()

解决方案1,初始化ddp的时候设置默认cuda设备,然而这个方法对我没用。

torch.cuda.set_device(args.local_rank)

解决方案2(final),显式指定device/GPU_id,以0号进程为例,代码如下:

# load to GPU_3
device = torch.device("cuda:0")
model.to(device)
model = torch.nn.parallel.DistributedDataParallel(
    model, device_ids=[local_rank], broadcast_buffers=False, find_unused_parameters=False)
sample.to(model.device)

这里的cuda:0其实是CUDA_VISIBLE_DEVICES的第一张卡,即3号卡。当然,上述只是举例,旨在说明CUDA_VISIBLE_DEVICES的作用。

正确写法是model.to(local_rank),不同进程直接load模型到对应的GPU上。(之前解法1+cuda()也是ok的,不知道为什么在新的节点上不行~)

ALBEF paper : http://arxiv.org/abs/2107.07651

BLIP paper : http://arxiv.org/abs/2201.12086

BLIP code : https://github.com/salesforce/BLIP

ALBEF

网络结构如下,这篇算得上BLIP/BLIP2的前身了,其三个Loss一直延续至今(当然MLM变成了LM)。具体三个Loss的介绍可以看BLIP2 - JJJYmmm Blog,有以下几个特别点:

  • ITC Loss采用了MOCO的形式,即通过一个momentum encoder来扩大负样本的数量,这也是多了一个momentum model的原因。论文还从模型蒸馏的角度对momentum model做了进一步改进,例如在计算ITC和MLM时引入了伪标签
  • 这里对于文本理解任务,采用的是MLM而不是LM,可能是因为MLM任务相比于LM任务更简单,因为模型只需要预测被mask的单词即可
  • 在计算ITC时得到的Image-Text Similarity可以挑选出hard negatives,专门去做ITM;这个方法在BLIP和BLIP2都用到
  • 在这里三个Encoder都不共享参数,算是典型的双流模型

BLIP

image-20231104162902822

网络结构如上,可以看到不同Encoder之间共享参数。loss则与ALBEF没什么区别。其他值得注意的点有:

  • 对于单模态Encoder,无论是文本还是图片,都是采用self attention提取特征;对于Image-grounded Text Encoder,主要添加了一个cross attention层,KVs是Image Embedding;对于Image-grounded Text Decoder,替换了self attention层(其实就是mask改成casual mask吧~) ; 三个text相关的encoder/decoder都共用FFN;关于参数共享的细节,可以看消融实验
  • 其实一直都有一个问题,为什么Image-grounded要让Image Embeddings作为cross attention的KVs呢?这样text作为query,cross attn的结果不就是通过text加权得到的image features?这样得到的特征应该更多与image有关而不是text有关吧(我能想到的一个原因是auto-regressive限制了decoder的input必须是text)
  • BLIP的主要亮点是对数据集的处理,这里引入了半监督bootstrap的做法,具体看下面这张图就懂了~这里论文同样从模型蒸馏的角度来说明bootstrap的有效性,在后续的实验中也表明,每次对于清洗/扩充后的数据集,都应该从头对模型进行pre-train,这符合模型蒸馏中的学生模型不应该继承教师模型参数的常识。

image-20231104164756650

Repository:https://github.com/salesforce/LAVIS/tree/main/projects/blip2

预训练结构

第一阶段

网络结构如下图。

  • 对于图像特征,采用DETR类似的思路,使用Learned Queries作为输入,Image Features作为cross attention的KVs,希望通过可学习的参数来抽取与文本更相关的视觉特征
  • 对于文本特征,采用传统的Bert Encoder思路
  • 对于多模态特征的融合,与双流模型不同,这里两个Encoder的self attention层是共享参数的,当然与单流模型也不同,因为FFN不共享参数,且视觉特征提取时还会走cross attention层

image-20231104154128875

训练采用的三个Loss函数(ITG/ITM/ITC)主要参考之前的ALBEF工作。

  • Image-Text Contrastive : 这部分的目的主要是对齐图像特征和文本特征的单模态特征,计算方法类似CLIP。与ALBEF不同的是,这里的negative pairs直接采用in-batc方式得到,并没有像ALBEF那样借鉴MOCO得到一个较大的Dictionary
  • Image-Text Matching : 这部分的目的是学习图像特征与文本特征的细粒度对齐,通过外接二分类器计算Loss。这一阶段Query和Text可以互相关注
  • Image-Grounded Text Generation : 这一部分主要是训练Queries捕获有关文本所有信息的视觉特征,因为在这一步Query是无法看到Text信息的,而Text可以通过self attention层看到Query并输出结果,所以Query只能从Image Feature中尽可能提取与文本相关的视觉特征,才能生成一个质量比较高的Text(一个直观理解的explanation)

第二阶段

第二阶段的网络结构如下,通过一个FC对齐Qformer与LLM的维度,并对Qformer进一步微调。Qformer的输出主要作为LLM的一个soft visual prompts,提示LLM的输出。

image-20231104155834896

源码

BLIP2的model文件在lavis/models/blip2_models下,之后默认以此为根目录

阅读顺序(主要类与函数)如下:

  • ../base_model.py : BaseModel(nn.Module)
  • blip2.py : Blip2Basecompute_sim_matrix
  • Qformer.py : BertEmbeddings, BertLayer, BertEncoder, BertPooler, BertModel, BertOnlyMLMHead, BertLMHeadModel
  • blip2_qformer.py : Blip2Qformer,

关注点主要是Qformer的实现,其实就是一个魔改的Bert

  • Learned Queries和Text会一起进入Encoder做attention,两者之间的交互由self-attention层的mask控制,具体mask信息参照论文Figure 2
  • 魔改的Bert会每隔1个Encoder Layer()就在该层self-attention层后添加一个cross-attention层,其KVs是Image Encoder的输出即视觉特征
  • 只有Query部分激活cross-attention层进行计算,会通过input[:, :query_length, :]进行截取;同理,如果输入只有Text(Unimodal)则根本不会进行cross-attn计算
  • past_key_values或者说KV cache是在自回归decoder推理时加速的trick,具体来说就是decoder上一次运算各层attention的结果KVs(即当前所有token的embedding信息)会被保存,下次运算时,只需要输入新的token(seq_len=1),进行attention计算时加入之前保存的KVs即可。这样做的可行性主要来自自回归模式的无后效性,token做attention时只会和自己以及之前的token交互
  • 在Qformer第一阶段训练中,past_key_values只使用一次:单模态阶段分别encode图像(Query)特征和文本特征,在encode图像(Query)特征时保存各个attn层的KVs;在计算ITG损失时,由于Query和Text都可以看到所有Query,于是这里就使用了当时单模态计算Query特征时的KVs,模型就只需要跑Text部分即可(使用past_key_values后,Query不需要也不能拼接在Text前面作为输入)
  • 一般来说计算ITM损失时也可以利用计算单模态Query时的产生的KVs,代码里之所以没这么做,是因为计算ITM时额外使用了hard negatives mining,输入已经发生变化