实验环境
dqn本质上只是使用函数去拟合qlearning中的qtable
本文将训练ai完成 Frozen Lake 游戏,一个4*4大小的格子表示一块冰面,冰面中有一些随机的破洞,这些洞一旦指定就不会改变,agent的目标就是从格子的左上角成功走到右下角。
每个格子标号为1-16,即agent的observation为1-16。agent的action为0-3,分别表示上下左右移动
详见:https://www.gymlibrary.dev/environments/toy_text/frozen_lake/
QLearning
import gym
import numpy as np
import random
# qTable
qtable = np.zeros(shape=(4 * 4, 4))
# 学习率,
alpha = 0.5
# 折损率,未来得到的奖励多大程度上依赖当前行为
gamma = 0.8
def choose_act(state):
# return random.sample(range(3), 1)[0]
# 在qTable中找到当前状态下最优的动作,若最优动作有多个,则随机从中选择一个
return random.sample(np.where(qtable[state] == np.max(qtable[state]))[0].tolist(), 1)[0]
def qlearning():
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False, render_mode='human', new_step_api=True)
state = env.reset()
# 进行1000局游戏
for i in range(1000):
# 每一局都可以进行无限步,知道该局游戏结束
while True:
act = choose_act(state)
# terminate表示该局游戏是否终止,truncated表示该局游戏是否为不正常结束,例如由于时间限制,或agent跑到地图边界外了
next_state, reward, terminate, truncated, info = env.step(act)
# 惩罚条件,由于该游戏原本只有成功奖励,而没设置惩罚条件,所以导致agent学不到错误的知识,训练过程异常缓慢
# if state == next_state or truncated or (terminate and reward == 0):
# reward -= 1
# reward -= 0.1
# 根据贝尔曼方程更新qTable
qtable[state, act] = (1 - alpha) * qtable[state, act] + alpha * (reward + gamma * np.max(qtable[next_state]))
state = next_state
env.render()
if terminate or truncated:
state = env.reset()
break
if __name__ == "__main__":
qlearning()
使用qlearning处理这种小型离散认为可以很快收敛
DQN
最原始的DQN实现方式
不使用任何技巧,每走一步完成一次训练
import itertools
import gym
import numpy as np
import torch
import torch.nn.functional as F
from gym.envs.toy_text.frozen_lake import generate_random_map
from torch.utils.tensorboard import SummaryWriter
gamma = 0.8
writer = SummaryWriter('./log/raw_dqn')
class QtableModel(torch.nn.Module):
def __init__(self):
super(QtableModel, self).__init__()
self.fn1 = torch.nn.Linear(in_features=1, out_features=64)
self.fn2 = torch.nn.Linear(in_features=64, out_features=64)
self.fn3 = torch.nn.Linear(in_features=64, out_features=4)
def forward(self, state):
q_value = F.relu(self.fn1(state))
q_value = F.relu(self.fn2(q_value))
q_value = self.fn3(q_value)
return q_value
model = QtableModel()
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(params=model.parameters())
def dqn():
env = gym.make('FrozenLake-v1', desc=generate_random_map(size=4), map_name="4x4", is_slippery=False, render_mode=None, new_step_api=True)
success = []
for game_count in itertools.count():
state = env.reset()
while True:
q_values = model(torch.tensor([state], dtype=torch.float))
act = env.action_space.sample()
next_state, reward, terminate, truncated, info = env.step(act)
with torch.no_grad():
expect = reward + gamma * torch.max(model(torch.tensor([next_state], dtype=torch.float))).detach()
real = q_values.gather(0, torch.tensor(act))
optimizer.zero_grad()
loss = loss_fn(real, expect)
loss.backward()
optimizer.step()
state = next_state
if terminate or truncated:
break
# 每完成一次训练就测试一下,以下代码与dqn无关,只是为了方便看训练效果
# 以下代码无特殊说明均有加下面这段代码
# --------------------test start-----------------------
state = env.reset()
while True:
q_values = model(torch.tensor([state], dtype=torch.float))
act = q_values.argmax().item()
next_state, reward, terminate, truncated, info = env.step(act)
state = next_state
if terminate or truncated:
success.append(reward)
break
if len(success) > 100:
writer.add_scalar('acc/success', sum(success[-100:]), game_count)
del success[0]
writer.add_histogram('Qvalue', np.array([model(torch.tensor([i], dtype=torch.float)).detach().numpy() for i in range(16)]), game_count)
# --------------------test end-----------------------
if __name__ == "__main__":
dqn()
事实上,如果这样写出来的dqn是很难收敛的(下面有实验结果对比),主要原因在于由于模型一直在变,故你上一次拟合了一点点,到下一次训练结束后,由于模型参数变了,很可能导致上次拟合的那个值发生变化。所以,一个解决方案是当每一局游戏结束时,固定预测的Q值,代码如下(另做了其他优化以便快速收敛,见注释)
稍有改进的dqn
详见代码注释
import itertools
import random
import gym
import torch
import torch.nn.functional as F
from gym.envs.toy_text.frozen_lake import generate_random_map
from torch.utils.tensorboard import SummaryWriter
import numpy as np
gamma = 0.8
writer = SummaryWriter('./log/reward_change_dqn')
class QtableModel(torch.nn.Module):
def __init__(self):
super(QtableModel, self).__init__()
self.fn1 = torch.nn.Linear(in_features=1, out_features=64)
self.fn2 = torch.nn.Linear(in_features=64, out_features=64)
self.fn3 = torch.nn.Linear(in_features=64, out_features=4)
def forward(self, state):
q_value = F.relu(self.fn1(state))
q_value = F.relu(self.fn2(q_value))
q_value = self.fn3(q_value)
return q_value
model = QtableModel()
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(params=model.parameters())
def dqn():
env = gym.make('FrozenLake-v1', desc=generate_random_map(size=4), map_name="4x4", is_slippery=False, render_mode=None, new_step_api=True)
success = []
for game_count in itertools.count():
state = env.reset()
while True:
q_values = model(torch.tensor([state], dtype=torch.float))
# 使用e-greedy方法一定概率上选择随机动作或者当前认为的最佳动作
if random.random() < 0.3:
act = q_values.argmax().item()
else:
act = env.action_space.sample()
next_state, reward, terminate, truncated, info = env.step(act)
# 修改奖励,如果原地踏步走、超时或者掉进坑里,都将获得惩罚
if state == next_state or truncated or (terminate and reward == 0):
reward = -1.
with torch.no_grad():
# 为了使得算法更稳定,取游戏结束的q_value为0,而不是由模型生成
expect = torch.tensor(reward) if terminate or truncated else reward + gamma * torch.max(model(torch.tensor([next_state], dtype=torch.float))).detach()
real = q_values.gather(0, torch.tensor(act))
optimizer.zero_grad()
loss = loss_fn(real, expect)
loss.backward()
optimizer.step()
state = next_state
if terminate or truncated:
break
if __name__ == "__main__":
dqn()
以下为监测结果,其中橙色的线就是最原始的dqn实现,可以发现从头到尾都没有一次成功的。灰色的线是在原始dqn的基础上固定了游戏结束时的q值,而蓝色的线则是在灰色线基础上增加了e-greedy策略以及对错误知识的学习,相比于灰色的线,蓝色的线表现更加稳定

