tensorflow2的一般训练方法(MNIST为例)

tech2024-07-07  73

tensorflow2的一般训练方法(MNIST为例)

简介我的程序官网例子

简介

在tensorflow中,有一类高度定式化的方法,就是使用Keras进行相关训练,我不否认Keras的便利性,但是,如果需要定义一些更为复杂的训练过程,Keras的一些功能个人感觉多多少少有点不适合。

Keras一般的过程就是,定义模型,模型配置(model.compile),训练模型(model.fit),参数提取(model.summary)。其中模型配置,训练模型,参数提取就是相当定式化的过程,如果是进行类别的分类任务还好,但是,如果进行比如GAN网络的训练,就拉跨了。更为重要的是Keras的这些部分,弱化了我们对整个网络训练过程的理解,因为我们在这些过程中,只要按照例子改变传给model.compile,model.fit,model.summary的变量就可以直接训练,可以见我的另一篇博客: 我的博客.

因此,我们需要一种更为普遍性的代码来进行训练,这个就是这篇文章的任务。在这篇文章中,我们仍旧以MNIST训练为例子,进行解析。

我的程序

这里的程序最好与上文中的博客对比来看。 首先依旧是相关package的导入。

''' @Auther : gaoxin @Date : 2020.08.26 @Version : 1.0 ''' import tensorflow as tf from PIL import Image import numpy as np from matplotlib import pyplot as plt import os from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Dropout, Flatten, Dense from tensorflow.keras import Model import warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' warnings.filterwarnings("ignore") np.set_printoptions(threshold = np.inf)

原来一样的预处理过程,不改变。

train_path = './mnist_image_label/mnist_train_jpg_60000/' train_txt = './mnist_image_label/mnist_train_jpg_60000.txt' x_train_savepath = './mnist_image_label/mnist_x_train.npy' y_train_savepath = './mnist_image_label/mnist_y_train.npy' test_path = './mnist_image_label/mnist_test_jpg_10000/' test_txt = './mnist_image_label/mnist_test_jpg_10000.txt' x_test_savepath = './mnist_image_label/mnist_x_test.npy' y_test_savepath = './mnist_image_label/mnist_y_test.npy' def generateds(path, txt): f = open(txt, 'r') contents = f.readlines() f.close() x, y_ = [], [] for content in contents: value = content.split() img_path = path + value[0] img = Image.open(img_path) img = np.array(img.convert('L')) img = img / 255. x.append(img) y_.append(value[1]) print('loading : ' + content) x = np.array(x) y_ = np.array(y_) y_ = y_.astype(np.int64) return x, y_ if os.path.exists(x_train_savepath) and os.path.exists(y_train_savepath) and os.path.exists( x_test_savepath) and os.path.exists(y_test_savepath): print('-------------Load Datasets-----------------') x_train_save = np.load(x_train_savepath) y_train = np.load(y_train_savepath) x_test_save = np.load(x_test_savepath) y_test = np.load(y_test_savepath) x_train = np.reshape(x_train_save, (len(x_train_save), 28, 28, 1)) x_test = np.reshape(x_test_save, (len(x_test_save), 28, 28, 1)) else: print('-------------Generate Datasets-----------------') x_train, y_train = generateds(train_path, train_txt) x_test, y_test = generateds(test_path, test_txt) print('-------------Save Datasets-----------------') x_train_save = np.reshape(x_train, (len(x_train), -1)) x_test_save = np.reshape(x_test, (len(x_test), -1)) np.save(x_train_savepath, x_train_save) np.save(y_train_savepath, y_train) np.save(x_test_savepath, x_test_save) np.save(y_test_savepath, y_test)

接着进行模型定义,也与之前的相同。

