ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

提高模型的训练性能(一)

2021-07-29 14:31:40  阅读:211  来源: 互联网

标签:训练 ops 模型 dataset self accu grad 性能 grads


转载地址:https://bbs.huaweicloud.com/forum/thread-118716-1-1.html

作者:李响

梯度累积引入Mini-batch的概念,首先对每个Mini-batch的数据计算loss和梯度,但不立即更新模型参数,而是先对所得梯度进行累加,然后在指定数量(N)个Mini-batch之后,用累积后的梯度更新网络参数。下次训练前清空过往累积梯度后重新累加,如此往复。最终目的是为了达到跟直接用N*Mini-batch数据训练几乎同样的效果。

在单机模式下,主要通过将训练流程拆分为正向反向训练、参数更新和累积梯度清理三个部分实现梯度累积。

(以下以MNIST数据集为例)

导入需要的库文件:

import argparse

import os

from collections.abc import Iterable

import mindspore.nn as nn

from mindspore import ParameterTuple

from mindspore import context, DatasetHelper, save_checkpoint

from mindspore.nn import Cell

import mindspore.ops as ops

from model_zoo.official.cv.lenet.src.dataset import create_dataset

from model_zoo.official.cv.lenet.src.lenet import LeNet5

定义训练流程:

将训练流程拆分为正向反向训练、参数更新和累积梯度清理三个部分:

TrainForwardBackward计算loss和梯度,利用grad_sum实现梯度累加。

TrainOptim实现参数更新。

TrainClear实现对梯度累加变量grad_sum清零。

_sum_op = ops.MultitypeFuncGraph("grad_sum_op")

_clear_op = ops.MultitypeFuncGraph("clear_op")

@_sum_op.register("Tensor", "Tensor")

def _cumulative_grad(grad_sum, grad):

    """Apply grad sum to cumulative gradient."""

    add = ops.AssignAdd()

    return add(grad_sum, grad)

@_clear_op.register("Tensor", "Tensor")

def _clear_grad_sum(grad_sum, zero):

    """Apply zero to clear grad_sum."""

    success = True

    success = ops.depend(success, ops.assign(grad_sum, zero))

    return success

class TrainForwardBackward(Cell):

    def __init__(self, network, optimizer, grad_sum, sens=1.0):

        super(TrainForwardBackward, self).__init__(auto_prefix=False)

        self.network = network

        self.network.set_grad()

        self.network.add_flags(defer_inline=True)

        self.weights = ParameterTuple(network.trainable_params())

        self.optimizer = optimizer

        self.grad_sum = grad_sum

        self.grad = ops.GradOperation(get_by_list=True, sens_param=True)

        self.sens = sens

        self.hyper_map = ops.HyperMap()

    def construct(self, *inputs):

        weights = self.weights

        loss = self.network(*inputs)

        sens = ops.Fill()(ops.DType()(loss), ops.Shape()(loss), self.sens)

        grads = self.grad(self.network, weights)(*inputs, sens)

        return ops.depend(loss, self.hyper_map(ops.partial(_sum_op), self.grad_sum, grads))

class TrainOptim(Cell):

    def __init__(self, optimizer, grad_sum):

        super(TrainOptim, self).__init__(auto_prefix=False)

        self.optimizer = optimizer

        self.grad_sum = grad_sum

    def construct(self):

        return self.optimizer(self.grad_sum)

class TrainClear(Cell):

    def __init__(self, grad_sum, zeros):

        super(TrainClear, self).__init__(auto_prefix=False)

        self.grad_sum = grad_sum

        self.zeros = zeros

        self.hyper_map = ops.HyperMap()

    def construct(self):

        success = self.hyper_map(ops.partial(_clear_op), self.grad_sum, self.zeros)

        return success

训练并保存模型

调用网络、优化器及损失函数,然后自定义GradientAccumulation的train_process接口,进行模型训练。

if __name__ == "__main__":

    parser = argparse.ArgumentParser(description='MindSpore Grad Cumulative Example')

    parser.add_argument('--device_target', type=str, default="GPU", choices=['GPU'],

                        help='device where the code will be implemented (default: GPU)')

    parser.add_argument('--data_path', type=str, default="./Data",

                        help='path where the dataset is saved')

    args = parser.parse_args()

    context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target)

    ds_train = create_dataset(os.path.join(args.data_path, "train"), 32)

    net = LeNet5(10)

    net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")

    net_opt = nn.Momentum(net.trainable_params(), 0.01, 0.9)

    model = GradientAccumulation(net, net_loss, net_opt)

    print("============== Starting Training ==============")

    model.train_process(10, ds_train, mini_steps=4)

上面阐述的是单机模式,如果在并行模式下,则需要改变策略

