Keras一般的过程就是,定义模型,模型配置(model.compile),训练模型(model.fit),参数提取(model.summary)。其中模型配置,训练模型,参数提取就是相当定式化的过程,如果是进行类别的分类任务还好,但是,如果进行比如GAN网络的训练,就拉跨了。更为重要的是Keras的这些部分,弱化了我们对整个网络训练过程的理解,因为我们在这些过程中,只要按照例子改变传给model.compile,model.fit,model.summary的变量就可以直接训练,可以见我的另一篇博客: 我的博客.



这里的程序最好与上文中的博客对比来看。 首先依旧是相关package的导入。

''' @Auther : gaoxin @Date : 2020.08.26 @Version : 1.0 ''' import tensorflow as tf from PIL import Image import numpy as np from matplotlib import pyplot as plt import os from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Dropout, Flatten, Dense from tensorflow.keras import Model import warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' warnings.filterwarnings("ignore") np.set_printoptions(threshold = np.inf)


train_path = './mnist_image_label/mnist_train_jpg_60000/' train_txt = './mnist_image_label/mnist_train_jpg_60000.txt' x_train_savepath = './mnist_image_label/mnist_x_train.npy' y_train_savepath = './mnist_image_label/mnist_y_train.npy' test_path = './mnist_image_label/mnist_test_jpg_10000/' test_txt = './mnist_image_label/mnist_test_jpg_10000.txt' x_test_savepath = './mnist_image_label/mnist_x_test.npy' y_test_savepath = './mnist_image_label/mnist_y_test.npy' def generateds(path, txt): f = open(txt, 'r') contents = f.readlines() f.close() x, y_ = [], [] for content in contents: value = content.split() img_path = path + value[0] img = Image.open(img_path) img = np.array(img.convert('L')) img = img / 255. x.append(img) y_.append(value[1]) print('loading : ' + content) x = np.array(x) y_ = np.array(y_) y_ = y_.astype(np.int64) return x, y_ if os.path.exists(x_train_savepath) and os.path.exists(y_train_savepath) and os.path.exists( x_test_savepath) and os.path.exists(y_test_savepath): print('-------------Load Datasets-----------------') x_train_save = np.load(x_train_savepath) y_train = np.load(y_train_savepath) x_test_save = np.load(x_test_savepath) y_test = np.load(y_test_savepath) x_train = np.reshape(x_train_save, (len(x_train_save), 28, 28, 1)) x_test = np.reshape(x_test_save, (len(x_test_save), 28, 28, 1)) else: print('-------------Generate Datasets-----------------') x_train, y_train = generateds(train_path, train_txt) x_test, y_test = generateds(test_path, test_txt) print('-------------Save Datasets-----------------') x_train_save = np.reshape(x_train, (len(x_train), -1)) x_test_save = np.reshape(x_test, (len(x_test), -1)) np.save(x_train_savepath, x_train_save) np.save(y_train_savepath, y_train) np.save(x_test_savepath, x_test_save) np.save(y_test_savepath, y_test)


class Baseline(Model) : def __init__(self): super(Baseline, self).__init__() self.c1 = Conv2D(filters=6, kernel_size=(5, 5), padding='same') # 卷积层 self.b1 = BatchNormalization() # BN层 self.a1 = Activation('relu') # 激活层 self.p1 = MaxPool2D(pool_size=(2, 2), strides=2, padding='same') # 池化层 self.d1 = Dropout(0.2) # dropout层 self.flatten = Flatten() self.f1 = Dense(128, activation='relu') self.d2 = Dropout(0.2) self.f2 = Dense(10, activation='softmax') @tf.function def call(self, x): x = self.c1(x) x = self.b1(x) x = self.a1(x) x = self.p1(x) x = self.d1(x) x = self.flatten(x) x = self.f1(x) x = self.d2(x) y = self.f2(x) return y

然后是损失函数的定义。使用均方差作为损失函数。 因为这里的inputs_y是一个batch,batch的每个元素为一个数字(1~10),batch大小为[batch_size,],而prediction大小为[batch_size,10](一行的10个中进行概率分布),因此需要把inputs_y变为[batch_size,10]大小的形式,建立各项为零的数组,对应数字位置的0置为1(这是预测的理想状态)。

def loss(model , inputs_x , inputs_y) : prediction = model(inputs_x) y = np.zeros(prediction.shape) for i in range(len(inputs_y)) : y[i,inputs_y[i]] = 1 #print(y) loss = tf.reduce_mean(tf.square(y-prediction)) return loss


def accuracy(model ,x , y) : acc = 0 prediction = model(x) y_ = tf.argmax(prediction,1) num = len(y_) for i in range(num) : if y[i] == y_[i] : acc = acc+1 return acc/num