class Baseline(Model) : def __init__(self): super(Baseline, self).__init__() self.c1 = Conv2D(filters=6, kernel_size=(5, 5), padding='same') # 卷积层 self.b1 = BatchNormalization() # BN层 self.a1 = Activation('relu') # 激活层 self.p1 = MaxPool2D(pool_size=(2, 2), strides=2, padding='same') # 池化层 self.d1 = Dropout(0.2) # dropout层 self.flatten = Flatten() self.f1 = Dense(128, activation='relu') self.d2 = Dropout(0.2) self.f2 = Dense(10, activation='softmax') @tf.function def call(self, x): x = self.c1(x) x = self.b1(x) x = self.a1(x) x = self.p1(x) x = self.d1(x) x = self.flatten(x) x = self.f1(x) x = self.d2(x) y = self.f2(x) return y

然后是损失函数的定义。使用均方差作为损失函数。 因为这里的inputs_y是一个batch,batch的每个元素为一个数字(1~10),batch大小为[batch_size,],而prediction大小为[batch_size,10](一行的10个中进行概率分布),因此需要把inputs_y变为[batch_size,10]大小的形式,建立各项为零的数组,对应数字位置的0置为1(这是预测的理想状态)。

def loss(model , inputs_x , inputs_y) : prediction = model(inputs_x) y = np.zeros(prediction.shape) for i in range(len(inputs_y)) : y[i,inputs_y[i]] = 1 #print(y) loss = tf.reduce_mean(tf.square(y-prediction)) return loss

接着是一个test数据集的检测,检测准确率。

def accuracy(model ,x , y) : acc = 0 prediction = model(x) y_ = tf.argmax(prediction,1) num = len(y_) for i in range(num) : if y[i] == y_[i] : acc = acc+1 return acc/num

因为这里的训练是按照batch给模型喂数据的,所以还要有一个函数依次按照batch给模型投喂数据进行训练,为了方便,这里写成了一个class。

class dataset() : def __init__(self,train_x,train_y) : self.counter = 0 self.train_x = train_x self.train_y = train_y def sample(self, batch_size) : batch_x = self.train_x[self.counter:self.counter+batch_size] batch_y = self.train_y[self.counter:self.counter+batch_size] self.counter = self.counter + batch_size if self.counter > self.train_y.shape[0] : self.counter = 0 return batch_x , batch_y

然后是本次的主程序啦,定义一个训练的函数。

def training() : model = Baseline() model.build(input_shape = (None , 28 , 28 , 1)) optimizer = tf.keras.optimizers.Adam()#learning_rate = 0.001 batch_size = 32 batch_num = y_train.shape[0] // batch_size data = dataset(x_train,y_train) summary_writer = tf.summary.create_file_writer('./tensorboard') tf.summary.trace_on(graph=True, profiler=True) # 开启Trace,可以记录图结构和profile信息 checkpoint_save_path = "./checkpoint/mnist.ckpt" if os.path.exists(checkpoint_save_path + '.index'): print('-------------load the model-----------------') model.load_weights(checkpoint_save_path) for epoch in range(1) : for num in range(batch_num) : inputs_x, inputs_y = data.sample(batch_size) with tf.GradientTape() as tape: y_loss = loss(model , inputs_x , inputs_y) grads = tape.gradient(y_loss, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) if num % 600 == 0: acc = accuracy(model , x_test , y_test) print(num, 'y-loss:',float(y_loss), 'accuracy', float(acc)) with summary_writer.as_default(): # 希望使用的记录器 tf.summary.scalar("y_loss", y_loss, step=num) print('#################################################') with summary_writer.as_default(): tf.summary.trace_export(name="model_trace", step=0, profiler_outdir='./tensorboard') model.save_weights(checkpoint_save_path)

这里,一个epoch就是数据集遍历训练一边的一个轮回,我只来了一遍;一个batch就是一次喂给模型的多个最小单位的集合,比如这里的一个batch是32个。

这个程序段中间的这一段:

with tf.GradientTape() as tape: y_loss = loss(model , inputs_x , inputs_y) grads = tape.gradient(y_loss, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables))

利用了tensorflow的自动求导机制,运用前面定义好的Adam优化器进行优化求解。

下面的这一段是为了查看训练的进度与主要参数情况。