在SEMI_AUTO_PARALLEL和AUTO_PARALLEL模式下使用梯度累积,主要是将累积迭代和更新迭代作为两张图下发并且交替执行。在累积迭代图上,只执行正反向运算及梯度累加。在更新迭代图上,执行正反向运算和参数更新。

定义并行训练流程

通常情况下,定义了正向网络后会使用TrainOneStepCell将网络正反向及优化器关联到一起。但是梯度累积时存在累积和更新两种情况,所以我们要基于原有类定义做一些改造。样例代码如下:

import numpy as np

import mindspore.common.dtype as mstype

from mindspore import ops, context, Tensor, Parameter

from mindspore.nn import TrainOneStepCell

from mindspore.common.initializer import initializer

zeroslike = ops.ZerosLike()

reset_accu_grads = ops.MultitypeFuncGraph("reset_accu_grads")

@reset_accu_grads.register("Tensor")

def _reset_accu_grads(accu_grad):

    succ = True

    return ops.depend(succ, ops.assign(accu_grad, zeroslike(accu_grad)))

cast = ops.Cast()

update_accu_grads = ops.MultitypeFuncGraph("update_accu_grads")

@update_accu_grads.register("Tensor", "Tensor")

def _update_accu_grads(accu_grad, grad):

    succ = True

    return ops.depend(succ, ops.assign_add(accu_grad, cast(grad, mstype.float32)))

class TrainAccuStepsCell(TrainOneStepCell):

    def __init__(self, network, optimizer, sens=1.0):

        super(TrainAccuStepsCell, self).__init__(network, optimizer, sens)

        self.accumulation = False

        self.accumulation_steps = context.get_auto_parallel_context("grad_accumulation_step")

        self.accu_grads = self.weights.clone(prefix="accu_grads", init='zeros')

        self.hyper_map = ops.HyperMap()

    def construct(self, *inputs):

        """Defines the computation performed."""

        weights = self.weights

        loss = self.network(*inputs)

        sens = ops.Fill()(ops.DType()(loss), ops.Shape()(loss), self.sens)

        grads = self.grad(self.network, weights)(*inputs, sens)

        if self.accumulation and self.accumulation_steps > 1:

            accu_succ = self.hyper_map(update_accu_grads, self.accu_grads, grads)

            loss = ops.depend(loss, accu_succ)

        if self.accumulation:

            succ = False

        else:

            grads = self.grad_reducer(grads)

            accu_grads = ops.depend(self.accu_grads, grads)

            accu_succ = self.hyper_map(reset_accu_grads, accu_grads)

            loss = ops.depend(loss, accu_succ)

            succ = self.optimizer(grads)

        return ops.depend(loss, succ)

在TrainOneStepCell的基础上,增加累积标记accumulation和累积梯度参数accu_grads的定义,分别用于区分训练流程和保存累积梯度值。在累积迭代图上,accumulation为True,只执行正反向运算并将梯度累加到参数accu_grads。在更新迭代图上,accumulation为False,执行正反向运算和参数更新。在动态loss scale场景下,除了梯度需要累积外,溢出标志位也需要累积判断,可以基于TrainOneStepWithLossScaleCell改造,实现代码如下:

import numpy as np

import mindspore.common.dtype as mstype

from mindspore import ops, context, Tensor, Parameter

from mindspore.nn import TrainOneStepWithLossScaleCell

from mindspore.nn.wrap.loss_scale import _grad_scale

from mindspore.common.initializer import initializer

zeroslike = ops.ZerosLike()

reset_accu_grads = ops.MultitypeFuncGraph("reset_accu_grads")

@reset_accu_grads.register("Tensor")

def _reset_accu_grads(accu_grad):

    succ = True

    return ops.depend(succ, ops.assign(accu_grad, zeroslike(accu_grad)))

cast = ops.Cast()

update_accu_grads = ops.MultitypeFuncGraph("update_accu_grads")

@update_accu_grads.register("Tensor", "Tensor")

def _update_accu_grads(accu_grad, grad):

    succ = True

    return ops.depend(succ, ops.assign_add(accu_grad, cast(grad, mstype.float32)))

