Source code for simple_rl.agents.QLearningAgentClass

''' QLearningAgentClass.py: Class for a basic QLearningAgent '''

# Python imports.
import random
import numpy
import time
from collections import defaultdict

# Other imports.
from simple_rl.agents.AgentClass import Agent

[docs]class QLearningAgent(Agent): ''' Implementation for a Q Learning Agent ''' def __init__(self, actions, name="Q-learning", alpha=0.1, gamma=0.99, epsilon=0.1, explore="uniform", anneal=False): ''' Args: actions (list): Contains strings denoting the actions. name (str): Denotes the name of the agent. alpha (float): Learning rate. gamma (float): Discount factor. epsilon (float): Exploration term. explore (str): One of {softmax, uniform}. Denotes explore policy. ''' name_ext = "-" + explore if explore != "uniform" else "" Agent.__init__(self, name=name + name_ext, actions=actions, gamma=gamma) # Set/initialize parameters and other relevant classwide data self.alpha, self.alpha_init = alpha, alpha self.epsilon, self.epsilon_init = epsilon, epsilon self.step_number = 0 self.anneal = anneal self.default_q = 0 #1 / (1 - self.gamma) self.explore = explore # Q Function: self.q_func = defaultdict(lambda : defaultdict(lambda: self.default_q)) # Key: state # Val: dict # Key: action # Val: q-value
[docs] def get_parameters(self): ''' Returns: (dict) key=param_name (str) --> val=param_val (object). ''' param_dict = defaultdict(int) param_dict["alpha"] = self.alpha param_dict["gamma"] = self.gamma param_dict["epsilon"] = self.epsilon_init param_dict["anneal"] = self.anneal param_dict["explore"] = self.explore return param_dict
# -------------------------------- # ---- CENTRAL ACTION METHODS ---- # --------------------------------
[docs] def act(self, state, reward, learning=True): ''' Args: state (State) reward (float) Returns: (str) Summary: The central method called during each time step. Retrieves the action according to the current policy and performs updates given (s=self.prev_state, a=self.prev_action, r=reward, s'=state) ''' if learning: self.update(self.prev_state, self.prev_action, reward, state) if self.explore == "softmax": # Softmax exploration action = self.soft_max_policy(state) else: # Uniform exploration action = self.epsilon_greedy_q_policy(state) self.prev_state = state self.prev_action = action self.step_number += 1 # Anneal params. if learning and self.anneal: self._anneal() return action
[docs] def epsilon_greedy_q_policy(self, state): ''' Args: state (State) Returns: (str): action. ''' # Policy: Epsilon of the time explore, otherwise, greedyQ. if numpy.random.random() > self.epsilon: # Exploit. action = self.get_max_q_action(state) else: # Explore action = numpy.random.choice(self.actions) return action
[docs] def soft_max_policy(self, state): ''' Args: state (State): Contains relevant state information. Returns: (str): action. ''' return numpy.random.choice(self.actions, 1, p=self.get_action_distr(state))[0]
# --------------------------------- # ---- Q VALUES AND PARAMETERS ---- # ---------------------------------
[docs] def update(self, state, action, reward, next_state): ''' Args: state (State) action (str) reward (float) next_state (State) Summary: Updates the internal Q Function according to the Bellman Equation. (Classic Q Learning update) ''' # If this is the first state, just return. if state is None: self.prev_state = next_state return # Update the Q Function. max_q_curr_state = self.get_max_q_value(next_state) prev_q_val = self.get_q_value(state, action) self.q_func[state][action] = (1 - self.alpha) * prev_q_val + self.alpha * (reward + self.gamma*max_q_curr_state)
def _anneal(self): # Taken from "Note on learning rate schedules for stochastic optimization, by Darken and Moody (Yale)": self.alpha = self.alpha_init / (1.0 + (self.step_number / 200.0)*(self.episode_number + 1) / 2000.0 ) self.epsilon = self.epsilon_init / (1.0 + (self.step_number / 200.0)*(self.episode_number + 1) / 2000.0 ) def _compute_max_qval_action_pair(self, state): ''' Args: state (State) Returns: (tuple) --> (float, str): where the float is the Qval, str is the action. ''' # Grab random initial action in case all equal best_action = random.choice(self.actions) max_q_val = float("-inf") shuffled_action_list = self.actions[:] random.shuffle(shuffled_action_list) # Find best action (action w/ current max predicted Q value) for action in shuffled_action_list: q_s_a = self.get_q_value(state, action) if q_s_a > max_q_val: max_q_val = q_s_a best_action = action return max_q_val, best_action
[docs] def get_max_q_action(self, state): ''' Args: state (State) Returns: (str): denoting the action with the max q value in the given @state. ''' return self._compute_max_qval_action_pair(state)[1]
[docs] def get_max_q_value(self, state): ''' Args: state (State) Returns: (float): denoting the max q value in the given @state. ''' return self._compute_max_qval_action_pair(state)[0]
[docs] def get_value(self, state): ''' Args: state (State) Returns: (float) ''' return self.get_max_q_value(state)
[docs] def get_q_value(self, state, action): ''' Args: state (State) action (str) Returns: (float): denoting the q value of the (@state, @action) pair. ''' return self.q_func[state][action]
[docs] def get_action_distr(self, state, beta=0.2): ''' Args: state (State) beta (float): Softmax temperature parameter. Returns: (list of floats): The i-th float corresponds to the probability mass associated with the i-th action (indexing into self.actions) ''' all_q_vals = [] for i, action in enumerate(self.actions): all_q_vals.append(self.get_q_value(state, action)) # Softmax distribution. total = sum([numpy.exp(beta * qv) for qv in all_q_vals]) softmax = [numpy.exp(beta * qv) / total for qv in all_q_vals] return softmax
[docs] def reset(self): self.step_number = 0 self.episode_number = 0 self.q_func = defaultdict(lambda : defaultdict(lambda: self.default_q)) Agent.reset(self)
[docs] def end_of_episode(self): ''' Summary: Resets the agents prior pointers. ''' if self.anneal: self._anneal() Agent.end_of_episode(self)