双模型结构
pytorch官网中的DQN教程使用了两个模型的结构,网上绝大多数也都是使用这种方式。
从上面结论可以看出,固定某个q值将对模型性能产生很大正面影响,而上面也只是固定了游戏结束时的q值,有没有办法固定住整个qtable(模型拟合的qtable)呢?做法就是将正在训练的模型复制一份,但不会在游戏过程中更新其参数,而只负责产生预测q值(需要说明的是,这里预测的q值并非最终收敛的结果,而是一个训练过程中的值),这样,该模型每次产生的预测q值都是恒定的(模型参数更新前)
它需要初始化两个参数完全相同的模型,eval_net 和 target_net,其中,eval_net在游戏进行过程中不断迭代参数,而target_net则负责计算临时的期望q值,并在游戏迭代一定次数后再同步eval_net的参数
import itertools
import random
from collections import OrderedDict
import gym
import numpy as np
import numpy.random
import torch
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from gym.envs.toy_text.frozen_lake import generate_random_map
gamma = 0.8
writer = SummaryWriter('./log/double_model_dqn')
torch.random.manual_seed(1)
random.seed(1)
numpy.random.seed(1)
class QtableModel(torch.nn.Module):
def __init__(self):
super(QtableModel, self).__init__()
self.fn1 = torch.nn.Linear(in_features=1, out_features=64)
self.fn2 = torch.nn.Linear(in_features=64, out_features=64)
self.fn3 = torch.nn.Linear(in_features=64, out_features=4)
def forward(self, state):
q_value = F.relu(self.fn1(state))
q_value = F.relu(self.fn2(q_value))
q_value = self.fn3(q_value)
return q_value
# 创建两个参数相同的模型
model, target_model = QtableModel(), QtableModel()
target_model.load_state_dict(OrderedDict(model.state_dict()))
target_model.eval()
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(params=model.parameters())
def dqn():
env = gym.make('FrozenLake-v1', desc=["SFFF", "FFFH", "HFFF", "HFFG", ], map_name="4x4", is_slippery=False, render_mode=None, new_step_api=True)
success = []
for game_count in itertools.count():
state = env.reset()
while True:
q_values = model(torch.tensor([state], dtype=torch.float))
if random.random() < 0.3:
act = q_values.argmax().item()
else:
act = env.action_space.sample()
next_state, reward, terminate, truncated, info = env.step(act)
if state == next_state or truncated or (terminate and reward == 0):
reward = -1.
with torch.no_grad():
# 注:这里使用 target_model 生成期望q值
expect = torch.tensor(reward) if terminate or truncated else reward + gamma * torch.max(target_model(torch.tensor([next_state], dtype=torch.float))).detach()
real = q_values.gather(0, torch.tensor(act))
optimizer.zero_grad()
loss = loss_fn(real, expect)
loss.backward()
optimizer.step()
state = next_state
if terminate or truncated:
break
# 每完成10局游戏就同步一下参数
if game_count % 10 == 0:
target_model.load_state_dict(OrderedDict(model.state_dict()))
target_model.eval()
if __name__ == "__main__":
dqn()
以下为实验结果,事实上,对于这种离散型的小任务来说,双模型的优势并没有体现出来

