问题简介

image

如上图所示,左上角的红色椭圆既是起点也是机器人的初始位置,右下角的绿色方块是出口。
游戏规则为:从起点开始,通过错综复杂的迷宫,到达目标点(出口)。

  • 在任一位置可执行动作包括:向上走 'u'、向右走 'r'、向下走 'd'、向左走 'l'
  • 执行不同的动作后,根据不同的情况会获得不同的奖励,具体而言,有以下几种情况。
    • 撞墙
    • 走到出口
    • 其余情况

我们分别实现了基于基础搜索算法Deep QLearning 算法的机器人,使机器人自动走到迷宫的出口。

设计思想

Maze 类介绍

创建迷宫

通过迷宫类 Maze 可以随机创建一个迷宫。

  1. 使用 Maze(maze_size=size) 来随机生成一个 size * size 大小的迷宫。
  2. 使用 print() 函数可以输出迷宫的 size 以及画出迷宫图
  3. 红色的圆是机器人初始位置
  4. 绿色的方块是迷宫的出口位置
""" 创建迷宫并展示 """
maze = Maze(maze_size=10) # 随机生成迷宫
print(maze)

重要的成员方法

在迷宫中已经初始化一个机器人,你要编写的算法实现在给定条件下控制机器人移动至目标点。

Maze 类中重要的成员方法如下:

  1. sense_robot() :获取机器人在迷宫中目前的位置。

return:机器人在迷宫中目前的位置。

  1. move_robot(direction) :根据输入方向移动默认机器人,若方向不合法则返回错误信息。

direction:移动方向, 如:”u”, 合法值为: [‘u’, ‘r’, ‘d’, ‘l’]

return:执行动作的奖励值

  1. can_move_actions(position):获取当前机器人可以移动的方向

position:迷宫中任一处的坐标点

return:该点可执行的动作,如:[‘u’,’r’,’d’]

  1. is_hit_wall(self, location, direction):判断该移动方向是否撞墙

location, direction:当前位置和要移动的方向,如(0,0) , “u”

return:True(撞墙) / False(不撞墙)

  1. draw_maze():画出当前的迷宫

随机移动机器人,并记录下获得的奖励,展示出机器人最后的位置。

import random

rewards = [] # 记录每走一步的奖励值
actions = [] # 记录每走一步的移动方向

# 循环、随机移动机器人10次,记录下奖励
for i in range(10):
    valid_actions = maze.can_move_actions(maze.sense_robot())
    action = random.choice(valid_actions)
    rewards.append(maze.move_robot(action))
    actions.append(action)

print("the history of rewards:", rewards)
print("the actions", actions)

# 输出机器人最后的位置
print("the end position of robot:", maze.sense_robot())

# 打印迷宫,观察机器人位置
print(maze)

基础搜索算法介绍(广度优先搜索算法)

对于迷宫游戏,常见的三种的搜索算法有广度优先搜索、深度优先搜索和最佳优先搜索(A*)。

在下面的代码示例中,将实现广度优先搜索算法;主要通过建立一颗搜索树并进行层次遍历实现。

  • 每个节点表示为以 Class SearchTree 实例化的对象,类属性有:当前节点位置、到达当前节点的动作、当前节点的父节点、当前节点的子节点
  • valid_actions(): 用以获取机器人可以行走的位置(即不能穿墙);
  • expand(): 对于未拓展的子节点进行拓展;
  • backpropagation(): 回溯搜索路径。

算法具体步骤

首先以机器人起始位置建立根节点,并入队;接下来不断重复以下步骤直到判定条件:

  1. 将队首节点的位置标记已访问;判断队首是否为目标位置(出口), 则终止循环并记录回溯路径
  2. 判断队首节点是否为叶子节点, 则拓展该叶子节点
  3. 如果队首节点有子节点,则将每个子节点插到队尾
  4. 将队首节点出队

实现广度优先搜索算法

import numpy as np

# 机器人移动方向
move_map = {
    'u': (-1, 0), # up
    'r': (0, +1), # right
    'd': (+1, 0), # down
    'l': (0, -1), # left
}


