5
5
"""
6
6
7
7
from abc import ABC , abstractmethod
8
+ from simmod .utils .typings_ import *
8
9
9
10
import numpy as np
10
11
11
12
12
- class AdaptiveParamNoiseSpec ( object ):
13
- """Adaptive parameter noise """
13
+ class Distribution ( ABC ):
14
+ """Abstract base class for distributions. """
14
15
15
- def __init__ (self , initial_stddev = 0.1 , desired_action_stddev = 0.1 , adoption_coefficient = 1.01 ):
16
+ def __init__ (self ):
17
+ super (Distribution , self ).__init__ ()
18
+
19
+ @abstractmethod
20
+ def proba_distribution_net (self , * args , ** kwargs ) -> Union [NDarray , Tuple [NDarray , NDarray ]]:
21
+ """Create the layers and parameters that represent the distribution.
22
+
23
+ Subclasses must define this, but the arguments and return type vary between concrete classes.
16
24
"""
17
25
26
+ @abstractmethod
27
+ def proba_distribution (self , * args , ** kwargs ) -> "Distribution" :
28
+ """Set parameters of the distribution.
29
+
18
30
Args:
19
- initial_stddev: (float) the initial value for the standard deviation of the noise
20
- desired_action_stddev: (float) the desired value for the standard deviation of the noise
21
- adoption_coefficient: (float) the update coefficient for the standard deviation of the noise
31
+ *args:
32
+ **kwargs:
33
+
34
+ Returns:
35
+ self
22
36
"""
23
- self .initial_stddev = initial_stddev
24
- self .desired_action_stddev = desired_action_stddev
25
- self .adoption_coefficient = adoption_coefficient
26
37
27
- self .current_stddev = initial_stddev
38
+ @abstractmethod
39
+ def log_prob (self , x : NDarray ) -> NDarray :
40
+ """Returns the log likelihood
28
41
29
- def adapt (self , distance ):
30
- """Update the standard deviation for the parameter noise
42
+ Args:
43
+ x: The taken action
44
+
45
+ Returns:
46
+ The log likelihood of the distribution
47
+ """
48
+
49
+ @abstractmethod
50
+ def entropy (self ) -> Optional [NDarray ]:
51
+ """Shannon's entropy of the probability
52
+
53
+ Returns:
54
+ the entropy, or None if no analytical form is known
55
+ """
56
+
57
+ @abstractmethod
58
+ def sample (self ) -> NDarray :
59
+ """Returns a sample from the probability distribution
60
+
61
+ Returns:
62
+ the stochastic action
63
+ """
64
+
65
+ @abstractmethod
66
+ def mode (self ) -> NDarray :
67
+ """Returns the most likely action (deterministic output) from the probability distribution
68
+
69
+ Returns:
70
+ the stochastic action
71
+ """
72
+
73
+ def get_actions (self , deterministic : bool = False ) -> NDarray :
74
+ """Return actions according to the probability distribution.
31
75
32
76
Args:
33
- distance: (float) the noise distance applied to the parameters
77
+ deterministic:
78
+
79
+ Returns:
80
+
34
81
"""
35
- if distance > self .desired_action_stddev :
36
- # Decrease stddev.
37
- self .current_stddev /= self .adoption_coefficient
38
- else :
39
- # Increase stddev.
40
- self .current_stddev *= self .adoption_coefficient
82
+ if deterministic :
83
+ return self .mode ()
84
+ return self .sample ()
41
85
42
- def get_stats (self ):
43
- """Return the standard deviation for the parameter noise
86
+ @abstractmethod
87
+ def actions_from_params (self , * args , ** kwargs ) -> NDarray :
88
+ """Returns samples from the probability distribution given its parameters.
89
+
90
+ Args:
91
+ *args:
92
+ **kwargs:
44
93
45
94
Returns:
46
- (dict) the stats of the noise
95
+
47
96
"""
48
- return {'param_noise_stddev' : self .current_stddev }
49
97
50
- def __repr__ (self ):
51
- fmt = 'AdaptiveParamNoiseSpec(initial_stddev={}, desired_action_stddev={}, adoption_coefficient={})'
52
- return fmt .format (self .initial_stddev , self .desired_action_stddev , self .adoption_coefficient )
98
+ @abstractmethod
99
+ def log_prob_from_params (self , * args , ** kwargs ) -> Tuple [NDarray , NDarray ]:
100
+ """Returns samples and the associated log probabilities from the probability distribution given its parameters.
101
+
102
+ Args:
103
+ *args:
104
+ **kwargs:
105
+
106
+ Returns:
107
+ actions and log prob
108
+ """
109
+
110
+
111
+ class UniformDistribution (Distribution ):
112
+ pass
113
+
114
+
115
+
116
+ class NormalDistribution (Distribution ):
117
+ pass
118
+
119
+
120
+ class BernoulliDistribution (Distribution ):
121
+ pass
122
+
123
+
53
124
54
125
55
126
class Noise (ABC ):
@@ -91,12 +162,13 @@ def __repr__(self) -> str:
91
162
92
163
93
164
class OrnsteinUhlenbeckNoise (Noise ):
94
- """A Ornstein Uhlenbeck action noise, this is designed to approximate brownian motion with friction.
165
+ """Ornstein Uhlenbeck noise. Designed to approximate brownian motion with friction.
95
166
96
167
Based on http://math.stackexchange.com/questions/1287634/implementing-ornstein-uhlenbeck-in-matlab
97
168
"""
98
169
99
- def __init__ (self , mean , sigma , theta = .15 , dt = 1e-2 , initial_noise = None ):
170
+ def __init__ (self , mean : float , sigma : float , theta : float = .15 , dt : float = 1e-2 ,
171
+ initial_noise : Optional [float ] = None ):
100
172
"""
101
173
102
174
Args:
@@ -126,4 +198,48 @@ def reset(self) -> None:
126
198
self .noise_prev = self .initial_noise if self .initial_noise is not None else np .zeros_like (self ._mu )
127
199
128
200
def __repr__ (self ) -> str :
129
- return 'OrnsteinUhlenbeckActionNoise(mu={}, sigma={})' .format (self ._mu , self ._sigma )
201
+ return 'OrnsteinUhlenbeckActionNoise(mu={}, sigma={})' .format (self ._mu , self ._sigma )
202
+
203
+
204
+ class AdaptiveNoise (Noise ):
205
+ """Adaptive parameter noise"""
206
+
207
+ def __init__ (self , initial_stddev = 0.1 , desired_action_stddev = 0.1 , adoption_coefficient = 1.01 ):
208
+ """
209
+
210
+ Args:
211
+ initial_stddev: (float) the initial value for the standard deviation of the noise
212
+ desired_action_stddev: (float) the desired value for the standard deviation of the noise
213
+ adoption_coefficient: (float) the update coefficient for the standard deviation of the noise
214
+ """
215
+ super ().__init__ ()
216
+ self .initial_stddev = initial_stddev
217
+ self .desired_action_stddev = desired_action_stddev
218
+ self .adoption_coefficient = adoption_coefficient
219
+
220
+ self .current_stddev = initial_stddev
221
+
222
+ def adapt (self , distance ):
223
+ """Update the standard deviation for the parameter noise
224
+
225
+ Args:
226
+ distance: (float) the noise distance applied to the parameters
227
+ """
228
+ if distance > self .desired_action_stddev :
229
+ # Decrease stddev.
230
+ self .current_stddev /= self .adoption_coefficient
231
+ else :
232
+ # Increase stddev.
233
+ self .current_stddev *= self .adoption_coefficient
234
+
235
+ def get_stats (self ):
236
+ """Return the standard deviation for the parameter noise
237
+
238
+ Returns:
239
+ (dict) the stats of the noise
240
+ """
241
+ return {'param_noise_stddev' : self .current_stddev }
242
+
243
+ def __repr__ (self ):
244
+ fmt = 'AdaptiveParamNoiseSpec(initial_stddev={}, desired_action_stddev={}, adoption_coefficient={})'
245
+ return fmt .format (self .initial_stddev , self .desired_action_stddev , self .adoption_coefficient )
0 commit comments