if num % 600 == 0: acc = accuracy(model , x_test , y_test) print(num, 'y-loss:',float(y_loss), 'accuracy', float(acc))

下面这一段则是进行记录,在tensorboard可以查看训练的收敛过程。

with summary_writer.as_default(): # 希望使用的记录器 tf.summary.scalar("y_loss", y_loss, step=num)

为了观察训练收敛过程,在命令行输入:

tensorboard --logdir=./tensorboard

然后把输入命令后的网址复制到浏览器即可。

附上, 训练时的一个截图: 最后附上我的完整代码,emmmm,因为我图了个简便,便于和前一篇对比,所以这里代码的整体结构挺烂的,不喜勿喷哈。然后数据集也是前文的数据集。

''' @Auther : gaoxin @Date : 2020.08.26 @Version : 1.0 ''' import tensorflow as tf from PIL import Image import numpy as np from matplotlib import pyplot as plt import os from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Dropout, Flatten, Dense from tensorflow.keras import Model import warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' warnings.filterwarnings("ignore") np.set_printoptions(threshold = np.inf) train_path = './mnist_image_label/mnist_train_jpg_60000/' train_txt = './mnist_image_label/mnist_train_jpg_60000.txt' x_train_savepath = './mnist_image_label/mnist_x_train.npy' y_train_savepath = './mnist_image_label/mnist_y_train.npy' test_path = './mnist_image_label/mnist_test_jpg_10000/' test_txt = './mnist_image_label/mnist_test_jpg_10000.txt' x_test_savepath = './mnist_image_label/mnist_x_test.npy' y_test_savepath = './mnist_image_label/mnist_y_test.npy' def generateds(path, txt): f = open(txt, 'r') contents = f.readlines() f.close() x, y_ = [], [] for content in contents: value = content.split() img_path = path + value[0] img = Image.open(img_path) img = np.array(img.convert('L')) img = img / 255. x.append(img) y_.append(value[1]) print('loading : ' + content) x = np.array(x) y_ = np.array(y_) y_ = y_.astype(np.int64) return x, y_ if os.path.exists(x_train_savepath) and os.path.exists(y_train_savepath) and os.path.exists( x_test_savepath) and os.path.exists(y_test_savepath): print('-------------Load Datasets-----------------') x_train_save = np.load(x_train_savepath) y_train = np.load(y_train_savepath) x_test_save = np.load(x_test_savepath) y_test = np.load(y_test_savepath) x_train = np.reshape(x_train_save, (len(x_train_save), 28, 28, 1)) x_test = np.reshape(x_test_save, (len(x_test_save), 28, 28, 1)) else: print('-------------Generate Datasets-----------------') x_train, y_train = generateds(train_path, train_txt) x_test, y_test = generateds(test_path, test_txt) print('-------------Save Datasets-----------------') x_train_save = np.reshape(x_train, (len(x_train), -1)) x_test_save = np.reshape(x_test, (len(x_test), -1)) np.save(x_train_savepath, x_train_save) np.save(y_train_savepath, y_train) np.save(x_test_savepath, x_test_save) np.save(y_test_savepath, y_test) class Baseline(Model) : def __init__(self): super(Baseline, self).__init__() self.c1 = Conv2D(filters=6, kernel_size=(5, 5), padding='same') # 卷积层 self.b1 = BatchNormalization() # BN层 self.a1 = Activation('relu') # 激活层 self.p1 = MaxPool2D(pool_size=(2, 2), strides=2, padding='same') # 池化层 self.d1 = Dropout(0.2) # dropout层 self.flatten = Flatten() self.f1 = Dense(128, activation='relu') self.d2 = Dropout(0.2) self.f2 = Dense(10, activation='softmax') @tf.function def call(self, x): x = self.c1(x) x = self.b1(x) x = self.a1(x) x = self.p1(x) x = self.d1(x) x = self.flatten(x) x = self.f1(x) x = self.d2(x) y = self.f2(x) return y def loss(model , inputs_x , inputs_y) : prediction = model(inputs_x) y = np.zeros(prediction.shape) for i in range(len(inputs_y)) : y[i,inputs_y[i]] = 1 #print(y) loss = tf.reduce_mean(tf.square(y-prediction)) return loss def accuracy(model ,x , y) : acc = 0 prediction = model(x) y_ = tf.argmax(prediction,1) num = len(y_) for i in range(num) : if y[i] == y_[i] : acc = acc+1 return acc/num class dataset() : def __init__(self,train_x,train_y) : self.counter = 0 self.train_x = train_x self.train_y = train_y def sample(self, batch_size) : batch_x = self.train_x[self.counter:self.counter+batch_size] batch_y = self.train_y[self.counter:self.counter+batch_size] self.counter = self.counter + batch_size if self.counter > self.train_y.shape[0] : self.counter = 0 return batch_x , batch_y def training() : model = Baseline() model.build(input_shape = (None , 28 , 28 , 1)) optimizer = tf.keras.optimizers.Adam()#learning_rate = 0.001 batch_size = 32 batch_num = y_train.shape[0] // batch_size data = dataset(x_train,y_train) summary_writer = tf.summary.create_file_writer('./tensorboard') tf.summary.trace_on(graph=True, profiler=True) # 开启Trace,可以记录图结构和profile信息 checkpoint_save_path = "./checkpoint/mnist.ckpt" if os.path.exists(checkpoint_save_path + '.index'): print('-------------load the model-----------------') model.load_weights(checkpoint_save_path) for epoch in range(1) : for num in range(batch_num) : inputs_x, inputs_y = data.sample(batch_size) with tf.GradientTape() as tape: y_loss = loss(model , inputs_x , inputs_y) grads = tape.gradient(y_loss, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) if num % 600 == 0: acc = accuracy(model , x_test , y_test) print(num, 'y-loss:',float(y_loss), 'accuracy', float(acc)) with summary_writer.as_default(): # 希望使用的记录器 tf.summary.scalar("y_loss", y_loss, step=num) print('#################################################') with summary_writer.as_default(): tf.summary.trace_export(name="model_trace", step=0, profiler_outdir='./tensorboard') model.save_weights(checkpoint_save_path) if __name__ == '__main__' : training()

