最小的例子

Q 学习是无模型强化学习的变体。在 Q 学习中,我们希望代理估计(状态,动作)对的好坏,以便它可以在每个状态中选择好的动作。这是通过近似拟合下面等式的动作值函数(Q)来完成的:

1

其中 sa 是当前时间步的状态和动作。 R 是直接奖励, 2 是折扣因素。并且, s’ 是观察到的下一个状态。

当代理与环境交互时,它会看到它所处的状态,执行操作,获得奖励,并观察它已移动到的新状态。该循环继续直到代理达到终止状态。由于 Q-learning 是一种非策略方法,我们可以将每个(状态,动作,奖励,next_state)保存为重放缓冲区中的体验。在每次训练迭代中对这些经验进行采样,并用于改进我们对 Q 的估计。以下是:

  1. next_state 通过假设代理贪婪地选择该状态中的动作来计算下一步的 Q 值,因此下面的代码中的 np.max(next_state_value)
  2. 下一步的 Q 值被打折并添加到代理人观察到的直接奖励中:(状态,行动,奖励,状态’)
  3. 如果状态动作导致剧集终止,我们使用 Q = reward 代替上面的步骤 1 和 2(情节学习)。所以我们还需要为每个添加到缓冲区的体验添加终止标志:(状态,动作,奖励,next_state,终止)
  4. 此时,我们有一个从 rewardnext_state 计算的 Q 值,我们还有另一个 Q 值,它是 q 网络函数逼近器的输出。通过使用梯度下降改变 q 网络函数逼近器的参数并最小化这两个动作值之间的差异,Q 函数逼近器收敛于真实动作值。

这是深 Q 网络的实现。

import tensorflow as tf
import gym
import numpy as np

def fullyConnected(name, input_layer, output_dim, activation=None):
    """
    Adds a fully connected layer after the `input_layer`. `output_dim` is
    the size of next layer. `activation` is the optional activation 
    function for the next layer.
    """
    initializer = tf.random_uniform_initializer(minval=-.003, maxval=.003)

    input_dims = input_layer.get_shape().as_list()[1:]
    weight = tf.get_variable(name + "_w", shape=[*input_dims, output_dim],
                             dtype=tf.float32, initializer=initializer)
    bias = tf.get_variable(name + "_b", shape=output_dim, dtype=tf.float32,
                           initializer=initializer)
    next_layer = tf.matmul(input_layer, weight) + bias

    if activation:
        next_layer = activation(next_layer, name=name + "_activated")

    return next_layer

class Memory(object):
    """
    Saves experiences as (state, action, reward, next_action, 
    termination). It only supports discrete action spaces.
    """

    def __init__(self, size, state_dims):
        self.length = size

        self.states = np.empty([size, state_dims], dtype=float)
        self.actions = np.empty(size, dtype=int)
        self.rewards = np.empty((size, 1), dtype=float)
        self.states_next = np.empty([size, state_dims], dtype=float)
        self.terminations = np.zeros((size, 1), dtype=bool)

        self.memory = [self.states, self.actions,
                       self.rewards, self.states_next, self.terminations]

        self.pointer = 0
        self.count = 0

    def add(self, state, action, reward, next_state, termination):
        self.states[self.pointer] = state
        self.actions[self.pointer] = action
        self.rewards[self.pointer] = reward
        self.states_next[self.pointer] = next_state
        self.terminations[self.pointer] = termination
        self.pointer = (self.pointer + 1) % self.length
        self.count += 1

    def sample(self, batch_size):
        index = np.random.randint(
            min(self.count, self.length), size=(batch_size))
        return (self.states[index], self.actions[index],
            self.rewards[index], self.states_next[index],
            self.terminations[index])