# 迷宫路径搜索树
class SearchTree(object):


    def __init__(self, loc=(), action='', parent=None):
        """
        初始化搜索树节点对象
        :param loc: 新节点的机器人所处位置
        :param action: 新节点的对应的移动方向
        :param parent: 新节点的父辈节点
        """

        self.loc = loc  # 当前节点位置
        self.to_this_action = action  # 到达当前节点的动作
        self.parent = parent  # 当前节点的父节点
        self.children = []  # 当前节点的子节点

    def add_child(self, child):
        """
        添加子节点
        :param child:待添加的子节点
        """
        self.children.append(child)

    def is_leaf(self):
        """
        判断当前节点是否是叶子节点
        """
        return len(self.children) == 0


def expand(maze, is_visit_m, node):
    """
    拓展叶子节点,即为当前的叶子节点添加执行合法动作后到达的子节点
    :param maze: 迷宫对象
    :param is_visit_m: 记录迷宫每个位置是否访问的矩阵
    :param node: 待拓展的叶子节点
    """
    can_move = maze.can_move_actions(node.loc)
    for a in can_move:
        new_loc = tuple(node.loc[i] + move_map[a][i] for i in range(2))
        if not is_visit_m[new_loc]:
            child = SearchTree(loc=new_loc, action=a, parent=node)
            node.add_child(child)


def back_propagation(node):
    """
    回溯并记录节点路径
    :param node: 待回溯节点
    :return: 回溯路径
    """
    path = []
    while node.parent is not None:
        path.insert(0, node.to_this_action)
        node = node.parent
    return path


def breadth_first_search(maze):
    """
    对迷宫进行广度优先搜索
    :param maze: 待搜索的maze对象
    """
    start = maze.sense_robot()
    root = SearchTree(loc=start)
    queue = [root]  # 节点队列,用于层次遍历
    h, w, _ = maze.maze_data.shape
    is_visit_m = np.zeros((h, w), dtype=np.int)  # 标记迷宫的各个位置是否被访问过
    path = []  # 记录路径
    while True:
        current_node = queue[0]
        is_visit_m[current_node.loc] = 1  # 标记当前节点位置已访问

        if current_node.loc == maze.destination:  # 到达目标点
            path = back_propagation(current_node)
            break

        if current_node.is_leaf():
            expand(maze, is_visit_m, current_node)

        # 入队
        for child in current_node.children:
            queue.append(child)

        # 出队
        queue.pop(0)

    return path

测试广度优先搜索算法

maze = Maze(maze_size=10)
height, width, _ = maze.maze_data.shape

path_1 = breadth_first_search(maze)
print("搜索出的路径:", path_1)

for action in path_1:
    maze.move_robot(action)

if maze.sense_robot() == maze.destination:
    print("恭喜你,到达了目标点")

print(maze)

深度优先搜索算法

  • 输入:迷宫

  • 输出:到达目标点的路径

def my_search(maze):
    """
    对迷宫实现深度优先搜索算法
    :param maze: 迷宫对象
    :return :到达目标点的路径 如:["u","u","r",...]
    """

    start = maze.sense_robot()
    root = SearchTree(loc=start)
    h, w, _ = maze.maze_data.shape
    is_visit_m = np.zeros((h, w), dtype=np.int)  # 标记迷宫的各个位置是否被访问过
    path = []  # 记录路径
    path = dfs(maze, is_visit_m, root)

    return path


def dfs(maze, is_visit_m, current_node):
    if current_node.loc == maze.destination:  # 到达目标点
        path = back_propagation(current_node)
        return path

    elif current_node.is_leaf():
        expand(maze, is_visit_m, current_node)
        for node in current_node.children:
            is_visit_m[node.loc] = 1
            path = dfs(maze, is_visit_m, node)
            if not path == None:
                return path

    else:
        return None
测试深度优先算法
maze = Maze(maze_size=10) # 从文件生成迷宫

path_2 = my_search(maze)
print("搜索出的路径:", path_2)

for action in path_2:
    maze.move_robot(action)


if maze.sense_robot() == maze.destination:
    print("恭喜你,到达了目标点")

强化学习算法介绍

强化学习作为机器学习算法的一种,其模式也是让智能体在“训练”中学到“经验”,以实现给定的任务。但不同于监督学习与非监督学习,在强化学习的框架中,我们更侧重通过智能体与环境的交互来学习。通常在监督学习和非监督学习任务中,智能体往往需要通过给定的训练集,辅之以既定的训练目标(如最小化损失函数),通过给定的学习算法来实现这一目标。然而在强化学习中,智能体则是通过其与环境交互得到的奖励进行学习。这个环境可以是虚拟的(如虚拟的迷宫),也可以是真实的(自动驾驶汽车在真实道路上收集数据)。

