ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

NCP:可实现自动驾驶控制的神经回路策略

2021-12-17 21:58:22  阅读:299  来源: 互联网

标签:NCP self 回路 驾驶 sensory init params ._ size


引入

  • 自动驾驶是现在人工智能技术发展的一个重要领域

  • 本次介绍一个很有意思的工作,其中介绍了一种新型的神经网络模型

  • 该方法受线虫等小型动物大脑的启发,仅用数十个神经元即可控制自动驾驶汽车,而常规深度神经网络方法(如 Inception、ResNet、VGG 等)则需要数百万神经元

  • 这一新型网络仅使用 75000 个参数、19 个神经元,比之前减少了数万倍

  • 自动驾驶的 Demo 演示如下方的动图所示:

  • 不过由于论文中使用的自动驾驶数据集比较大而且获取比较困难,所以本次介绍的只是其中的 NCP 模型,迁移 NCP 项目至 Paddle框架,以及使用该模型对一个简单的序列数据进行拟合

参考资料

  • 论文:Neural circuit policies enabling auditable autonomy

  • 官方项目:mlech26l/keras-ncp

  • Paddle 实现:jm12138/keras-ncp

  • 引用:

    @article{lechner2020neural,
      title={Neural circuit policies enabling auditable autonomy},
      author={Lechner, Mathias and Hasani, Ramin and Amini, Alexander and Henzinger, Thomas A and Rus, Daniela and Grosu, Radu},
      journal={Nature Machine Intelligence},
      volume={2},
      number={10},
      pages={642--652},
      year={2020},
      publisher={Nature Publishing Group}
    }
    

NCP

  • 神经回路策略(Neural Circuit Policies, NCP)是基于 LTC 神经元和突触模型设计的稀疏循环神经网络

  • 其中 LTC 神经元的公式表达如下,具体的推导过程可以参考论文(实在看不太懂,就不瞎解释了):

  • 具体的代码实现如下:

import paddle
import paddle.nn as nn
import numpy as np