class TrainAccuStepsWithLossScaleCell(TrainOneStepWithLossScaleCell):

    def __init__(self, network, optimizer, scale_sense):

        super(TrainAccuStepsWithLossScaleCell, self).__init__(network, optimizer, scale_sense)

        self.accumulation = False

        self.accumulation_steps = context.get_auto_parallel_context("grad_accumulation_step")

        self.one = Tensor(np.array([1]).astype(np.int32))

        self.zero = Tensor(np.array([0]).astype(np.int32))

        self.accu_grads = self.weights.clone(prefix="accu_grads", init='zeros')

        self.accu_overflow = Parameter(initializer(0, [1], mstype.int32))

        self.accu_loss = Parameter(initializer(0, [1], mstype.float32))

        self.cast = ops.Cast()

        self.logical_or = ops.LogicalOr()

        self.not_equal = ops.NotEqual()

        self.select = ops.Select()

        self.reshape = ops.Reshape()

    def construct(self, *inputs):

        """Defines the computation performed."""

        weights = self.weights

        loss = self.network(*inputs)

        scaling_sens = self.scale_sense

        status, scaling_sens = self.start_overflow_check(loss, scaling_sens)

        scaling_sens_filled = ops.ones_like(loss) * ops.cast(scaling_sens, ops.dtype(loss))

        grads = self.grad(self.network, weights)(*inputs, scaling_sens_filled)

        # accumulate gradients

        if self.accumulation and self.accumulation_steps > 1:

            accu_succ = self.hyper_map(update_accu_grads, self.accu_grads, grads)

            loss = ops.depend(loss, accu_succ)

        overflow = self.get_overflow_status(status, grads)

        overflow = self.logical_or(self.not_equal(self.accu_overflow, self.zero), overflow)

        accu_overflow = self.select(overflow, self.one, self.zero)

        if self.accumulation:

            succ = False

            self.accu_overflow = accu_overflow

        else:

            self.accu_overflow = self.zero

            # apply grad reducer on grads

            grads = self.grad_reducer(grads)

            grads = self.hyper_map(ops.partial(_grad_scale, scaling_sens), grads)

            accu_overflow = self.allreduce(accu_overflow)

            overflow = self.less_equal(self.base, accu_overflow)

            accu_grads = ops.depend(self.accu_grads, grads)

            accu_succ = self.hyper_map(reset_accu_grads, accu_grads)

            overflow = ops.depend(overflow, accu_succ)

            overflow = self.reshape(overflow, (()))

            overflow = self.process_loss_scale(overflow)

            if overflow:

                succ = False

            else:

                succ = self.optimizer(grads)

        ret = (loss, overflow, scaling_sens)

        return ops.depend(ret, succ)

定义并行训练模型

经过cell_wrapper封装的网络已经包含了正反向和优化器实现,我们还需要将数据集对接到网络并实现两张图交替执行。这里基于框架中的Model接口实现上述功能。

import math

from mindspore.train.callback import RunContext

from mindspore import context

from mindspore.context import ParallelMode

from mindspore import Model, connect_network_with_dataset

from mindspore.common.dtype import pytype_to_dtype

from mindspore._c_expression import init_exec_dataset

from mindspore.train.train_thor.dataset_helper import DatasetHelper

def _convert_type(types):

    """

    Convert from numpy type to tensor type.

    Args:

        types (list): Numpy type list of element in dataset.

    Returns:

        list, list of element in dataset.

    """

    ms_types = []

    for np_type in types:

        ms_type = pytype_to_dtype(np_type)

        ms_types.append(ms_type)

    return ms_types

def _get_types_and_shapes(dataset):

    """Get dataset types and shapes."""

    dataset_types = _convert_type(dataset.output_types())

    dataset_shapes = dataset.output_shapes()

    return dataset_types, dataset_shapes

def _exec_datagraph(exec_dataset, dataset_size, phase='dataset'):

    """Initialize and execute the dataset graph."""

    batch_size = exec_dataset.get_batch_size()

    input_indexs = exec_dataset.input_indexs

    # transform data format

    dataset_types, dataset_shapes = _get_types_and_shapes(exec_dataset)

    init_exec_dataset(exec_dataset.__transfer_dataset__.queue_name,

                      dataset_size,

                      batch_size,

                      dataset_types,

                      dataset_shapes,

                      input_indexs,

                      phase=phase,

                      need_run=False)

