14
14
__author__ = 'dougalb'
15
15
16
16
from datetime import datetime
17
- import boto .ec2
18
- import dateutil .parser
19
17
import urllib2
20
- import ConfigParser
21
- import boto .ec2 .autoscale
22
18
import os
23
19
import time
24
20
import sys
25
21
import tempfile
26
22
import logging
23
+ import boto3
24
+ import ConfigParser
25
+ from botocore .config import Config
27
26
28
27
log = logging .getLogger (__name__ )
29
28
@@ -37,12 +36,20 @@ def getConfig(instance_id):
37
36
logging .getLogger ().setLevel (lvl )
38
37
_region = config .get ('nodewatcher' , 'region' )
39
38
_scheduler = config .get ('nodewatcher' , 'scheduler' )
39
+ _proxy = config .get ('nodewatcher' , 'proxy' )
40
+ proxy_config = Config ()
41
+
42
+ if not _proxy == "NONE" :
43
+ proxy_config = Config (proxies = {'https' : _proxy })
44
+
40
45
try :
41
46
_asg = config .get ('nodewatcher' , 'asg' )
42
47
except ConfigParser .NoOptionError :
43
- conn = boto .ec2 .connect_to_region (_region ,proxy = boto .config .get ('Boto' , 'proxy' ),
44
- proxy_port = boto .config .get ('Boto' , 'proxy_port' ))
45
- _asg = conn .get_all_instances (instance_ids = instance_id )[0 ].instances [0 ].tags ['aws:autoscaling:groupName' ]
48
+ ec2 = boto3 .resource ('ec2' , region_name = _region , config = proxy_config )
49
+
50
+ instances = ec2 .instances .filter (InstanceIds = [instance_id ])
51
+ instance = next (iter (instances or []), None )
52
+ _asg = filter (lambda tag : tag .get ('Key' ) == 'aws:autoscaling:groupName' , instance .tags )[0 ].get ('Value' )
46
53
log .debug ("discovered asg: %s" % _asg )
47
54
config .set ('nodewatcher' , 'asg' , _asg )
48
55
@@ -53,13 +60,13 @@ def getConfig(instance_id):
53
60
54
61
os .rename (tup [1 ], 'nodewatcher.cfg' )
55
62
56
- log .debug ("region=%s asg=%s scheduler=%s" % (_region , _asg , _scheduler ))
57
- return _region , _asg , _scheduler
63
+ log .debug ("region=%s asg=%s scheduler=%s prox_config=%s " % (_region , _asg , _scheduler , proxy_config ))
64
+ return _region , _asg , _scheduler , proxy_config
58
65
59
- def getHourPercentile (instance_id , conn ):
60
- _reservations = conn . get_all_instances ( instance_ids = [instance_id ])
61
- _instance = _reservations [ 0 ]. instances [ 0 ]
62
- _launch_time = dateutil . parser . parse ( _instance . launch_time ) .replace (tzinfo = None )
66
+ def getHourPercentile (instance_id , ec2 ):
67
+ instances = ec2 . instances . filter ( InstanceIds = [instance_id ])
68
+ instance = next ( iter ( instances or []), None )
69
+ _launch_time = instance . launch_time .replace (tzinfo = None )
63
70
_current_time = datetime .utcnow ()
64
71
_delta = _current_time - _launch_time
65
72
_delta_in_hours = _delta .seconds / 3600.0
@@ -121,20 +128,17 @@ def lockHost(s,hostname,unlock=False):
121
128
122
129
return _r
123
130
124
- def selfTerminate (region , asg , instance_id ):
125
- _as_conn = boto .ec2 .autoscale .connect_to_region (region ,proxy = boto .config .get ('Boto' , 'proxy' ),
126
- proxy_port = boto .config .get ('Boto' , 'proxy_port' ))
127
- if not maintainSize (region , asg ):
131
+ def selfTerminate (asg_name , asg_conn , instance_id ):
132
+ if not maintainSize (asg_name , asg_conn ):
128
133
log .info ("terminating %s" % instance_id )
129
- _as_conn .terminate_instance (instance_id , decrement_capacity = True )
130
-
131
- def maintainSize (region , asg ):
132
- _as_conn = boto .ec2 .autoscale .connect_to_region (region ,proxy = boto .config .get ('Boto' , 'proxy' ),
133
- proxy_port = boto .config .get ('Boto' , 'proxy_port' ))
134
- _asg = _as_conn .get_all_groups (names = [asg ])[0 ]
135
- _capacity = _asg .desired_capacity
136
- _min_size = _asg .min_size
137
- log .debug ("capacity=%d min_size=%d" % (_capacity , _min_size ))
134
+ asg_conn .terminate_instance_in_auto_scaling_group (InstanceId = instance_id , ShouldDecrementDesiredCapacity = True )
135
+
136
+ def maintainSize (asg_name , asg_conn ):
137
+ asg = asg_conn .describe_auto_scaling_groups (AutoScalingGroupNames = [asg_name ]) \
138
+ .get ('AutoScalingGroups' )[0 ]
139
+ _capacity = asg .get ('DesiredCapacity' )
140
+ _min_size = asg .get ('MinSize' )
141
+ log .info ("capacity=%d min_size=%d" % (_capacity , _min_size ))
138
142
if _capacity > _min_size :
139
143
log .debug ('capacity greater then min size.' )
140
144
return False
@@ -150,24 +154,25 @@ def main():
150
154
log .info ("nodewatcher startup" )
151
155
instance_id = getInstanceId ()
152
156
hostname = getHostname ()
153
- region , asg , scheduler = getConfig (instance_id )
157
+ region , asg_name , scheduler , proxy_config = getConfig (instance_id )
154
158
155
159
s = loadSchedulerModule (scheduler )
156
160
157
161
while True :
158
162
time .sleep (60 )
159
- conn = boto .ec2 .connect_to_region (region )
160
- hour_percentile = getHourPercentile (instance_id ,conn )
163
+ ec2_conn = boto3 .resource ('ec2' , region_name = region , config = proxy_config )
164
+ asg_conn = boto3 .client ('autoscaling' , region_name = region , config = proxy_config )
165
+ hour_percentile = getHourPercentile (instance_id , ec2_conn )
161
166
log .info ('Percent of hour used: %d' % hour_percentile )
162
167
163
168
if hour_percentile < 95 :
164
169
continue
165
-
170
+
166
171
jobs = getJobs (s , hostname )
167
172
if jobs == True :
168
173
log .info ('Instance has active jobs.' )
169
174
else :
170
- if maintainSize (region , asg ):
175
+ if maintainSize (asg_name , asg_conn ):
171
176
continue
172
177
# avoid race condition by locking and verifying
173
178
lockHost (s , hostname )
@@ -177,7 +182,7 @@ def main():
177
182
lockHost (s , hostname , unlock = True )
178
183
continue
179
184
else :
180
- selfTerminate (region , asg , instance_id )
185
+ selfTerminate (asg_name , asg_conn , instance_id )
181
186
182
187
if __name__ == "__main__" :
183
188
main ()
0 commit comments