class LTCCell(nn.Layer):
    def __init__(
        self,
        wiring,
        in_features=None,
        input_mapping="affine",
        output_mapping="affine",
        ode_unfolds=6,
        epsilon=1e-8,
    ):
        super(LTCCell, self).__init__()
        if in_features is not None:
            wiring.build((None, in_features))
        if not wiring.is_built():
            raise ValueError(
                "Wiring error! Unknown number of input features. Please pass the parameter 'in_features' or call the 'wiring.build()'."
            )
        self._init_ranges = {
            "gleak": (0.001, 1.0),
            "vleak": (-0.2, 0.2),
            "cm": (0.4, 0.6),
            "w": (0.001, 1.0),
            "sigma": (3, 8),
            "mu": (0.3, 0.8),
            "sensory_w": (0.001, 1.0),
            "sensory_sigma": (3, 8),
            "sensory_mu": (0.3, 0.8),
        }
        self._wiring = wiring
        self._input_mapping = input_mapping
        self._output_mapping = output_mapping
        self._ode_unfolds = ode_unfolds
        self._epsilon = epsilon
        self._allocate_parameters()

    @property
    def state_size(self):
        return self._wiring.units

    @property
    def sensory_size(self):
        return self._wiring.input_dim

    @property
    def motor_size(self):
        return self._wiring.output_dim

    @property
    def output_size(self):
        return self.motor_size

    @property
    def synapse_count(self):
        return np.sum(np.abs(self._wiring.adjacency_matrix))

    @property
    def sensory_synapse_count(self):
        return np.sum(np.abs(self._wiring.adjacency_matrix))

    def add_weight(self, name, init_value):
        param = self.create_parameter(
            init_value.shape, attr=nn.initializer.Assign(init_value))
        self.add_parameter(name, param)
        return param

    def _get_init_value(self, shape, param_name):
        minval, maxval = self._init_ranges[param_name]
        if minval == maxval:
            return paddle.ones(shape) * minval
        else:
            return paddle.rand(shape) * (maxval - minval) + minval

    def _allocate_parameters(self):
        print("alloc!")
        self._params = {}
        self._params["gleak"] = self.add_weight(
            name="gleak", init_value=self._get_init_value((self.state_size,), "gleak")
        )
        self._params["vleak"] = self.add_weight(
            name="vleak", init_value=self._get_init_value((self.state_size,), "vleak")
        )
        self._params["cm"] = self.add_weight(
            name="cm", init_value=self._get_init_value((self.state_size,), "cm")
        )
        self._params["sigma"] = self.add_weight(
            name="sigma",
            init_value=self._get_init_value(
                (self.state_size, self.state_size), "sigma"
            ),
        )
        self._params["mu"] = self.add_weight(
            name="mu",
            init_value=self._get_init_value(
                (self.state_size, self.state_size), "mu"),
        )
        self._params["w"] = self.add_weight(
            name="w",
            init_value=self._get_init_value(
                (self.state_size, self.state_size), "w"),
        )
        self._params["erev"] = self.add_weight(
            name="erev",
            init_value=paddle.to_tensor(self._wiring.erev_initializer()),
        )
        self._params["sensory_sigma"] = self.add_weight(
            name="sensory_sigma",
            init_value=self._get_init_value(
                (self.sensory_size, self.state_size), "sensory_sigma"
            ),
        )
        self._params["sensory_mu"] = self.add_weight(
            name="sensory_mu",
            init_value=self._get_init_value(
                (self.sensory_size, self.state_size), "sensory_mu"
            ),
        )
        self._params["sensory_w"] = self.add_weight(
            name="sensory_w",
            init_value=self._get_init_value(
                (self.sensory_size, self.state_size), "sensory_w"
            ),
        )
        self._params["sensory_erev"] = self.add_weight(
            name="sensory_erev",
            init_value=paddle.to_tensor(
                self._wiring.sensory_erev_initializer()),
        )

        self._params["sparsity_mask"] = paddle.to_tensor(
            np.abs(self._wiring.adjacency_matrix)
        )
        self._params["sensory_sparsity_mask"] = paddle.to_tensor(
            np.abs(self._wiring.sensory_adjacency_matrix)
        )

        if self._input_mapping in ["affine", "linear"]:
            self._params["input_w"] = self.add_weight(
                name="input_w",
                init_value=paddle.ones((self.sensory_size,)),
            )
        if self._input_mapping == "affine":
            self._params["input_b"] = self.add_weight(
                name="input_b",
                init_value=paddle.zeros((self.sensory_size,)),
            )

        if self._output_mapping in ["affine", "linear"]:
            self._params["output_w"] = self.add_weight(
                name="output_w",
                init_value=paddle.ones((self.motor_size,)),
            )
        if self._output_mapping == "affine":
            self._params["output_b"] = self.add_weight(
                name="output_b",
                init_value=paddle.zeros((self.motor_size,)),
            )

    def _sigmoid(self, v_pre, mu, sigma):
        v_pre = paddle.unsqueeze(v_pre, -1)  # For broadcasting
        mues = v_pre - mu
        x = sigma * mues
        return nn.functional.sigmoid(x)

    def _ode_solver(self, inputs, state, elapsed_time):
        v_pre = state

        # We can pre-compute the effects of the sensory neurons here
        sensory_w_activation = self._params["sensory_w"] * self._sigmoid(
            inputs, self._params["sensory_mu"], self._params["sensory_sigma"]
        )
        sensory_w_activation *= self._params["sensory_sparsity_mask"]

        sensory_rev_activation = sensory_w_activation * \
            self._params["sensory_erev"]

        # Reduce over dimension 1 (=source sensory neurons)
        w_numerator_sensory = paddle.sum(sensory_rev_activation, axis=1)
        w_denominator_sensory = paddle.sum(sensory_w_activation, axis=1)

        # cm/t is loop invariant
        cm_t = self._params["cm"] / (elapsed_time / self._ode_unfolds)

        # Unfold the multiply ODE multiple times into one RNN step
        for t in range(self._ode_unfolds):
            w_activation = self._params["w"] * self._sigmoid(
                v_pre, self._params["mu"], self._params["sigma"]
            )

            w_activation *= self._params["sparsity_mask"]

            rev_activation = w_activation * self._params["erev"]

            # Reduce over dimension 1 (=source neurons)
            w_numerator = paddle.sum(
                rev_activation, axis=1) + w_numerator_sensory
            w_denominator = paddle.sum(
                w_activation, axis=1) + w_denominator_sensory

            numerator = (
                cm_t * v_pre
                + self._params["gleak"] * self._params["vleak"]
                + w_numerator
            )
            denominator = cm_t + self._params["gleak"] + w_denominator

            # Avoid dividing by 0
            v_pre = numerator / (denominator + self._epsilon)

        return v_pre

    def _map_inputs(self, inputs):
        if self._input_mapping in ["affine", "linear"]:
            inputs = inputs * self._params["input_w"]
        if self._input_mapping == "affine":
            inputs = inputs + self._params["input_b"]
        return inputs

    def _map_outputs(self, state):
        output = state
        if self.motor_size < self.state_size:
            output = output[:, 0: self.motor_size]  # slice

        if self._output_mapping in ["affine", "linear"]:
            output = output * self._params["output_w"]
        if self._output_mapping == "affine":
            output = output + self._params["output_b"]
        return output

    def _clip(self, w):
        return nn.functional.relu(w)

    def apply_weight_constraints(self):
        self._params["w"].set_value(self._clip(self._params["w"].detach()))
        self._params["sensory_w"].set_value(self._clip(self._params["sensory_w"].detach()))
        self._params["cm"].set_value(self._clip(self._params["cm"].detach()))
        self._params["gleak"].set_value(self._clip(self._params["gleak"].detach()))

    def forward(self, inputs, states):
        # Regularly sampled mode (elapsed time = 1 second)
        elapsed_time = 1.0
        inputs = self._map_inputs(inputs)

        next_state = self._ode_solver(inputs, states, elapsed_time)

        outputs = self._map_outputs(next_state)

        return outputs, next_state

