-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnes.py
74 lines (61 loc) · 3.03 KB
/
nes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from rl import get_space_info, get_policy
import numpy as np
class NESOptimizer(object):
def __init__(self, env, alpha, sigma):
self.alpha = alpha
self.sigma = sigma
self.obs_space = get_space_info(env.observation_space)
self.action_space = get_space_info(env.action_space)
self.policy = get_policy(self.obs_space, self.action_space)
def optimize(self, env, n_batches, n_episodes_in_batch,
verbose=False, render=False):
""" Performs optimization for given environment """
if verbose:
self._print_start()
# Prepare weights
w = np.zeros(self.obs_space['n'] * self.action_space['n']) # np.random.rand(self.obs_space['n'] * self.action_space['n'])
# Obtained reward stored to evaluate total performance
reward_history = []
rewards = np.zeros(n_episodes_in_batch)
for j in range(n_batches):
# Generates random noise for each parameter
N = np.random.normal(scale=self.sigma, size=(n_episodes_in_batch-1, w.shape[0]))
# Use previous set of parameters as a last set of parameters for this batch to prevent negative changes
N = np.vstack((N, np.zeros(w.shape[0])))
# Evaluate the changes
for i in range(n_episodes_in_batch):
w_try = w + N[i]
reward = self._run_episode(env, w_try, render)
rewards[i] = reward
reward_history.append(np.mean(rewards))
w = self._update_w(rewards, N, w, n_episodes_in_batch)
if verbose:
print("Batch {}/{}, reward mean {}, reward standard deviation {}".format(j + 1, n_batches, np.mean(rewards), np.std(rewards)))
return w, reward_history
def _print_start(self):
print("Started optimization for environment with {} {} observation dimensions and {} {} action dimensions"
.format(self.obs_space['n'], "discrete" if self.obs_space['discrete'] else "continuous",
self.action_space['n'], "discrete" if self.action_space['discrete'] else "continuous"))
def _update_w(self, rewards, N, w, n_episodes_in_batch):
std = np.std(rewards)
m = np.mean(rewards)
# If no difference in rewards, nothing to change
if std == 0: return w
# Reward transformed to weights
A = (rewards - m) / std
# Weights are changed on a vector of weighted sum of
w += self.alpha / (n_episodes_in_batch * self.sigma) * np.dot(N.T, A)
return w
def _run_episode(self, env, w, render=False):
""" Evaluates single episode for given environment and given parameters w """
done = False
observation = env.reset()
ep_reward = 0
w = w.reshape(self.obs_space['n'], self.action_space['n'])
while not done:
action = self.policy(w, observation)
if render:
env.render()
observation, reward, done, _ = env.step(action)
ep_reward += reward
return ep_reward