# by defining a dictionary, classfying the both inputs (keys) and outputs (values)
from functools import reduce
import random
def topologic(graph):
'''
{
node:[node1, node2, ..., node]
x: [linear],
k: [linear],
b: [linear],
Linear: [sigmoid],
sigmoid: [loss],
y: [linear],
}
'''
sorted_node = []
while graph:
# n = graph.pop(n) # 如何图不是空的,就在图里找一个节点
# pass # 图为空,结束
# e.g. simple_graph = {'a': [1, 2], 'b': [2, 3]} # a simple example for graph
# reduce(lambda a, b: a + b, list(simple_graph.values())) #通过 reduce 函数将列表加起来
all_nodes_have_inputs = reduce(lambda a, b: a + b,
list(graph.values()))
all_nodes_have_outputs = list(graph.keys())
# There is the example to help understand how to get #all_nodes_only_have_inputs#
# list_a = [1, 2, 3]
# list_b = [2, 3, 4]
# # Using Set to process this problem, there are some operations for sets, e.g.
# set(list_a) | set(list_b) # 取并集
# set(list_a) & set(list_b) # 取交集
# set(list_a) - set(list_b) # Only in a, not in b
all_nodes_only_have_outputs_no_inputs = set(
all_nodes_have_outputs) - set(all_nodes_have_inputs)
# If have found only outputs nodes, not inputs, in another word, found the node at the edge, randomly pick one, and pop it from graph to sorted_node
if all_nodes_only_have_outputs_no_inputs:
node = random.choice(list(all_nodes_only_have_outputs_no_inputs))
sorted_node.append(node)
if len(graph) == 1:
sorted_node += graph[node]
# sorted_node.append(graph[node]) 这样不行,因为 keys 返回列表
# Escape from node pop leaded keys lost
graph.pop(node)
# 遍历 graph, delete the links about the sorted_node
for _, links in graph.items():
if node in links: links.remove(node)
else:
raise TypeError(
"This Graph has circle, can't adopt topological order")
return sorted_node
# For testing the topologic
# x, k, b, linear, sigmoid, y, loss = 'x', 'k', 'b', 'linear', 'sigmoid', 'y', 'loss'
# test_graph = {
# x: [linear],
# k: [linear],
# b: [linear],
# linear: [sigmoid],
# sigmoid: [loss],
# y: [loss],
# }
# print(topologic(test_graph))
#### 抽象每个node,并保存每个点的信息
class Node:
def __init__(self, inputs=[], name=None, is_trainable=False):
self.inputs = inputs
self.outputs = []
self.name = name
self.value = None
self.gradients = dict() # 存储loss对某个值的偏导
self.is_trainable = is_trainable
# 因为 for nodes, 在一个 确定的拓扑排序框架下,如果已知 inputs 就已经获得 outputs 的信息了,所以代码可以见
for node in inputs:
node.outputs.append(self)
def forward(self):
# print('I am {}, I was not assigend, but calculated myself value!!!'.
# format(self.name))
pass
def backward(self):
# for n in self.inputs:
# print('I get ∂{} / ∂{}'.format(self.name, n.name))
pass
# 当打印类时,规定 __repr__方法,会打印方法中的内容
def __repr__(self):
return 'Node: {}'.format(self.name)
class Placeholder(Node):
def __init__(self, name=None, is_trainable=False):
Node.__init__(self, name=name, is_trainable=is_trainable)
def forward(self):
# print('I am {}, I was assigned, value: {}'.format(
# self.name, self.value))
pass
def backward(self):
self.gradients[self] = self.outputs[0].gradients[self]
# print('I got myself gradients: {}'.format(
# self.outputs[0].gradients[self]))
# 当打印类时,规定 __repr__方法,会打印方法中的内容
def __repr__(self):
return 'Node: {}'.format(self.name)
class Linear(Node):
def __init__(self, x, k, b, name=None):
Node.__init__(self, inputs=[x, k, b], name=name)
def forward(self):
x, k, b = self.inputs[0], self.inputs[1], self.inputs[2]
self.value = k.value * x.value + b.value
# print(
# 'I am {}, I was not assigend, value: {}, but calculated myself value!!!'
# .format(self.name, self.value))
def backward(self):
# self.gradients[self.inputs[0]] = ' * '.join([
# self.outputs[0].gradients[self],
# '∂{} / ∂{}'.format(self.name, self.inputs[0].name)
# ])
# self.gradients[self.inputs[1]] = ' * '.join([
# self.outputs[0].gradients[self],
# '∂{} / ∂{}'.format(self.name, self.inputs[1].name)
# ])
# self.gradients[self.inputs[2]] = ' * '.join([
# self.outputs[0].gradients[self],
# '∂{} / ∂{}'.format(self.name, self.inputs[2].name)
# ])
# load number
x, k, b = self.inputs[0], self.inputs[1], self.inputs[2]
self.gradients[x] = self.outputs[0].gradients[self] * k.value
self.gradients[k] = self.outputs[0].gradients[self] * x.value
self.gradients[b] = self.outputs[0].gradients[self] * 1
# print('self.gradients[self.inputs[0]] {}'.format(
# self.gradients[self.inputs[0]]))
# print('self.gradients[self.inputs[1]] {}'.format(
# self.gradients[self.inputs[1]]))
# print('self.gradients[self.inputs[2]] {}'.format(
# self.gradients[self.inputs[2]]))
# 当打印类时,规定 __repr__方法,会打印方法中的内容
def __repr__(self):
return 'Linear: {}'.format(self.name)
import numpy as np
class Sigmoid(Node):
def __init__(self, x, name=None):
Node.__init__(self, inputs=[x], name=name)
def _sigmoid(self, x):
return 1 / (1 + np.exp(-x))
def forward(self):
x = self.inputs[0]
self.value = self._sigmoid(x.value)
# print(
# 'I am {}, I was not assigend, value: {}, but calculated myself value!!!'
# .format(self.name, self.value))
def backward(self):
# self.gradients[self.inputs[0]] = ' * '.join([
# self.outputs[0].gradients[self],
# '∂{} / ∂{}'.format(self.name, self.inputs[0].name)
# ])
# load number, ∂'(x) = ∂(x)(1 - ∂(x))
x = self.inputs[0]
self.gradients[x] = self.outputs[0].gradients[self] * self._sigmoid(
x.value) * (1 - self._sigmoid(x.value))
# print('self.gradients[self.inputs[0]] {}'.format(
# self.gradients[self.inputs[0]]))
def __repr__(self):
return 'Sigmoid: {}'.format(self.name)
class MSE_Loss(Node):
def __init__(self, y, yhat, name=None):
Node.__init__(self, inputs=[y, yhat], name=name)
def forward(self):
y = self.inputs[0]
yhat = self.inputs[1]
self.value = np.mean((y.value - yhat.value) ** 2)
# print(
# 'I am {}, I was not assigend, value: {}, but calculated myself value!!!'
# .format(self.name, self.value))
def backward(self):
# word description
# self.gradients[self.inputs[0]] = '∂{} / ∂{}'.format(
# self.name, self.inputs[0].name)
# self.gradients[self.inputs[1]] = '∂{} / ∂{}'.format(
# self.name, self.inputs[1].name)
# load number values
y = self.inputs[0]
yhat = self.inputs[1]
self.gradients[self.inputs[0]] = 2 * np.mean(y.value - yhat.value)
self.gradients[self.inputs[1]] = -2 * np.mean(y.value - yhat.value)
# print('self.gradients[self.inputs[0]] {} | {}'.format(
# self.inputs[0].name, self.gradients[self.inputs[0]]))
# print('self.gradients[self.inputs[1]] {} | {}'.format(
# self.inputs[1].name, self.gradients[self.inputs[1]]))
# 当打印类时,规定 __repr__方法,会打印方法中的内容
def __repr__(self):
return 'Loss: {}'.format(self.name)
# node_x = Node(inputs=None, outputs=[node_linear])
# node_y = Node(inputs=None, outputs=[node_loss])
# node_k = Node(inputs=None, outputs=[node_linear])
# node_b = Node(inputs=None, outputs=[node_linear])
# node_linear = Node(inputs=[node_x, node_k, node_b], outputs=[node_sigmoid])
# node_sigmoid = Node(inputs=node_linear, outputs=[node_loss])
# node_loss = Node(inputs=[node_sigmoid, node_y], outputs=Node)
node_x = Placeholder(name='x', is_trainable=False) # Placeholder,需要赋值的类
node_y = Placeholder(name='y', is_trainable=False)
node_k = Placeholder(name='k', is_trainable=True)
node_b = Placeholder(name='b', is_trainable=True)
node_linear = Linear(node_x, node_k, node_b, name='linear')
node_sigmoid = Sigmoid(x=node_linear, name='sigmoid')
node_loss = MSE_Loss(yhat=node_sigmoid, y=node_y, name='loss')
# 如何将定义好的 node 串起来?可以通过考虑每一个 node 的边缘,即走向
# 第一步考虑定一个一个list包括所有需要给定值的最“边缘”的值
# need_expand = [node_x, node_b, node_y, node_k]
# 为包装函数,更改 need_expand,将其变为能够赋值的变量
feed_dict = {
node_x: 3,
node_y: random.random(),
node_k: random.random(),
node_b: 0.38
}
from collections import defaultdict
def convert_feed_dict_to_graph(feed_dict):
need_expand = [n for n in feed_dict]
computing_graph = defaultdict(list)
# 通过外源映射生成连接图,将 node 串联起来
while need_expand:
n = need_expand.pop(0)
if n in computing_graph: continue
# 如果 n 是一个需要被赋值的量,实现对 n 进行赋值
if isinstance(n, Placeholder): n.value = feed_dict[n]
for m in n.outputs:
computing_graph[n].append(m)
need_expand.append(m)
return computing_graph
# 比如 x -> linear -> sigmoid,通过 x 映射到 linear,找到 linear, 再通过 linear 找到 sigmoid
# 如果不在class中定义__repr__()方法,通过 computing_graph 我们就能获得一个图
# print(computing_graph) 后
# 结果如下:
# defaultdict(<class 'list'>,
# {
# <__main__.Node object at 0x101f52fd0>: [<__main__.Node object at 0x101f51670>],
# <__main__.Node object at 0x101f51730>: [<__main__.Node object at 0x101f51670>],
# <__main__.Node object at 0x101f52f70>: [<__main__.Node object at 0x101f515b0>],
# <__main__.Node object at 0x101f52f10>: [<__main__.Node object at 0x101f51670>],
# <__main__.Node object at 0x101f51670>: [<__main__.Node object at 0x101f51610>],
# <__main__.Node object at 0x101f51610>: [<__main__.Node object at 0x101f515b0>]
# })
# 即为每一个类
# 而将 nodes 串联起来构建成一个图以后,就可以用上面的 topologic 函数进行拓扑排序
sorted_nodes = topologic(convert_feed_dict_to_graph(feed_dict))
# 经过拓扑排序实现后续的 node,在后面计算
###############
# FeedForward #
###############
# for node in sorted_nodes:
# node.forward()
# 最终输出结果:
# I am k, I was assigned, value: 0.11644522193335771
# I am b, I was assigned, value: 0.38
# I am x, I was assigned, value: 3
# I am linear, I was not assigend, value: 0.7293356658000731, but calculated myself value!!!
# I am sigmoid, I was not assigend, value: 0.6746594720886453, but calculated myself value!!!
# I am y, I was assigned, value: 0.6049903624105541
# I am loss, I was not assigend, value: 0.004853784843337893, but calculated myself value!!!
################
# FeedBackward #
################
# 经过计算以后,我们希望 loss 的值变小,在 Node 类中添加 backward()
# for node in sorted_nodes[::-1]:
# print('I am: {}'.format(node.name))
# node.backward()
# 输出结果:
# I am k, I was assigned, value: 0.42129768084566066
# I am x, I was assigned, value: 3
# I am b, I was assigned, value: 0.38
# I am y, I was assigned, value: 0.8808783865431419
# I am linear, I was not assigend, value: 1.6438930425369818, but calculated myself value!!!
# I am sigmoid, I was not assigend, value: 0.8380639685285712, but calculated myself value!!!
# I am loss, I was not assigend, value: 0.0018330743899263912, but calculated myself value!!!
# I am: loss
# I get ∂loss / ∂y
# I get ∂loss / ∂sigmoid
# I am: sigmoid
# I get ∂sigmoid / ∂linear
# I am: linear
# I get ∂linear / ∂x
# I get ∂linear / ∂k
# I get ∂linear / ∂b
# I am: y
# I am: b
# I am: x
# I am: k
# 下一步将每一步的偏导做乘积即可,引入 gradients()
############
# Optimize #
############
# learning_rate = 1e-1
# for node in sorted_nodes:
# if node.is_trainable:
# node.value = node.value + (-1) * node.gradients[node] * learning_rate
# cmp = 'large' if node.gradients[node] > 0 else 'small'
# print('"{}" is too {}, I need update myself!'.format(node.name, cmp))
# package into function
def forward(graph_sorted_nodes):
for node in graph_sorted_nodes:
node.forward()
# if node is node_y:
if isinstance(node, MSE_Loss):
print('loss value: {}'.format(node.value))
# Backward Propogation
def backward(graph_sorted_nodes):
for node in graph_sorted_nodes[::-1]:
# print('\nI am: {}'.format(node.name))
node.backward()
# epoch() = forward() + backward()
def run_one_epoch(graph_sorted_nodes):
forward(graph_sorted_nodes)
backward(graph_sorted_nodes)
def optimize(graph_nodes, learning_rate=1e-3):
for node in graph_nodes:
if node.is_trainable:
node.value = node.value + (
-1) * node.gradients[node] * learning_rate
cmp = 'large' if node.gradients[node] > 0 else 'small'
# print('"{}" is too {}, I need update myself to {}!'.format(
# node.name, cmp, node.value))
############################
# 完整一次的求值-求导-更新 #
############################
loss_history = []
for _ in range(100):
run_one_epoch(sorted_nodes)
_loss_node = sorted_nodes[-1]
assert isinstance(_loss_node, MSE_Loss)
loss_history.append(_loss_node.value)
optimize(sorted_nodes, 1e-1)
########################
# test training result #
########################
import matplotlib.pyplot as plt
def sigmoid(x):
return 1 / (1 + np.exp(-x))
# 获得 x, k, y, b 在 sorted_nodes中的索引值
k_pos, x_pos, b_pos, y_pos = sorted_nodes.index(node_k), sorted_nodes.index(
node_x), sorted_nodes.index(node_b), sorted_nodes.index(node_y)
print('yhat = {}'.format(
sigmoid(sorted_nodes[k_pos].value * sorted_nodes[x_pos].value +
sorted_nodes[b_pos].value)))
print('y = {}'.format(sorted_nodes[y_pos].value))
plt.plot(loss_history)
plt.show()
本站自2019年7月已访问