class Model_ACCU(Model):

    def __init__(self, network, loss_fn=None, optimizer=None, metrics=None, eval_network=None,

                 eval_indexes=None, amp_level="O0", **kwargs):

        super(Model_ACCU, self).__init__(network, loss_fn, optimizer, metrics, eval_network,

                                         eval_indexes, amp_level, **kwargs)

        self._frequency = context.get_auto_parallel_context("grad_accumulation_step")

        self._train_network = self._build_train_network()

    def _exec_preprocess(self, network, is_train, phase, dataset, dataset_sink_mode, sink_size=-1,

                         epoch_num=1, iter_first_order=1):

        """Initializes dataset."""

        if dataset_sink_mode and not is_train:

            dataset.__loop_size__ = 1

        dataset_helper = DatasetHelper(dataset, dataset_sink_mode, sink_size, epoch_num, iter_first_order)

        if dataset_sink_mode and context.get_context("device_target") != "GPU":

            network = connect_network_with_dataset(network, dataset_helper)

        network.set_train(is_train)

        network.phase = phase

        if self._parallel_mode in (ParallelMode.SEMI_AUTO_PARALLEL, ParallelMode.AUTO_PARALLEL):

            network.set_auto_parallel()

        return dataset_helper, network

    def _train_dataset_sink_process(self, epoch, train_dataset, list_callback=None, cb_params=None, sink_size=-1):

        """

        Training process. The data would be passed to network through dataset channel.

        Args:

            epoch (int): Total number of iterations on the data.

            train_dataset (Dataset): A training dataset iterator. If there is no

                                     loss_fn, a tuple with multiple data (data1, data2, data3, ...) should be

                                     returned and passed to the network. Otherwise, a tuple (data, label) should

                                     be returned. The data and label would be passed to the network and loss

                                     function respectively.

            list_callback (Callback): Executor of callback list. Default: None.

            cb_params (_InternalCallbackParam): Callback parameters. Default: None.

            sink_size (int): Control the amount of data in each sink. Default: -1.

        """

        if sink_size == -1:

            epoch_num = epoch

        else:

            epoch_num = math.ceil(epoch * sink_size / train_dataset.get_dataset_size())

        iter_first_order = 1

        iter_second_order = self._frequency - 1

        train_dataset.__loop_size__ = iter_second_order

        dataset_helper, train_network = self._exec_preprocess(self._train_network,

                                                              is_train=True,

                                                              phase='train',

                                                              dataset=train_dataset,

                                                              dataset_sink_mode=True,

                                                              sink_size=sink_size,

                                                              epoch_num=epoch_num,

                                                              iter_first_order=iter_first_order)

        self._train_network = train_network

        cb_params.train_network = self._train_network

        cb_params.cur_step_num = 0

        run_context = RunContext(cb_params)

        list_callback.begin(run_context)

        # used to stop training for early stop, such as stopAtTIme or stopATStep

        should_stop = False

        switch_branch_one = True

        index_first_order = 0

        train_network_init_flag = True

        has_do_dataset_init = False

        for i in range(epoch):

            cb_params.cur_epoch_num = i + 1

            list_callback.epoch_begin(run_context)

            # for data sink dataset_helper only iter once, other wise iter epoch_size times.

            for inputs in dataset_helper:

                list_callback.step_begin(run_context)

                if switch_branch_one:

                    cb_params.cur_step_num += iter_second_order

                    if train_network_init_flag:

                        self._train_network.add_flags_recursive(accumulation=True)

                    self._train_network.phase = 'train0'

                else:

                    cb_params.cur_step_num += iter_first_order

                    if train_network_init_flag:

                        self._train_network.add_flags_recursive(accumulation=False)

                        train_network_init_flag = False

                    self._train_network.phase = 'train1'

                    if not has_do_dataset_init:

                        _exec_datagraph(train_dataset, iter_first_order, phase='train1_dataset')

                        has_do_dataset_init = True

                switch_branch_one = not switch_branch_one

                outputs = self._train_network(*inputs)

                cb_params.net_outputs = outputs

                list_callback.step_end(run_context)

            list_callback.epoch_end(run_context)

            should_stop = should_stop or run_context.get_stop_requested()

            if should_stop:

                break

        dataset_helper.stop_send()

        list_callback.end(run_context)

训练模型

完成上述定义后,即可利用训练接口完成模型训练。首先需要在context.set_auto_parallel_context配置grad_accumulation_step参数,使能梯度累积。其次利用改造的cell_warapper封装网络结构,传入Model_ACCU中初始化模型。

context.set_auto_parallel_context(parallel_mode=ParallelMode.AUTO_PARALLEL, gradients_mean=True, grad_accumulation_step=6)

loss_cb = LossMonitor()

data_path = os.getenv('DATA_PATH')

batch_size = 32

dataset = create_dataset(data_path, batch_size=batch_size)

num_classes = 10

net = resnet50(batch_size, num_classes)

loss = SoftmaxCrossEntropyExpand(sparse=True)

opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), 0.01, 0.9)

net_with_loss = nn.WithLossCell(net, loss)

net_with_loss = VirtualDatasetCell(net_with_loss)

wrap_net = TrainAccuStepsCell(net_with_loss, opt)

model = Model_ACCU(wrap_net)

model.train(epoch_size, dataset, callbacks=[loss_cb], dataset_sink_mode=True)

标签:训练,ops,模型,dataset,self,accu,grad,性能,grads
来源: https://blog.csdn.net/Kenji_Shinji/article/details/119181701

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有