class dataset() : def __init__(self,train_x,train_y) : self.counter = 0 self.train_x = train_x self.train_y = train_y def sample(self, batch_size) : batch_x = self.train_x[self.counter:self.counter+batch_size] batch_y = self.train_y[self.counter:self.counter+batch_size] self.counter = self.counter + batch_size if self.counter > self.train_y.shape[0] : self.counter = 0 return batch_x , batch_y


def training() : model = Baseline() model.build(input_shape = (None , 28 , 28 , 1)) optimizer = tf.keras.optimizers.Adam()#learning_rate = 0.001 batch_size = 32 batch_num = y_train.shape[0] // batch_size data = dataset(x_train,y_train) summary_writer = tf.summary.create_file_writer('./tensorboard') tf.summary.trace_on(graph=True, profiler=True) # 开启Trace,可以记录图结构和profile信息 checkpoint_save_path = "./checkpoint/mnist.ckpt" if os.path.exists(checkpoint_save_path + '.index'): print('-------------load the model-----------------') model.load_weights(checkpoint_save_path) for epoch in range(1) : for num in range(batch_num) : inputs_x, inputs_y = data.sample(batch_size) with tf.GradientTape() as tape: y_loss = loss(model , inputs_x , inputs_y) grads = tape.gradient(y_loss, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) if num % 600 == 0: acc = accuracy(model , x_test , y_test) print(num, 'y-loss:',float(y_loss), 'accuracy', float(acc)) with summary_writer.as_default(): # 希望使用的记录器 tf.summary.scalar("y_loss", y_loss, step=num) print('#################################################') with summary_writer.as_default(): tf.summary.trace_export(name="model_trace", step=0, profiler_outdir='./tensorboard') model.save_weights(checkpoint_save_path)



with tf.GradientTape() as tape: y_loss = loss(model , inputs_x , inputs_y) grads = tape.gradient(y_loss, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables))



if num % 600 == 0: acc = accuracy(model , x_test , y_test) print(num, 'y-loss:',float(y_loss), 'accuracy', float(acc))


with summary_writer.as_default(): # 希望使用的记录器 tf.summary.scalar("y_loss", y_loss, step=num)


tensorboard --logdir=./tensorboard


附上, 训练时的一个截图: 最后附上我的完整代码,emmmm,因为我图了个简便,便于和前一篇对比,所以这里代码的整体结构挺烂的,不喜勿喷哈。然后数据集也是前文的数据集。

