from functools import reduce import random
def topologic(graph): ''' { node:[node1, node2, ..., node]
x: [linear], k: [linear], b: [linear], Linear: [sigmoid], sigmoid: [loss], y: [linear], } '''
sorted_node = []
while graph:
all_nodes_have_inputs = reduce(lambda a, b: a + b, list(graph.values()))
all_nodes_have_outputs = list(graph.keys())
all_nodes_only_have_outputs_no_inputs = set( all_nodes_have_outputs) - set(all_nodes_have_inputs)
if all_nodes_only_have_outputs_no_inputs: node = random.choice(list(all_nodes_only_have_outputs_no_inputs))
sorted_node.append(node)
if len(graph) == 1: sorted_node += graph[node]
graph.pop(node)
for _, links in graph.items(): if node in links: links.remove(node)
else: raise TypeError( "This Graph has circle, can't adopt topological order")
return sorted_node
class Node: def __init__(self, inputs=[], name=None, is_trainable=False): self.inputs = inputs self.outputs = [] self.name = name self.value = None self.gradients = dict() self.is_trainable = is_trainable
for node in inputs: node.outputs.append(self)
def forward(self): pass
def backward(self): pass
def __repr__(self): return 'Node: {}'.format(self.name)
class Placeholder(Node): def __init__(self, name=None, is_trainable=False): Node.__init__(self, name=name, is_trainable=is_trainable)
def forward(self): pass
def backward(self): self.gradients[self] = self.outputs[0].gradients[self]
def __repr__(self): return 'Node: {}'.format(self.name)
class Linear(Node): def __init__(self, x, k, b, name=None): Node.__init__(self, inputs=[x, k, b], name=name)
def forward(self): x, k, b = self.inputs[0], self.inputs[1], self.inputs[2]
self.value = k.value * x.value + b.value
def backward(self):
x, k, b = self.inputs[0], self.inputs[1], self.inputs[2] self.gradients[x] = self.outputs[0].gradients[self] * k.value self.gradients[k] = self.outputs[0].gradients[self] * x.value self.gradients[b] = self.outputs[0].gradients[self] * 1
def __repr__(self): return 'Linear: {}'.format(self.name)
import numpy as np
class Sigmoid(Node): def __init__(self, x, name=None): Node.__init__(self, inputs=[x], name=name)
def _sigmoid(self, x): return 1 / (1 + np.exp(-x))
def forward(self): x = self.inputs[0] self.value = self._sigmoid(x.value)
def backward(self):
x = self.inputs[0] self.gradients[x] = self.outputs[0].gradients[self] * self._sigmoid( x.value) * (1 - self._sigmoid(x.value))
def __repr__(self): return 'Sigmoid: {}'.format(self.name)
class MSE_Loss(Node): def __init__(self, y, yhat, name=None): Node.__init__(self, inputs=[y, yhat], name=name)
def forward(self): y = self.inputs[0] yhat = self.inputs[1] self.value = np.mean((y.value - yhat.value) ** 2)
def backward(self):
y = self.inputs[0] yhat = self.inputs[1]
self.gradients[self.inputs[0]] = 2 * np.mean(y.value - yhat.value) self.gradients[self.inputs[1]] = -2 * np.mean(y.value - yhat.value)
def __repr__(self): return 'Loss: {}'.format(self.name)
node_x = Placeholder(name='x', is_trainable=False) node_y = Placeholder(name='y', is_trainable=False) node_k = Placeholder(name='k', is_trainable=True) node_b = Placeholder(name='b', is_trainable=True) node_linear = Linear(node_x, node_k, node_b, name='linear') node_sigmoid = Sigmoid(x=node_linear, name='sigmoid') node_loss = MSE_Loss(yhat=node_sigmoid, y=node_y, name='loss')
feed_dict = { node_x: 3, node_y: random.random(), node_k: random.random(), node_b: 0.38 }
from collections import defaultdict
def convert_feed_dict_to_graph(feed_dict):
need_expand = [n for n in feed_dict] computing_graph = defaultdict(list)
while need_expand: n = need_expand.pop(0)
if n in computing_graph: continue
if isinstance(n, Placeholder): n.value = feed_dict[n]
for m in n.outputs: computing_graph[n].append(m) need_expand.append(m) return computing_graph
sorted_nodes = topologic(convert_feed_dict_to_graph(feed_dict))
def forward(graph_sorted_nodes): for node in graph_sorted_nodes: node.forward() if isinstance(node, MSE_Loss): print('loss value: {}'.format(node.value))
def backward(graph_sorted_nodes): for node in graph_sorted_nodes[::-1]: node.backward()
def run_one_epoch(graph_sorted_nodes): forward(graph_sorted_nodes) backward(graph_sorted_nodes)
def optimize(graph_nodes, learning_rate=1e-3): for node in graph_nodes: if node.is_trainable: node.value = node.value + ( -1) * node.gradients[node] * learning_rate cmp = 'large' if node.gradients[node] > 0 else 'small'
loss_history = [] for _ in range(100): run_one_epoch(sorted_nodes) _loss_node = sorted_nodes[-1] assert isinstance(_loss_node, MSE_Loss)
loss_history.append(_loss_node.value) optimize(sorted_nodes, 1e-1)
import matplotlib.pyplot as plt
def sigmoid(x): return 1 / (1 + np.exp(-x))
k_pos, x_pos, b_pos, y_pos = sorted_nodes.index(node_k), sorted_nodes.index( node_x), sorted_nodes.index(node_b), sorted_nodes.index(node_y)
print('yhat = {}'.format( sigmoid(sorted_nodes[k_pos].value * sorted_nodes[x_pos].value + sorted_nodes[b_pos].value))) print('y = {}'.format(sorted_nodes[y_pos].value))
plt.plot(loss_history) plt.show()
|