神经网络拓扑排序反向传播算法的代码实现

# by defining a dictionary, classfying the both inputs (keys) and outputs (values)

from functools import reduce
import random

def topologic(graph):
'''
{
node:[node1, node2, ..., node]

x: [linear],
k: [linear],
b: [linear],
Linear: [sigmoid],
sigmoid: [loss],
y: [linear],
}
'''

sorted_node = []

while graph:
# n = graph.pop(n) # 如何图不是空的,就在图里找一个节点
# pass # 图为空,结束
# e.g. simple_graph = {'a': [1, 2], 'b': [2, 3]} # a simple example for graph
# reduce(lambda a, b: a + b, list(simple_graph.values())) #通过 reduce 函数将列表加起来

all_nodes_have_inputs = reduce(lambda a, b: a + b,
list(graph.values()))

all_nodes_have_outputs = list(graph.keys())

# There is the example to help understand how to get #all_nodes_only_have_inputs#
# list_a = [1, 2, 3]
# list_b = [2, 3, 4]
# # Using Set to process this problem, there are some operations for sets, e.g.
# set(list_a) | set(list_b) # 取并集
# set(list_a) & set(list_b) # 取交集
# set(list_a) - set(list_b) # Only in a, not in b

all_nodes_only_have_outputs_no_inputs = set(
all_nodes_have_outputs) - set(all_nodes_have_inputs)

# If have found only outputs nodes, not inputs, in another word, found the node at the edge, randomly pick one, and pop it from graph to sorted_node
if all_nodes_only_have_outputs_no_inputs:
node = random.choice(list(all_nodes_only_have_outputs_no_inputs))

sorted_node.append(node)

if len(graph) == 1:
sorted_node += graph[node]
# sorted_node.append(graph[node]) 这样不行,因为 keys 返回列表
# Escape from node pop leaded keys lost

graph.pop(node)

# 遍历 graph, delete the links about the sorted_node
for _, links in graph.items():
if node in links: links.remove(node)

else:
raise TypeError(
"This Graph has circle, can't adopt topological order")

return sorted_node

# For testing the topologic

# x, k, b, linear, sigmoid, y, loss = 'x', 'k', 'b', 'linear', 'sigmoid', 'y', 'loss'

# test_graph = {
# x: [linear],
# k: [linear],
# b: [linear],
# linear: [sigmoid],
# sigmoid: [loss],
# y: [loss],
# }

# print(topologic(test_graph))

#### 抽象每个node,并保存每个点的信息

class Node:
def __init__(self, inputs=[], name=None, is_trainable=False):
self.inputs = inputs
self.outputs = []
self.name = name
self.value = None
self.gradients = dict() # 存储loss对某个值的偏导
self.is_trainable = is_trainable

# 因为 for nodes, 在一个 确定的拓扑排序框架下,如果已知 inputs 就已经获得 outputs 的信息了,所以代码可以见
for node in inputs:
node.outputs.append(self)

def forward(self):
# print('I am {}, I was not assigend, but calculated myself value!!!'.
# format(self.name))
pass

def backward(self):
# for n in self.inputs:
# print('I get ∂{} / ∂{}'.format(self.name, n.name))
pass

# 当打印类时,规定 __repr__方法,会打印方法中的内容
def __repr__(self):
return 'Node: {}'.format(self.name)

class Placeholder(Node):
def __init__(self, name=None, is_trainable=False):
Node.__init__(self, name=name, is_trainable=is_trainable)

def forward(self):
# print('I am {}, I was assigned, value: {}'.format(
# self.name, self.value))
pass

def backward(self):
self.gradients[self] = self.outputs[0].gradients[self]
# print('I got myself gradients: {}'.format(
# self.outputs[0].gradients[self]))

# 当打印类时,规定 __repr__方法,会打印方法中的内容
def __repr__(self):
return 'Node: {}'.format(self.name)

class Linear(Node):
def __init__(self, x, k, b, name=None):
Node.__init__(self, inputs=[x, k, b], name=name)

def forward(self):
x, k, b = self.inputs[0], self.inputs[1], self.inputs[2]

self.value = k.value * x.value + b.value

# print(
# 'I am {}, I was not assigend, value: {}, but calculated myself value!!!'
# .format(self.name, self.value))

def backward(self):
# self.gradients[self.inputs[0]] = ' * '.join([
# self.outputs[0].gradients[self],
# '∂{} / ∂{}'.format(self.name, self.inputs[0].name)
# ])
# self.gradients[self.inputs[1]] = ' * '.join([
# self.outputs[0].gradients[self],
# '∂{} / ∂{}'.format(self.name, self.inputs[1].name)
# ])
# self.gradients[self.inputs[2]] = ' * '.join([
# self.outputs[0].gradients[self],
# '∂{} / ∂{}'.format(self.name, self.inputs[2].name)
# ])

# load number

x, k, b = self.inputs[0], self.inputs[1], self.inputs[2]
self.gradients[x] = self.outputs[0].gradients[self] * k.value
self.gradients[k] = self.outputs[0].gradients[self] * x.value
self.gradients[b] = self.outputs[0].gradients[self] * 1

