深度学习初探

来源于:

MIT-Introduction to Deep Learning

代码为MIT课程练习LAB,可以使用Colab进行学习。链接:
MIT-Codes Github resposibility

Information:

1
2
3
4
5
6
7
8
# Copyright 2022 MIT 6.S191 Introduction to Deep Learning. All Rights Reserved.
#
# Licensed under the MIT License. You may not use this file except in compliance
# with the License. Use and/or modification of this code outside of 6.S191 must
# reference:
#
# © MIT 6.S191: Introduction to Deep Learning
# http://introtodeeplearning.com

Lab 1: Intro to TensorFlow and Music Generation with RNNs

Part 1:Intro to TensorFlow

创建tf值

  • x=tf.constant('String', tf.string):
    创建一个常量,这个常量可以是多维列表。
    tf.string表示为字符串,tf.float64表示浮点型。

  • x=tf.Variable(value):创建一个变量

  • tf.zeros([a, b, c]):创建一个全是0的矩阵,列表中表示size。

查看tf变量属性

  • tf.rank(value).numpy():查看value的维数
  • tf.shape(value).numpy():查看value的大小

tf计算函数

  • tf.add(a, b): a+b
  • tf.subtract(a, b): a-b
  • tf.multiply(a, b): a*b

创建一个简单的神经网络

使用的输出公式: y=sigmoid(xW+b)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
### Defining a network Layer ###

# n_output_nodes: number of output nodes 输出节点个数
# input_shape: shape of the input 输入大小
# x: input to the layer 层的输入

class OurDenseLayer(tf.keras.layers.Layer): # 继承了Layer
def __init__(self, n_output_nodes):
super(OurDenseLayer, self).__init__()
self.n_output_nodes = n_output_nodes

def build(self, input_shape):
d = int(input_shape[-1])
# Define and initialize parameters: a weight matrix W and bias b 定义并初始化参数:权重W 误差b
# Note that parameter initialization is random! 参数的初始值是随机的!!!
self.W = self.add_weight("weight", shape=[d, self.n_output_nodes]) # note the dimensionality d行‘节点个数'列
self.b = self.add_weight("bias", shape=[1, self.n_output_nodes]) # note the dimensionality 1行 ‘节点个数'列

def call(self, x):
'''TODO: define the operation for z (hint: use tf.matmul) 定义z '''
z = tf.add(tf.matmul(x, self.W), self.b) # TODO

'''TODO: define the operation for out (hint: use tf.sigmoid) 定义输出 '''
y = tf.sigmoid(z) # TODO
return y

# Since layer parameters are initialized randomly, we will set a random seed for reproducibility 层的参数初始化值是随机的,设置一个随机种子
tf.random.set_seed(1)
layer = OurDenseLayer(3)
layer.build((1,2))
x_input = tf.constant([[1,2.]], shape=(1,2))
y = layer.call(x_input)

# test the output!
print(y.numpy())

使用时序(Sequential)API的神经网络

1
2
3
4
5
6
7
8
9
10
11
12
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense

n_output_nodes = 3

# First define the model
model = Sequential()

dense_layer = Dense(units=n_output_nodes) # TODO
# 答案:dense_layer = Dense(n_output_nodes, activation='sigmoid') # TODO
# Add the dense layer to the model
model.add(dense_layer)

测试输出:

1
2
3
4
x_input = tf.constant([[1,2.]], shape=(1,2))
model_output = model(x_input) # TODO 将x输入model模型中运行。
# 答案 model_output = model(x_input).numpy()
print(model_output)

使用Model类创建继承的子类SubclassModel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
class SubclassModel(tf.keras.Model):
def __init__(self, n_output_nodes):
super(SubclassModel, self).__init__()
'''TODO: Our model consists of a single Dense layer. Define this layer.'''
self.dense_layer = Dense(n_output_nodes, activation='sigmoid') '''TODO: Dense Layer'''
# 答案 self.dense_layer = Dense(n_output_nodes, activation='sigmoid')
# 此处使用sigmoid函数,若使用relu,输出为0;使用tanh,输出的0、2位置为负数。
def call(self, inputs):
return self.dense_layer(inputs)
# 测试:
n_output_nodes = 3
model = SubclassModel(n_output_nodes)
x_input = tf.constant([[1,2.]], shape=(1,2))
print(model.call(x_input))