class DQN(object):
    """
    Deep Q network agent. 
    """

    def __init__(self, state_dim, action_dim, memory_size, layer_dims,
                 optimizer):

        self.action_dim = action_dim
        self.state = tf.placeholder(
            tf.float32, [None, state_dim], "states")
        self.action_ph = tf.placeholder(tf.int32, [None], "actions")
        self.action_value_ph = tf.placeholder(
            tf.float32, [None], "action_values")
        self.memory = Memory(memory_size, state_dim)

        def _make():
            flow = self.state
            for i, size in enumerate(layer_dims):
                flow = fullyConnected(
                    "layer%i" % i, flow, size, tf.nn.relu)

            return fullyConnected(
                "output_layer", flow, self.action_dim)

        # generate the learner network
        with tf.variable_scope('learner'):
            self.action_value = _make()
        # generate the target network
        with tf.variable_scope('target'):
            self.target_action_value = _make()

        # get parameters for learner and target networks
        from_list = tf.get_collection(
            tf.GraphKeys.TRAINABLE_VARIABLES, scope='learner')
        target_list = tf.get_collection(
            tf.GraphKeys.TRAINABLE_VARIABLES, scope='target')

        # create a copy operation from parameters of learner 
        # to parameters of target network
        from_list = sorted(from_list, key=lambda v: v.name)
        target_list = sorted(target_list, key=lambda v: v.name)
        self.update_target_network = []
        for i in range(len(from_list)):
            self.update_target_network.append(target_list[i].assign(from_list[i]))

        # gather the action-values of the performed actions
        row = tf.range(0, tf.shape(self.action_value)[0])
        indexes = tf.stack([row, self.action_ph], axis=1)
        action_value = tf.gather_nd(self.action_value, indexes)

        # calculate loss of Q network
        self.single_loss = tf.square(action_value - self.action_value_ph)
        self._loss = tf.reduce_mean(self.single_loss)

        self.train_op = optimizer.minimize(self._loss)

    def train(self, session, batch=None, discount=.97):
        states, actions, rewards, next_states, terminals =\
            self.memory.sample(batch)
        next_state_value = session.run(
            self.target_action_value, {self.state: next_states})
        observed_value = rewards + discount * \
            np.max(next_state_value, 1, keepdims=True)
        observed_value[terminals] = rewards[terminals]

        _, batch_loss = session.run([self.train_op, self._loss], {
            self.state: states, self.action_ph: actions,
            self.action_value_ph: observed_value[:, 0]})
        return batch_loss

    def policy(self, session, state):
        return session.run(self.action_value, {self.state: [state]})[0]

    def memorize(self, state, action, reward, next_state, terminal):
        self.memory.add(state, action, reward, next_state, terminal)

    def update(self, session):
        session.run(self.update_target_network)

深 Q 网络中, 很少有机制用于改善代理的收敛性。一个是强调从重放缓冲区随机抽样经验,以防止采样体验之间的任何时间关系。另一种机制是使用目标网络来评估 next_state 的 Q 值。目标网络与学习者网络类似,但其参数的修改频率要低得多。此外,目标网络不会通过梯度下降来更新,而是每隔一段时间从学习者网络复制其参数。

下面的代码是此代理学习在 cartpole 环境中执行操作的示例

ENVIRONMENT = 'CartPole-v1'  # environment name from `OpenAI`.
MEMORY_SIZE = 50000  # how many of recent time steps should be saved in agent's memory
LEARNING_RATE = .01  # learning rate for Adam optimizer
BATCH_SIZE = 8  # number of experiences to sample in each training step
EPSILON = .1  # how often an action should be chosen randomly. This encourages exploration
EPXILON_DECAY = .99  # the rate of decaying `EPSILON`
NETWORK_ARCHITECTURE = [100] # shape of the q network. Each element is one layer
TOTAL_EPISODES = 500  # number of total episodes
MAX_STEPS = 200  # maximum number of steps in each episode
REPORT_STEP = 10  # how many episodes to run before printing a summary

env = gym.make(ENVIRONMENT)  # initialize environment
state_dim = env.observation_space.shape[
    0]  # dimensions of observation space
action_dim = env.action_space.n

optimizer = tf.train.AdamOptimizer(LEARNING_RATE)
agent = DQN(state_dim, action_dim, MEMORY_SIZE,
            NETWORK_ARCHITECTURE, optimizer)

eps = [EPSILON]

def runEpisode(env, session):
    state = env.reset()
    total_l = 0.
    total_reward = 0.
    for i in range(MAX_STEPS):
        if np.random.uniform() < eps[0]:
            action = np.random.randint(action_dim)
        else:
            action_values = agent.policy(session, state)
            action = np.argmax(action_values)

        next_state, reward, terminated, _ = env.step(action)

        if terminated:
            reward = -1

        total_reward += reward

        agent.memorize(state, action, reward, next_state, terminated)
        state = next_state
        total_l += agent.train(session, BATCH_SIZE)

        if terminated:
            break

    eps[0] *= EPXILON_DECAY
    i += 1

    return i, total_reward / i, total_l / i

session = tf.InteractiveSession()
session.run(tf.global_variables_initializer())

for i in range(1, TOTAL_EPISODES + 1):
    leng, reward, loss = runEpisode(env, session)
    agent.update(session)
    if i % REPORT_STEP == 0:
        print(("Episode: %4i " +
               "| Episod Length: %3i " +
               "| Avg Reward: %+.3f " +
               "| Avg Loss: %6.3f " +
               "| Epsilon: %.3f") %
              (i, leng, reward, loss, eps[0]))