# -*- coding: utf-8 -*-
"""
Created on Sun Nov 4 14:48:40 2018
@author: 12890
"""
import numpy as np
import layer
from matplotlib import pylab as plt
#定义神经网络
class NN(object):
def __init__(self, num_layers, layer_size, data, labels, learning_rate, p, batch_size, iterations, k, beta1, beta2):
"""
初始化神经网络
num_layers :网络的层数
layer_size :网络每一层的大小
data :网络需要处理的数据,即网络的输入
labels : 网络输入数据的标签
learning_rate :学习率
p :dropout的概率
batch_size :训练过程中btach的大小
iterations : 训练的迭代次数
k : 学习率的降低参数
beta1 : Adam法中动量项的系数
beta2 :Adam法中梯度项的系数
"""
self.num_layers = num_layers
self.layer_size = layer_size
self.data = data
self.labels = labels
self.num_layers = num_layers
self.learning_rate = learning_rate
self.p = p
self.batch_size = batch_size
self.iterations = iterations
self.k = k
self.beta1 = beta1
self.beta2 = beta2
self.layers = []
self.socre = []
self.loss = 0
self.gradient = []
"""构建神经网络的每一层"""
def add_layer(self):
for i in range(self.num_layers):
if i == 0:
layer_temp=layer.LAYER(self.layer_size[i][0], self.layer_size[i][1], self.data)
self.layers.append(layer_temp)
else:
self.layers[i-1].forward(False, self.p)
input_data=self.layers[i-1].output
layer_temp=layer.LAYER(self.layer_size[i][0], self.layer_size[i][1], input_data)
self.layers.append(layer_temp)
"""构建神经网络的softmax层"""
def add_softmax_layer(self):
self.socre=self.layers[-1].output
self.socre=np.exp(self.socre)
self.socre=self.socre/np.tile(np.sum(self.socre,axis=1), (self.layers[-1].output_dim,1)).T
"""构建用于训练的batch"""
def sample_training_data(self, data, labels):
batch_index= np.random.randint(0, data.shape[0], self.batch_size)
batch=data[batch_index]
batch_labels=labels[batch_index]
return batch, batch_labels
"""前向传播过程"""
def forward(self, training_mode):
for i in range(self.num_layers):
if i == 0:
self.layers[i].input = self.data.copy()
else :
self.layers[i].input = self.layers[i-1].output.copy() #以上一层的输出作为这一层的输入
if i == self.num_layers-1:
self.layers[i].forward(False , self.p) #最后一层不进行dropout操作
else:
self.layers[i].forward(training_mode , self.p)
self.add_softmax_layer()
"""计算损失"""
def compute_loss(self):
loss=np.zeros(self.data.shape[0])
for i in range(self.data.shape[0]):
loss[i] = -np.log(self.socre[i, self.labels[i]])
self.loss = np.mean(loss)
"""反向传播过程"""
def bp(self):
for i in range(self.num_layers, 0, -1):
if i == self.num_layers:
dsdo = self.socre.copy()
for j in range(self.data.shape[0]):
dsdo[j, self.labels[j]]-=1
else:
dsdo = np.dot(self.layers[i].dsdwh, self.layers[i].w.T)
if i != self.num_layers:
dsdo *= self.layers[i-1].u1
dodwh = np.dot(self.layers[i-1].input, self.layers[i-1].w)
dodwh[np.where(dodwh<=0)] = 0
dodwh[np.where(dodwh>0) ] = 1
self.layers[i-1].dsdwh = dsdo * dodwh
self.layers[i-1].db = np.mean(self.layers[i-1].dsdwh, axis = 0)
self.layers[i-1].dw = np.dot(self.layers[i-1].input.T, self.layers[i-1].dsdwh)/self.data.shape[0]
"""使用Adam进行网络权重更新"""
def Adam(self, t):
for i in range(self.num_layers):
#更新w
self.layers[i].dw_m = self.beta1*self.layers[i].dw_m +(1-self.beta1)*self.layers[i].dw
mt = self.layers[i].dw_m/(1-self.beta1**(t+1))
self.layers[i].dw_v = self.beta2*self.layers[i].dw_v +(1-self.beta2)*(self.layers[i].dw**2)
vt = self.layers[i].dw_v / (1-self.beta2**(t+1))
alpha = self.learning_rate*np.exp(-(i+1)*self.k)
self.layers[i].w += - alpha * mt/(np.sqrt(vt)+1e-8)
#更新b
self.layers[i].db_m = self.beta1*self.layers[i].db_m +(1-self.beta1)*self.layers[i].db
mt = self.layers[i].db_m/(1-self.beta1**(t+1))
self.layers[i].db_v = self.beta2*self.layers[i].db_v +(1-self.beta2)*(self.layers[i].db**2)
vt = self.layers[i].db_v / (1-self.beta2**(t+1))
self.layers[i].b += - alpha * mt/(np.sqrt(vt)+1e-8)
"""训练过程"""
def training(self, training_data, training_labels):
total_loss=[]
for i in range(self.iterations):
batch, batch_labels = self.sample_training_data(training_data, training_labels)
self.data = batch
self.labels = batch_labels
self.forward(True)
self.compute_loss()
self.bp()
self.Adam(i)
if np.mod(i,10)==0:
total_loss.append(self.loss)
if np.mod(i, 1000)==0:
print('Steps ',i,' finished. Loss is ', self.loss,' \n')
plt.figure(0)
plt.plot(range(0, self.iterations, 10), total_loss)
plt.xlabel('iteration')
plt.ylabel('loss')
plt.savefig('Loss')
"""测试过程"""
def testing(self, testing_data, testing_labels):
self.data= testing_data
self.labels=testing_labels
self.forward(False)
result=np.argmax(self.socre,axis=1)
correct=np.where(result==testing_labels)
correct_num=np.size(correct[0])
accuracy=correct_num/testing_data.shape[0]
return accuracy