# print('self.gradients[self.inputs[0]] {}'.format(
# self.gradients[self.inputs[0]]))
# print('self.gradients[self.inputs[1]] {}'.format(
# self.gradients[self.inputs[1]]))
# print('self.gradients[self.inputs[2]] {}'.format(
# self.gradients[self.inputs[2]]))

# 当打印类时,规定 __repr__方法,会打印方法中的内容
def __repr__(self):
return 'Linear: {}'.format(self.name)

import numpy as np

class Sigmoid(Node):
def __init__(self, x, name=None):
Node.__init__(self, inputs=[x], name=name)

def _sigmoid(self, x):
return 1 / (1 + np.exp(-x))

def forward(self):
x = self.inputs[0]
self.value = self._sigmoid(x.value)
# print(
# 'I am {}, I was not assigend, value: {}, but calculated myself value!!!'
# .format(self.name, self.value))

def backward(self):

# self.gradients[self.inputs[0]] = ' * '.join([
# self.outputs[0].gradients[self],
# '∂{} / ∂{}'.format(self.name, self.inputs[0].name)
# ])

# load number, ∂'(x) = ∂(x)(1 - ∂(x))

x = self.inputs[0]
self.gradients[x] = self.outputs[0].gradients[self] * self._sigmoid(
x.value) * (1 - self._sigmoid(x.value))

# print('self.gradients[self.inputs[0]] {}'.format(
# self.gradients[self.inputs[0]]))

def __repr__(self):
return 'Sigmoid: {}'.format(self.name)

class MSE_Loss(Node):
def __init__(self, y, yhat, name=None):
Node.__init__(self, inputs=[y, yhat], name=name)

def forward(self):
y = self.inputs[0]
yhat = self.inputs[1]
self.value = np.mean((y.value - yhat.value) ** 2)
# print(
# 'I am {}, I was not assigend, value: {}, but calculated myself value!!!'
# .format(self.name, self.value))

def backward(self):

# word description

# self.gradients[self.inputs[0]] = '∂{} / ∂{}'.format(
# self.name, self.inputs[0].name)
# self.gradients[self.inputs[1]] = '∂{} / ∂{}'.format(
# self.name, self.inputs[1].name)

# load number values

y = self.inputs[0]
yhat = self.inputs[1]

self.gradients[self.inputs[0]] = 2 * np.mean(y.value - yhat.value)
self.gradients[self.inputs[1]] = -2 * np.mean(y.value - yhat.value)

# print('self.gradients[self.inputs[0]] {} | {}'.format(
# self.inputs[0].name, self.gradients[self.inputs[0]]))
# print('self.gradients[self.inputs[1]] {} | {}'.format(
# self.inputs[1].name, self.gradients[self.inputs[1]]))

# 当打印类时,规定 __repr__方法,会打印方法中的内容
def __repr__(self):
return 'Loss: {}'.format(self.name)

# node_x = Node(inputs=None, outputs=[node_linear])
# node_y = Node(inputs=None, outputs=[node_loss])
# node_k = Node(inputs=None, outputs=[node_linear])
# node_b = Node(inputs=None, outputs=[node_linear])
# node_linear = Node(inputs=[node_x, node_k, node_b], outputs=[node_sigmoid])
# node_sigmoid = Node(inputs=node_linear, outputs=[node_loss])
# node_loss = Node(inputs=[node_sigmoid, node_y], outputs=Node)

node_x = Placeholder(name='x', is_trainable=False) # Placeholder,需要赋值的类
node_y = Placeholder(name='y', is_trainable=False)
node_k = Placeholder(name='k', is_trainable=True)
node_b = Placeholder(name='b', is_trainable=True)
node_linear = Linear(node_x, node_k, node_b, name='linear')
node_sigmoid = Sigmoid(x=node_linear, name='sigmoid')
node_loss = MSE_Loss(yhat=node_sigmoid, y=node_y, name='loss')

# 如何将定义好的 node 串起来?可以通过考虑每一个 node 的边缘,即走向
# 第一步考虑定一个一个list包括所有需要给定值的最“边缘”的值

# need_expand = [node_x, node_b, node_y, node_k]

# 为包装函数,更改 need_expand,将其变为能够赋值的变量

feed_dict = {
node_x: 3,
node_y: random.random(),
node_k: random.random(),
node_b: 0.38
}

from collections import defaultdict

def convert_feed_dict_to_graph(feed_dict):

need_expand = [n for n in feed_dict]
computing_graph = defaultdict(list)

# 通过外源映射生成连接图,将 node 串联起来

while need_expand:
n = need_expand.pop(0)

if n in computing_graph: continue

# 如果 n 是一个需要被赋值的量,实现对 n 进行赋值
if isinstance(n, Placeholder): n.value = feed_dict[n]

for m in n.outputs:
computing_graph[n].append(m)
need_expand.append(m)
return computing_graph

# 比如 x -> linear -> sigmoid,通过 x 映射到 linear,找到 linear, 再通过 linear 找到 sigmoid

