DQN学习与实验

实验环境

dqn本质上只是使用函数去拟合qlearning中的qtable

本文将训练ai完成 Frozen Lake 游戏,一个4*4大小的格子表示一块冰面,冰面中有一些随机的破洞,这些洞一旦指定就不会改变,agent的目标就是从格子的左上角成功走到右下角。

每个格子标号为1-16,即agent的observation为1-16。agent的action为0-3,分别表示上下左右移动

详见:https://www.gymlibrary.dev/environments/toy_text/frozen_lake/

QLearning

import gym
import numpy as np
import random

# qTable
qtable = np.zeros(shape=(4 * 4, 4))
# 学习率,
alpha = 0.5
# 折损率,未来得到的奖励多大程度上依赖当前行为
gamma = 0.8

def choose_act(state):
    # return random.sample(range(3), 1)[0]
    # 在qTable中找到当前状态下最优的动作,若最优动作有多个,则随机从中选择一个
    return random.sample(np.where(qtable[state] == np.max(qtable[state]))[0].tolist(), 1)[0]

def qlearning():
    env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False, render_mode='human', new_step_api=True)
    state = env.reset()

    # 进行1000局游戏
    for i in range(1000):
        # 每一局都可以进行无限步,知道该局游戏结束
        while True:

            act = choose_act(state)

            # terminate表示该局游戏是否终止,truncated表示该局游戏是否为不正常结束,例如由于时间限制,或agent跑到地图边界外了
            next_state, reward, terminate, truncated, info = env.step(act)

            # 惩罚条件,由于该游戏原本只有成功奖励,而没设置惩罚条件,所以导致agent学不到错误的知识,训练过程异常缓慢
            # if state == next_state or truncated or (terminate and reward == 0):
            #     reward -= 1
            # reward -= 0.1

            # 根据贝尔曼方程更新qTable
            qtable[state, act] = (1 - alpha) * qtable[state, act] + alpha * (reward + gamma * np.max(qtable[next_state]))

            state = next_state
            env.render()

            if terminate or truncated:
                state = env.reset()
                break

if __name__ == "__main__":
    qlearning()

使用qlearning处理这种小型离散认为可以很快收敛

DQN

最原始的DQN实现方式

不使用任何技巧,每走一步完成一次训练

import itertools

import gym
import numpy as np
import torch
import torch.nn.functional as F
from gym.envs.toy_text.frozen_lake import generate_random_map
from torch.utils.tensorboard import SummaryWriter

gamma = 0.8

writer = SummaryWriter('./log/raw_dqn')


class QtableModel(torch.nn.Module):
    def __init__(self):
        super(QtableModel, self).__init__()
        self.fn1 = torch.nn.Linear(in_features=1, out_features=64)
        self.fn2 = torch.nn.Linear(in_features=64, out_features=64)
        self.fn3 = torch.nn.Linear(in_features=64, out_features=4)

    def forward(self, state):
        q_value = F.relu(self.fn1(state))
        q_value = F.relu(self.fn2(q_value))
        q_value = self.fn3(q_value)
        return q_value


model = QtableModel()

loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(params=model.parameters())


def dqn():
    env = gym.make('FrozenLake-v1', desc=generate_random_map(size=4), map_name="4x4", is_slippery=False, render_mode=None, new_step_api=True)

    success = []
    for game_count in itertools.count():
        state = env.reset()
        while True:

            q_values = model(torch.tensor([state], dtype=torch.float))
            act = env.action_space.sample()

            next_state, reward, terminate, truncated, info = env.step(act)

            with torch.no_grad():
                expect = reward + gamma * torch.max(model(torch.tensor([next_state], dtype=torch.float))).detach()

            real = q_values.gather(0, torch.tensor(act))
            optimizer.zero_grad()
            loss = loss_fn(real, expect)
            loss.backward()
            optimizer.step()

            state = next_state

            if terminate or truncated:
                break

        # 每完成一次训练就测试一下,以下代码与dqn无关,只是为了方便看训练效果

        # 以下代码无特殊说明均有加下面这段代码
        # --------------------test start-----------------------
        state = env.reset()
        while True:
            q_values = model(torch.tensor([state], dtype=torch.float))
            act = q_values.argmax().item()
            next_state, reward, terminate, truncated, info = env.step(act)
            state = next_state
            if terminate or truncated:
                success.append(reward)
                break
        if len(success) > 100:
            writer.add_scalar('acc/success', sum(success[-100:]), game_count)
            del success[0]
        writer.add_histogram('Qvalue', np.array([model(torch.tensor([i], dtype=torch.float)).detach().numpy() for i in range(16)]), game_count)
        # --------------------test end-----------------------