更多对比实验
我在训练dqn网络前,先使用qlearning训练了一个qtable,下图中的第一个折线图表示的是dqn网络训练过程中产生的q值与qtable中值的相似情况,16(4*4的网格下)则表示所有的预测值都和qtable匹配

经验回放
不管是pytorch的DQN教程还是网上搜到的绝大多数教程,都同时使用双模型和经验回放用以提升模型训练性能
上面所有的做法都是agent每进行一个action就会训练一次网络,其实这不管是对模型的稳定性还是对训练速度都有很大的负面作用,一个好的做法是,将agent的每一个动作,以及这个动作产生的结果都暂时保存起来,只有当保存的数量到达一定值的时候,再统一封装成batch送入模型训练。这个过程就叫经验回放。
import itertools
import random
from collections import OrderedDict
import gym
import numpy as np
import numpy.random
import torch
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from gym.envs.toy_text.frozen_lake import generate_random_map
gamma = 0.8
writer = SummaryWriter('./log/mem_replay_dqn')
torch.random.manual_seed(1)
random.seed(1)
numpy.random.seed(1)
env = gym.make('FrozenLake-v1', desc=["SFFF", "FFFH", "HFFF", "HFFG", ], map_name="4x4", is_slippery=False, render_mode=None, new_step_api=True)
# memory
mem = []
success = []
# 训练次数
train_count = 0
class QtableModel(torch.nn.Module):
def __init__(self):
super(QtableModel, self).__init__()
self.fn1 = torch.nn.Linear(in_features=1, out_features=64)
self.fn2 = torch.nn.Linear(in_features=64, out_features=64)
self.fn3 = torch.nn.Linear(in_features=64, out_features=4)
def forward(self, state):
q_value = F.relu(self.fn1(state))
q_value = F.relu(self.fn2(q_value))
q_value = self.fn3(q_value)
return q_value
model, target_model = QtableModel(), QtableModel()
target_model.load_state_dict(OrderedDict(model.state_dict()))
target_model.eval()
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(params=model.parameters())
# 经验回放
# 这里设定的回放长度为1000,都直接硬编码了
def mem_replay() -> None:
if len(mem) < 1000:
return
global train_count
train_count += 1
model.train()
next_states = torch.tensor(mem)[:1000][:, 0].unsqueeze(1)
rewards = torch.tensor(mem)[:1000][:, 1].unsqueeze(1)
terminates = torch.tensor(mem)[:1000][:, 2]
truncateds = torch.tensor(mem)[:1000][:, 3]
acts = torch.tensor(mem, dtype=torch.int64)[:1000][:, 4].unsqueeze(1)
states = torch.tensor(mem)[:1000][:, 5].unsqueeze(1)
with torch.no_grad():
final_mask = torch.tensor([int(terminates[i] or truncateds[i]) for i in range(len(terminates))], dtype=torch.bool)
q_vs = torch.max(target_model(next_states), 1)[0].unsqueeze(1)
q_vs[final_mask] = 0
expect = (rewards + gamma * q_vs).detach()
q_values = model(states)
real = q_values.gather(1, acts)
optimizer.zero_grad()
loss = loss_fn(real, expect)
loss.backward()
optimizer.step()
model.eval()
with torch.no_grad():
writer.add_histogram('Qvalue', np.array([model(torch.tensor([i], dtype=torch.float)).detach().numpy() for i in range(16)]), train_count)
state = env.reset()
while True:
act = model(torch.tensor([state], dtype=torch.float)).detach().argmax().item()
next_state, reward, terminate, truncated, info = env.step(act)
state = next_state
if terminate or truncated:
success.append(reward)
break
if len(success) > 100:
writer.add_scalar('acc/success', sum(success[-100:]), train_count)
del success[0]
if train_count % 10 == 0:
target_model.load_state_dict(OrderedDict(model.state_dict()))
target_model.eval()
mem.clear()
def dqn():
for game_count in itertools.count():
state = env.reset()
while True:
q_values = model(torch.tensor([state], dtype=torch.float))
if random.random() < 0.3:
act = q_values.argmax().item()
else:
act = env.action_space.sample()
next_state, reward, terminate, truncated, info = env.step(act)
if state == next_state or truncated or (terminate and reward == 0):
reward = -1.
mem.append((next_state, reward, terminate, truncated, act, state))
# 经验回放
mem_replay()
state = next_state
if terminate or truncated:
break
if __name__ == "__main__":
dqn()