简单使用

  • 虽然原理看上去非常的深奥,实际上也确实非常深奥,但是 LTC 神经元使用起来和普通的 RNN 神经元区别不大

  • 下面就通过一个简单的序列拟合的任务来介绍一下如何使用 NCP 模型,这也是官方项目中提供的一个示例任务

同步项目代码

!git clone https://github.com/jm12138/keras-ncp

切换到项目目录

%cd ~/keras-ncp
%matplotlib inline
/home/aistudio/keras-ncp

导入基础模块

import paddle
import numpy as np
import paddle.nn as nn
import kerasncp as kncp
import matplotlib.pyplot as plt
from paddle.optimizer import Adam
from kerasncp.paddle import LTCCell
from paddle.io import DataLoader, TensorDataset

搭建一个简单的 RNN 模型网络

class RNNSequence(nn.Layer):
    def __init__(
        self,
        rnn_cell,
    ):
        super(RNNSequence, self).__init__()
        self.rnn_cell = rnn_cell

    def forward(self, x):
        batch_size, seq_len = x.shape[:2]
        hidden_state = paddle.zeros((batch_size, self.rnn_cell.state_size))
        outputs = []
        for t in range(seq_len):
            inputs = x[:, t]
            new_output, hidden_state = self.rnn_cell.forward(
                inputs, hidden_state)
            outputs.append(new_output)
        outputs = paddle.stack(outputs, axis=1)  # return entire sequence
        return outputs

搭建一个简单的模型学习器

  • 这个模型学习器是魔改了一下 Paddle.Model 实现的

  • 可以正常训练验证推理,不过看起来不太优雅