if __name__ == "__main__":
    dqn()

事实上,如果这样写出来的dqn是很难收敛的(下面有实验结果对比),主要原因在于由于模型一直在变,故你上一次拟合了一点点,到下一次训练结束后,由于模型参数变了,很可能导致上次拟合的那个值发生变化。所以,一个解决方案是当每一局游戏结束时,固定预测的Q值,代码如下(另做了其他优化以便快速收敛,见注释)

稍有改进的dqn

详见代码注释

import itertools

import random
import gym
import torch
import torch.nn.functional as F
from gym.envs.toy_text.frozen_lake import generate_random_map
from torch.utils.tensorboard import SummaryWriter
import numpy as np

gamma = 0.8

writer = SummaryWriter('./log/reward_change_dqn')


class QtableModel(torch.nn.Module):
    def __init__(self):
        super(QtableModel, self).__init__()
        self.fn1 = torch.nn.Linear(in_features=1, out_features=64)
        self.fn2 = torch.nn.Linear(in_features=64, out_features=64)
        self.fn3 = torch.nn.Linear(in_features=64, out_features=4)

    def forward(self, state):
        q_value = F.relu(self.fn1(state))
        q_value = F.relu(self.fn2(q_value))
        q_value = self.fn3(q_value)
        return q_value


model = QtableModel()

loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(params=model.parameters())


def dqn():
    env = gym.make('FrozenLake-v1', desc=generate_random_map(size=4), map_name="4x4", is_slippery=False, render_mode=None, new_step_api=True)

    success = []
    for game_count in itertools.count():
        state = env.reset()
        while True:

            q_values = model(torch.tensor([state], dtype=torch.float))
            # 使用e-greedy方法一定概率上选择随机动作或者当前认为的最佳动作
            if random.random() < 0.3:
                act = q_values.argmax().item()
            else:
                act = env.action_space.sample()

            next_state, reward, terminate, truncated, info = env.step(act)

            # 修改奖励,如果原地踏步走、超时或者掉进坑里,都将获得惩罚
            if state == next_state or truncated or (terminate and reward == 0):
                reward = -1.

            with torch.no_grad():
                # 为了使得算法更稳定,取游戏结束的q_value为0,而不是由模型生成
                expect = torch.tensor(reward) if terminate or truncated else reward + gamma * torch.max(model(torch.tensor([next_state], dtype=torch.float))).detach()

            real = q_values.gather(0, torch.tensor(act))
            optimizer.zero_grad()
            loss = loss_fn(real, expect)
            loss.backward()
            optimizer.step()

            state = next_state

            if terminate or truncated:
                break
if __name__ == "__main__":
    dqn()

以下为监测结果,其中橙色的线就是最原始的dqn实现,可以发现从头到尾都没有一次成功的。灰色的线是在原始dqn的基础上固定了游戏结束时的q值,而蓝色的线则是在灰色线基础上增加了e-greedy策略以及对错误知识的学习,相比于灰色的线,蓝色的线表现更加稳定

双模型结构

pytorch官网中的DQN教程使用了两个模型的结构,网上绝大多数也都是使用这种方式。

从上面结论可以看出,固定某个q值将对模型性能产生很大正面影响,而上面也只是固定了游戏结束时的q值,有没有办法固定住整个qtable(模型拟合的qtable)呢?做法就是将正在训练的模型复制一份,但不会在游戏过程中更新其参数,而只负责产生预测q值(需要说明的是,这里预测的q值并非最终收敛的结果,而是一个训练过程中的值),这样,该模型每次产生的预测q值都是恒定的(模型参数更新前)

它需要初始化两个参数完全相同的模型,eval_net 和 target_net,其中,eval_net在游戏进行过程中不断迭代参数,而target_net则负责计算临时的期望q值,并在游戏迭代一定次数后再同步eval_net的参数

import itertools
import random
from collections import OrderedDict

import gym
import numpy as np
import numpy.random
import torch
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from gym.envs.toy_text.frozen_lake import generate_random_map

gamma = 0.8

writer = SummaryWriter('./log/double_model_dqn')
torch.random.manual_seed(1)
random.seed(1)
numpy.random.seed(1)


