Sources
# PI, VI import warnings ; warnings.filterwarnings('ignore') import gym, gym_walk, gym_aima import numpy as np from pprint import pprint from tqdm import tqdm_notebook as tqdm from itertools import cycle import random np.set_printoptions(suppress=True) random.seed(123); np.random.seed(123) def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='Policy:'): print(title) arrs = {k:v for k,v in enumerate(action_symbols)} for s in range(len(P)): a = pi(s) print("| ", end="") if np.all([done for action in P[s].values() for _, _, _, done in action]): print("".rjust(9), end=" ") else: print(str(s).zfill(2), arrs[a].rjust(6), end=" ") if (s + 1) % n_cols == 0: print("|") def print_state_value_function(V, P, n_cols=4, prec=3, title='State-value function:'): print(title) for s in range(len(P)): v = V[s] print("| ", end="") if np.all([done for action in P[s].values() for _, _, _, done in action]): print("".rjust(9), end=" ") else: print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ") if (s + 1) % n_cols == 0: print("|") def print_action_value_function(Q, optimal_Q=None, action_symbols=('<', '>'), prec=3, title='Action-value function:'): vf_types=('',) if optimal_Q is None else ('', '*', 'err') headers = ['s',] + [' '.join(i) for i in list(itertools.product(vf_types, action_symbols))] print(title) states = np.arange(len(Q))[..., np.newaxis] arr = np.hstack((states, np.round(Q, prec))) if not (optimal_Q is None): arr = np.hstack((arr, np.round(optimal_Q, prec), np.round(optimal_Q-Q, prec))) print(tabulate(arr, headers, tablefmt="fancy_grid")) def probability_success(env, pi, goal_state, n_episodes=100, max_steps=200): random.seed(123); np.random.seed(123) ; env.seed(123) results = [] for _ in range(n_episodes): state, done, steps = env.reset(), False, 0 while not done and steps < max_steps: state, _, done, h = env.step(pi(state)) steps += 1 results.append(state == goal_state) return np.sum(results)/len(results) def mean_return(env, pi, n_episodes=100, max_steps=200): random.seed(123); np.random.seed(123) ; env.seed(123) results = [] for _ in range(n_episodes): state, done, steps = env.reset(), False, 0 results.append(0.0) while not done and steps < max_steps: state, reward, done, _ = env.step(pi(state)) results[-1] += reward steps += 1 return np.mean(results) # Slippery Walk Five MDP and sample policy env = gym.make('SlipperyWalkFive-v0') P = env.env.P init_state = env.reset() goal_state = 6 LEFT, RIGHT = range(2) pi = lambda s: { 0:LEFT, 1:LEFT, 2:LEFT, 3:LEFT, 4:LEFT, 5:LEFT, 6:LEFT }[s] print_policy(pi, P, action_symbols=('<', '>'), n_cols=7) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, pi, goal_state=goal_state)*100, mean_return(env, pi))) # Policy Evaluation def policy_evaluation(pi, P, gamma=1.0, theta=1e-10): prev_V = np.zeros(len(P), dtype=np.float64) while True: V = np.zeros(len(P), dtype=np.float64) for s in range(len(P)): for prob, next_state, reward, done in P[s][pi(s)]: V[s] += prob * (reward + gamma * prev_V[next_state] * (not done)) if np.max(np.abs(prev_V - V)) < theta: break prev_V = V.copy() return V V = policy_evaluation(pi, P) print_state_value_function(V, P, n_cols=7, prec=5) # Policy Improvement def policy_improvement(V, P, gamma=1.0): Q = np.zeros((len(P), len(P[0])), dtype=np.float64) for s in range(len(P)): for a in range(len(P[s])): for prob, next_state, reward, done in P[s][a]: Q[s][a] += prob * (reward + gamma * V[next_state] * (not done)) new_pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s] return new_pi improved_pi = policy_improvement(V, P) print_policy(improved_pi, P, action_symbols=('<', '>'), n_cols=7) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, improved_pi, goal_state=goal_state)*100, mean_return(env, improved_pi))) # how about we evaluate the improved policy? improved_V = policy_evaluation(improved_pi, P) print_state_value_function(improved_V, P, n_cols=7, prec=5) # can we improved the improved policy? improved_improved_pi = policy_improvement(improved_V, P) print_policy(improved_improved_pi, P, action_symbols=('<', '>'), n_cols=7) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, improved_improved_pi, goal_state=goal_state)*100, mean_return(env, improved_improved_pi))) # it is the same policy # if we evaluate again, we can see there is nothing to improve # that also means we reached the optimal policy improved_improved_V = policy_evaluation(improved_improved_pi, P) print_state_value_function(improved_improved_V, P, n_cols=7, prec=5) # state-value function didn't improve, then we reach the optimal policy assert np.all(improved_V == improved_improved_V) # Policy Iteration def policy_iteration(P, gamma=1.0, theta=1e-10): random_actions = np.random.choice(tuple(P[0].keys()), len(P)) pi = lambda s: {s:a for s, a in enumerate(random_actions)}[s] while True: old_pi = {s:pi(s) for s in range(len(P))} V = policy_evaluation(pi, P, gamma, theta) pi = policy_improvement(V, P, gamma) if old_pi == {s:pi(s) for s in range(len(P))}: break return V, pi optimal_V, optimal_pi = policy_iteration(P) print('Optimal policy and state-value function (PI):') print_policy(optimal_pi, P, action_symbols=('<', '>'), n_cols=7) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, optimal_pi, goal_state=goal_state)*100, mean_return(env, optimal_pi))) print() print_state_value_function(optimal_V, P, n_cols=7, prec=5) assert np.all(improved_V == optimal_V) # Frozen Lake MDP and sample policies env = gym.make('FrozenLake-v0') P = env.env.P init_state = env.reset() goal_state = 15 LEFT, DOWN, RIGHT, UP = range(4) random_pi = lambda s: { 0:RIGHT, 1:LEFT, 2:DOWN, 3:UP, 4:LEFT, 5:LEFT, 6:RIGHT, 7:LEFT, 8:UP, 9:DOWN, 10:UP, 11:LEFT, 12:LEFT, 13:RIGHT, 14:DOWN, 15:LEFT }[s] print_policy(random_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, random_pi, goal_state=goal_state)*100, mean_return(env, random_pi))) go_get_pi = lambda s: { 0:RIGHT, 1:RIGHT, 2:DOWN, 3:LEFT, 4:DOWN, 5:LEFT, 6:DOWN, 7:LEFT, 8:RIGHT, 9:RIGHT, 10:DOWN, 11:LEFT, 12:LEFT, 13:RIGHT, 14:RIGHT, 15:LEFT }[s] print_policy(go_get_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, go_get_pi, goal_state=goal_state)*100, mean_return(env, go_get_pi))) careful_pi = lambda s: { 0:LEFT, 1:UP, 2:UP, 3:UP, 4:LEFT, 5:LEFT, 6:UP, 7:LEFT, 8:UP, 9:DOWN, 10:LEFT, 11:LEFT, 12:LEFT, 13:RIGHT, 14:RIGHT, 15:LEFT }[s] print_policy(careful_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, careful_pi, goal_state=goal_state)*100, mean_return(env, careful_pi))) # Policy Evaluation V = policy_evaluation(careful_pi, P, gamma=0.99) print_state_value_function(V, P, prec=4) # Policy Improvement careful_plus_pi = policy_improvement(V, P, gamma=0.99) print_policy(careful_plus_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, careful_plus_pi, goal_state=goal_state)*100, mean_return(env, careful_plus_pi))) new_V = policy_evaluation(careful_plus_pi, P, gamma=0.99) print_state_value_function(new_V, P, prec=4) print_state_value_function(new_V - V, P, prec=4) # Alternating between evaluation and improvement adversarial_pi = lambda s: { 0:UP, 1:UP, 2:UP, 3:UP, 4:UP, 5:LEFT, 6:UP, 7:LEFT, 8:LEFT, 9:LEFT, 10:LEFT, 11:LEFT, 12:LEFT, 13:LEFT, 14:LEFT, 15:LEFT }[s] print_policy(adversarial_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, adversarial_pi, goal_state=goal_state)*100, mean_return(env, adversarial_pi))) V = policy_evaluation(adversarial_pi, P, gamma=0.99) print_state_value_function(V, P, prec=2) i_pi = policy_improvement(V, P, gamma=0.99) print_policy(i_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, i_pi, goal_state=goal_state)*100, mean_return(env, i_pi))) i_V = policy_evaluation(i_pi, P, gamma=0.99) print_state_value_function(i_V, P, prec=2) ii_pi = policy_improvement(i_V, P, gamma=0.99) print_policy(ii_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, ii_pi, goal_state=goal_state)*100, mean_return(env, ii_pi))) ii_V = policy_evaluation(ii_pi, P, gamma=0.99) print_state_value_function(ii_V, P, prec=2) iii_pi = policy_improvement(ii_V, P, gamma=0.99) print_policy(iii_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, iii_pi, goal_state=goal_state)*100, mean_return(env, iii_pi))) iii_V = policy_evaluation(iii_pi, P, gamma=0.99) print_state_value_function(iii_V, P, prec=2) iiii_pi = policy_improvement(iii_V, P, gamma=0.99) print_policy(iiii_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, iiii_pi, goal_state=goal_state)*100, mean_return(env, iiii_pi))) iiii_V = policy_evaluation(iiii_pi, P, gamma=0.99) print_state_value_function(iiii_V, P, prec=2) iiiii_pi = policy_improvement(iiii_V, P, gamma=0.99) print_policy(iiiii_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, iiiii_pi, goal_state=goal_state)*100, mean_return(env, iiiii_pi))) iiiii_V = policy_evaluation(iiiii_pi, P, gamma=0.99) print_state_value_function(iiiii_V, P, prec=2) iiiiii_pi = policy_improvement(iiiii_V, P, gamma=0.99) print_policy(iiiiii_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, iiiiii_pi, goal_state=goal_state)*100, mean_return(env, iiiiii_pi))) iiiiii_V = policy_evaluation(iiiiii_pi, P, gamma=0.99) print_state_value_function(iiiiii_V, P, prec=2) iiiiiii_pi = policy_improvement(iiiiii_V, P, gamma=0.99) print_policy(iiiiiii_pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, iiiiiii_pi, goal_state=goal_state)*100, mean_return(env, iiiiiii_pi))) # Policy Iteration V_best_p, pi_best_p = policy_iteration(P, gamma=0.99) print_state_value_function(V_best_p, P, prec=4) print() print('Optimal policy and state-value function (PI):') print_policy(pi_best_p, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, pi_best_p, goal_state=goal_state)*100, mean_return(env, pi_best_p))) # Slippery Walk Five env = gym.make('SlipperyWalkFive-v0') init_state = env.reset() goal_state = 6 P = env.env.P # Value Iteration def value_iteration(P, gamma=1.0, theta=1e-10): V = np.zeros(len(P), dtype=np.float64) while True: Q = np.zeros((len(P), len(P[0])), dtype=np.float64) for s in range(len(P)): for a in range(len(P[s])): for prob, next_state, reward, done in P[s][a]: Q[s][a] += prob * (reward + gamma * V[next_state] * (not done)) if np.max(np.abs(V - np.max(Q, axis=1))) < theta: break V = np.max(Q, axis=1) pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s] return V, pi optimal_V, optimal_pi = value_iteration(P) print('Optimal policy and state-value function (PI):') print_policy(optimal_pi, P, action_symbols=('<', '>'), n_cols=7) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, optimal_pi, goal_state=goal_state)*100, mean_return(env, optimal_pi))) print() print_state_value_function(optimal_V, P, n_cols=7, prec=5) # | | 01 0.668 | 02 0.890 | 03 0.964 | 04 0.989 | 05 0.997 | | # Frozen Lake MDP env = gym.make('FrozenLake-v0') init_state = env.reset() goal_state = 15 P = env.env.P V_best_v, pi_best_v = value_iteration(P, gamma=0.99) print('Optimal policy and state-value function (VI):') print_policy(pi_best_v, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, pi_best_v, goal_state=goal_state)*100, mean_return(env, pi_best_v))) print() print_state_value_function(V_best_v, P, prec=4) print('For comparison, optimal policy and state-value function (PI):') print_policy(pi_best_p, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, pi_best_p, goal_state=goal_state)*100, mean_return(env, pi_best_p))) print() print_state_value_function(V_best_p, P) # Changing the Frozen Lake environment MDP env = gym.make('FrozenLake-v0') P = env.env.P # change reward function reward_goal, reward_holes, reward_others = 1, -1, -0.01 goal, hole = 15, [5, 7, 11, 12] for s in range(len(P)): for a in range(len(P[s])): for t in range(len(P[s][a])): values = list(P[s][a][t]) if values[1] == goal: values[2] = reward_goal values[3] = False elif values[1] in hole: values[2] = reward_holes values[3] = False else: values[2] = reward_others values[3] = False if s in hole or s == goal: values[2] = 0 values[3] = True P[s][a][t] = tuple(values) # change transition function prob_action, prob_drift_one, prob_drift_two = 0.8, 0.1, 0.1 for s in range(len(P)): for a in range(len(P[s])): for t in range(len(P[s][a])): if P[s][a][t][0] == 1.0: continue values = list(P[s][a][t]) if t == 0: values[0] = prob_drift_one elif t == 1: values[0] = prob_action elif t == 2: values[0] = prob_drift_two P[s][a][t] = tuple(values) env.env.P = P V_best, pi_best = policy_iteration(env.env.P, gamma=0.99) print('Optimal policy and state-value function (PI):') print_policy(pi_best, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, pi_best, goal_state=goal_state)*100, mean_return(env, pi_best))) print() print_state_value_function(V_best, P) V_best, pi_best = value_iteration(env.env.P, gamma=0.99) print('Optimal policy and state-value function (PI):') print_policy(pi_best, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, pi_best, goal_state=goal_state)*100, mean_return(env, pi_best))) print() print_state_value_function(V_best, P) # Russell & Norvig's Gridworld env = gym.make('RussellNorvigGridworld-v0') init_state = env.reset() goal_state = 3 P = env.env.P V_best_p, pi_best = policy_iteration(P) print('Optimal policy and state-value function (PI):') print_policy(pi_best, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, pi_best, goal_state=goal_state)*100, mean_return(env, pi_best))) print() print_state_value_function(V_best_p, P) V_best_v, pi_best = value_iteration(P) print('Optimal policy and state-value function (PI):') print_policy(pi_best, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, pi_best, goal_state=goal_state)*100, mean_return(env, pi_best))) print() print_state_value_function(V_best_v, P) LEFT, DOWN, RIGHT, UP = range(4) pi = lambda s: { 0:RIGHT, 1:RIGHT, 2:RIGHT, 3:LEFT, 4:UP, 5:LEFT, 6:UP, 7:LEFT, 8:UP, 9:LEFT, 10:LEFT, 11:LEFT }[s] print('Re-construct optimal policy:') print_policy(pi, P) print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format( probability_success(env, pi, goal_state=goal_state)*100, mean_return(env, pi))) V = policy_evaluation(pi, P) print('Evaluate optimal policy:') print_state_value_function(V, P) pi = policy_improvement(V, P) print('Improve optimal policy (nothing to improve -- it is the same policy, because it is optimal):') print_policy(pi, P) print('There are no differences, nothing to improve on the optimal policy and state-value function:') print(np.abs(V_best_p - V)) print(np.abs(V_best_v - V)) 解读代码,中文回答!!!!
Podcast Editor
Podcast.json
Preview
Audio