call函数中增加:isidentity=False
isidentity,“有时我们希望网络简单地输出原来的值”。
True时返回输入值,否则正常执行。

梯度计算

通过GradientTape()方法实现梯度计算。

1
2
3
4
x = tf.Variable(3.0)
with tf.GradientTape() as tape:
y=x*x
dy_dx=tape.gradient(y, x)

SGD梯度法优化方法:
计算损失L=(x−x_f)^2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
x = tf.Variable([tf.random.normal([1])]) # x随机取值
learning_rate = 1e-2 # SGD的学习率
history = []
x_f = 4 # 定义目标值

for i in range(500):
with tf.GradientTape() as tape:
loss = (x - x_f)**2
grad = tape.gradient(loss, x) # 计算梯度值
new_x = x - learning_rate*grad # sgd update
x.assign(new_x) # 更新x值,即用new_x值替换x值。
history.append(x.numpy()[0]) # 加入索引是因为x为列表,但是x列表只有一个值。

# Plot the evolution of x as we optimize towards x_f!
plt.plot(history)
plt.plot([0, 500],[x_f,x_f])
plt.legend(('Predicted', 'True'))
plt.xlabel('Iteration')
plt.ylabel('x value')

Lab 2: Computer Vision

Part 1: MNIST Digit Classification 手写数字识别

初始模型创建

1
2
3
4
5
6
7
8
9
10
11
12
13
def build_fc_model():
fc_model = tf.keras.Sequential([
# First define a Flatten layer定义一个展平层
tf.keras.layers.Flatten(),

# '''TODO: Define the activation function for the first fully connected (Dense) layer.'''第一层全连接层激活函数
tf.keras.layers.Dense(128, activation=tf.nn.relu),

# '''TODO: Define the second Dense layer to output the classification probabilities'''定义第二层全连接层,输出分类概率
tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
return fc_model
model = build_fc_model()

第一层全连接层使用relu激活函数,第二层全连接层用于输出预测的概率值,使用softmax激活函数.

这里使用tf.nn.relu形式,如果用'relu'会报错。

模型编译

在训练模型之前执行模型编译,包括设置优化器、损失函数、训练测试的指标:这里使用准确率。

1
2
3
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=1e-1), 
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])

模型训练

训练过程中显示损失和准确率。

  • epochs:训练次数
    1
    2
    3
    BATCH_SIZE = 64
    EPOCHS = 5
    model.fit(train_images, train_labels, batch_size=BATCH_SIZE, epochs=EPOCHS)

使用测试训练集测试模型准确性

evaluate方法用来测试准确性。

1
2
3
test_loss, test_acc = model.evaluate(x=test_images, y=test_labels)

print('Test accuracy:', test_acc)

创建CNN网络模型,并进行编译、训练和测试。

创建模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def build_cnn_model():
cnn_model = tf.keras.Sequential([
# 定义第一个卷积层
tf.keras.layers.Conv2D(filters=24, kernel_size=(3, 3), activation='relu'),
# 定义第一个最大池化层
tf.keras.layers.MaxPool2D(pool_size=(2, 2)),
# 定义第二个卷积层
tf.keras.layers.Conv2D(filters=36, kernel_size=(3, 3), activation='relu'),
# 定义第二个最大池化层
tf.keras.layers.MaxPool2D(pool_size=(2, 2)),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation=tf.nn.relu),
# 输出分类概率,这一层输出为概率,因此输出个数为10,表示10个数字。
tf.keras.layers.Dense(10, activation=tf.nn.softmax),
])
return cnn_model
cnn_model = build_cnn_model() # 输入一些数据进行模型初始化
cnn_model.predict(train_images[[0]]) # 输出模型的一些摘要信息
print(cnn_model.summary())

编译:

1
cnn_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss='sparse_categorical_crossentropy', metrics=['accuracy']) 

训练:

1
cnn_model.fit(x=train_images, y=train_labels, batch_size=BATCH_SIZE, epochs=EPOCHS)

测试:

1
2
test_loss, test_acc = cnn_model.evaluate(train_images, train_labels)
print('Test accuracy:', test_acc)

至此模型已经训练完毕,此时输入图片进行预测:

1
2
3
4
5
6
7
8
predictions = cnn_model.predict(test_images)
predictions[0] # 输出的概率列表

prediction = np.argmax(predictions[0]) # 此为预测值
print(prediction)