# 如果不在class中定义__repr__()方法,通过 computing_graph 我们就能获得一个图
# print(computing_graph) 后
# 结果如下:
# defaultdict(<class 'list'>,
# {
# <__main__.Node object at 0x101f52fd0>: [<__main__.Node object at 0x101f51670>],
# <__main__.Node object at 0x101f51730>: [<__main__.Node object at 0x101f51670>],
# <__main__.Node object at 0x101f52f70>: [<__main__.Node object at 0x101f515b0>],
# <__main__.Node object at 0x101f52f10>: [<__main__.Node object at 0x101f51670>],
# <__main__.Node object at 0x101f51670>: [<__main__.Node object at 0x101f51610>],
# <__main__.Node object at 0x101f51610>: [<__main__.Node object at 0x101f515b0>]
# })
# 即为每一个类

# 而将 nodes 串联起来构建成一个图以后,就可以用上面的 topologic 函数进行拓扑排序

sorted_nodes = topologic(convert_feed_dict_to_graph(feed_dict))

# 经过拓扑排序实现后续的 node,在后面计算

###############
# FeedForward #
###############

# for node in sorted_nodes:
# node.forward()

# 最终输出结果:
# I am k, I was assigned, value: 0.11644522193335771
# I am b, I was assigned, value: 0.38
# I am x, I was assigned, value: 3
# I am linear, I was not assigend, value: 0.7293356658000731, but calculated myself value!!!
# I am sigmoid, I was not assigend, value: 0.6746594720886453, but calculated myself value!!!
# I am y, I was assigned, value: 0.6049903624105541
# I am loss, I was not assigend, value: 0.004853784843337893, but calculated myself value!!!

################
# FeedBackward #
################

# 经过计算以后,我们希望 loss 的值变小,在 Node 类中添加 backward()

# for node in sorted_nodes[::-1]:
# print('I am: {}'.format(node.name))
# node.backward()

# 输出结果:
# I am k, I was assigned, value: 0.42129768084566066
# I am x, I was assigned, value: 3
# I am b, I was assigned, value: 0.38
# I am y, I was assigned, value: 0.8808783865431419
# I am linear, I was not assigend, value: 1.6438930425369818, but calculated myself value!!!
# I am sigmoid, I was not assigend, value: 0.8380639685285712, but calculated myself value!!!
# I am loss, I was not assigend, value: 0.0018330743899263912, but calculated myself value!!!
# I am: loss
# I get ∂loss / ∂y
# I get ∂loss / ∂sigmoid
# I am: sigmoid
# I get ∂sigmoid / ∂linear
# I am: linear
# I get ∂linear / ∂x
# I get ∂linear / ∂k
# I get ∂linear / ∂b
# I am: y
# I am: b
# I am: x
# I am: k

# 下一步将每一步的偏导做乘积即可,引入 gradients()

############
# Optimize #
############

# learning_rate = 1e-1

# for node in sorted_nodes:
# if node.is_trainable:
# node.value = node.value + (-1) * node.gradients[node] * learning_rate
# cmp = 'large' if node.gradients[node] > 0 else 'small'
# print('"{}" is too {}, I need update myself!'.format(node.name, cmp))

# package into function

def forward(graph_sorted_nodes):
for node in graph_sorted_nodes:
node.forward()
# if node is node_y:
if isinstance(node, MSE_Loss):
print('loss value: {}'.format(node.value))

# Backward Propogation

def backward(graph_sorted_nodes):
for node in graph_sorted_nodes[::-1]:
# print('\nI am: {}'.format(node.name))
node.backward()

# epoch() = forward() + backward()
def run_one_epoch(graph_sorted_nodes):
forward(graph_sorted_nodes)
backward(graph_sorted_nodes)

def optimize(graph_nodes, learning_rate=1e-3):
for node in graph_nodes:
if node.is_trainable:
node.value = node.value + (
-1) * node.gradients[node] * learning_rate
cmp = 'large' if node.gradients[node] > 0 else 'small'
# print('"{}" is too {}, I need update myself to {}!'.format(
# node.name, cmp, node.value))

############################
# 完整一次的求值-求导-更新 #
############################

loss_history = []
for _ in range(100):
run_one_epoch(sorted_nodes)
_loss_node = sorted_nodes[-1]
assert isinstance(_loss_node, MSE_Loss)

loss_history.append(_loss_node.value)
optimize(sorted_nodes, 1e-1)

########################
# test training result #
########################

import matplotlib.pyplot as plt

def sigmoid(x):
return 1 / (1 + np.exp(-x))

# 获得 x, k, y, b 在 sorted_nodes中的索引值
k_pos, x_pos, b_pos, y_pos = sorted_nodes.index(node_k), sorted_nodes.index(
node_x), sorted_nodes.index(node_b), sorted_nodes.index(node_y)

print('yhat = {}'.format(
sigmoid(sorted_nodes[k_pos].value * sorted_nodes[x_pos].value +
sorted_nodes[b_pos].value)))
print('y = {}'.format(sorted_nodes[y_pos].value))

plt.plot(loss_history)
plt.show()