class QtableModel(torch.nn.Module):
    def __init__(self):
        super(QtableModel, self).__init__()
        self.fn1 = torch.nn.Linear(in_features=1, out_features=64)
        self.fn2 = torch.nn.Linear(in_features=64, out_features=64)
        self.fn3 = torch.nn.Linear(in_features=64, out_features=4)

    def forward(self, state):
        q_value = F.relu(self.fn1(state))
        q_value = F.relu(self.fn2(q_value))
        q_value = self.fn3(q_value)
        return q_value


# 创建两个参数相同的模型
model, target_model = QtableModel(), QtableModel()
target_model.load_state_dict(OrderedDict(model.state_dict()))
target_model.eval()

loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(params=model.parameters())


def dqn():
    env = gym.make('FrozenLake-v1', desc=["SFFF", "FFFH", "HFFF", "HFFG", ], map_name="4x4", is_slippery=False, render_mode=None, new_step_api=True)

    success = []
    for game_count in itertools.count():
        state = env.reset()
        while True:

            q_values = model(torch.tensor([state], dtype=torch.float))
            if random.random() < 0.3:
                act = q_values.argmax().item()
            else:
                act = env.action_space.sample()

            next_state, reward, terminate, truncated, info = env.step(act)

            if state == next_state or truncated or (terminate and reward == 0):
                reward = -1.

            with torch.no_grad():
                # 注:这里使用 target_model 生成期望q值
                expect = torch.tensor(reward) if terminate or truncated else reward + gamma * torch.max(target_model(torch.tensor([next_state], dtype=torch.float))).detach()

            real = q_values.gather(0, torch.tensor(act))
            optimizer.zero_grad()
            loss = loss_fn(real, expect)
            loss.backward()
            optimizer.step()

            state = next_state

            if terminate or truncated:
                break

        # 每完成10局游戏就同步一下参数
        if game_count % 10 == 0:
            target_model.load_state_dict(OrderedDict(model.state_dict()))
            target_model.eval()

if __name__ == "__main__":
    dqn()

以下为实验结果,事实上,对于这种离散型的小任务来说,双模型的优势并没有体现出来

更多对比实验

我在训练dqn网络前,先使用qlearning训练了一个qtable,下图中的第一个折线图表示的是dqn网络训练过程中产生的q值与qtable中值的相似情况,16(4*4的网格下)则表示所有的预测值都和qtable匹配

经验回放

不管是pytorch的DQN教程还是网上搜到的绝大多数教程,都同时使用双模型和经验回放用以提升模型训练性能

上面所有的做法都是agent每进行一个action就会训练一次网络,其实这不管是对模型的稳定性还是对训练速度都有很大的负面作用,一个好的做法是,将agent的每一个动作,以及这个动作产生的结果都暂时保存起来,只有当保存的数量到达一定值的时候,再统一封装成batch送入模型训练。这个过程就叫经验回放。

import itertools
import random
from collections import OrderedDict

import gym
import numpy as np
import numpy.random
import torch
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter
from gym.envs.toy_text.frozen_lake import generate_random_map

gamma = 0.8

writer = SummaryWriter('./log/mem_replay_dqn')
torch.random.manual_seed(1)
random.seed(1)
numpy.random.seed(1)

env = gym.make('FrozenLake-v1', desc=["SFFF", "FFFH", "HFFF", "HFFG", ], map_name="4x4", is_slippery=False, render_mode=None, new_step_api=True)
# memory
mem = []
success = []
# 训练次数
train_count = 0


class QtableModel(torch.nn.Module):
    def __init__(self):
        super(QtableModel, self).__init__()
        self.fn1 = torch.nn.Linear(in_features=1, out_features=64)
        self.fn2 = torch.nn.Linear(in_features=64, out_features=64)
        self.fn3 = torch.nn.Linear(in_features=64, out_features=4)

    def forward(self, state):
        q_value = F.relu(self.fn1(state))
        q_value = F.relu(self.fn2(q_value))
        q_value = self.fn3(q_value)
        return q_value


model, target_model = QtableModel(), QtableModel()
target_model.load_state_dict(OrderedDict(model.state_dict()))
target_model.eval()

loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(params=model.parameters())


