import numpy as np
from tqdm import tqdm
import torch
import torch.nn.functional as F
from sklearn.preprocessing import MinMaxScaler
# 生成随机噪声
def sample_z(n_rows, m_cols, feature_range = (-0.01, +0.01)):
return np.random.uniform(low=feature_range[0], high=feature_range[1], size=[n_rows, m_cols]) # (-0.01,0.01)随机采样
# Mask Vector
def sample_M(m, n, p):
'''
p:缺失率
A:(0,1)均匀分布的[m,n]矩阵
B:[m,n]布尔矩阵
C:[m,n]1 0矩阵(大约(1-p)%的1,p%的0)
'''
A = np.random.uniform(0., 1., size=[m, n]) # 生成(0,1)分布随机采样
B = A > p
C = 1. * B
return C
def sample_batch_index(total, batch_size):
'''Sample index of the mini-batch.
Args:
- total: total number of samples
- batch_size: batch size
Returns:
- batch_idx: batch index
'''
total_idx = np.random.permutation(total) # 对序列total随机排序
batch_idx = total_idx[:batch_size] #获取前batch_size个值的数组
return batch_idx
# Xavier Initialization Definition
def xavier_init(size):
in_dim = size[0]
xavier_stddev = 1. / np.sqrt(in_dim / 2.)
return np.random.normal(size = size, scale = xavier_stddev)
def rounding(imputed_data, data_x):
'''对于类别变量,对填补数据进行四舍五入
Args:
- imputed_data: imputed data
- data_x: original data with missing values
Returns:
- rounded_data: rounded imputed data
'''
_, dim = data_x.shape
rounded_data = imputed_data.copy()
for i in range(dim):
temp = data_x[~np.isnan(data_x[:, i]), i]
# Only for the categorical variable
if len(np.unique(temp)) < 20:
rounded_data[:, i] = np.round(rounded_data[:, i])
return rounded_data
def rmse_loss(ori_data, imputed_data, data_m):
'''计算ori_data和imputed_data的RMSE loss
Args:
- ori_data: original data without missing values
- imputed_data: imputed data
- data_m: indicator matrix for missingness
Returns:
- rmse: Root Mean Squared Error
'''
ori_data=scaler.fit_transform(ori_data)
imputed_data=scaler.fit_transform(imputed_data)
# Only for missing values
nominator = np.sum(((1 - data_m) * ori_data - (1 - data_m) * imputed_data) ** 2)
denominator = np.sum(1 - data_m)
rmse = np.sqrt(nominator / float(denominator))
return rmse
def SGAIN(data):
## GAIN architecture
# Discriminator variables
if use_gpu is True:
D_W1 = torch.tensor(xavier_init([m_dim, m_dim]), requires_grad=True, device="cuda") # Data + Hint as inputs
D_b1 = torch.tensor(np.zeros(shape=[m_dim]), requires_grad=True, device="cuda")
D_W2 = torch.tensor(xavier_init([m_dim, m_dim]), requires_grad=True, device="cuda")
D_b2 = torch.tensor(np.zeros(shape=[m_dim]), requires_grad=True, device="cuda") # Output is multi-variate
else:
D_W1 = torch.tensor(xavier_init([m_dim, m_dim]), requires_grad=True) # Data + Hint as inputs
D_b1 = torch.tensor(np.zeros(shape=[m_dim]), requires_grad=True)
D_W2 = torch.tensor(xavier_init([m_dim, m_dim]), requires_grad=True)
D_b2 = torch.tensor(np.zeros(shape=[m_dim]), requires_grad=True) # Output is multi-variate
theta_D = [D_W1, D_W2, D_b1, D_b2]
# %% 2. Generator
if use_gpu is True:
G_W1 = torch.tensor(xavier_init([m_dim * 2, m_dim]), requires_grad=True,device="cuda") # Data + Mask as inputs (Random Noises are in Missing Components)
G_b1 = torch.tensor(np.zeros(shape=[m_dim]), requires_grad=True, device="cuda")
G_W2 = torch.tensor(xavier_init([m_dim, m_dim]), requires_grad=True, device="cuda")
G_b2 = torch.tensor(np.zeros(shape=[m_dim]), requires_grad=True, device="cuda")
else:
G_W1 = torch.tensor(xavier_init([m_dim * 2, m_dim]),
requires_grad=True) # Data + Mask as inputs (Random Noises are in Missing Components)
G_b1 = torch.tensor(np.zeros(shape=[m_dim]), requires_grad=True)
G_W2 = torch.tensor(xavier_init([m_dim, m_dim]), requires_grad=True)
G_b2 = torch.tensor(np.zeros(shape=[m_dim]), requires_grad=True)
theta_G = [G_W1, G_W2, G_b1, G_b2]
## GAIN functions
# Generator
def generator(z, m):
# Concatenate Data and Mask
inputs = torch.cat(dim=1, tensors=[z, m])
G_h1 = F.relu(torch.matmul(inputs, G_W1) + G_b1)
G_prob = torch.tanh(torch.matmul(G_h1, G_W2) + G_b2)
return G_prob
# Discriminator
def discriminator(x):
# Concatenate Data and Hint
inputs = x
D_h1 = F.relu(torch.matmul(inputs, D_W1) + D_b1)
D_prob = torch.tanh(torch.matmul(D_h1, D_W2) + D_b2)
return D_prob
# GAIN Loss
def discriminator_loss(X, M, Z):
# Generator
G_sample = generator(Z, M)
# Discriminator
D_real = discriminator(X)
D_fake = discriminator(G_sample)
# %% Loss
D_loss = torch.mean(M * D_real) - torch.mean((1 - M) * D_fake)
return D_loss
def generator_loss(X, M, Z):
# %% Structure
# Generator
G_sample = generator(Z, M)
# Discriminator
# D_real = discriminator(X)
D_fake = discriminator(G_sample)
# %% Loss
G_loss1 = -torch.mean((1 - M) * D_fake)
MSE_loss = torch.mean((M * X - M * G_sample) ** 2) / torch.mean(M)
G_loss = G_loss1 + alpha * MSE_loss
return G_loss, MSE_loss
data=data.copy()
# 数据归一化
data_miss = scaler.fit_transform(data)
data_mask = 1. - np.isnan(data) # 定义Mask矩阵(缺失数据为0,非缺失数据为1)
data_miss = np.nan_to_num(data_miss, nan=0.00)
# optimizer
optimizer_D = torch.optim.Adam(params=theta_D,lr=lr,betas=(beta_1,beta_2),eps=epsilon)
optimizer_G = torch.optim.Adam(params=theta_G,lr=lr,betas=(beta_1,beta_2),eps=epsilon)
# Start Iterations
for it in tqdm(range(n_iterations+1)):
# Sample batch
batch_idx = sample_batch_index(total=no, batch_size=batch_size)
X_mb=data_miss[batch_idx, :] # 获取第batch_idx行的X元素(128,16)
M_mb = data_mask[batch_idx, :] # 获取第batch_idx行的M元素(128,16)
Z_mb = M_mb * X_mb + (1 - M_mb) * sample_z(batch_size,m_dim)
if use_gpu is True:
X_mb = torch.tensor(X_mb, device="cuda")
M_mb = torch.tensor(M_mb, device="cuda")
Z_mb = torch.tensor(Z_mb, device="cuda")
else:
X_mb = torch.tensor(X_mb)
M_mb = torch.tensor(M_mb)
Z_mb = torch.tensor(Z_mb)
optimizer_D.zero_grad()
D_loss_curr = discriminator_loss(X=M_mb, M=X_mb,Z=Z_mb)
D_loss_curr.backward()
optimizer_D.step()
optimizer_G.zero_grad()
G_loss_curr, MSE_loss_curr = generator_loss(X=X_mb, M=M_mb, Z=Z_mb)
G_loss_curr.backward()
optimizer_G.step()
if it % 1000 == 0:
tqdm.write(f"Iteration: {it}; "
f"MSE_loss: {MSE_loss_curr:.4}")
# tqdm.write(f"Iteration: {it}; "
# f"D loss: {D_loss_curr:.4}; G_loss: {G_loss_curr:.4}; MSE_loss: {MSE_loss_curr:.4}")
# impute data
Z_all = data_mask*data_miss + (1 - data_mask)*sample_z(no,m_dim)
# 转为tensor
if use_gpu is True:
Z_all = torch.tensor(Z_all, device='cuda')
data_mask = torch.tensor(data_mask, device='cuda')
else:
Z_all = torch.tensor(Z_all)
data_mask = torch.tensor(data_mask)
imputed_data = generator(z=Z_all, m=data_mask)
if use_gpu is True:
imputed_data = imputed_data.cpu().detach().numpy()