曲曲的秘密学术基地

纯化欲望、坚持严肃性

欢迎!我是曲泽慧(@zququ),目前在深圳(ICBI,BCBDI,SIAT)任职助理研究员。


病毒学、免疫学及结构生物学背景,可以在 RG 上找到我已发表的论文

本站自2019年7月已访问web counter

神经网络拓扑排序反向传播算法的代码实现

# by defining a dictionary, classfying the both inputs (keys) and outputs (values)

from functools import reduce
import random


def topologic(graph):
    '''
    {
        node:[node1, node2, ..., node]


        x: [linear],
        k: [linear],
        b: [linear],
        Linear: [sigmoid],
        sigmoid: [loss],
        y: [linear],
    }
    '''

    sorted_node = []

    while graph:
        # n = graph.pop(n) # 如何图不是空的,就在图里找一个节点
        # pass # 图为空,结束
        # e.g. simple_graph = {'a': [1, 2], 'b': [2, 3]} # a simple example for graph
        # reduce(lambda a, b: a + b, list(simple_graph.values())) #通过 reduce 函数将列表加起来

        all_nodes_have_inputs = reduce(lambda a, b: a + b,
                                       list(graph.values()))

        all_nodes_have_outputs = list(graph.keys())

        # There is the example to help understand how to get #all_nodes_only_have_inputs#
        # list_a = [1, 2, 3]
        # list_b = [2, 3, 4]
        # # Using Set to process this problem, there are some operations for sets, e.g.
        # set(list_a) | set(list_b) # 取并集
        # set(list_a) & set(list_b) # 取交集
        # set(list_a) - set(list_b) # Only in a, not in b

        all_nodes_only_have_outputs_no_inputs = set(
            all_nodes_have_outputs) - set(all_nodes_have_inputs)

        # If have found only outputs nodes, not inputs, in another word, found the node at the edge, randomly pick one, and pop it from graph to sorted_node
        if all_nodes_only_have_outputs_no_inputs:
            node = random.choice(list(all_nodes_only_have_outputs_no_inputs))

            sorted_node.append(node)

            if len(graph) == 1:
                sorted_node += graph[node]
                # sorted_node.append(graph[node]) 这样不行,因为 keys 返回列表
                # Escape from node pop leaded keys lost

            graph.pop(node)

            # 遍历 graph, delete the links about the sorted_node
            for _, links in graph.items():
                if node in links: links.remove(node)

        else:
            raise TypeError(
                "This Graph has circle, can't adopt topological order")

    return sorted_node


# For testing the topologic

# x, k, b, linear, sigmoid, y, loss = 'x', 'k', 'b', 'linear', 'sigmoid', 'y', 'loss'

# test_graph = {
# x: [linear],
# k: [linear],
# b: [linear],
# linear: [sigmoid],
# sigmoid: [loss],
# y: [loss],
# }

# print(topologic(test_graph))

#### 抽象每个node,并保存每个点的信息


class Node:
    def __init__(self, inputs=[], name=None, is_trainable=False):
        self.inputs = inputs
        self.outputs = []
        self.name = name
        self.value = None
        self.gradients = dict() # 存储loss对某个值的偏导
        self.is_trainable = is_trainable

        # 因为 for nodes, 在一个 确定的拓扑排序框架下,如果已知 inputs 就已经获得 outputs 的信息了,所以代码可以见
        for node in inputs:
            node.outputs.append(self)

    def forward(self):
        # print('I am {}, I was not assigend, but calculated myself value!!!'.
        # format(self.name))
        pass

    def backward(self):
        # for n in self.inputs:
        # print('I get ∂{} / ∂{}'.format(self.name, n.name))
        pass

    # 当打印类时,规定 __repr__方法,会打印方法中的内容
    def __repr__(self):
        return 'Node: {}'.format(self.name)


class Placeholder(Node):
    def __init__(self, name=None, is_trainable=False):
        Node.__init__(self, name=name, is_trainable=is_trainable)

    def forward(self):
        # print('I am {}, I was assigned, value: {}'.format(
        # self.name, self.value))
        pass

    def backward(self):
        self.gradients[self] = self.outputs[0].gradients[self]
        # print('I got myself gradients: {}'.format(
        # self.outputs[0].gradients[self]))

    # 当打印类时,规定 __repr__方法,会打印方法中的内容
    def __repr__(self):
        return 'Node: {}'.format(self.name)


