Skip to content

Commit cce9f8b

Browse files
committed
Add leaderelection module
Add leaderelection module, based off of the leaderelection module in kubernetes-client/python. The module has been altered slightly to support asyncio. Fixes #297
1 parent 3ab6408 commit cce9f8b

File tree

8 files changed

+925
-0
lines changed

8 files changed

+925
-0
lines changed

examples/leaderelection.py

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import os
17+
import uuid
18+
19+
from kubernetes_asyncio import config
20+
from kubernetes_asyncio.client import api_client
21+
from kubernetes_asyncio.leaderelection import electionconfig, leaderelection
22+
from kubernetes_asyncio.leaderelection.resourcelock.configmaplock import (
23+
ConfigMapLock,
24+
)
25+
26+
27+
async def main():
28+
29+
# Authenticate using config file
30+
await config.load_kube_config(config_file=os.environ.get("KUBECONFIG", ""))
31+
32+
# Parameters required from the user
33+
34+
# A unique identifier for this candidate
35+
candidate_id = uuid.uuid4()
36+
37+
# Name of the lock object to be created
38+
lock_name = "examplepython"
39+
40+
# Kubernetes namespace
41+
lock_namespace = "default"
42+
43+
# The function that a user wants to run once a candidate is elected as a
44+
# leader. Cancellation is supported (when a held leader lock is lost).
45+
async def example_start_func():
46+
try:
47+
print("I am leader")
48+
except asyncio.CancelledError:
49+
print(
50+
"Start function cancelled - lost leader election after becoming leader"
51+
)
52+
53+
async def example_end_func():
54+
print("I am no longer leader")
55+
56+
# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
57+
# In that case, a default callback function will be used
58+
59+
async with api_client.ApiClient() as apic:
60+
# Create config
61+
leader_election_config = electionconfig.Config(
62+
ConfigMapLock(lock_name, lock_namespace, candidate_id, apic),
63+
lease_duration=17,
64+
renew_deadline=15,
65+
retry_period=5,
66+
onstarted_leading=example_start_func,
67+
onstopped_leading=example_end_func,
68+
)
69+
70+
# Enter leader election
71+
await leaderelection.LeaderElection(leader_election_config).run()
72+
# User can choose to do another round of election or simply exit
73+
print("Exited leader election")
74+
75+
76+
if __name__ == "__main__":
77+
asyncio.run(main())