在强化学习中有五个核心组成部分,它们分别是:环境(Environment)智能体(Agent)状态(State)动作(Action)奖励(Reward)

在某一时间节点 $t$:

  • 智能体在从环境中感知其所处的状态 $s_t$
  • 智能体根据某些准则选择动作 $a_t$
  • 环境根据智能体选择的动作,向智能体反馈奖励 $r_{t+1}$

通过合理的学习算法,智能体将在这样的问题设置下,成功学到一个在状态 $s_t$ 选择动作 $a_t$ 的策略 $\pi (s_t) = a_t$。

QLearning 算法

Q-Learning 是一个值迭代(Value Iteration)算法。
与策略迭代(Policy Iteration)算法不同,值迭代算法会计算每个”状态“或是”状态-动作“的值(Value)或是效用(Utility),然后在执行动作的时候,会设法最大化这个值。
因此,对每个状态值的准确估计,是值迭代算法的核心。
通常会考虑最大化动作的长期奖励,即不仅考虑当前动作带来的奖励,还会考虑动作长远的奖励。

Q 值的计算与迭代

Q-learning 算法将状态(state)和动作(action)构建成一张 Q_table 表来存储 Q 值,Q 表的行代表状态(state),列代表动作(action):

在 Q-Learning 算法中,将这个长期奖励记为 Q 值,其中会考虑每个 ”状态-动作“ 的 Q 值,具体而言,它的计算公式为:

也就是对于当前的“状态-动作” $(s{t},a)$,考虑执行动作 $a$ 后环境奖励 $R{t+1}$,以及执行动作 $a$ 到达 $s{t+1}$后,执行任意动作能够获得的最大的Q值 $\max_a Q(a,s{t+1})$,$\gamma$ 为折扣因子。

计算得到新的 Q 值之后,一般会使用更为保守地更新 Q 表的方法,即引入松弛变量 $alpha$ ,按如下的公式进行更新,使得 Q 表的迭代变化更为平缓。

机器人动作的选择

在强化学习中,探索-利用 问题是非常重要的问题。
具体来说,根据上面的定义,会尽可能地让机器人在每次选择最优的决策,来最大化长期奖励。
但是这样做有如下的弊端:

  1. 在初步的学习中,Q 值是不准确的,如果在这个时候都按照 Q 值来选择,那么会造成错误。
  2. 学习一段时间后,机器人的路线会相对固定,则机器人无法对环境进行有效的探索。

因此需要一种办法,来解决如上的问题,增加机器人的探索。
通常会使用 epsilon-greedy 算法:

  1. 在机器人选择动作的时候,以一部分的概率随机选择动作,以一部分的概率按照最优的 Q 值选择动作。
  2. 同时,这个选择随机动作的概率应当随着训练的过程逐步减小。


Q-Learning 算法的学习过程

QRobot 类

QRobot 类,其中实现了 Q 表迭代和机器人动作的选择策略,可通过 from QRobot import QRobot 导入使用。

QRobot 类的核心成员方法

  1. sense_state():获取当前机器人所处位置

return:机器人所处的位置坐标,如: (0, 0)

  1. current_state_valid_actions():获取当前机器人可以合法移动的动作

return:由当前合法动作组成的列表,如: [‘u’,’r’]

  1. train_update():以训练状态,根据 QLearning 算法策略执行动作

return:当前选择的动作,以及执行当前动作获得的回报, 如: ‘u’, -1

  1. test_update():以测试状态,根据 QLearning 算法策略执行动作

return:当前选择的动作,以及执行当前动作获得的回报, 如:’u’, -1

  1. reset()

return:重置机器人在迷宫中的位置

from QRobot import QRobot
from Maze import Maze

maze = Maze(maze_size=5) # 随机生成迷宫

robot = QRobot(maze) # 记得将 maze 变量修改为你创建迷宫的变量名

action, reward = robot.train_update() # QLearning 算法一次Q值迭代和动作选择

print("the choosed action: ", action)
print("the returned reward: ", action)

Runner 类

QRobot 类实现了 QLearning 算法的 Q 值迭代和动作选择策略。在机器人自动走迷宫的训练过程中,需要不断的使用 QLearning 算法来迭代更新 Q 值表,以达到一个“最优”的状态,因此封装好了一个类 Runner 用于机器人的训练和可视化。可通过 from Runner import Runner 导入使用。