print("Label of this digit is:", test_labels[0])
plt.imshow(test_images[0,:,:,0], cmap=plt.cm.binary) # 此为输出对应预测值的图像

输出为:

1
2
3
4
array([1.1896365e-08, 2.9328827e-08, 1.0020367e-07, 1.9016676e-07,
8.6228073e-11, 5.7627930e-11, 1.5786619e-13, 9.9999928e-01,
3.2206058e-09, 3.1025860e-07], dtype=float32)
7

使用随机梯度下降训练CNN网络模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 重建CNN网络
cnn_model = build_cnn_model()

batch_size = 12
loss_history = mdl.util.LossHistory(smoothing_factor=0.95) # 记录训练过程的损失
plotter = mdl.util.PeriodicPlotter(sec=2, xlabel='Iterations', ylabel='Loss', scale='semilogy')
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-2) # 定义优化器

if hasattr(tqdm, '_instances'): tqdm._instances.clear() # 如果存在,清除掉

for idx in tqdm(range(0, train_images.shape[0], batch_size)):
# 首先抓取数据并将输入图像转化为张量。
(images, labels) = (train_images[idx:idx+batch_size], train_labels[idx:idx+batch_size])
images = tf.convert_to_tensor(images, dtype=tf.float32)
# 使用GradientTape方法记录梯度操作
with tf.GradientTape() as tape:
# 输入图片获得预测值
logits = cnn_model(images)

#'''TODO: compute the categorical cross entropy loss
# 计算分类交叉熵损失(categorical cross entropy loss)
loss_value = tf.keras.backend.sparse_categorical_crossentropy(labels, logits) # 目标, 输出
loss_history.append(loss_value.numpy().mean()) # 将损失记录到记录着中。
plotter.plot(loss_history.get())
# 反向传播
# 使用trainable_variables获得预测值
# 计算参数梯度,d右/d左,利用apply_gradients计算,将梯度应用到变量中。
# zip()将变量重新打包,输入其中。
grads = tape.gradient(loss_value, cnn_model.trainable_variables)
optimizer.apply_gradients(zip(grads, cnn_model.trainable_variables))

Part 2: Debiasing Facial Detection Systems 去偏面部检测系统

依赖项:

1
2
3
4
5
6
7
import tensorflow as tf
import IPython
import functools
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import mitdeeplearning as mdl

获取数据集

获取数据的网站:

1
2
3
4
# 从Celeba和ImageNet获取数据
path_to_training_data = tf.keras.utils.get_file('train_face.h5', 'https://www.dropbox.com/s/hlz8atheyozp1yx/train_face.h5?dl=1')
# 使用下载的数据集实例化训练数据集加载器
loader = mdl.lab2.TrainingDatasetLoader(path_to_training_data)

定义并训练CNN网络模型

定义四层卷积层,然后输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
n_filters = 12 # 卷积滤波器的base number 
# 定义标准CNN模型函数
def make_standard_classifier(n_outputs=1):
Conv2D = functools.partial(tf.keras.layers.Conv2D, padding='same', activation='relu') # 函数Conv2D的相应默认值修改为same和relu
BatchNormalization = tf.keras.layers.BatchNormalization
Flatten = tf.keras.layers.Flatten
Dense = functools.partial(tf.keras.layers.Dense, activation='relu')

model = tf.keras.Sequential([
Conv2D(filters=1*n_filters, kernel_size=5, strides=2),
BatchNormalization(),

Conv2D(filters=2*n_filters, kernel_size=5, strides=2),
BatchNormalization(),

Conv2D(filters=4*n_filters, kernel_size=3, strides=2),
BatchNormalization(),

Conv2D(filters=6*n_filters, kernel_size=3, strides=2),
BatchNormalization(),

Flatten(),
Dense(512),
Dense(n_outputs, activation=None),
])
return model

standard_classifier = make_standard_classifier()

对模型进行训练:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 训练使用的参数集
batch_size = 32
num_epochs = 2 # keep small to run faster 越小运算越快
learning_rate = 5e-4

optimizer = tf.keras.optimizers.Adam(learning_rate) # define our optimizer
loss_history = mdl.util.LossHistory(smoothing_factor=0.99) # to record loss evolution
plotter = mdl.util.PeriodicPlotter(sec=2, scale='semilogy')
if hasattr(tqdm, '_instances'): tqdm._instances.clear() # clear if it exists