''' @Auther : gaoxin @Date : 2020.08.26 @Version : 1.0 ''' import tensorflow as tf from PIL import Image import numpy as np from matplotlib import pyplot as plt import os from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Dropout, Flatten, Dense from tensorflow.keras import Model import warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' warnings.filterwarnings("ignore") np.set_printoptions(threshold = np.inf) train_path = './mnist_image_label/mnist_train_jpg_60000/' train_txt = './mnist_image_label/mnist_train_jpg_60000.txt' x_train_savepath = './mnist_image_label/mnist_x_train.npy' y_train_savepath = './mnist_image_label/mnist_y_train.npy' test_path = './mnist_image_label/mnist_test_jpg_10000/' test_txt = './mnist_image_label/mnist_test_jpg_10000.txt' x_test_savepath = './mnist_image_label/mnist_x_test.npy' y_test_savepath = './mnist_image_label/mnist_y_test.npy' def generateds(path, txt): f = open(txt, 'r') contents = f.readlines() f.close() x, y_ = [], [] for content in contents: value = content.split() img_path = path + value[0] img = Image.open(img_path) img = np.array(img.convert('L')) img = img / 255. x.append(img) y_.append(value[1]) print('loading : ' + content) x = np.array(x) y_ = np.array(y_) y_ = y_.astype(np.int64) return x, y_ if os.path.exists(x_train_savepath) and os.path.exists(y_train_savepath) and os.path.exists( x_test_savepath) and os.path.exists(y_test_savepath): print('-------------Load Datasets-----------------') x_train_save = np.load(x_train_savepath) y_train = np.load(y_train_savepath) x_test_save = np.load(x_test_savepath) y_test = np.load(y_test_savepath) x_train = np.reshape(x_train_save, (len(x_train_save), 28, 28, 1)) x_test = np.reshape(x_test_save, (len(x_test_save), 28, 28, 1)) else: print('-------------Generate Datasets-----------------') x_train, y_train = generateds(train_path, train_txt) x_test, y_test = generateds(test_path, test_txt) print('-------------Save Datasets-----------------') x_train_save = np.reshape(x_train, (len(x_train), -1)) x_test_save = np.reshape(x_test, (len(x_test), -1)) np.save(x_train_savepath, x_train_save) np.save(y_train_savepath, y_train) np.save(x_test_savepath, x_test_save) np.save(y_test_savepath, y_test) class Baseline(Model) : def __init__(self): super(Baseline, self).__init__() self.c1 = Conv2D(filters=6, kernel_size=(5, 5), padding='same') # 卷积层 self.b1 = BatchNormalization() # BN层 self.a1 = Activation('relu') # 激活层 self.p1 = MaxPool2D(pool_size=(2, 2), strides=2, padding='same') # 池化层 self.d1 = Dropout(0.2) # dropout层 self.flatten = Flatten() self.f1 = Dense(128, activation='relu') self.d2 = Dropout(0.2) self.f2 = Dense(10, activation='softmax') @tf.function def call(self, x): x = self.c1(x) x = self.b1(x) x = self.a1(x) x = self.p1(x) x = self.d1(x) x = self.flatten(x) x = self.f1(x) x = self.d2(x) y = self.f2(x) return y def loss(model , inputs_x , inputs_y) : prediction = model(inputs_x) y = np.zeros(prediction.shape) for i in range(len(inputs_y)) : y[i,inputs_y[i]] = 1 #print(y) loss = tf.reduce_mean(tf.square(y-prediction)) return loss def accuracy(model ,x , y) : acc = 0 prediction = model(x) y_ = tf.argmax(prediction,1) num = len(y_) for i in range(num) : if y[i] == y_[i] : acc = acc+1 return acc/num class dataset() : def __init__(self,train_x,train_y) : self.counter = 0 self.train_x = train_x self.train_y = train_y def sample(self, batch_size) : batch_x = self.train_x[self.counter:self.counter+batch_size] batch_y = self.train_y[self.counter:self.counter+batch_size] self.counter = self.counter + batch_size if self.counter > self.train_y.shape[0] : self.counter = 0 return batch_x , batch_y def training() : model = Baseline() model.build(input_shape = (None , 28 , 28 , 1)) optimizer = tf.keras.optimizers.Adam()#learning_rate = 0.001 batch_size = 32 batch_num = y_train.shape[0] // batch_size data = dataset(x_train,y_train) summary_writer = tf.summary.create_file_writer('./tensorboard') tf.summary.trace_on(graph=True, profiler=True) # 开启Trace,可以记录图结构和profile信息 checkpoint_save_path = "./checkpoint/mnist.ckpt" if os.path.exists(checkpoint_save_path + '.index'): print('-------------load the model-----------------') model.load_weights(checkpoint_save_path) for epoch in range(1) : for num in range(batch_num) : inputs_x, inputs_y = data.sample(batch_size) with tf.GradientTape() as tape: y_loss = loss(model , inputs_x , inputs_y) grads = tape.gradient(y_loss, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) if num % 600 == 0: acc = accuracy(model , x_test , y_test) print(num, 'y-loss:',float(y_loss), 'accuracy', float(acc)) with summary_writer.as_default(): # 希望使用的记录器 tf.summary.scalar("y_loss", y_loss, step=num) print('#################################################') with summary_writer.as_default(): tf.summary.trace_export(name="model_trace", step=0, profiler_outdir='./tensorboard') model.save_weights(checkpoint_save_path) if __name__ == '__main__' : training()


在最后加上官网的一个mnist训练的例子。 给一个官网教程连接:https://tensorflow.google.cn/tutorials/quickstart/advanced

import tensorflow as tf from tensorflow.keras.layers import Dense, Flatten, Conv2D from tensorflow.keras import Model mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] train_ds = tf.data.Dataset.from_tensor_slices( (x_train, y_train)).shuffle(10000).batch(32) test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(32) class MyModel(Model): def __init__(self): super(MyModel, self).__init__() self.conv1 = Conv2D(32, 3, activation='relu') self.flatten = Flatten() self.d1 = Dense(128, activation='relu') self.d2 = Dense(10, activation='softmax') def call(self, x): x = self.conv1(x) x = self.flatten(x) x = self.d1(x) return self.d2(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy() optimizer = tf.keras.optimizers.Adam() train_loss = tf.keras.metrics.Mean(name='train_loss') train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy') test_loss = tf.keras.metrics.Mean(name='test_loss') test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy') @tf.function def train_step(images, labels): with tf.GradientTape() as tape: predictions = model(images) loss = loss_object(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) train_loss(loss) train_accuracy(labels, predictions) @tf.function def test_step(images, labels): predictions = model(images) t_loss = loss_object(labels, predictions) test_loss(t_loss) test_accuracy(labels, predictions) EPOCHS = 5 for epoch in range(EPOCHS): # 在下一个epoch开始时,重置评估指标 train_loss.reset_states() train_accuracy.reset_states() test_loss.reset_states() test_accuracy.reset_states() for images, labels in train_ds: train_step(images, labels) for test_images, test_labels in test_ds: test_step(test_images, test_labels) template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}' print (template.format(epoch+1, train_loss.result(), train_accuracy.result()*100, test_loss.result(), test_accuracy.result()*100))