Skip to content

Commit bb0c3f4

Browse files
committed
add support Lease-based leader election (rather than ConfigMaps) kubernetes-client#1877
Signed-off-by: Lane Richard <[email protected]>
1 parent 98a5be8 commit bb0c3f4

File tree

4 files changed

+139
-7
lines changed

4 files changed

+139
-7
lines changed

kubernetes/base/leaderelection/example.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from kubernetes import client, config
1717
from kubernetes.leaderelection import leaderelection
1818
from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock
19+
from kubernetes.leaderelection.resourcelock.leaselock import LeaseLock
1920
from kubernetes.leaderelection import electionconfig
2021

2122

@@ -42,10 +43,13 @@ def example_func():
4243
# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
4344
# In that case, a default callback function will be used
4445

46+
# Choose the type of lock mechanism to use
47+
lock_object = LeaseLock(lock_name, lock_namespace, candidate_id)
48+
#lock_object = ConfigMapLock(lock_name, lock_namespace, candidate_id)
49+
4550
# Create config
46-
config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17,
47-
renew_deadline=15, retry_period=5, onstarted_leading=example_func,
48-
onstopped_leading=None)
51+
config = electionconfig.Config(lock_object, lease_duration=17, renew_deadline=15, retry_period=5,
52+
onstarted_leading=example_func, onstopped_leading=None)
4953

5054
# Enter leader election
5155
leaderelection.LeaderElection(config).run()

kubernetes/base/leaderelection/leaderelectionrecord.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
class LeaderElectionRecord:
17-
# Annotation used in the lock object
17+
# Lease configuration from in the lock object.
1818
def __init__(self, holder_identity, lease_duration, acquire_time, renew_time):
1919
self.holder_identity = holder_identity
2020
self.lease_duration = lease_duration

kubernetes/base/leaderelection/resourcelock/configmaplock.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313
# limitations under the License.
1414

1515
from kubernetes.client.rest import ApiException
16-
from kubernetes import client, config
17-
from kubernetes.client.api_client import ApiClient
16+
from kubernetes import client
1817
from ..leaderelectionrecord import LeaderElectionRecord
1918
import json
2019
import logging
@@ -126,4 +125,4 @@ def get_lock_dict(self, leader_election_record):
126125
self.lock_record['acquireTime'] = leader_election_record.acquire_time
127126
self.lock_record['renewTime'] = leader_election_record.renew_time
128127

129-
return self.lock_record
128+
return self.lock_record
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from kubernetes.client.rest import ApiException
16+
from kubernetes import client
17+
from ..leaderelectionrecord import LeaderElectionRecord
18+
from datetime import datetime
19+
import logging
20+
logging.basicConfig(level=logging.INFO)
21+
22+
23+
class LeaseLock:
24+
def __init__(self, name, namespace, identity):
25+
"""
26+
:param name: name of the lock
27+
:param namespace: namespace
28+
:param identity: A unique identifier that the candidate is using
29+
"""
30+
self.api_instance = client.CoordinationV1Api()
31+
32+
# lease resource identity and reference
33+
self.name = name
34+
self.namespace = namespace
35+
self.lease_reference = None
36+
37+
# identity of this candidate
38+
self.identity = str(identity)
39+
40+
# get returns the election record from a Lease Annotation
41+
def get(self, name, namespace):
42+
"""
43+
:param name: Name of the lease object information to get
44+
:param namespace: Namespace in which the lease object is to be searched
45+
:return: 'True, election record' if object found else 'False, exception response'
46+
"""
47+
try:
48+
lease = self.api_instance.read_namespaced_lease(name, namespace)
49+
50+
except ApiException as e:
51+
return False, e
52+
else:
53+
self.lease_reference = lease
54+
return True, self.election_record(lease)
55+
56+
def create(self, name, namespace, election_record):
57+
"""
58+
:param electionRecord: Annotation string
59+
:param name: Name of the lease object to be created
60+
:param namespace: Namespace in which the lease object is to be created
61+
:return: 'True' if object is created else 'False' if failed
62+
"""
63+
body = client.V1Lease(metadata={"name": name},
64+
spec=self.update_lease(election_record))
65+
66+
try:
67+
_ = self.api_instance.create_namespaced_lease(namespace, body, pretty=True)
68+
return True
69+
except ApiException as e:
70+
logging.info("Failed to create lock as {}".format(e))
71+
return False
72+
73+
def update(self, name, namespace, updated_record):
74+
"""
75+
:param name: name of the lock to be updated
76+
:param namespace: namespace the lock is in
77+
:param updated_record: the updated election record
78+
:return: True if update is successful False if it fails
79+
"""
80+
try:
81+
# update the Lease from the updated record
82+
self.lease_reference.spec = self.update_lease(updated_record,
83+
self.lease_reference.spec)
84+
85+
_ = self.api_instance.replace_namespaced_lease(name=name, namespace=namespace,
86+
body=self.lease_reference)
87+
return True
88+
except ApiException as e:
89+
logging.info("Failed to update lock as {}".format(e))
90+
return False
91+
92+
def update_lease(self, leader_election_record, current_spec=None):
93+
# existing or new lease?
94+
spec = current_spec if current_spec else client.V1LeaseSpec()
95+
96+
# lease configuration
97+
spec.holder_identity = leader_election_record.holder_identity
98+
spec.lease_duration_seconds = int(leader_election_record.lease_duration)
99+
spec.acquire_time = self.time_str_to_iso(leader_election_record.acquire_time)
100+
spec.renew_time = self.time_str_to_iso(leader_election_record.renew_time)
101+
102+
return spec
103+
104+
def election_record(self, lease):
105+
"""
106+
Get leader election record from Lease spec.
107+
"""
108+
leader_election_record = LeaderElectionRecord(None, None, None, None)
109+
110+
if lease.spec and lease.spec.holder_identity:
111+
leader_election_record.holder_identity = lease.spec.holder_identity
112+
if lease.spec and lease.spec.lease_duration_seconds:
113+
leader_election_record.lease_duration = str(lease.spec.lease_duration_seconds)
114+
if lease.spec and lease.spec.acquire_time:
115+
leader_election_record.acquire_time = str(datetime.replace(lease.spec.acquire_time, tzinfo=None))
116+
if lease.spec and lease.spec.renew_time:
117+
leader_election_record.renew_time = str(datetime.replace(lease.spec.renew_time, tzinfo=None))
118+
119+
return leader_election_record
120+
121+
# conversion between kubernetes ISO formatted time and elector record time
122+
def time_str_to_iso(self, str_time):
123+
formats = ["%Y-%m-%d %H:%M:%S.%f%z", "%Y-%m-%d %H:%M:%S.%f"]
124+
for fmt in formats:
125+
try:
126+
return datetime.strptime(str_time, fmt).isoformat()+'Z'
127+
except ValueError:
128+
pass
129+
logging.error("Failed to parse time string: {}".format(str_time))

0 commit comments

Comments
 (0)