kubernetes_asyncio/leaderelection/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
17+
logging.basicConfig(level=logging.INFO)
18+
19+
20+
class Config:
21+
# Validate config, exit if an error is detected
22+
def __init__(
23+
self,
24+
lock,
25+
lease_duration,
26+
renew_deadline,
27+
retry_period,
28+
onstarted_leading,
29+
onstopped_leading,
30+
):
31+
self.jitter_factor = 1.2
32+
33+
if lock is None:
34+
raise ValueError("lock cannot be None")
35+
self.lock = lock
36+
37+
if lease_duration <= renew_deadline:
38+
raise ValueError("lease_duration must be greater than renew_deadline")
39+
40+
if renew_deadline <= self.jitter_factor * retry_period:
41+
raise ValueError(
42+
"renewDeadline must be greater than retry_period*jitter_factor"
43+
)
44+
45+
if lease_duration < 1:
46+
raise ValueError("lease_duration must be greater than one")
47+
48+
if renew_deadline < 1:
49+
raise ValueError("renew_deadline must be greater than one")
50+
51+
if retry_period < 1:
52+
raise ValueError("retry_period must be greater than one")
53+
54+
self.lease_duration = lease_duration
55+
self.renew_deadline = renew_deadline
56+
self.retry_period = retry_period
57+
58+
if onstarted_leading is None:
59+
raise ValueError("callback onstarted_leading cannot be None")
60+
self.onstarted_leading = onstarted_leading
61+
62+
if onstopped_leading is None:
63+
self.onstopped_leading = self.on_stoppedleading_callback
64+
else:
65+
self.onstopped_leading = onstopped_leading
66+
67+
# Default callback for when the current candidate if a leader, stops leading
68+
def on_stoppedleading_callback(self):
69+
logging.info("{} stopped leading".format(self.lock.identity))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import datetime
17+
import json
18+
import logging
19+
import sys
20+
import time
21+
from http import HTTPStatus
22+
23+
from .leaderelectionrecord import LeaderElectionRecord
24+
25+
logging.basicConfig(level=logging.INFO)
26+
27+
"""
28+
This package implements leader election using an annotation in a Kubernetes
29+
object. The onstarted_leading coroutine is run as a task, which is cancelled if
30+
the leader lock is obtained and then lost.
31+
32+
At first all candidates are considered followers. The one to create a lock or
33+
update an existing lock first becomes the leader and remains so until it keeps
34+
renewing its lease.
35+
"""
36+
37+
38+
class LeaderElection:
39+
def __init__(self, election_config):
40+
if election_config is None:
41+
sys.exit("argument config not passed")
42+
43+
# Latest record observed in the created lock object
44+
self.observed_record = None
45+
46+
# The configuration set for this candidate
47+
self.election_config = election_config
48+
49+
# Latest update time of the lock
50+
self.observed_time_milliseconds = 0
51+
52+
# Point of entry to Leader election
53+
async def run(self):
54+
# Try to create/ acquire a lock
55+
if await self.acquire():
56+
logging.info(
57+
"{} successfully acquired lease".format(
58+
self.election_config.lock.identity
59+
)
60+
)
61+
62+
task = asyncio.create_task(self.election_config.onstarted_leading())
63+
64+
await self.renew_loop()
65+
66+
# Leader lock lost - cancel the onstarted_leading coroutine if it's
67+
# still running. This permits onstarted_leading to clean up state
68+
# that might not be accessible to onstopped_leading.
69+
task.cancel()
70+
71+
# Failed to update lease, run onstopped_leading callback. This is
72+
# preserved in order to continue to provide an interface similar to
73+
# the one provided by `kubernetes-client/python`.
74+
await self.election_config.onstopped_leading()
75+
76+
async def acquire(self):
77+
# Follower
78+
logging.info("{} is a follower".format(self.election_config.lock.identity))
79+
retry_period = self.election_config.retry_period
80+
81+
while True:
82+
succeeded = await self.try_acquire_or_renew()
83+
84+
if succeeded:
85+
return True
86+
87+
await asyncio.sleep(retry_period)
88+
89+
async def renew_loop(self):
90+
# Leader
91+
logging.info(
92+
"Leader has entered renew loop and will try to update lease continuously"
93+
)
94+
95+
retry_period = self.election_config.retry_period
96+
renew_deadline = self.election_config.renew_deadline * 1000
97+
98+
while True:
99+
timeout = int(time.time() * 1000) + renew_deadline
100+
succeeded = False
101+
102+
while int(time.time() * 1000) < timeout:
103+
succeeded = await self.try_acquire_or_renew()
104+
105+
if succeeded:
106+
break
107+
await asyncio.sleep(retry_period)
108+
109+
if succeeded:
110+
await asyncio.sleep(retry_period)
111+
continue
112+
113+
# failed to renew, return
114+
return
115+
116+
async def try_acquire_or_renew(self):
117+
now_timestamp = time.time()
118+
now = datetime.datetime.fromtimestamp(now_timestamp)
119+
120+
# Check if lock is created
121+
lock_status, old_election_record = await self.election_config.lock.get(
122+
self.election_config.lock.name, self.election_config.lock.namespace
123+
)
124+
125+
# create a default Election record for this candidate
126+
leader_election_record = LeaderElectionRecord(
127+
self.election_config.lock.identity,
128+
str(self.election_config.lease_duration),
129+
str(now),
130+
str(now),
131+
)
132+
133+
# A lock is not created with that name, try to create one
134+
if not lock_status:
135+
if json.loads(old_election_record.body)["code"] != HTTPStatus.NOT_FOUND:
136+
logging.info(
137+
"Error retrieving resource lock {} as {}".format(
138+
self.election_config.lock.name, old_election_record.reason
139+
)
140+
)
141+
return False
142+
143+
logging.info(
144+
"{} is trying to create a lock".format(
145+
leader_election_record.holder_identity
146+
)
147+
)
148+
create_status = await self.election_config.lock.create(
149+
name=self.election_config.lock.name,
150+
namespace=self.election_config.lock.namespace,
151+
election_record=leader_election_record,
152+
)
153+
154+
if not create_status:
155+
logging.info(
156+
"{} Failed to create lock".format(
157+
leader_election_record.holder_identity
158+
)
159+
)
160+
return False
161+
162+
self.observed_record = leader_election_record
163+
self.observed_time_milliseconds = int(time.time() * 1000)
164+
return True
165+
166+
# A lock exists with that name
167+
# Validate old_election_record
168+
if old_election_record is None:
169+
# try to update lock with proper annotation and election record
170+
return await self.update_lock(leader_election_record)
171+
172+
if (
173+
old_election_record.holder_identity is None
174+
or old_election_record.lease_duration is None
175+
or old_election_record.acquire_time is None
176+
or old_election_record.renew_time is None
177+
):
178+
# try to update lock with proper annotation and election record
179+
return await self.update_lock(leader_election_record)
180+
181+
# Report transitions
182+
if (
183+
self.observed_record
184+
and self.observed_record.holder_identity
185+
!= old_election_record.holder_identity
186+
):
187+
logging.info(
188+
"Leader has switched to {}".format(old_election_record.holder_identity)
189+
)
190+
191+
if (
192+
self.observed_record is None
193+
or old_election_record.__dict__ != self.observed_record.__dict__
194+
):
195+
self.observed_record = old_election_record
196+
self.observed_time_milliseconds = int(time.time() * 1000)
197+
198+
# If This candidate is not the leader and lease duration is yet to finish
199+
if (
200+
self.election_config.lock.identity != self.observed_record.holder_identity
201+
and self.observed_time_milliseconds
202+
+ self.election_config.lease_duration * 1000
203+
> int(now_timestamp * 1000)
204+
):
205+
logging.info(
206+
"yet to finish lease_duration, lease held by {} and has not expired".format(
207+
old_election_record.holder_identity
208+
)
209+
)
210+
return False
211+
212+
# If this candidate is the Leader
213+
if self.election_config.lock.identity == self.observed_record.holder_identity:
214+
# Leader updates renewTime, but keeps acquire_time unchanged
215+
leader_election_record.acquire_time = self.observed_record.acquire_time
216+
217+
return await self.update_lock(leader_election_record)
218+
219+
async def update_lock(self, leader_election_record):
220+
# Update object with latest election record
221+
update_status = await self.election_config.lock.update(
222+
self.election_config.lock.name,
223+
self.election_config.lock.namespace,
224+
leader_election_record,
225+
)
226+
227+
if not update_status:
228+
logging.info(
229+
"{} failed to acquire lease".format(
230+
leader_election_record.holder_identity
231+
)
232+
)
233+
return False
234+
235+
self.observed_record = leader_election_record
236+
self.observed_time_milliseconds = int(time.time() * 1000)
237+
logging.info(
238+
"leader {} has successfully acquired lease".format(
239+
leader_election_record.holder_identity
240+
)
241+
)
242+
return True

0 commit comments

Comments
 (0)