class Linear(Node):
    def __init__(self, x, k, b, name=None):
        Node.__init__(self, inputs=[x, k, b], name=name)

    def forward(self):
        x, k, b = self.inputs[0], self.inputs[1], self.inputs[2]

        self.value = k.value * x.value + b.value

        # print(
        # 'I am {}, I was not assigend, value: {}, but calculated myself value!!!'
        # .format(self.name, self.value))

    def backward(self):
        # self.gradients[self.inputs[0]] = ' * '.join([
        # self.outputs[0].gradients[self],
        # '∂{} / ∂{}'.format(self.name, self.inputs[0].name)
        # ])
        # self.gradients[self.inputs[1]] = ' * '.join([
        # self.outputs[0].gradients[self],
        # '∂{} / ∂{}'.format(self.name, self.inputs[1].name)
        # ])
        # self.gradients[self.inputs[2]] = ' * '.join([
        # self.outputs[0].gradients[self],
        # '∂{} / ∂{}'.format(self.name, self.inputs[2].name)
        # ])

        # load number

        x, k, b = self.inputs[0], self.inputs[1], self.inputs[2]
        self.gradients[x] = self.outputs[0].gradients[self] * k.value
        self.gradients[k] = self.outputs[0].gradients[self] * x.value
        self.gradients[b] = self.outputs[0].gradients[self] * 1

        # print('self.gradients[self.inputs[0]] {}'.format(
        # self.gradients[self.inputs[0]]))
        # print('self.gradients[self.inputs[1]] {}'.format(
        # self.gradients[self.inputs[1]]))
        # print('self.gradients[self.inputs[2]] {}'.format(
        # self.gradients[self.inputs[2]]))

    # 当打印类时,规定 __repr__方法,会打印方法中的内容
    def __repr__(self):
        return 'Linear: {}'.format(self.name)


import numpy as np


class Sigmoid(Node):
    def __init__(self, x, name=None):
        Node.__init__(self, inputs=[x], name=name)

    def _sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def forward(self):
        x = self.inputs[0]
        self.value = self._sigmoid(x.value)
        # print(
        # 'I am {}, I was not assigend, value: {}, but calculated myself value!!!'
        # .format(self.name, self.value))

    def backward(self):

        # self.gradients[self.inputs[0]] = ' * '.join([
        # self.outputs[0].gradients[self],
        # '∂{} / ∂{}'.format(self.name, self.inputs[0].name)
        # ])

        # load number, ∂'(x) = ∂(x)(1 - ∂(x))

        x = self.inputs[0]
        self.gradients[x] = self.outputs[0].gradients[self] * self._sigmoid(
            x.value) * (1 - self._sigmoid(x.value))

        # print('self.gradients[self.inputs[0]] {}'.format(
        # self.gradients[self.inputs[0]]))

    def __repr__(self):
        return 'Sigmoid: {}'.format(self.name)


class MSE_Loss(Node):
    def __init__(self, y, yhat, name=None):
        Node.__init__(self, inputs=[y, yhat], name=name)

    def forward(self):
        y = self.inputs[0]
        yhat = self.inputs[1]
        self.value = np.mean((y.value - yhat.value) ** 2)
        # print(
        # 'I am {}, I was not assigend, value: {}, but calculated myself value!!!'
        # .format(self.name, self.value))

    def backward(self):

        # word description

        # self.gradients[self.inputs[0]] = '∂{} / ∂{}'.format(
        # self.name, self.inputs[0].name)
        # self.gradients[self.inputs[1]] = '∂{} / ∂{}'.format(
        # self.name, self.inputs[1].name)

        # load number values

        y = self.inputs[0]
        yhat = self.inputs[1]

        self.gradients[self.inputs[0]] = 2 * np.mean(y.value - yhat.value)
        self.gradients[self.inputs[1]] = -2 * np.mean(y.value - yhat.value)

        # print('self.gradients[self.inputs[0]] {} |  {}'.format(
        # self.inputs[0].name, self.gradients[self.inputs[0]]))
        # print('self.gradients[self.inputs[1]] {} |  {}'.format(
        # self.inputs[1].name, self.gradients[self.inputs[1]]))

    # 当打印类时,规定 __repr__方法,会打印方法中的内容
    def __repr__(self):
        return 'Loss: {}'.format(self.name)