# 经验回放
# 这里设定的回放长度为1000,都直接硬编码了
def mem_replay() -> None:
    if len(mem) < 1000:
        return

    global train_count
    train_count += 1
    model.train()

    next_states = torch.tensor(mem)[:1000][:, 0].unsqueeze(1)
    rewards = torch.tensor(mem)[:1000][:, 1].unsqueeze(1)
    terminates = torch.tensor(mem)[:1000][:, 2]
    truncateds = torch.tensor(mem)[:1000][:, 3]
    acts = torch.tensor(mem, dtype=torch.int64)[:1000][:, 4].unsqueeze(1)
    states = torch.tensor(mem)[:1000][:, 5].unsqueeze(1)

    with torch.no_grad():
        final_mask = torch.tensor([int(terminates[i] or truncateds[i]) for i in range(len(terminates))], dtype=torch.bool)
        q_vs = torch.max(target_model(next_states), 1)[0].unsqueeze(1)
        q_vs[final_mask] = 0

        expect = (rewards + gamma * q_vs).detach()

    q_values = model(states)

    real = q_values.gather(1, acts)

    optimizer.zero_grad()
    loss = loss_fn(real, expect)
    loss.backward()
    optimizer.step()

    model.eval()
    with torch.no_grad():
        writer.add_histogram('Qvalue', np.array([model(torch.tensor([i], dtype=torch.float)).detach().numpy() for i in range(16)]), train_count)

        state = env.reset()
        while True:
            act = model(torch.tensor([state], dtype=torch.float)).detach().argmax().item()
            next_state, reward, terminate, truncated, info = env.step(act)
            state = next_state
            if terminate or truncated:
                success.append(reward)
                break

        if len(success) > 100:
            writer.add_scalar('acc/success', sum(success[-100:]), train_count)
            del success[0]

    if train_count % 10 == 0:
        target_model.load_state_dict(OrderedDict(model.state_dict()))
        target_model.eval()

    mem.clear()


def dqn():
    for game_count in itertools.count():
        state = env.reset()
        while True:

            q_values = model(torch.tensor([state], dtype=torch.float))
            if random.random() < 0.3:
                act = q_values.argmax().item()
            else:
                act = env.action_space.sample()

            next_state, reward, terminate, truncated, info = env.step(act)

            if state == next_state or truncated or (terminate and reward == 0):
                reward = -1.

            mem.append((next_state, reward, terminate, truncated, act, state))

            # 经验回放
            mem_replay()

            state = next_state

            if terminate or truncated:
                break


if __name__ == "__main__":
    dqn()

事实上,从若同上面其他方法对比,其训练出结果是比较慢的,但貌似其最终稳定性是最好的

折损率的影响

其他问题

实验过程中我发现,一旦该游戏地图5*5,dqn的训练就变得异常艰难

多次实验后发现,模型是否能收敛和地图状态有极大关系,例如当我使用下面两个地图时,经多次实验,结果都是如图这样,我就很纳闷

甚至对于有些 4*4、5*5的地图也很难收敛,例如:

["SFFFHF","FHFFFF","HFHFFF","FFFFHF","FFFFFF","FFFFHG",],
["SHFHF","FFFFF","HFHFF","FFFHF","FHFFG",],
["SHFF","FFFF","HFHF","FHFG",]

但很难收敛并不意味着一直不会收敛,有些情况下,前期成功率一直为0,训练几个小时甚至十几个小时之后才会突然开始收敛

我觉得这可能和学习率有关(上面代码基本都使用Adam算法),我也做了相关的实验,但是貌似没记录。印象中,貌似还不能使用过大的学习率,会出现NAN。我也尝试使用其他激活函数,貌似作用也不大

后记

事实上,使用dqn处理这种离散型任务是不公平的,从上面的一些结果中也能看出,即使dqn模型预测的q值和真实的qtable以及很接近了,但游戏的成功率可能依然不高,这是因为对于这个游戏来说,就算你能正确选择大部分的动作,但一旦某个关键动作选择错误(本来最后一步向右走就成功了,你非要向左走掉进水坑里),就前功尽弃了。

而且,我没想到这玩意这么难练,就这么简单的东西其结果很大程度上靠运气。

因为dqn模型收敛是依靠贝尔曼方程收敛规则来的,我就尝试直接先用qlearning练出qtable,再直接用这个qtable去练一个模型,这种做法明显比直接依靠贝尔曼方程收敛来的快,但它也仅适用于小规模离散型任务。

pytorch的DQN教程:https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

Leave a Comment