关于DQN的工作过程可以参考以下图片:
结合莫烦的机器学习教程,我总结了我搭建的DQN的步骤以及过程。
参数的初始化。 import tensorflow as tf from tensorflow.keras import layers import numpy as np class DeepQNetwork: def __init__( self, n_actions, n_features, learning_rate=0.01, reward_decay=0.9, replace_target_iter=300, e_greedy=0.9, e_greedy_increment=0.009, memory_size=500, batch_size=32 ): self.n_actions = n_actions self.n_features = n_features self.lr = learning_rate self.gamma = reward_decay self.epsilon_max = e_greedy self.replace_target_iter = replace_target_iter self.epsilon_increment = e_greedy_increment self.memory_size = memory_size self.batch_size = batch_size self.epsilon = 0.9 self._built_model() self.memory_counter = 0 # 记录学习次数(用于判断是否更换 target_net 参数) self.learn_step_counter = 0 self.cost_his = [] # 记录所有 cost 变化, 用于最后 plot 出来观看 self.memory = np.zeros((self.memory_size, n_features * 2 + 2)) # 两个state加上reward和action 网络的创建 def _built_model(self): # 创建当前值网络 input_shape = np.zeros(4) EvalModel = tf.keras.Sequential( [ layers.Dense(self.n_features, activation='relu'), layers.Dense(self.n_actions, activation=None) ] ) EvalModel.compile(optimizer=tf.keras.optimizers.SGD(lr=0.1), loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False), metrics=['categorical_accuracy']) self.EvalModel = EvalModel # 创建目标值网络 TargetModel = tf.keras.Sequential( [ layers.Dense(self.n_features, activation='relu'), layers.Dense(self.n_actions, activation=None) ] ) TargetModel.compile(optimizer=tf.keras.optimizers.SGD(lr=0.1), loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False), metrics=['categorical_accuracy']) self.TargetModel = TargetModelDQN需要两套网络,这里使用Sequential方式创建的,创建模式比较固定。
记忆库的更新 def store_transition(self, s, a, r, s_): if not hasattr(self, 'memory_counter'): self.memory_counter = 0 transition = np.hstack((s, [a, r], s_)) # 记录一条记录在水平方向上平埔帮 index = self.memory_counter % self.memory_size # 完成更新对于buffer的更新技巧 self.memory[index, :] = transition self.memory_counter += 1 # 完成step更新 行为选择功能 def choose_action(self, observation): observation = observation[np.newaxis, :] # 将观测值变成一维喂入预测模型中,进行下一步action选择 if np.random.uniform() < self.epsilon: action_value = self.EvalModel.predict(observation) # 预测下一个action值进行选择 action = np.argmax(action_value) # 选择价值最大的动作 else: action = np.random.randint(0, self.n_actions) # 随机一个动作 return action learn的方式 def learn(self): # 实现target网络参数的更新 if self.learn_step_counter % self.replace_target_iter == 0: self.TargetModel = self.EvalModel print('Replace the target\n') # 从 memory 中随机抽取 batch_size 这么多记忆 if self.memory_counter > self.memory_size: sample_index = np.random.choice(self.memory_size, self.batch_size) else: sample_index = np.random.choice(self.memory_counter, self.batch_size) batch_memory = self.memory[sample_index, :] # 获取 q_next (target_net 产生了 q) 和 q_eval(eval_net 产生的 q) q_next = self.TargetModel.predict(batch_memory[:, -self.n_features:]) q_eval = self.EvalModel.predict(batch_memory[:, :self.n_features]) q_target = q_eval.copy() batch_index = np.arange(self.batch_size, dtype=np.int32) eval_act_index = batch_memory[:, self.n_features].astype(int) reward = batch_memory[:, self.n_features + 1] q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1) self.cost = self.EvalModel.train_on_batch(batch_memory[:, :self.n_features], q_target[:, 2]) # self.cost = self.EvalModel.train_on_batch(a, q_target) # self.cost = self.EvalModel.train_on_batch([1,2], [3,4]) self.cost_his.append(self.cost) # increasing epsilon self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max self.learn_step_counter += 1