# node_x = Node(inputs=None, outputs=[node_linear])
# node_y = Node(inputs=None, outputs=[node_loss])
# node_k = Node(inputs=None, outputs=[node_linear])
# node_b = Node(inputs=None, outputs=[node_linear])
# node_linear = Node(inputs=[node_x, node_k, node_b], outputs=[node_sigmoid])
# node_sigmoid = Node(inputs=node_linear, outputs=[node_loss])
# node_loss = Node(inputs=[node_sigmoid, node_y], outputs=Node)

node_x = Placeholder(name='x', is_trainable=False) # Placeholder,需要赋值的类
node_y = Placeholder(name='y', is_trainable=False)
node_k = Placeholder(name='k', is_trainable=True)
node_b = Placeholder(name='b', is_trainable=True)
node_linear = Linear(node_x, node_k, node_b, name='linear')
node_sigmoid = Sigmoid(x=node_linear, name='sigmoid')
node_loss = MSE_Loss(yhat=node_sigmoid, y=node_y, name='loss')

# 如何将定义好的 node 串起来?可以通过考虑每一个 node 的边缘,即走向
# 第一步考虑定一个一个list包括所有需要给定值的最“边缘”的值

# need_expand = [node_x, node_b, node_y, node_k]

# 为包装函数,更改 need_expand,将其变为能够赋值的变量

feed_dict = {
    node_x: 3,
    node_y: random.random(),
    node_k: random.random(),
    node_b: 0.38
}

from collections import defaultdict


def convert_feed_dict_to_graph(feed_dict):

    need_expand = [n for n in feed_dict]
    computing_graph = defaultdict(list)

    # 通过外源映射生成连接图,将 node 串联起来

    while need_expand:
        n = need_expand.pop(0)

        if n in computing_graph: continue

        # 如果 n 是一个需要被赋值的量,实现对 n 进行赋值
        if isinstance(n, Placeholder): n.value = feed_dict[n]

        for m in n.outputs:
            computing_graph[n].append(m)
            need_expand.append(m)
    return computing_graph

    # 比如 x -> linear -> sigmoid,通过 x 映射到 linear,找到 linear, 再通过 linear 找到 sigmoid


# 如果不在class中定义__repr__()方法,通过 computing_graph 我们就能获得一个图
# print(computing_graph) 后
# 结果如下:
# defaultdict(<class 'list'>,
# {
# <__main__.Node object at 0x101f52fd0>: [<__main__.Node object at 0x101f51670>],
# <__main__.Node object at 0x101f51730>: [<__main__.Node object at 0x101f51670>],
# <__main__.Node object at 0x101f52f70>: [<__main__.Node object at 0x101f515b0>],
# <__main__.Node object at 0x101f52f10>: [<__main__.Node object at 0x101f51670>],
# <__main__.Node object at 0x101f51670>: [<__main__.Node object at 0x101f51610>],
# <__main__.Node object at 0x101f51610>: [<__main__.Node object at 0x101f515b0>]
# })
# 即为每一个类

# 而将 nodes 串联起来构建成一个图以后,就可以用上面的 topologic 函数进行拓扑排序

sorted_nodes = topologic(convert_feed_dict_to_graph(feed_dict))

# 经过拓扑排序实现后续的 node,在后面计算

###############
# FeedForward #
###############

# for node in sorted_nodes:
# node.forward()

# 最终输出结果:
# I am k, I was assigned, value: 0.11644522193335771
# I am b, I was assigned, value: 0.38
# I am x, I was assigned, value: 3
# I am linear, I was not assigend, value: 0.7293356658000731, but calculated myself value!!!
# I am sigmoid, I was not assigend, value: 0.6746594720886453, but calculated myself value!!!
# I am y, I was assigned, value: 0.6049903624105541
# I am loss, I was not assigend, value: 0.004853784843337893, but calculated myself value!!!