官网例子

在最后加上官网的一个mnist训练的例子。 给一个官网教程连接:https://tensorflow.google.cn/tutorials/quickstart/advanced

import tensorflow as tf from tensorflow.keras.layers import Dense, Flatten, Conv2D from tensorflow.keras import Model mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] train_ds = tf.data.Dataset.from_tensor_slices( (x_train, y_train)).shuffle(10000).batch(32) test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(32) class MyModel(Model): def __init__(self): super(MyModel, self).__init__() self.conv1 = Conv2D(32, 3, activation='relu') self.flatten = Flatten() self.d1 = Dense(128, activation='relu') self.d2 = Dense(10, activation='softmax') def call(self, x): x = self.conv1(x) x = self.flatten(x) x = self.d1(x) return self.d2(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy() optimizer = tf.keras.optimizers.Adam() train_loss = tf.keras.metrics.Mean(name='train_loss') train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy') test_loss = tf.keras.metrics.Mean(name='test_loss') test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy') @tf.function def train_step(images, labels): with tf.GradientTape() as tape: predictions = model(images) loss = loss_object(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) train_loss(loss) train_accuracy(labels, predictions) @tf.function def test_step(images, labels): predictions = model(images) t_loss = loss_object(labels, predictions) test_loss(t_loss) test_accuracy(labels, predictions) EPOCHS = 5 for epoch in range(EPOCHS): # 在下一个epoch开始时,重置评估指标 train_loss.reset_states() train_accuracy.reset_states() test_loss.reset_states() test_accuracy.reset_states() for images, labels in train_ds: train_step(images, labels) for test_images, test_labels in test_ds: test_step(test_images, test_labels) template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}' print (template.format(epoch+1, train_loss.result(), train_accuracy.result()*100, test_loss.result(), test_accuracy.result()*100))
最新回复(0)