Source code for simple_rl.planning.BeliefSparseSamplingClass

from math import log
import numpy as np
import copy
from collections import defaultdict
import random

from simple_rl.pomdp.BeliefMDPClass import BeliefMDP


[docs]class BeliefSparseSampling(object): ''' A Sparse Sampling Algorithm for Near-Optimal Planning in Large Markov Decision Processes (Kearns et al) Assuming that you don't have access to the underlying transition dynamics, but do have access to a naiive generative model of the underlying MDP, this algorithm performs on-line, near-optimal planning with a per-state running time that has no dependence on the number of states in the MDP. ''' def __init__(self, gen_model, gamma, tol, max_reward, state, name="bss"): ''' Args: gen_model (BeliefMDP): Model of our MDP -- we tell it what action we are performing from some state s and it will return what our next state is gamma (float): MDP discount factor tol (float): Most expected difference between optimal and computed value function max_reward (float): Upper bound on the reward you can get for any state, action state (State): This is the current state, and we need to output the action to take here ''' self.tol = tol self.gamma = gamma self.max_reward = max_reward self.gen_model = gen_model self.current_state = state self.horizon = self._horizon self.width = self._width print('BSS Horizon = {} \t Width = {}'.format(self.horizon, self.width)) self.name = name self.root_level_qvals = defaultdict() self.nodes_by_horizon = defaultdict(lambda: defaultdict(float)) @property def _horizon(self): ''' Returns: _horizon (int): The planning horizon; depth of the recursive tree created to determined the near-optimal action to take from a given state ''' return int(log((self._lam / self._vmax), self.gamma)) @property def _width(self): ''' The number of times we ask the generative model to give us a next_state sample for each state, action pair. Returns: _width (int) ''' part1 = (self._vmax ** 2) / (self._lam ** 2) part2 = 2 * self._horizon * log(self._horizon * (self._vmax ** 2) / (self._lam ** 2)) part3 = log(self.max_reward / self._lam) return int(part1 * (part2 + part3)) @property def _lam(self): return (self.tol * (1.0 - self.gamma) ** 2) / 4.0 @property def _vmax(self): return float(self.max_reward) / (1 - self.gamma) def _get_width_at_height(self, height): ''' The branching factor of the tree is decayed according to this formula as suggested by the BSS paper. Args: height (int): the current depth in the MDP recursive tree measured from top Returns: width (int): the decayed branching factor for a state, action pair ''' c = int(self.width * (self.gamma ** (2 * height))) return c if c > 1 else 1 def _estimate_qs(self, state, horizon): qvalues = np.zeros(len(self.gen_model.actions)) for action_idx, action in enumerate(self.gen_model.actions): if horizon <= 0: qvalues[action_idx] = 0.0 else: qvalues[action_idx] = self._sampled_q_estimate(state, action, horizon) return qvalues def _sampled_q_estimate(self, state, action, horizon): ''' Args: state (State): current state in MDP action (str): action to take from `state` horizon (int): planning horizon / depth of recursive tree Returns: average_reward (float): measure of how good (s, a) would be ''' total = 0.0 width = self._get_width_at_height(self.horizon - horizon) for _ in range(width): next_state = self.gen_model.transition_func(state, action) total += self.gen_model.reward_func(state, action) + (self.gamma * self._estimate_v(next_state, horizon-1)) return total / float(width) def _estimate_v(self, state, horizon): ''' Args: state (State): current state horizon (int): time steps in future you want to use to estimate V* Returns: V(s) (float) ''' if state in self.nodes_by_horizon: if horizon in self.nodes_by_horizon[state]: return self.nodes_by_horizon[state][horizon] if self.gen_model.is_in_goal_state(): self.nodes_by_horizon[state][horizon] = self.gen_model.reward_func(state, random.choice(self.gen_model.actions)) else: self.nodes_by_horizon[state][horizon] = np.max(self._estimate_qs(state, horizon)) return self.nodes_by_horizon[state][horizon]
[docs] def plan_from_state(self, state): ''' Args: state (State): the current state in the MDP Returns: action (str): near-optimal action to perform from state ''' if state in self.root_level_qvals: qvalues = self.root_level_qvals[state] else: init_horizon = self.horizon qvalues = self._estimate_qs(state, init_horizon) action_idx = np.argmax(qvalues) self.root_level_qvals[state] = qvalues return self.gen_model.actions[action_idx]
[docs] def run(self, verbose=True): discounted_sum_rewards = 0.0 num_iter = 0 self.gen_model.reset() state = self.gen_model.init_state policy = defaultdict() while not self.gen_model.is_in_goal_state(): action = self.plan_from_state(state) reward, next_state = self.gen_model.execute_agent_action(action) policy[state] = action discounted_sum_rewards += ((self.gamma ** num_iter) * reward) if verbose: print('({}, {}, {}) -> {} | {}'.format(state, action, next_state, reward, discounted_sum_rewards)) state = copy.deepcopy(next_state) num_iter += 1 return discounted_sum_rewards, policy