Runner 类的核心成员方法:

  1. run_training(training_epoch, training_per_epoch=150): 训练机器人,不断更新 Q 表,并讲训练结果保存在成员变量 train_robot_record 中

training_epoch, training_per_epoch: 总共的训练次数、每次训练机器人最多移动的步数

  1. run_testing():测试机器人能否走出迷宫

  2. generate_gif(filename):将训练结果输出到指定的 gif 图片中

filename:合法的文件路径,文件名需以 .gif 为后缀

  1. plot_results():以图表展示训练过程中的指标:Success Times、Accumulated Rewards、Runing Times per Epoch

设定训练参数、训练、查看结果

from QRobot import QRobot
from Maze import Maze
from Runner import Runner

"""  Qlearning 算法相关参数: """

epoch = 10  # 训练轮数
epsilon0 = 0.5  # 初始探索概率
alpha = 0.5  # 公式中的 ⍺
gamma = 0.9  # 公式中的 γ
maze_size = 5  # 迷宫size

""" 使用 QLearning 算法训练过程 """

g = Maze(maze_size=maze_size)
r = QRobot(g, alpha=alpha, epsilon0=epsilon0, gamma=gamma)

runner = Runner(r)
runner.run_training(epoch, training_per_epoch=int(maze_size * maze_size * 1.5))

# 生成训练过程的gif图, 建议下载到本地查看;也可以注释该行代码,加快运行速度。
runner.generate_gif(filename="results/size5.gif")

runner.plot_results() # 输出训练结果,可根据该结果对您的机器人进行分析。

实现 Deep QLearning 算法

DQN 算法介绍

强化学习是一个反复迭代的过程,每一次迭代要解决两个问题:给定一个策略求值函数,和根据值函数来更新策略。而 DQN 算法使用神经网络来近似值函数。(DQN 论文地址)

  • DQN 算法流程

  • DQN 算法框架图

完成 DQN 算法

ReplayDataSet 类的核心成员方法

  • add(self, state, action_index, reward, next_state, is_terminal) 添加一条训练数据

state: 当前机器人位置

action_index: 选择执行动作的索引

reward: 执行动作获得的回报

next_state:执行动作后机器人的位置

is_terminal:机器人是否到达了终止节点(到达终点或者撞墙)

  • random_sample(self, batch_size):从数据集中随机抽取固定batch_size的数据

batch_size: 整数,不允许超过数据集中数据的个数

  • build_full_view(self, maze):开启金手指,获取全图视野

maze: 以 Maze 类实例化的对象

"""ReplayDataSet 类的使用"""

from ReplayDataSet import ReplayDataSet

test_memory = ReplayDataSet(max_size=1e3) # 初始化并设定最大容量
actions = ['u', 'r', 'd', 'l']  
test_memory.add((0,1), actions.index("r"), -10, (0,1), 1)  # 添加一条数据(state, action_index, reward, next_state)
print(test_memory.random_sample(1)) # 从中随机抽取一条(因为只有一条数据)

实现DQNRobot

  • 输入: 由 Maze 类实例化的对象 maze
from QRobot import QRobot

class Robot(QRobot):
valid_action = ['u', 'r', 'd', 'l']

''' QLearning parameters'''
epsilon0 = 0.5  # 初始贪心算法探索概率
gamma = 0.94  # 公式中的 γ

EveryUpdate = 1  # the interval of target model's updating

"""some parameters of neural network"""
target_model = None
eval_model = None
batch_size = 32
learning_rate = 1e-2
TAU = 1e-3
step = 1  # 记录训练的步数

"""setting the device to train network"""
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")

def __init__(self, maze):
    """
    初始化 Robot 类
    :param maze:迷宫对象
    """
    super(Robot, self).__init__(maze)
    maze.set_reward(reward={
        "hit_wall": 10.,
        "destination": -50.,
        "default": 1.,
    })
    self.maze = maze
    self.maze_size = maze.maze_size

    """build network"""
    self.target_model = None
    self.eval_model = None
    self._build_network()

    """create the memory to store data"""
    max_size = max(self.maze_size ** 2 * 3, 1e4)
    self.memory = ReplayDataSet(max_size=max_size)

def _build_network(self):
    seed = 0
    random.seed(seed)

    """build target model"""
    self.target_model = QNetwork(state_size=2, action_size=4, seed=seed).to(self.device)

    """build eval model"""
    self.eval_model = QNetwork(state_size=2, action_size=4, seed=seed).to(self.device)

    """build the optimizer"""
    self.optimizer = optim.Adam(self.eval_model.parameters(), lr=self.learning_rate)