################
# FeedBackward #
################

# 经过计算以后,我们希望 loss 的值变小,在 Node 类中添加 backward()

# for node in sorted_nodes[::-1]:
# print('I am: {}'.format(node.name))
# node.backward()

# 输出结果:
# I am k, I was assigned, value: 0.42129768084566066
# I am x, I was assigned, value: 3
# I am b, I was assigned, value: 0.38
# I am y, I was assigned, value: 0.8808783865431419
# I am linear, I was not assigend, value: 1.6438930425369818, but calculated myself value!!!
# I am sigmoid, I was not assigend, value: 0.8380639685285712, but calculated myself value!!!
# I am loss, I was not assigend, value: 0.0018330743899263912, but calculated myself value!!!
# I am: loss
# I get ∂loss / ∂y
# I get ∂loss / ∂sigmoid
# I am: sigmoid
# I get ∂sigmoid / ∂linear
# I am: linear
# I get ∂linear / ∂x
# I get ∂linear / ∂k
# I get ∂linear / ∂b
# I am: y
# I am: b
# I am: x
# I am: k

# 下一步将每一步的偏导做乘积即可,引入 gradients()

############
# Optimize #
############

# learning_rate = 1e-1

# for node in sorted_nodes:
# if node.is_trainable:
# node.value = node.value + (-1) * node.gradients[node] * learning_rate
# cmp = 'large' if node.gradients[node] > 0 else 'small'
# print('"{}" is too {}, I need update myself!'.format(node.name, cmp))

# package into function


def forward(graph_sorted_nodes):
    for node in graph_sorted_nodes:
        node.forward()
        # if node is node_y:
        if isinstance(node, MSE_Loss):
            print('loss value: {}'.format(node.value))


# Backward Propogation


def backward(graph_sorted_nodes):
    for node in graph_sorted_nodes[::-1]:
        # print('\nI am: {}'.format(node.name))
        node.backward()


# epoch() = forward() + backward()
def run_one_epoch(graph_sorted_nodes):
    forward(graph_sorted_nodes)
    backward(graph_sorted_nodes)


def optimize(graph_nodes, learning_rate=1e-3):
    for node in graph_nodes:
        if node.is_trainable:
            node.value = node.value + (
                -1) * node.gradients[node] * learning_rate
            cmp = 'large' if node.gradients[node] > 0 else 'small'
            # print('"{}" is too {}, I need update myself to {}!'.format(
            # node.name, cmp, node.value))


############################
# 完整一次的求值-求导-更新 #
############################

loss_history = []
for _ in range(100):
    run_one_epoch(sorted_nodes)
    _loss_node = sorted_nodes[-1]
    assert isinstance(_loss_node, MSE_Loss)

    loss_history.append(_loss_node.value)
    optimize(sorted_nodes, 1e-1)

########################
# test training result #
########################

import matplotlib.pyplot as plt


def sigmoid(x):
    return 1 / (1 + np.exp(-x))


# 获得 x, k, y, b 在 sorted_nodes中的索引值
k_pos, x_pos, b_pos, y_pos = sorted_nodes.index(node_k), sorted_nodes.index(
    node_x), sorted_nodes.index(node_b), sorted_nodes.index(node_y)

print('yhat = {}'.format(
    sigmoid(sorted_nodes[k_pos].value * sorted_nodes[x_pos].value +
            sorted_nodes[b_pos].value)))
print('y = {}'.format(sorted_nodes[y_pos].value))

plt.plot(loss_history)
plt.show()
Last One

为什么矩阵行、列空间维度相同

转自:stackexchange 论坛Proof. Suppose that $\lbrace v_1,v_2,\dots,v_k\rbrace$ is a basis for the column space of $A$. Then each column of $A$ can be expressed as a linear combination of these vectors; suppose that the $i$-th column $c_i$ is given by\[...…

数学More
Next One

拓扑排序的代码实现

# by defining a dictionary, classfying the both inputs (keys) and outputs (values)from functools import reduceimport randomdef topologic(graph): ''' { node:[node1, node2, ..., node] x: [linear], k: [linear], b: [l...…

MachineLearningMore