import numpy as np
import os
import re
import scipy.io as scio
import scipy.io as scio
import pandas as pd
import scipy.signal
from keras.models import Sequential,Model,load_model
from keras.optimizers import Adam
from sklearn.preprocessing import LabelBinarizer
from sklearn import preprocessing
from keras.layers import *
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import itertools
from sklearn.metrics import confusion_matrix
from keras.callbacks import ReduceLROnPlateau
from keras import backend as k
from keras.callbacks import ModelCheckpoint
from tensorflow.keras.utils import plot_model
raw_num = 960#每个特征故障的样本个数
col_num = 250#每个样本的长度
class Data(object):
def __init__(self):
self.data = self.get_data()
self.label = self.get_label()
def file_list(self):#定义数据文件夹位置
return os.listdir('data/')
def get_data(self):#定义获取数据函数
file_list = self.file_list()#文件位置
for i in range(len(file_list)):#文件夹子文件个数:10
file = scio.loadmat('data/{}'.format(file_list[i]))#调用子文件路径
for k in file.keys():#打开子文件
file_matched = re.match('X\d{3}_DE_time', k)#判断是否匹配:X111_DE_time
if file_matched:
key = file_matched.group()#X111_DE_time
if i == 0:
data = np.array(file[key][0:240000].reshape(raw_num,col_num))
else:
data = np.vstack((data, file[key][0:240000].reshape((raw_num,col_num))))
return data
def get_label(self):
file_list = self.file_list()#获取文件位置
#title=['111' '124' '137' '176' '191' '203' '215' '228' '240' '99']
title = np.array([i.replace('.mat', '') for i in file_list])#(10,)
label = title[:, np.newaxis]#(10, 1)
label_copy = np.copy(label)#(10, 1)
for _ in range(raw_num-1):#239
label = np.hstack((label, label_copy))#(10,240)水平拼接
return label.flatten()
# =============================================================================
# Data = Data()
#
#
# data = Data.data
# print(data)
# =============================================================================
load_mat = scio.loadmat('1.mat')
# load_mat为字典类型, <class 'dict'>
print(type(load_mat))
data = load_mat['Up']
# =============================================================================
# label = Data.label
# =============================================================================
label1 =[]
for i in range(1600):
if i<400:
label1.append(0)
elif i<800:
label1.append(1)
elif i<1200:
label1.append(2)
elif i<1600:
label1.append(3)
label = np.array(label1)
lb = LabelBinarizer()#标签二值化
y = lb.fit_transform(label)#转化成独热码
# 维纳滤波
data_wiener = scipy.signal.wiener(data, mysize=3, noise=None)
# 下采样
#index = np.arange(0,2000, 8)#每隔8个点的位置索引
#data_samp = data_wiener[:, index]#取点[2400,2000]变为[2400,250]
#print(data_samp.shape) # 如果正确应该输入的是(2400,250)
X_train, X_test, y_train, y_test = train_test_split(data_wiener, y, test_size=0.2,stratify=y)
def built_model():
input_seq = Input(shape=(250,))#输入形状(250,)
X = Reshape((250,1))(input_seq)#[250,]变形为[1,250]
# encoder1
ec1_layer1 = Conv1D(filters=50, kernel_size=20, strides=2,
padding='valid', activation='relu',
data_format='channels_last')(X)
ec1_layer2 = Conv1D(filters=30, kernel_size=10, strides=2,
padding='valid', activation='relu',
data_format='channels_last')(ec1_layer1)
ec1_outputs = MaxPooling1D(pool_size=2, strides=None, padding='valid',
data_format='channels_last')(ec1_layer2)
# encoder2
ec2_layer1 = Conv1D(filters=50, kernel_size=6, strides=1,
padding='valid', activation='relu',
data_format='channels_last')(X)
ec2_layer2 = Conv1D(filters=40, kernel_size=6, strides=1,
padding='valid', activation='relu',
data_format='channels_last')(ec2_layer1)
ec2_layer3 = MaxPooling1D(pool_size=2, strides=None, padding='valid',
data_format='channels_last')(ec2_layer2)
ec2_layer4 = Conv1D(filters=30, kernel_size=6, strides=1,
padding='valid', activation='relu',
data_format='channels_last')(ec2_layer3)
ec2_layer5 = Conv1D(filters=30, kernel_size=6, strides=2,
padding='valid', activation='relu',
data_format='channels_last')(ec2_layer4)
ec2_outputs = MaxPooling1D(pool_size=2, strides=None, padding='valid',
data_format='channels_last')(ec2_layer5)
# 将两个自编码做点积
encoder = multiply([ec1_outputs, ec2_outputs])#[27,30]*[27,30]=[27,30]
# 分类
dc_layer1 = LSTM(60, return_sequences=True)(encoder)#
dc_layer2 = LSTM(60)(dc_layer1)
dc_layer3 = Dropout(0.5)(dc_layer2)
dc_layer4 = Dense(4, activation='softmax')(dc_layer3)
model = Model(input_seq, dc_layer4)
return model
def plot_confusion_matrix(cm, classes, title='Confusion matrix', cmap=plt.cm.Blues, normalize=False):
plt.imshow(cm , cmap=cmap)
plt.title(title)
plt.colorbar()
tick_mark = np.arange(len(classes))#[0,1,2....9]一个参数时,参数值为终点,起点取默认值0,步长取默认值1。
'''
tick_mark:用于设置X轴刻度间隔
classes:用于设置每个间隔的显示标签
rotation:旋转角度
cm=confusion_mat
classes=range(10)
'''
plt.xticks(tick_mark, classes, rotation=40)#
plt.yticks(tick_mark, classes)
for i,j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j,i,cm[i,j], horizontalalignment='center',color='black')
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predict label')
import time
begain_time = time.time()
model = built_model()
opt = Adam(lr=0.0006)
model.compile(optimizer=opt, loss='mean_squared_error', metrics=['accuracy'])
model.summary()
history = model.fit(x=X_train, y=y_train, batch_size = 64, epochs=50,
verbose=2, validation_data=(X_test, y_test),
shuffle=True, initial_epoch=0)
plt.figure(figsize=(12,9))
y_pre = model.predict(X_test)#[batch,10]
label_pre = np.argmax(y_pre, axis=1)#[batch,]
print(label_pre.shape)
label_true = np.argmax(y_test, axis=1)#[batch,]
confusion_mat = confusion_matrix(label_true, label_pre)#计算混淆矩阵,以评估分类的准确性。[]
print(confusion_mat)
plot_confusion_matrix(confusion_mat, classes=range(4))
#plot_model(model, to_file='简单测试模型_001.png', show_shapes=True)
- 1
- 2
前往页