class SequenceLearner(paddle.Model):
    def train_batch(self, inputs, labels=None, update=True):
        x, y = inputs[0], labels[0]
        y_hat = self.network.forward(x)
        y_hat = y_hat.reshape(y.shape)
        loss = self._loss(y_hat, y)
        loss.backward()
        if update:
            self._optimizer.step()
            self._optimizer.clear_grad()
            self.network.rnn_cell.apply_weight_constraints()
        return [loss.numpy()]

    def eval_batch(self, inputs, labels=None):
        x, y = inputs[0], labels[0]
        y_hat = self.network.forward(x)
        y_hat = y_hat.reshape(y.shape)
        loss = self._loss(y_hat, y)
        return [loss.numpy()]

    def predict_batch(self, inputs):
        x = inputs[0]
        y_hat = self.network.forward(x)
        return [x.numpy(), y_hat.numpy()]

构建数据

  • 生成一个 sin 和 一个 cos 序列作为输入特征,拟合的标签为另一个 sin 函数
  • 序列长度为 128
# 数据参数
in_features = 2
out_features = 1
N = 128

# 生成数据
data_x = np.stack(
    [np.sin(np.linspace(0, 3 * np.pi, N)), 
    np.cos(np.linspace(0, 3 * np.pi, N))], 
    axis=1
)
data_x = np.expand_dims(data_x, axis=0).astype(np.float32)
data_y = np.sin(np.linspace(0, 6 * np.pi, N)).reshape([1, N, 1]).astype(np.float32)

# 预览数据
print("data_x.shape: ", str(data_x.shape))
print("data_y.shape: ", str(data_y.shape))
for i in range(in_features):
    plt.plot(range(N), data_x[0, :, i], color='black')
for i in range(out_features):
    plt.plot(range(N), data_y[0, :, i], color='red')

# 转换为 Tensor
data_x = paddle.to_tensor(data_x)
data_y = paddle.to_tensor(data_y)

# 组建数据集和读取器
train_dataloader = DataLoader(TensorDataset(
    [data_x, data_y]), batch_size=1, shuffle=True, num_workers=0)
val_dataloader = DataLoader(TensorDataset(
    [data_x, data_y]), batch_size=1, shuffle=False, num_workers=0)
data_x.shape:  (1, 128, 2)
data_y.shape:  (1, 128, 1)


/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/matplotlib/cbook/__init__.py:2349: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  if isinstance(obj, collections.Iterator):
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/matplotlib/cbook/__init__.py:2366: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  return list(data) if isinstance(data, collections.MappingView) else data

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Hxd2rJ1b-1639749211251)(output_16_2.png)]

使用 NCP 构建 RNN 序列模型

wiring = kncp.wirings.FullyConnected(8, out_features)
ltc_cell = LTCCell(wiring, in_features)
ltc_sequence = RNNSequence(ltc_cell)

配置模型训练器

loss = nn.MSELoss()
opt = Adam(learning_rate=0.01, parameters=ltc_sequence.parameters())

learn = SequenceLearner(ltc_sequence)
learn.prepare(opt, loss)

模型训练和验证

  • 模型的拟合过程如下图所示:

  • 可以看到对应这样一个简单的任务,NCP 的拟合效果还是相当不错的

learn.fit(train_dataloader, epochs=400, verbose=2)
learn.evaluate(val_dataloader)

模型测试

x, y = learn.predict(val_dataloader)
x = x[0]
y = y[0]


for i in range(in_features):
    plt.plot(range(N), data_x[0, :, i], color='black')
for i in range(out_features):
    plt.plot(range(N), data_y[0, :, i], color='red')
for i in range(out_features):
    plt.plot(range(N), y[0, :, i], color='green')
Predict begin...
step 1/1 [==============================] - 834ms/step
Predict samples: 1

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kEtB5H1q-1639749211252)(output_24_1.png)]

总结

  • 本次迁移了 NCP 模型到 Paddle 框架上,并实现的简单的序列拟合任务

  • 至于论文中提到的自动驾驶,以后看看有没有机会拿到数据集来试一下,感觉还是蛮有意思的

标签:NCP,self,回路,驾驶,sensory,init,params,._,size
来源: https://blog.csdn.net/m0_63642362/article/details/122005760

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有