@tf.function
def standard_train_step(x, y):
with tf.GradientTape() as tape:
# 将图像输入模型
logits = standard_classifier(x)
# 计算损失
loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits) # 目标值 预测值

# 反向传播,进行优化
grads = tape.gradient(loss, standard_classifier.trainable_variables)
optimizer.apply_gradients(zip(grads, standard_classifier.trainable_variables))
return loss

# 进行训练循环
for epoch in range(num_epochs):
for idx in tqdm(range(loader.get_train_size()//batch_size)):
# 抓取一批训练数据通过网络传播
x, y = loader.get_batch(batch_size)
loss = standard_train_step(x, y)
# 记录损失并将损失的演变绘制为训练的函数
loss_history.append(loss.numpy().mean())
plotter.plot(loss_history.get())

性能评估:

  • 此部分使用CelebA数据集进行评估。
    1
    2
    3
    4
    5
    6
    # 标准CNN网络模型
    # 使用CelebA+Imagenet两个数据集的子集进行评估
    (batch_x, batch_y) = loader.get_batch(5000)
    y_pred_standard = tf.round(tf.nn.sigmoid(standard_classifier.predict(batch_x)))
    acc_standard = tf.reduce_mean(tf.cast(tf.equal(batch_y, y_pred_standard), tf.float32))
    print("Standard CNN accuracy on (potentially biased) training set: {:.4f}".format(acc_standard.numpy()))
  • 使用未知数据集进行评估
1
2
3
4
5
6
7
8
9
10
11
### 评估在测试数据上的CNN网络模型
standard_classifier_logits = [standard_classifier(np.array(x, dtype=np.float32)) for x in test_faces]
standard_classifier_probs = tf.squeeze(tf.sigmoid(standard_classifier_logits)) # 删除为1的内容

# Plot the prediction accuracies per demographic 绘制预测准确度
xx = range(len(keys))
yy = standard_classifier_probs.numpy().mean(1)
plt.bar(xx, yy)
plt.xticks(xx, keys)
plt.ylim(max(0,yy.min()-yy.ptp()/2.), yy.max()+yy.ptp()/2.)
plt.title("Standard classifier predictions");

用于学习隐结构(latent structure)的变分自编码器(VAE)

对于某些训练集中不存在或者存在较少的特征,例如深色皮肤,戴帽子的人等,
这些特征可以通过无监督的学习方式,使用VAE进行训练。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 定义VAE损失函数
''' Function to calculate VAE loss given:
输入 x,
重构输出 x_recon,
编码均值 mu,
编码标准偏差的log值 logsigma,
隐损失的权重参数 kl_weight
'''
def vae_loss_function(x, x_recon, mu, logsigma, kl_weight=0.0005):
# 定义隐损失函数:Latent_Loss=0.5*sum(sigmaj+mu^2-1-log(sigmaj))
latent_loss = 1/2 * tf.reduce_sum(tf.exp(logsigma) + tf.square(mu) - 1 - logsigma, axis=1) #

# https://www.tensorflow.org/api_docs/python/tf/math/reduce_mean
# 定义L1规范的reconstruction loss,即重构损失:reconstruction_Loss=||x-x_recon)||1 计算输入与重构输入的1范数。
reconstruction_loss = tf.reduce_mean(tf.abs(x - x_recon), axis=(1, 2, 3)) # 即使使用这种axis,可能是按照不同顺序计算维度的平均值,最后得到的是一个数值,即number而非张量

# 定义VAE损失:Lvae=c*LatentLoss+Reconstruction_loss
vae_loss = kl_weight * latent_loss + reconstruction_loss
return vae_loss

VAEs重参数计算;

1
2
3
4
5
6
7
8
9
10
# 输入为:隐分布(潜在分布)均值、隐分布的log值
# 输出为:z张量,采样潜在向量
def sampling(z_mean, z_logsigma):
# By default, random.normal is "standard" (ie. mean=0 and std=1.0)
batch, latent_dim = z_mean.shape
epsilon = tf.random.normal(shape=(batch, latent_dim))

# 定义重参数化计算:z=mu+exp(0.5*log(sigma))*epsilon
z = z_mean + tf.math.exp(z_logsigma / 2) * epsilon # b不知道这里tf.math.exp需不需要加math,因为前面的reduce_mean没有加math
return z

DB-VAE 去偏变分自编码器

将具有更低出现频率的特征增加采样,而高频特征减少采样,达到平均采样的目的。
具体流程图如下:
2

DB损失函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 输入:输入值,重构值,真正的标签值,预测的标签值,隐分布的均值,隐分布标准方差的log值
def debiasing_loss_function(x, x_pred, y, y_logit, mu, logsigma):

# 使用VAE损失函数计算VAE损失
vae_loss = vae_loss_function(x, x_pred, mu, logsigma) # TODO

# sigmoid_cross_entropy_with_logits计算分类损失
# https://www.tensorflow.org/api_docs/python/tf/nn/sigmoid_cross_entropy_with_logits
classification_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=y , logits=y_logit)

# 使用训练数据标签创建变量:该变量反映是否是人脸数据的指标
face_indicator = tf.cast(tf.equal(y, 1), tf.float32)

# 定义总损失
total_loss = tf.reduce_mean(classification_loss + face_indicator * vae_loss)

return total_loss, classification_loss
# 输出:DBVAE的总损失,分类损失

定义DBVAE的解码器部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 解码器部分
n_filters = 12 # 卷积滤波器的base number,和CNN一样
latent_dim = 100 # 因变量的数量

def make_face_decoder_network():
# 定义不同的层
Conv2DTranspose = functools.partial(tf.keras.layers.Conv2DTranspose, padding='same', activation='relu')
BatchNormalization = tf.keras.layers.BatchNormalization
Flatten = tf.keras.layers.Flatten
Dense = functools.partial(tf.keras.layers.Dense, activation='relu')
Reshape = tf.keras.layers.Reshape

# 使用Sequential构建解码器
decoder = tf.keras.Sequential([
# Transform to pre-convolutional generation
Dense(units=4*4*6*n_filters), # 4x4 feature maps (with 6N occurances)
Reshape(target_shape=(4, 4, 6*n_filters)),

# Upscaling convolutions (inverse of encoder)
Conv2DTranspose(filters=4*n_filters, kernel_size=3, strides=2),
Conv2DTranspose(filters=2*n_filters, kernel_size=3, strides=2),
Conv2DTranspose(filters=1*n_filters, kernel_size=5, strides=2),
Conv2DTranspose(filters=3, kernel_size=5, strides=2),
])
return decoder

定义和创建DB—VAE网络:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class DB_VAE(tf.keras.Model):
def __init__(self, latent_dim):
super(DB_VAE, self).__init__()
self.latent_dim = latent_dim

# Define the number of outputs for the encoder. Recall that we have
# `latent_dim` latent variables, as well as a supervised output for the
# classification.
# 定义编码器的输出数量。
num_encoder_dims = 2*self.latent_dim + 1

self.encoder = make_standard_classifier(num_encoder_dims)
self.decoder = make_face_decoder_network()

# function to feed images into encoder, encode the latent space, and output
# classification probability
# 定义编码器,输出预测值,mu和logsigma
def encode(self, x):
encoder_output = self.encoder(x) # 编码器输出
# classification prediction 分类预测值
y_logit = tf.expand_dims(encoder_output[:, 0], -1)
# latent variable distribution parameters 因变量分布参数
z_mean = encoder_output[:, 1:self.latent_dim+1]
z_logsigma = encoder_output[:, self.latent_dim+1:]
return y_logit, z_mean, z_logsigma

# VAE reparameterization: given a mean and logsigma, sample latent variables
def reparameterize(self, z_mean, z_logsigma):
# VAE重参数构建
z = sampling(z_mean, z_logsigma)
return z

# 解码因空间,输出输入的重建
def decode(self, z):
reconstruction = self.decoder(z) # why???
return reconstruction

# The call function will be used to pass inputs x through the core VAE
def call(self, x):
# Encode input to a prediction and latent space
y_logit, z_mean, z_logsigma = self.encode(x)
# 重参数化
z = self.reparameterize(z_mean, z_logsigma)
# 使用decode重构建
recon = self.decode(z)
return y_logit, z_mean, z_logsigma, recon

# 预测是否是人脸
def predict(self, x):
y_logit, z_mean, z_logsigma = self.encode(x)
return y_logit

dbvae = DB_VAE(latent_dim)

实现DB-VAE

定义一个辅助函数,输出隐变量均值

1
2
3
4
5
6
7
8
9
def get_latent_mu(images, dbvae, batch_size=1024):
N = images.shape[0]
mu = np.zeros((N, latent_dim))
for start_ind in range(0, N, batch_size):
end_ind = min(start_ind+batch_size, N+1)
batch = (images[start_ind:end_ind]).astype(np.float32)/255.
_, batch_mu, _ = dbvae.encode(batch)
mu[start_ind:end_ind] = batch_mu
return mu

重新定义重采样算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 根据图像在训练数据中的分布重新计算批次中图像的采样概率的函数
def get_training_sample_probabilities(images, dbvae, bins=10, smoothing_fac=0.001):
print("Recomputing the sampling probabilities")
mu = get_latent_mu(images, dbvae) # 获得潜在变量均值
training_sample_p = np.zeros(mu.shape[0]) # 图像采样概率

# 考虑每个潜在变量的分布
for i in range(latent_dim):

latent_distribution = mu[:,i]
# generate a histogram of the latent distribution
# 潜在分布直方图
hist_density, bin_edges = np.histogram(latent_distribution, density=True, bins=bins)

# find which latent bin every data sample falls in
bin_edges[0] = -float('inf')
bin_edges[-1] = float('inf')

# call the digitize function to find which bins in the latent distribution every data sample falls in to
# https://docs.scipy.org/doc/numpy-1.13.0/reference/generated/numpy.digitize.html
bin_idx = np.digitize(latent_distribution, bin_edges)O

# 平滑密度函数
hist_smoothed_density = hist_density + smoothing_fac
hist_smoothed_density = hist_smoothed_density / np.sum(hist_smoothed_density)

p = 1.0/(hist_smoothed_density[bin_idx-1])# 反转密度函数
p = p/np.sum(p) # 将概率归一化
training_sample_p = np.maximum(p, training_sample_p) # 选择较大的p值作为采样概率
training_sample_p /= np.sum(training_sample_p) # 最终归一化

return training_sample_p

训练DB-VAE网络:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Hyperparameters
batch_size = 32
learning_rate = 5e-4
latent_dim = 100

# DB-VAE训练次数增加,因为它更加复杂。
num_epochs = 6

# 创建一个DB-VAE对象dbvae,以及一个Adam优化器
dbvae = DB_VAE(100)
optimizer = tf.keras.optimizers.Adam(learning_rate)

# 使用tf.function使得得以绘制直方图
@tf.function
def debiasing_train_step(x, y):

with tf.GradientTape() as tape:
# 输入x进入dbvae
y_logit, z_mean, z_logsigma, x_recon = dbvae(x)

# 计算损失
loss, class_loss = debiasing_loss_function(x=x, x_pred=x_recon, y=y, y_logit=y_logit, mu=z_mean, logsigma=z_logsigma) # TODO

# 使用GradientTape.gradient计算梯度
grads = tape.gradient(loss, dbvae.trainable_variables)

# apply gradients to variables 调用梯度到优化器
optimizer.apply_gradients(zip(grads, dbvae.trainable_variables))
return loss

# 加载数据集
all_faces = loader.get_all_train_faces()

if hasattr(tqdm, '_instances'): tqdm._instances.clear() # 存在则清除

# 训练循环
for i in range(num_epochs):
IPython.display.clear_output(wait=True)
print("Starting epoch {}/{}".format(i+1, num_epochs))

# Recompute data sampling proabilities 重新计算数据采样概率
p_faces = get_training_sample_probabilities(images=all_faces ,dbvae=dbvae)

# 获取一批训练数据并计算训练步长
for j in tqdm(range(loader.get_train_size() // batch_size)):
# load a batch of data
(x, y) = loader.get_batch(batch_size, p_pos=p_faces)
# loss optimization
loss = debiasing_train_step(x, y)

# plot the progress every 200 steps
if j % 500 == 0:
mdl.util.plot_sample(x, y, dbvae)

准确性评估:

1
2
3
4
5
6
7
8
9
10
dbvae_logits = [dbvae.predict(np.array(x, dtype=np.float32)) for x in test_faces]
dbvae_probs = tf.squeeze(tf.sigmoid(dbvae_logits))

xx = np.arange(len(keys))
plt.bar(xx, standard_classifier_probs.numpy().mean(1), width=0.2, label="Standard CNN")
plt.bar(xx+0.2, dbvae_probs.numpy().mean(1), width=0.2, label="DB-VAE")
plt.xticks(xx, keys);
plt.title("Network predictions on test dataset")
plt.ylabel("Probability"); plt.legend(bbox_to_anchor=(1.04,1), loc="upper left");