事实上,从若同上面其他方法对比,其训练出结果是比较慢的,但貌似其最终稳定性是最好的
折损率的影响



其他问题
实验过程中我发现,一旦该游戏地图5*5,dqn的训练就变得异常艰难


多次实验后发现,模型是否能收敛和地图状态有极大关系,例如当我使用下面两个地图时,经多次实验,结果都是如图这样,我就很纳闷

甚至对于有些 4*4、5*5的地图也很难收敛,例如:
["SFFFHF","FHFFFF","HFHFFF","FFFFHF","FFFFFF","FFFFHG",],
["SHFHF","FFFFF","HFHFF","FFFHF","FHFFG",],
["SHFF","FFFF","HFHF","FHFG",]
但很难收敛并不意味着一直不会收敛,有些情况下,前期成功率一直为0,训练几个小时甚至十几个小时之后才会突然开始收敛



我觉得这可能和学习率有关(上面代码基本都使用Adam算法),我也做了相关的实验,但是貌似没记录。印象中,貌似还不能使用过大的学习率,会出现NAN。我也尝试使用其他激活函数,貌似作用也不大
后记
事实上,使用dqn处理这种离散型任务是不公平的,从上面的一些结果中也能看出,即使dqn模型预测的q值和真实的qtable以及很接近了,但游戏的成功率可能依然不高,这是因为对于这个游戏来说,就算你能正确选择大部分的动作,但一旦某个关键动作选择错误(本来最后一步向右走就成功了,你非要向左走掉进水坑里),就前功尽弃了。
而且,我没想到这玩意这么难练,就这么简单的东西其结果很大程度上靠运气。
因为dqn模型收敛是依靠贝尔曼方程收敛规则来的,我就尝试直接先用qlearning练出qtable,再直接用这个qtable去练一个模型,这种做法明显比直接依靠贝尔曼方程收敛来的快,但它也仅适用于小规模离散型任务。
pytorch的DQN教程:https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html