def target_replace_op(self):
    """
        Soft update the target model parameters.
        θ_target = τ*θ_local + (1 - τ)*θ_target
    """

    # for target_param, eval_param in zip(self.target_model.parameters(), self.eval_model.parameters()):
    #     target_param.data.copy_(self.TAU * eval_param.data + (1.0 - self.TAU) * target_param.data)

    """ replace the whole parameters"""
    self.target_model.load_state_dict(self.eval_model.state_dict())

def _choose_action(self, state):
    state = np.array(state)
    state = torch.from_numpy(state).float().to(self.device)
    if random.random() < self.epsilon:
        action = random.choice(self.valid_action)
    else:
        self.eval_model.eval()
        with torch.no_grad():
            q_next = self.eval_model(state).cpu().data.numpy()  # use target model choose action
        self.eval_model.train()

        action = self.valid_action[np.argmin(q_next).item()]
    return action

def _learn(self, batch: int = 16):
    if len(self.memory) < batch:
        print("the memory data is not enough")
        return
    state, action_index, reward, next_state, is_terminal = self.memory.random_sample(batch)

    """ convert the data to tensor type"""
    state = torch.from_numpy(state).float().to(self.device)
    action_index = torch.from_numpy(action_index).long().to(self.device)
    reward = torch.from_numpy(reward).float().to(self.device)
    next_state = torch.from_numpy(next_state).float().to(self.device)
    is_terminal = torch.from_numpy(is_terminal).int().to(self.device)

    self.eval_model.train()
    self.target_model.eval()

    """Get max predicted Q values (for next states) from target model"""
    Q_targets_next = self.target_model(next_state).detach().min(1)[0].unsqueeze(1)

    """Compute Q targets for current states"""
    Q_targets = reward + self.gamma * Q_targets_next * (torch.ones_like(is_terminal) - is_terminal)

    """Get expected Q values from local model"""
    self.optimizer.zero_grad()
    Q_expected = self.eval_model(state).gather(dim=1, index=action_index)

    """Compute loss"""
    loss = F.mse_loss(Q_expected, Q_targets)
    loss_item = loss.item()

    """ Minimize the loss"""
    loss.backward()
    self.optimizer.step()

    """copy the weights of eval_model to the target_model"""
    self.target_replace_op()
    return loss_item

def train_update(self):
    """
    以训练状态选择动作并更新Deep Q network的相关参数
    :return :action, reward 如:"u", -1
    """
    state = self.sense_state()
    action = self._choose_action(state)
    reward = self.maze.move_robot(action)
    next_state = self.sense_state()
    is_terminal = 1 if next_state == self.maze.destination or next_state == state else 0

    self.memory.add(state, self.valid_action.index(action), reward, next_state, is_terminal)

    """--间隔一段时间更新target network权重--"""
    if self.step % self.EveryUpdate == 0:
        self._learn(batch=32)

    """---update the step and epsilon---"""
    self.step += 1
    self.epsilon = max(0.01, self.epsilon * 0.995)

    return action, reward

def test_update(self):
    """
    以测试状态选择动作并更新Deep Q network的相关参数
    :return : action, reward 如:"u", -1
    """
    state = np.array(self.sense_state(), dtype=np.int16)
    state = torch.from_numpy(state).float().to(self.device)

    self.eval_model.eval()
    with torch.no_grad():
        q_value = self.eval_model(state).cpu().data.numpy()

    action = self.valid_action[np.argmin(q_value).item()]
    reward = self.maze.move_robot(action)
    return action, reward

测试 DQN 算法

from QRobot import QRobot
from Maze import Maze
from Runner import Runner

"""  Deep Qlearning 算法相关参数: """

epoch = 10  # 训练轮数
maze_size = 5  # 迷宫size
training_per_epoch=int(maze_size * maze_size * 1.5)

""" 使用 DQN 算法训练 """

g = Maze(maze_size=maze_size)
r = Robot(g)
runner = Runner(r)
runner.run_training(epoch, training_per_epoch)

# 生成训练过程的gif图, 建议下载到本地查看;也可以注释该行代码,加快运行速度。
runner.generate_gif(filename="results/dqn_size10.gif")

实验结果

基础搜索算法

深度强化学习算法