Using Keras and Deep Q-Network to Play FlappyBird
—— github源码
该项目通过卷积神经网络加Q-learning算法,利用Keras框架共210行代码实现了让程序自己学习如何玩耍FlappyBird。
这篇文章面向对卷积神经网络、增强学习感兴趣的新手。
文末有代码的注释。可以先大致浏览下代码注释再看文章。
如何运行
安装依赖
pip install keras pip install pygame pip install scikit-image pip install h5py
作者使用的是theano
训练的,训练好的模型文件要使用theano
作为Keras
的后端才能调用,在配置文件~/.keras/keras.json
中(没有可创建)确认/修改backend
为theano
(如果没有安装tensorflow
[Keras
的另一可选后端]好像就不用管了),配置文件样式下文中卷积神经网络小节的补充里有。
要使用theano
,我们还需要安装OpenBLAS。直接下载源码并解压,cd
进目录,然后
sudo apt-get install gfortran make FC=gfortran sudo make PREFIX=/usr/local install
下载源码并运行
git clone https://github.com/yanpanlau/Keras-FlappyBird.gitcd Keras-FlappyBird python qlearn.py -m "Run"
我下载时目录中game/wrapped_flappy_bird.py文件的第144行,FPSCLOCK.tick(FPS)
语句处缩进有点问题,删去现有缩进,打8个空格就好了,没问题就不用管了。
vmware虚拟机Ubuntu16.04+python3+只使用CPU+theano运行:
Paste_Image.png
如果想重新训练神经网络,删除
model.h5
文件然后运行命令qlearn.py -m "Train"
。
源码分析
游戏输入及返回图像
import wrapped_flappy_bird as game x_t1_colored, r_t, terminal = game_state.frame_step(a_t)
直接使用flappybird python版本的接口。
输入为a_t((1, 0)
代表不跳,(0,1)
代表跳)。
返回值为下一帧图像x_t1_colored和奖励reward(+0.1
表示存活,+1
表示通过管道,-1
表示死亡),奖励被控制在[-1,+1]
来提高稳定性。terminal 是一个布尔值表示游戏是否结束。
奖励函数在game/wrapped_flappy_bird.py
中的def frame_step(self, input_actions)
方法中修改。
为什么直接将游戏图像输入处理呢?我一开始没转过弯,其实图像中包含了全部的信息(声音信息在多数游戏里只是辅助,不影响游戏),而人在玩游戏时也是接受输入的图像信息,然后决策输出相应的操作指令。这里其实就是在模拟人的反馈过程,将这一过程描述为一个非线性函数,而该非线性函数我们将使用卷积神经网络来表达,大体上卷积实现了对图像特征的提取,神经网络实现了从特征到操作指令的转换。
图像预处理
要素:
将图片转换为灰阶
裁剪图片尺寸到80x80像素
每次堆积4帧,一起馈入神经网络。相当于一次输入一张'四通道'的图像。
(为什么要将4帧堆在一起?这是一种方法,为了让模型能推断出小鸟的速度信息。)
x_t1 = skimage.color.rgb2gray(x_t1_colored) x_t1 = skimage.transform.resize(x_t1,(80,80)) x_t1 = skimage.exposure.rescale_intensity(x_t1, out_range=(0, 255)) # 调整亮度#x_t1 = x_t1.reshape(1, 1, x_t1.shape[0], x_t1.shape[1]) s_t1 = np.append(x_t1, s_t[:, :3, :, :], axis=1)# axis=1 意味着在第二维上添加
x_t1是一个(1x1x80x80) 的单帧,s_t1是4帧的叠加,形状为(1x4x80x80)。输入设计为 (1x4x80x80)而不是(4x80x80)是为了Keras考虑。
补充
rescale_intensity
卷积神经网络
现在,将预处理后的图像输入神经网络。
def buildmodel(): print("开始建模") model = Sequential() model.add(Convolution2D(32, 8, 8, subsample=(4,4),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same', dim_ordering='th', input_shape=(img_channels,img_rows,img_cols))) model.add(Activation('relu')) model.add(Convolution2D(64, 4, 4, subsample=(2,2),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same', dim_ordering='th')) model.add(Activation('relu')) model.add(Convolution2D(64, 3, 3, subsample=(1,1),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same', dim_ordering='th')) model.add(Activation('relu')) model.add(Flatten()) model.add(Dense(512, init=lambda shape, name: normal(shape, scale=0.01, name=name))) model.add(Activation('relu')) model.add(Dense(2,init=lambda shape, name: normal(shape, scale=0.01, name=name))) adam = Adam(lr=1e-6) model.compile(loss='mse',optimizer=adam) print("建模完成") return model
Convolution2D(
nb_filter, # 过滤器个数
nb_row, # 过滤器的行数
nb_col, # 过滤器的列数
init='glorot_uniform', # 层权重weights的初始化函数
activation='linear', # 默认激活函数为线性,即a(x) = x
weights=None,
border_mode='valid',
# 默认'valid'(不补零,一般情况)
# 或者'same'(自动补零,使得输出尺寸在过滤窗口步幅为1的情况下与输入尺寸相同,
# 即输出尺寸=输入尺寸/步幅)
subsample=(1, 1), # 代表向左和向下的过滤窗口移动步幅
dim_ordering='default', # 'default' 或'tf' 或'th'
W_regularizer=None,
b_regularizer=None,
activity_regularizer=None,
W_constraint=None,
b_constraint=None,
bias=True
# 未注释的一般用默认值
)
该函数是二维输入的滤波窗口的卷积函数。 当使用它作为模型的第一层时,需要提供`input_shape`关键字,如输入为128x128 RGB 3通道图像,则`input_shape=(3, 128, 128)`。`dim_ordering`的默认值在`~/.keras/keras.json`文件中,若没有可以创建(一般运行过一次keras就有),格式为
{
"image_dim_ordering": "tf",
"epsilon": 1e-07,
"floatx": "float32",
"backend": "tensorflow"
}
根据`image_dim_ordering`和`backend`选择使用`theano`即`th`或者`tensorflow`即`tf`。 - 确切结构如下: 输入为4x80x80的图像矩阵。 第一层卷积层,有32个卷积核(过滤器),每个卷积核的尺寸是8x8,x轴和y轴的步幅都是4,补零,并使用了一个ReLU激活函数。 第二层卷积层,有64个卷积核(过滤器),每个卷积核的尺寸是4x4,x轴和y轴的步幅都是2,补零,并使用了一个ReLU激活函数。 第三层卷积层,有64个卷积核(过滤器),每个卷积核的尺寸是3x3,x轴和y轴的步幅都是1,补零,并使用了一个ReLU激活函数。 然后将它们展平为一维输入隐藏层。该隐藏层有512个神经单元,全连接到第三层卷积层的输出,并使用ReLU激活函数。 最后的输出层是一个全连接线性层,输出动作对应的Q值列表。一般来说,索引0代表什么也不做;在这个游戏里索引1代表跳一下。比较两者的Q值大小,选择大的作为下一步操作。 > 相关知识点请看: [卷积神经网络CNN基本概念笔记](http://www.jianshu.com/p/606a33ba04ff)[CS231n Convolutional Neural Networks](http://cs231n.github.io/convolutional-networks/)每层输出计算公式:(W-F+2P)/S+1。 W:输入尺寸大小; S:步幅; F:卷积核尺寸; P:补零数。 在这个应用的设置中,输出计算可以简化为W/S。 ![结构图](http://upload-images.jianshu.io/upload_images/2422746-ca7a2f32e0f2e692.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)> 补充: 参看[画出卷积神经网络结构图](http://www.jianshu.com/p/56a05b5e4f20)- 小贴士 keras使得构建卷积神经网络异常简单。但是有一些需要注意的地方。 A. 选择一个良好的初始化方法是重要的,这里选择了σ=0.01的正态分布。(`lambad x,y : f(x,y)`是一个匿名函数)`init=lambda shape, name: normal(shape, scale=0.01, name=name)`B. 维度的顺序很重要。使用Theano 的话就是4x80x80,Tensorflow的话输入就是80x80x4。通过Convolution2D函数的**dim_ordering**参数设置,这里使用的是theano。 C. 此次使用了一个叫亚当的自适应优化算法 。学习速率为**1-e6**。 D. 关于梯度下降优化算法[An overview of gradient descent optimization algorithms](http://sebastianruder.com/optimizing-gradient-descent/)。E. `Convolution2D`函数中的参数`border_mode`的模式也需要注意,这里选择了补零操作,使得图像边缘的像素点也受到过滤操作,转化了所有的图像信息。 - DQN 下面就是运用Q-learning算法来训练神经网络了。 在Q-learning中最重要的就是Q函数了: **Q(s, a)**代表了当我们在状态s执行a动作的最大贴现奖励。 **Q(s, a)**给你一个关于在s状态选择a动作是有多好的考量。 Q函数就像玩游戏的秘籍,在你需要决定在状态s下该选择动作a还是b时,只需要挑高Q值的动作就行了。 **最大贴现奖励**既反映了在s状态下做出动作a得到状态s'的即时反馈奖励(存活+0.1,通过管道+1),也反映了在状态s'下继续游戏可能得到的最佳奖励(不论什么输入)——其实就是s'下所有动作的**最大贴现奖励中**的最大的一个(即`max[ Q(s', a) | 所有可能的动作a]`),但第二个奖励要乘以一个折扣系数,因为它是未来的奖励,要贴现,得打点折扣。 实际上Q函数是一个理论假设存在的函数,从上面的表述中我们可以看出Q函数可以表达为一种递归的形式,我们可以通过迭代来获取它,这与神经网络的训练过程不谋而合。 而我们就是要用卷积神经网络来实现Q函数,实际上是(Q,a) = f(s)函数,由一个状态返回在该状态下的所有可能输入与相应Q值构成的二值对列表,只是输入动作以不同的索引表示。这样,我们省去了对巨多的状态s的分析判断和复杂程序编写,只要以某种方式初始化Q值表,然后根据Q值表训练调整神经网络的权重,再根据训练后的神经网络预测来更新Q值表,如此反复迭代来逼近Q函数。
# 抽取小批量样本进行训练 minibatch = random.sample(D, BATCH) # inputs和targets一起构成了Q值表 inputs = np.zeros((BATCH, s_t.shape[1], s_t.shape[2], s_t.shape[3])) #32, 80, 80, 4 targets = np.zeros((inputs.shape[0], ACTIONS)) #32, 2 # 开始经验回放 for i in range(0, len(minibatch)): # 以下序号对应D的存储顺序将信息全部取出, # D.append((s_t, action_index, r_t, s_t1, terminal)) state_t = minibatch[i][0] # 当前状态 action_t = minibatch[i][1] # 输入动作 reward_t = minibatch[i][2] # 返回奖励 state_t1 = minibatch[i][3] # 返回的下一状态 terminal = minibatch[i][4] # 返回的是否终止的标志 inputs[i:i + 1] = state_t # 保存当前状态,即Q(s,a)中的s # 得到预测的以输入动作x为索引的Q值列表 targets[i] = model.predict(state_t) # 得到下一状态下预测的以输入动作x为索引的Q值列表 Q_sa = model.predict(state_t1) if terminal: # 如果动作执行后游戏终止了,该状态下(s)该动作(a)的Q值就相当于奖励 targets[i, action_t] = reward_t else: # 否则,该状态(s)下该动作(a)的Q值就相当于动作执行后的即时奖励和下一状态下的最佳预期奖励乘以一个折扣率 targets[i, action_t] = reward_t + GAMMA * np.max(Q_sa) # 用生成的Q值表训练神经网络,同时返回当前的误差 loss += model.train_on_batch(inputs, targets)
- 经历重播 在上面的代码中,有一个经验重播(Experience Replay)。实际上,使用像神经网络这样的非线性函数来对Q值近似是非常不稳定的。解决这个问题最重要的诀窍就是经验重播了。在游戏中的所有阶段性情景都被放在回放存储器**D**中。(这里使用了python的deque结构来存储)。当训练神经网络时,从**D**中随机小批量抽取情景,而不是使用最近的,这将大大提高系统的稳定性。 - 勘探与开发 这是增强学习的另一个问题:是继续勘探新的资源,还是专注开发现有的资源。也就是说在 利用开发现有的已知优良的决策 和 继续探索新的可能更好的行为 之间如何选择和分配时间。在现实生活中我们也经常遇到这类问题:选择一家尝试过的合口的饭店还是试试新的——有可能更好,也可能根本吃不下。在增强学习中,为了最大化未来收益,它必须在两者之间取得平衡。一个流行的解决方案叫做ϵ(epsilon)贪心方法。ϵ是一个0到1之间的变量,根据它可以得到哪些时间该去探索一个随机的新动作。
if random.random() <= epsilon: print("----------Random Action----------") action_index = random.randrange(ACTIONS) a_t[action_index] = 1 else: q = model.predict(s_t) #输入四幅图像的组合,预测结果 max_Q = np.argmax(q) action_index = max_Q a_t[max_Q] = 1
### 可以改进的工作1. 现在的DQN基于大量的经验回放,是否有可能替代它或者删去它。2. 如何决定最佳的卷积神经网络。3. 训练很慢,如何加速它或者使它收敛更快。4. 卷积神经网络到底学到了什么,它的学习成果是否是可迁移的。### 其它资源- [Demystifying Deep Reinforcement Learning](https://www.nervanasys.com/demystifying-deep-reinforcement-learning/)- [深度强化学习揭秘(上面英文文章的国人翻译)](http://chuansong.me/n/335655551424)- [Keras plays catch, a single file Reinforcement Learning example](http://edersantana.github.io/articles/keras_rl/)- [Human-level control through deep reinforcement learning](http://www.nature.com/nature/journal/v518/n7540/full/nature14236.html)- [上文(Human-level)的ppt讲稿](http://ir.hit.edu.cn/~jguo/docs/notes/dqn-atari.pdf)- [Configuring Theano For High Performance Deep Learning](http://www.johnwittenauer.net/configuring-theano-for-high-performance-deep-learning/)### 代码注释
!/usr/bin/env python
from future import print_function
import argparse
import skimage as skimage
from skimage import transform, color, exposure
from skimage.transform import rotate
from skimage.viewer import ImageViewer
import sys
sys.path.append("game/")
import wrapped_flappy_bird as game
import random
import numpy as np
from collections import deque
import json
from keras import initializations
from keras.initializations import normal, identity
from keras.models import model_from_json
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.layers.convolutional import Convolution2D, MaxPooling2D
from keras.optimizers import SGD , Adam
GAME = 'bird' # 游戏名
CONFIG = 'nothreshold'
ACTIONS = 2 # 有效动作数:不动+跳=2个
GAMMA = 0.99 # 折扣系数,未来的奖励转化为现在的要乘的一个系数
OBSERVATION = 3200. # 训练之前观察多少步
EXPLORE = 3000000. # epsilon衰减的总步数
FINAL_EPSILON = 0.0001 # epsilon的最小值
INITIAL_EPSILON = 0.1 # epsilon的初始值,epsilon逐渐减小
REPLAY_MEMORY = 50000 # 记住的情景(状态s到状态s'的所有信息)数
BATCH = 32 # 选取的小批量训练样本数
一帧一个输入动作
FRAME_PER_ACTION = 1
预处理后的图片尺寸
img_rows , img_cols = 80, 80
每次堆叠4帧灰阶图像,相当于4通道
img_channels = 4
构建神经网络模型
def buildmodel():
print("Now we build the model")
# 以下注释见文中
model = Sequential()
model.add(Convolution2D(32, 8, 8, subsample=(4,4),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same',input_shape=(img_channels,img_rows,img_cols)))
model.add(Activation('relu'))
model.add(Convolution2D(64, 4, 4, subsample=(2,2),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same'))
model.add(Activation('relu'))
model.add(Convolution2D(64, 3, 3, subsample=(1,1),init=lambda shape, name: normal(shape, scale=0.01, name=name), border_mode='same'))
model.add(Activation('relu'))
model.add(Flatten())
model.add(Dense(512, init=lambda shape, name: normal(shape, scale=0.01, name=name)))
model.add(Activation('relu'))
model.add(Dense(2,init=lambda shape, name: normal(shape, scale=0.01, name=name)))
adam = Adam(lr=1e-6) model.compile(loss='mse',optimizer=adam) # 使用损失函数为均方误差,优化器为Adam。print("We finish building the model")return model
def trainNetwork(model,args):
# 得到一个游戏模拟器
game_state = game.GameState()
# 保存之前的观察到回放存储器DD = deque()# 什么也不做来得到第一个状态然后预处理图片为80x80x4格式do_nothing = np.zeros(ACTIONS) do_nothing[0] = 1 # do_nothing 为 array([1,0])x_t, r_0, terminal = game_state.frame_step(do_nothing) x_t = skimage.color.rgb2gray(x_t) x_t = skimage.transform.resize(x_t,(80,80)) x_t = skimage.exposure.rescale_intensity(x_t,out_range=(0,255))# 初始化时,堆叠4张图都为初始的同1张s_t = np.stack((x_t, x_t, x_t, x_t), axis=0) # s_t为四张图的堆叠# 为了在Keras中使用,我们需要调整数组形状,在头部增加一个维度s_t = s_t.reshape(1, s_t.shape[0], s_t.shape[1], s_t.shape[2])if args['mode'] == 'Run': OBSERVE = 999999999 # 我们一直观察,而不训练 epsilon = FINAL_EPSILON print ("Now we load weight") model.load_weights("model.h5") adam = Adam(lr=1e-6) model.compile(loss='mse',optimizer=adam) print ("Weight load successfully") else: # 否则我们在观察一段时间之后开始训练 OBSERVE = OBSERVATION epsilon = INITIAL_EPSILON t = 0 # t为总帧数while (True): # 每次循环重新初始化的值 loss = 0 Q_sa = 0 action_index = 0 r_t = 0 a_t = np.zeros([ACTIONS]) # 通过epsilon贪心算法选择行为 if t % FRAME_PER_ACTION == 0: if random.random() <= epsilon: print("----------Random Action----------") action_index = random.randrange(ACTIONS) # 随机选取一个动作 a_t[action_index] = 1 # 生成相应的规范化动作输入参数 else: q = model.predict(s_t) # 输入当前状态得到预测的Q值 max_Q = np.argmax(q) # 返回数组中最大值的索引 # numpy.argmax(a, axis=None, out=None) # Returns the indices of the maximum values along an axis. action_index = max_Q # 索引0代表啥也不做,索引1代表跳一下 a_t[max_Q] = 1 # 生成相应的规范化动作输入参数 # 在开始训练之后并且epsilon小于一定值之前,我们逐步减小epsilon if epsilon > FINAL_EPSILON and t > OBSERVE: epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE # 执行选定的动作,并观察返回的下一状态和奖励 x_t1_colored, r_t, terminal = game_state.frame_step(a_t) # 将图像处理为灰阶,调整尺寸、亮度 x_t1 = skimage.color.rgb2gray(x_t1_colored) x_t1 = skimage.transform.resize(x_t1,(80,80)) x_t1 = skimage.exposure.rescale_intensity(x_t1, out_range=(0, 255)) # 调整图像数组形状,增加头两维到4维 x_t1 = x_t1.reshape(1, 1, x_t1.shape[0], x_t1.shape[1]) # 将s_t的前三帧添加在新帧的后面,新帧的索引为0,形成最后的4帧图像 s_t1 = np.append(x_t1, s_t[:, :3, :, :], axis=1) # 存储状态转移到回放存储器 D.append((s_t, action_index, r_t, s_t1, terminal)) if len(D) > REPLAY_MEMORY: D.popleft() # 如果观察完成,则 if t > OBSERVE: # 抽取小批量样本进行训练 minibatch = random.sample(D, BATCH) # inputs和targets一起构成了Q值表 inputs = np.zeros((BATCH, s_t.shape[1], s_t.shape[2], s_t.shape[3])) #32, 80, 80, 4 targets = np.zeros((inputs.shape[0], ACTIONS)) #32, 2 # 开始经验回放 for i in range(0, len(minibatch)): # 以下序号对应D的存储顺序将信息全部取出, # D.append((s_t, action_index, r_t, s_t1, terminal)) state_t = minibatch[i][0] # 当前状态 action_t = minibatch[i][1] # 输入动作 reward_t = minibatch[i][2] # 返回奖励 state_t1 = minibatch[i][3] # 返回的下一状态 terminal = minibatch[i][4] # 返回的是否终止的标志 inputs[i:i + 1] = state_t # 保存当前状态,即Q(s,a)中的s # 得到预测的以输入动作x为索引的Q值列表 targets[i] = model.predict(state_t) # 得到下一状态下预测的以输入动作x为索引的Q值列表 Q_sa = model.predict(state_t1) if terminal: # 如果动作执行后游戏终止了,该状态下(s)该动作(a)的Q值就相当于奖励 targets[i, action_t] = reward_t else: # 否则,该状态(s)下该动作(a)的Q值就相当于动作执行后的即时奖励和下一状态下的最佳预期奖励乘以一个折扣率 targets[i, action_t] = reward_t + GAMMA * np.max(Q_sa) # 用生成的Q值表训练神经网络,同时返回当前的误差 loss += model.train_on_batch(inputs, targets) s_t = s_t1 # 下一状态变为当前状态 t = t + 1 # 总帧数+1 # 每100次迭代存储下当前的训练模型 if t % 100 == 0: print("Now we save model") model.save_weights("model.h5", overwrite=True) with open("model.json", "w") as outfile: json.dump(model.to_json(), outfile) # 输出信息 state = "" if t <= OBSERVE: state = "observe" elif t > OBSERVE and t <= OBSERVE + EXPLORE: state = "explore" else: state = "train" print("TIMESTEP", t, "/ STATE", state, \ "/ EPSILON", epsilon, "/ ACTION", action_index, "/ REWARD", r_t, \ "/ Q_MAX " , np.max(Q_sa), "/ Loss ", loss) print("Episode finished!") print("************************")
def playGame(args):
model = buildmodel() # 先建立模型
trainNetwork(model,args) # 开始训练
def main():
parser = argparse.ArgumentParser(description='Description of your program')
parser.add_argument('-m','--mode', help='Train / Run', required=True) #接受参数 mode
args = vars(parser.parse_args()) # args是字典,'mode'是键
playGame(args) # 开始游戏
if name == "main":
main() #执行本脚本时以main函数开始
作者:treelake
链接:https://www.jianshu.com/p/3ba69493f020