Skip to content
This repository was archived by the owner on May 30, 2022. It is now read-only.

Commit 4ad9d11

Browse files
committedMay 26, 2021
Format and check codebase by black
Signed-off-by: Chenxiong Qi <[email protected]>
1 parent 518310c commit 4ad9d11

File tree

3 files changed

+431
-311
lines changed

3 files changed

+431
-311
lines changed
 

‎krbcontext/context.py

+79-56
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@
2727

2828
from threading import Lock
2929

30-
__all__ = ('krbContext',)
30+
__all__ = ("krbContext",)
3131

3232

33-
DEFAULT_CCACHE = 'DEFAULT_CCACHE'
34-
DEFAULT_KEYTAB = 'DEFAULT_KEYTAB'
35-
ENV_KRB5CCNAME = 'KRB5CCNAME'
33+
DEFAULT_CCACHE = "DEFAULT_CCACHE"
34+
DEFAULT_KEYTAB = "DEFAULT_KEYTAB"
35+
ENV_KRB5CCNAME = "KRB5CCNAME"
3636

3737

3838
def get_login():
@@ -60,8 +60,14 @@ class krbContext(object):
6060
krbContext can work with ``with`` statement to simplify your code.
6161
"""
6262

63-
def __init__(self, using_keytab=False, principal=None, keytab_file=None,
64-
ccache_file=None, password=None):
63+
def __init__(
64+
self,
65+
using_keytab=False,
66+
principal=None,
67+
keytab_file=None,
68+
ccache_file=None,
69+
password=None,
70+
):
6571
"""Initialize context
6672
6773
:param bool using_keytab: indicate whether to initialize credential
@@ -82,20 +88,26 @@ def __init__(self, using_keytab=False, principal=None, keytab_file=None,
8288
omitted, program will be blocked and prompts to enter a password
8389
from command line, which requires program runs in a terminal.
8490
"""
85-
self._cleaned_options = self.clean_options(using_keytab=using_keytab,
86-
principal=principal,
87-
keytab_file=keytab_file,
88-
ccache_file=ccache_file,
89-
password=password)
91+
self._cleaned_options = self.clean_options(
92+
using_keytab=using_keytab,
93+
principal=principal,
94+
keytab_file=keytab_file,
95+
ccache_file=ccache_file,
96+
password=password,
97+
)
9098
self._original_krb5ccname = None
9199
self._inited = False
92100

93101
self._init_lock = Lock()
94102

95-
def clean_options(self,
96-
using_keytab=False, principal=None,
97-
keytab_file=None, ccache_file=None,
98-
password=None):
103+
def clean_options(
104+
self,
105+
using_keytab=False,
106+
principal=None,
107+
keytab_file=None,
108+
ccache_file=None,
109+
password=None,
110+
):
99111
"""Clean argument to related object
100112
101113
:param bool using_keytab: refer to ``krbContext.__init__``.
@@ -114,63 +126,68 @@ def clean_options(self,
114126

115127
if using_keytab:
116128
if principal is None:
117-
raise ValueError('Principal is required when using key table.')
129+
raise ValueError("Principal is required when using key table.")
118130
princ_name = gssapi.Name(
119-
principal, gssapi.NameType.kerberos_principal)
131+
principal, gssapi.NameType.kerberos_principal
132+
)
120133

121134
if keytab_file is None:
122-
cleaned['keytab'] = DEFAULT_KEYTAB
135+
cleaned["keytab"] = DEFAULT_KEYTAB
123136
elif not os.path.exists(keytab_file):
124-
raise ValueError(f'Keytab file {keytab_file} does not exist.')
137+
raise ValueError(f"Keytab file {keytab_file} does not exist.")
125138
else:
126-
cleaned['keytab'] = keytab_file
139+
cleaned["keytab"] = keytab_file
127140
else:
128141
if principal is None:
129142
principal = get_login()
130143
princ_name = gssapi.Name(principal, gssapi.NameType.user)
131144

132-
cleaned['using_keytab'] = using_keytab
133-
cleaned['principal'] = princ_name
134-
cleaned['ccache'] = ccache_file or DEFAULT_CCACHE
135-
cleaned['password'] = password
145+
cleaned["using_keytab"] = using_keytab
146+
cleaned["principal"] = princ_name
147+
cleaned["ccache"] = ccache_file or DEFAULT_CCACHE
148+
cleaned["password"] = password
136149

137150
return cleaned
138151

139152
def init_with_keytab(self):
140153
"""Initialize credential cache with keytab"""
141154
creds_opts = {
142-
'usage': 'initiate',
143-
'name': self._cleaned_options['principal'],
155+
"usage": "initiate",
156+
"name": self._cleaned_options["principal"],
144157
}
145158

146159
store = {}
147-
if self._cleaned_options['keytab'] != DEFAULT_KEYTAB:
148-
store['client_keytab'] = self._cleaned_options['keytab']
149-
if self._cleaned_options['ccache'] != DEFAULT_CCACHE:
150-
store['ccache'] = self._cleaned_options['ccache']
160+
if self._cleaned_options["keytab"] != DEFAULT_KEYTAB:
161+
store["client_keytab"] = self._cleaned_options["keytab"]
162+
if self._cleaned_options["ccache"] != DEFAULT_CCACHE:
163+
store["ccache"] = self._cleaned_options["ccache"]
151164
if store:
152-
creds_opts['store'] = store
165+
creds_opts["store"] = store
153166

154167
creds = gssapi.Credentials(**creds_opts)
155168
try:
156169
creds.lifetime
157170
except gssapi.exceptions.ExpiredCredentialsError:
158171
new_creds_opts = copy.deepcopy(creds_opts)
159172
# Get new credential and put it into a temporary ccache
160-
temp_directory = tempfile.mkdtemp('-krbcontext')
161-
temp_ccache = os.path.join(temp_directory, 'ccache')
173+
temp_directory = tempfile.mkdtemp("-krbcontext")
174+
temp_ccache = os.path.join(temp_directory, "ccache")
162175
try:
163-
new_creds_opts.setdefault('store', {})['ccache'] = temp_ccache
176+
new_creds_opts.setdefault("store", {})["ccache"] = temp_ccache
164177
creds = gssapi.Credentials(**new_creds_opts)
165178
# Then, store new credential back to original specified ccache,
166179
# whatever a given ccache file or the default one.
167180
_store = None
168181
# If default ccache is used, no need to specify ccache in
169182
# store parameter passed to ``creds.store``.
170-
if self._cleaned_options['ccache'] != DEFAULT_CCACHE:
171-
_store = {'ccache': store['ccache']}
172-
creds.store(usage='initiate', store=_store, set_default=True,
173-
overwrite=True)
183+
if self._cleaned_options["ccache"] != DEFAULT_CCACHE:
184+
_store = {"ccache": store["ccache"]}
185+
creds.store(
186+
usage="initiate",
187+
store=_store,
188+
set_default=True,
189+
overwrite=True,
190+
)
174191
finally:
175192
shutil.rmtree(temp_directory, ignore_errors=True)
176193

@@ -188,24 +205,25 @@ def init_with_password(self):
188205
line but no attry is available.
189206
"""
190207
creds_opts = {
191-
'usage': 'initiate',
192-
'name': self._cleaned_options['principal'],
208+
"usage": "initiate",
209+
"name": self._cleaned_options["principal"],
193210
}
194-
if self._cleaned_options['ccache'] != DEFAULT_CCACHE:
195-
creds_opts['store'] = {'ccache': self._cleaned_options['ccache']}
211+
if self._cleaned_options["ccache"] != DEFAULT_CCACHE:
212+
creds_opts["store"] = {"ccache": self._cleaned_options["ccache"]}
196213

197214
cred = gssapi.Credentials(**creds_opts)
198215
try:
199216
cred.lifetime
200217
except gssapi.exceptions.ExpiredCredentialsError:
201-
password = self._cleaned_options['password']
218+
password = self._cleaned_options["password"]
202219

203220
if not password:
204221
if not sys.stdin.isatty():
205222
raise IOError(
206-
'krbContext is not running from a terminal. So, you '
207-
'need to run kinit with your principal manually before'
208-
' anything goes.')
223+
"krbContext is not running from a terminal. So, you "
224+
"need to run kinit with your principal manually before"
225+
" anything goes."
226+
)
209227

210228
# If there is no password specified via API call, prompt to
211229
# enter one in order to continue to get credential. BUT, in
@@ -218,19 +236,24 @@ def init_with_password(self):
218236
password = getpass.getpass()
219237

220238
cred = gssapi.raw.acquire_cred_with_password(
221-
self._cleaned_options['principal'], password.encode('utf-8'))
239+
self._cleaned_options["principal"], password.encode("utf-8")
240+
)
222241

223-
ccache = self._cleaned_options['ccache']
242+
ccache = self._cleaned_options["ccache"]
224243
if ccache == DEFAULT_CCACHE:
225244
gssapi.raw.store_cred(
226245
cred.creds,
227-
usage='initiate', overwrite=True, set_default=True,
246+
usage="initiate",
247+
overwrite=True,
248+
set_default=True,
228249
)
229250
else:
230-
gssapi.raw.store_cred_into({'ccache': ccache},
231-
cred.creds,
232-
usage='initiate',
233-
overwrite=True)
251+
gssapi.raw.store_cred_into(
252+
{"ccache": ccache},
253+
cred.creds,
254+
usage="initiate",
255+
overwrite=True,
256+
)
234257

235258
def _prepare_context(self):
236259
"""Prepare context
@@ -242,7 +265,7 @@ def _prepare_context(self):
242265
243266
Internal use only.
244267
"""
245-
ccache = self._cleaned_options['ccache']
268+
ccache = self._cleaned_options["ccache"]
246269

247270
# Whatever there is KRB5CCNAME was set in current process,
248271
# original_krb5ccname will contain current value even if None if
@@ -260,7 +283,7 @@ def _prepare_context(self):
260283
# us point to the given ccache by KRB5CCNAME.
261284
os.environ[ENV_KRB5CCNAME] = ccache
262285

263-
if self._cleaned_options['using_keytab']:
286+
if self._cleaned_options["using_keytab"]:
264287
self.init_with_keytab()
265288
else:
266289
self.init_with_password()
@@ -281,7 +304,7 @@ def __exit__(self, exc_type, exc_value, traceback):
281304
restored correctly, if there was. And, lock gets released as well.
282305
"""
283306
try:
284-
if self._cleaned_options['ccache'] == DEFAULT_CCACHE:
307+
if self._cleaned_options["ccache"] == DEFAULT_CCACHE:
285308
if self._original_krb5ccname:
286309
os.environ[ENV_KRB5CCNAME] = self._original_krb5ccname
287310
else:

‎test/test_krbcontext.py

+347-254
Original file line numberDiff line numberDiff line change
@@ -19,294 +19,365 @@ def test_missing_principal(self):
1919
self.assertRaises(ValueError, krbContext, using_keytab=True)
2020

2121
def test_all_defaults(self):
22-
context = krbContext(using_keytab=True,
23-
principal='HTTP/hostname@EXAMPLE.COM')
22+
context = krbContext(
23+
using_keytab=True, principal="HTTP/hostname@EXAMPLE.COM"
24+
)
2425

25-
self.assertTrue(context._cleaned_options['using_keytab'])
26+
self.assertTrue(context._cleaned_options["using_keytab"])
2627
expected_princ = gssapi.Name(
27-
'HTTP/hostname@EXAMPLE.COM',
28-
gssapi.NameType.kerberos_principal)
29-
self.assertEqual(expected_princ, context._cleaned_options['principal'])
30-
self.assertEqual(kctx.DEFAULT_CCACHE,
31-
context._cleaned_options['ccache'])
32-
self.assertEqual(kctx.DEFAULT_KEYTAB,
33-
context._cleaned_options['keytab'])
34-
35-
@patch('os.path.exists')
28+
"HTTP/hostname@EXAMPLE.COM", gssapi.NameType.kerberos_principal
29+
)
30+
self.assertEqual(expected_princ, context._cleaned_options["principal"])
31+
self.assertEqual(
32+
kctx.DEFAULT_CCACHE, context._cleaned_options["ccache"]
33+
)
34+
self.assertEqual(
35+
kctx.DEFAULT_KEYTAB, context._cleaned_options["keytab"]
36+
)
37+
38+
@patch("os.path.exists")
3639
def test_specify_existing_keytab(self, exists):
3740
exists.return_value = True
3841

39-
context = krbContext(using_keytab=True,
40-
principal='HTTP/hostname@EXAMPLE.COM',
41-
keytab_file='/etc/app/app.keytab')
42-
self.assertEqual('/etc/app/app.keytab',
43-
context._cleaned_options['keytab'])
42+
context = krbContext(
43+
using_keytab=True,
44+
principal="HTTP/hostname@EXAMPLE.COM",
45+
keytab_file="/etc/app/app.keytab",
46+
)
47+
self.assertEqual(
48+
"/etc/app/app.keytab", context._cleaned_options["keytab"]
49+
)
4450

45-
@patch('os.path.exists')
51+
@patch("os.path.exists")
4652
def test_specify_nonexisting_keytab(self, exists):
4753
exists.return_value = False
4854

49-
self.assertRaises(ValueError,
50-
krbContext,
51-
using_keytab=True,
52-
principal='HTTP/hostname@EXAMPLE.COM',
53-
keytab_file='/etc/app/app.keytab')
55+
self.assertRaises(
56+
ValueError,
57+
krbContext,
58+
using_keytab=True,
59+
principal="HTTP/hostname@EXAMPLE.COM",
60+
keytab_file="/etc/app/app.keytab",
61+
)
5462

5563
def test_specify_ccache(self):
56-
context = krbContext(using_keytab=True,
57-
principal='HTTP/hostname@EXAMPLE.COM',
58-
ccache_file='/var/app/krb5_ccache')
59-
self.assertEqual('/var/app/krb5_ccache',
60-
context._cleaned_options['ccache'])
64+
context = krbContext(
65+
using_keytab=True,
66+
principal="HTTP/hostname@EXAMPLE.COM",
67+
ccache_file="/var/app/krb5_ccache",
68+
)
69+
self.assertEqual(
70+
"/var/app/krb5_ccache", context._cleaned_options["ccache"]
71+
)
6172

6273

6374
class CleanArgumentsAsRegularUserTest(unittest.TestCase):
6475
"""Test clean_context_options for not using keytab"""
6576

66-
@patch('krbcontext.context.get_login')
77+
@patch("krbcontext.context.get_login")
6778
def test_all_defaults(self, get_login):
68-
get_login.return_value = 'cqi'
79+
get_login.return_value = "cqi"
6980

7081
context = krbContext()
7182

72-
expected_princ = gssapi.Name(get_login.return_value,
73-
gssapi.NameType.user)
74-
self.assertEqual(expected_princ,
75-
context._cleaned_options['principal'])
76-
self.assertEqual(kctx.DEFAULT_CCACHE,
77-
context._cleaned_options['ccache'])
78-
self.assertFalse(context._cleaned_options['using_keytab'])
83+
expected_princ = gssapi.Name(
84+
get_login.return_value, gssapi.NameType.user
85+
)
86+
self.assertEqual(expected_princ, context._cleaned_options["principal"])
87+
self.assertEqual(
88+
kctx.DEFAULT_CCACHE, context._cleaned_options["ccache"]
89+
)
90+
self.assertFalse(context._cleaned_options["using_keytab"])
7991

8092
def test_specify_ccache(self):
81-
context = krbContext(principal='cqi',
82-
ccache_file='/var/app/krb5_ccache')
83-
self.assertEqual('/var/app/krb5_ccache',
84-
context._cleaned_options['ccache'])
93+
context = krbContext(
94+
principal="cqi", ccache_file="/var/app/krb5_ccache"
95+
)
96+
self.assertEqual(
97+
"/var/app/krb5_ccache", context._cleaned_options["ccache"]
98+
)
8599

86100
def test_specify_principal(self):
87-
context = krbContext(principal='cqi')
88-
expected_princ = gssapi.Name('cqi', gssapi.names.NameType.user)
89-
self.assertEqual(expected_princ,
90-
context._cleaned_options['principal'])
101+
context = krbContext(principal="cqi")
102+
expected_princ = gssapi.Name("cqi", gssapi.names.NameType.user)
103+
self.assertEqual(expected_princ, context._cleaned_options["principal"])
91104

92105

93106
class TestInitWithKeytab(unittest.TestCase):
94107
"""Test krbContext.init_with_keytab"""
95108

96109
def setUp(self):
97-
self.service_principal = 'HTTP/hostname@EXAMPLE.COM'
110+
self.service_principal = "HTTP/hostname@EXAMPLE.COM"
98111
self.princ_name = gssapi.Name(
99-
self.service_principal,
100-
gssapi.NameType.kerberos_principal)
112+
self.service_principal, gssapi.NameType.kerberos_principal
113+
)
101114

102-
self.Lock = patch('krbcontext.context.Lock')
115+
self.Lock = patch("krbcontext.context.Lock")
103116
self.Lock.start()
104117

105118
# No need to create a real temp file for tests
106-
self.tmp_dir = '/tmp/test-krbcontext'
107-
self.tmp_ccache = os.path.join(self.tmp_dir, 'ccache')
108-
self.mkdtemp = patch('tempfile.mkdtemp',
109-
return_value=self.tmp_dir)
119+
self.tmp_dir = "/tmp/test-krbcontext"
120+
self.tmp_ccache = os.path.join(self.tmp_dir, "ccache")
121+
self.mkdtemp = patch("tempfile.mkdtemp", return_value=self.tmp_dir)
110122
self.mkdtemp.start()
111123

112-
self.os_close = patch('os.close')
124+
self.os_close = patch("os.close")
113125
self.os_close.start()
114126

115127
def tearDown(self):
116128
self.os_close.stop()
117129
self.mkdtemp.stop()
118130
self.Lock.stop()
119131

120-
@patch('gssapi.Credentials')
132+
@patch("gssapi.Credentials")
121133
def test_cred_not_expired(self, Credentials):
122-
context = krbContext(using_keytab=True,
123-
principal=self.service_principal)
134+
context = krbContext(
135+
using_keytab=True, principal=self.service_principal
136+
)
124137
context.init_with_keytab()
125138

126139
self.assertEqual(1, Credentials.call_count)
127140
Credentials.return_value.store.assert_not_called()
128141

129-
@patch('gssapi.Credentials')
142+
@patch("gssapi.Credentials")
130143
def test_init_in_default_ccache_with_default_keytab(self, Credentials):
131144
type(Credentials.return_value).lifetime = PropertyMock(
132-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
145+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
146+
)
133147

134-
context = krbContext(using_keytab=True,
135-
principal=self.service_principal)
148+
context = krbContext(
149+
using_keytab=True, principal=self.service_principal
150+
)
136151
context.init_with_keytab()
137152

138-
Credentials.assert_has_calls([
139-
call(usage='initiate', name=self.princ_name),
140-
call(usage='initiate', name=self.princ_name,
141-
store={'ccache': self.tmp_ccache}),
142-
])
153+
Credentials.assert_has_calls(
154+
[
155+
call(usage="initiate", name=self.princ_name),
156+
call(
157+
usage="initiate",
158+
name=self.princ_name,
159+
store={"ccache": self.tmp_ccache},
160+
),
161+
]
162+
)
143163
Credentials.return_value.store.assert_called_once_with(
144-
store=None,
145-
usage='initiate',
146-
set_default=True,
147-
overwrite=True)
164+
store=None, usage="initiate", set_default=True, overwrite=True
165+
)
148166

149-
@patch('gssapi.Credentials')
150-
@patch('os.path.exists', return_value=True)
151-
def test_init_in_default_ccache_with_given_keytab(self,
152-
exists,
153-
Credentials):
167+
@patch("gssapi.Credentials")
168+
@patch("os.path.exists", return_value=True)
169+
def test_init_in_default_ccache_with_given_keytab(
170+
self, exists, Credentials
171+
):
154172
type(Credentials.return_value).lifetime = PropertyMock(
155-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
173+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
174+
)
156175

157-
keytab = '/etc/app/app.keytab'
158-
context = krbContext(using_keytab=True,
159-
principal=self.service_principal,
160-
keytab_file=keytab)
176+
keytab = "/etc/app/app.keytab"
177+
context = krbContext(
178+
using_keytab=True,
179+
principal=self.service_principal,
180+
keytab_file=keytab,
181+
)
161182
context.init_with_keytab()
162183

163-
Credentials.assert_has_calls([
164-
call(usage='initiate', name=self.princ_name,
165-
store={'client_keytab': keytab}),
166-
call(usage='initiate', name=self.princ_name,
167-
store={'ccache': self.tmp_ccache, 'client_keytab': keytab}),
168-
call().store(usage='initiate', store=None,
169-
set_default=True, overwrite=True),
170-
])
184+
Credentials.assert_has_calls(
185+
[
186+
call(
187+
usage="initiate",
188+
name=self.princ_name,
189+
store={"client_keytab": keytab},
190+
),
191+
call(
192+
usage="initiate",
193+
name=self.princ_name,
194+
store={"ccache": self.tmp_ccache, "client_keytab": keytab},
195+
),
196+
call().store(
197+
usage="initiate",
198+
store=None,
199+
set_default=True,
200+
overwrite=True,
201+
),
202+
]
203+
)
171204
Credentials.return_value.store.assert_called_once_with(
172-
store=None,
173-
usage='initiate',
174-
set_default=True,
175-
overwrite=True)
205+
store=None, usage="initiate", set_default=True, overwrite=True
206+
)
176207

177-
@patch('gssapi.Credentials')
208+
@patch("gssapi.Credentials")
178209
def test_init_in_given_ccache_with_default_keytab(self, Credentials):
179210
type(Credentials.return_value).lifetime = PropertyMock(
180-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
211+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
212+
)
181213

182-
ccache = '/tmp/mycc'
183-
context = krbContext(using_keytab=True,
184-
principal=self.service_principal,
185-
ccache_file=ccache)
214+
ccache = "/tmp/mycc"
215+
context = krbContext(
216+
using_keytab=True,
217+
principal=self.service_principal,
218+
ccache_file=ccache,
219+
)
186220
context.init_with_keytab()
187221

188-
Credentials.assert_has_calls([
189-
call(usage='initiate', name=self.princ_name,
190-
store={'ccache': ccache}),
191-
call(usage='initiate', name=self.princ_name,
192-
store={'ccache': self.tmp_ccache}),
193-
])
222+
Credentials.assert_has_calls(
223+
[
224+
call(
225+
usage="initiate",
226+
name=self.princ_name,
227+
store={"ccache": ccache},
228+
),
229+
call(
230+
usage="initiate",
231+
name=self.princ_name,
232+
store={"ccache": self.tmp_ccache},
233+
),
234+
]
235+
)
194236
Credentials.return_value.store.assert_called_once_with(
195-
store={'ccache': ccache},
196-
usage='initiate',
237+
store={"ccache": ccache},
238+
usage="initiate",
197239
set_default=True,
198-
overwrite=True)
240+
overwrite=True,
241+
)
199242

200-
@patch('gssapi.Credentials')
201-
@patch('os.path.exists', return_value=True)
243+
@patch("gssapi.Credentials")
244+
@patch("os.path.exists", return_value=True)
202245
def test_init_with_given_keytab_and_ccache(self, exists, Credentials):
203246
type(Credentials.return_value).lifetime = PropertyMock(
204-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
205-
206-
keytab = '/etc/app/app.keytab'
207-
ccache = '/tmp/mycc'
208-
context = krbContext(using_keytab=True,
209-
principal=self.service_principal,
210-
keytab_file=keytab,
211-
ccache_file=ccache)
247+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
248+
)
249+
250+
keytab = "/etc/app/app.keytab"
251+
ccache = "/tmp/mycc"
252+
context = krbContext(
253+
using_keytab=True,
254+
principal=self.service_principal,
255+
keytab_file=keytab,
256+
ccache_file=ccache,
257+
)
212258
context.init_with_keytab()
213259

214-
Credentials.assert_has_calls([
215-
call(usage='initiate', name=self.princ_name,
216-
store={'client_keytab': keytab, 'ccache': ccache}),
217-
call(usage='initiate', name=self.princ_name,
218-
store={'client_keytab': keytab, 'ccache': self.tmp_ccache}),
219-
])
260+
Credentials.assert_has_calls(
261+
[
262+
call(
263+
usage="initiate",
264+
name=self.princ_name,
265+
store={"client_keytab": keytab, "ccache": ccache},
266+
),
267+
call(
268+
usage="initiate",
269+
name=self.princ_name,
270+
store={"client_keytab": keytab, "ccache": self.tmp_ccache},
271+
),
272+
]
273+
)
220274
Credentials.return_value.store.assert_called_once_with(
221-
store={'ccache': ccache},
222-
usage='initiate',
275+
store={"ccache": ccache},
276+
usage="initiate",
223277
set_default=True,
224-
overwrite=True)
278+
overwrite=True,
279+
)
225280

226281

227282
class TestInitWithPassword(unittest.TestCase):
228283
"""Test krbContext.init_with_password"""
229284

230285
def setUp(self):
231-
self.principal = 'cqi'
286+
self.principal = "cqi"
232287
self.princ_name = gssapi.Name(self.principal, gssapi.NameType.user)
233288

234-
@patch('gssapi.Credentials')
235-
@patch('gssapi.raw.acquire_cred_with_password')
236-
@patch('gssapi.raw.store_cred_into')
289+
@patch("gssapi.Credentials")
290+
@patch("gssapi.raw.acquire_cred_with_password")
291+
@patch("gssapi.raw.store_cred_into")
237292
def test_no_need_init_if_not_expired(
238-
self, store_cred_into, acquire_cred_with_password, Credentials):
239-
context = krbContext(using_keytab=False,
240-
principal=self.principal,
241-
password='security')
293+
self, store_cred_into, acquire_cred_with_password, Credentials
294+
):
295+
context = krbContext(
296+
using_keytab=False, principal=self.principal, password="security"
297+
)
242298
context.init_with_password()
243299

244300
self.assertEqual(1, Credentials.call_count)
245301
store_cred_into.assert_not_called()
246302
acquire_cred_with_password.assert_not_called()
247303

248-
@patch('gssapi.Credentials')
249-
@patch('gssapi.raw.acquire_cred_with_password')
250-
@patch('gssapi.raw.store_cred')
304+
@patch("gssapi.Credentials")
305+
@patch("gssapi.raw.acquire_cred_with_password")
306+
@patch("gssapi.raw.store_cred")
251307
def test_init_in_default_ccache(
252-
self, store_cred, acquire_cred_with_password, Credentials):
308+
self, store_cred, acquire_cred_with_password, Credentials
309+
):
253310
type(Credentials.return_value).lifetime = PropertyMock(
254-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
311+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
312+
)
255313

256-
context = krbContext(using_keytab=False,
257-
principal=self.principal,
258-
password='security')
314+
context = krbContext(
315+
using_keytab=False, principal=self.principal, password="security"
316+
)
259317
context.init_with_password()
260318

261319
acquire_cred_with_password.assert_called_once_with(
262-
self.princ_name, b'security')
320+
self.princ_name, b"security"
321+
)
263322

264323
store_cred.assert_called_once_with(
265324
acquire_cred_with_password.return_value.creds,
266-
usage='initiate', overwrite=True, set_default=True
325+
usage="initiate",
326+
overwrite=True,
327+
set_default=True,
267328
)
268329

269-
@patch('gssapi.Credentials')
270-
@patch('gssapi.raw.acquire_cred_with_password')
271-
@patch('gssapi.raw.store_cred_into')
330+
@patch("gssapi.Credentials")
331+
@patch("gssapi.raw.acquire_cred_with_password")
332+
@patch("gssapi.raw.store_cred_into")
272333
def test_init_in_given_ccache(
273-
self, store_cred_into, acquire_cred_with_password, Credentials):
334+
self, store_cred_into, acquire_cred_with_password, Credentials
335+
):
274336
type(Credentials.return_value).lifetime = PropertyMock(
275-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
337+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
338+
)
276339

277-
ccache = '/tmp/mycc'
278-
context = krbContext(using_keytab=False,
279-
principal=self.principal,
280-
ccache_file=ccache,
281-
password='security')
340+
ccache = "/tmp/mycc"
341+
context = krbContext(
342+
using_keytab=False,
343+
principal=self.principal,
344+
ccache_file=ccache,
345+
password="security",
346+
)
282347
context.init_with_password()
283348

284349
Credentials.assert_called_once_with(
285-
usage='initiate',
286-
name=self.princ_name,
287-
store={'ccache': ccache})
350+
usage="initiate", name=self.princ_name, store={"ccache": ccache}
351+
)
288352

289353
acquire_cred_with_password.assert_called_once_with(
290-
self.princ_name, b'security')
354+
self.princ_name, b"security"
355+
)
291356

292357
store_cred_into.assert_called_once_with(
293-
{'ccache': '/tmp/mycc'},
358+
{"ccache": "/tmp/mycc"},
294359
acquire_cred_with_password.return_value.creds,
295-
usage='initiate',
296-
overwrite=True)
297-
298-
@patch('gssapi.Credentials')
299-
@patch('sys.stdin.isatty', return_value=True)
300-
@patch('getpass.getpass')
301-
@patch('gssapi.raw.acquire_cred_with_password')
302-
@patch('gssapi.raw.store_cred')
303-
def test_init_cred_with_need_enter_password(self, store_cred,
304-
acquire_cred_with_password,
305-
getpass, isatty,
306-
Credentials):
360+
usage="initiate",
361+
overwrite=True,
362+
)
363+
364+
@patch("gssapi.Credentials")
365+
@patch("sys.stdin.isatty", return_value=True)
366+
@patch("getpass.getpass")
367+
@patch("gssapi.raw.acquire_cred_with_password")
368+
@patch("gssapi.raw.store_cred")
369+
def test_init_cred_with_need_enter_password(
370+
self,
371+
store_cred,
372+
acquire_cred_with_password,
373+
getpass,
374+
isatty,
375+
Credentials,
376+
):
307377
type(Credentials.return_value).lifetime = PropertyMock(
308-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
309-
getpass.return_value = 'mypassword'
378+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
379+
)
380+
getpass.return_value = "mypassword"
310381

311382
context = krbContext(using_keytab=False, principal=self.principal)
312383
context.init_with_password()
@@ -315,31 +386,36 @@ def test_init_cred_with_need_enter_password(self, store_cred,
315386
# Ensure this must be called.
316387
getpass.assert_called_once()
317388

318-
Credentials.assert_called_once_with(usage='initiate',
319-
name=self.princ_name)
389+
Credentials.assert_called_once_with(
390+
usage="initiate", name=self.princ_name
391+
)
320392

321393
acquire_cred_with_password.assert_called_once_with(
322-
self.princ_name, b'mypassword')
394+
self.princ_name, b"mypassword"
395+
)
323396

324397
store_cred.assert_called_once_with(
325398
acquire_cred_with_password.return_value.creds,
326-
usage='initiate', overwrite=True, set_default=True
399+
usage="initiate",
400+
overwrite=True,
401+
set_default=True,
327402
)
328403

329-
@patch('gssapi.Credentials')
330-
@patch('sys.stdin.isatty', return_value=False)
331-
def test_init_with_entering_password_but_not_in_atty(self,
332-
isatty,
333-
Credentials):
404+
@patch("gssapi.Credentials")
405+
@patch("sys.stdin.isatty", return_value=False)
406+
def test_init_with_entering_password_but_not_in_atty(
407+
self, isatty, Credentials
408+
):
334409
type(Credentials.return_value).lifetime = PropertyMock(
335-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
410+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
411+
)
336412

337413
context = krbContext(using_keytab=False, principal=self.principal)
338414
self.assertRaises(IOError, context.init_with_password)
339415

340-
context = krbContext(using_keytab=False,
341-
principal=self.principal,
342-
password='')
416+
context = krbContext(
417+
using_keytab=False, principal=self.principal, password=""
418+
)
343419
self.assertRaises(IOError, context.init_with_password)
344420

345421

@@ -348,121 +424,138 @@ class TestKrbContextManager(unittest.TestCase):
348424

349425
def setUp(self):
350426
# Do not actually operate threading lock
351-
self.init_Lock = patch('krbcontext.context.Lock')
427+
self.init_Lock = patch("krbcontext.context.Lock")
352428
self.init_Lock.start()
353429

354430
def tearDown(self):
355431
self.init_Lock.stop()
356432

357-
@patch('gssapi.Credentials')
358-
@patch.dict('os.environ', {}, clear=True)
433+
@patch("gssapi.Credentials")
434+
@patch.dict("os.environ", {}, clear=True)
359435
def test_init_with_default_keytab(self, Credentials):
360436
type(Credentials.return_value).lifetime = PropertyMock(
361-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
362-
363-
with krbContext(using_keytab=True,
364-
principal='app/hostname@EXAMPLE.COM',
365-
ccache_file='/tmp/my_cc'):
366-
self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME'])
437+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
438+
)
367439

368-
@patch('gssapi.Credentials')
369-
@patch('gssapi.raw.acquire_cred_with_password')
370-
@patch('gssapi.raw.store_cred')
371-
@patch.dict('os.environ', {}, clear=True)
440+
with krbContext(
441+
using_keytab=True,
442+
principal="app/hostname@EXAMPLE.COM",
443+
ccache_file="/tmp/my_cc",
444+
):
445+
self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"])
446+
447+
@patch("gssapi.Credentials")
448+
@patch("gssapi.raw.acquire_cred_with_password")
449+
@patch("gssapi.raw.store_cred")
450+
@patch.dict("os.environ", {}, clear=True)
372451
def test_init_in_default_ccache_with_password(
373-
self, store_cred, acquire_cred_with_password, Credentials):
452+
self, store_cred, acquire_cred_with_password, Credentials
453+
):
374454
type(Credentials.return_value).lifetime = PropertyMock(
375-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
455+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
456+
)
376457

377-
with krbContext(using_keytab=False,
378-
principal='cqi',
379-
password='security'):
380-
self.assertNotIn('KRB5CCNAME', os.environ)
458+
with krbContext(
459+
using_keytab=False, principal="cqi", password="security"
460+
):
461+
self.assertNotIn("KRB5CCNAME", os.environ)
381462

382-
self.assertNotIn('KRB5CCNAME', os.environ)
463+
self.assertNotIn("KRB5CCNAME", os.environ)
383464

384-
@patch('gssapi.Credentials')
385-
@patch('gssapi.raw.acquire_cred_with_password')
386-
@patch('gssapi.raw.store_cred')
387-
@patch.dict('os.environ', {'KRB5CCNAME': '/tmp/my_cc'}, clear=True)
465+
@patch("gssapi.Credentials")
466+
@patch("gssapi.raw.acquire_cred_with_password")
467+
@patch("gssapi.raw.store_cred")
468+
@patch.dict("os.environ", {"KRB5CCNAME": "/tmp/my_cc"}, clear=True)
388469
def test_after_init_in_default_ccache_original_ccache_should_be_restored(
389-
self, store_cred, acquire_cred_with_password, Credentials):
470+
self, store_cred, acquire_cred_with_password, Credentials
471+
):
390472
type(Credentials.return_value).lifetime = PropertyMock(
391-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
473+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
474+
)
392475

393-
with krbContext(using_keytab=False,
394-
principal='cqi',
395-
password='security'):
396-
self.assertNotIn('KRB5CCNAME', os.environ)
476+
with krbContext(
477+
using_keytab=False, principal="cqi", password="security"
478+
):
479+
self.assertNotIn("KRB5CCNAME", os.environ)
397480

398-
self.assertIn('KRB5CCNAME', os.environ)
399-
self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME'])
481+
self.assertIn("KRB5CCNAME", os.environ)
482+
self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"])
400483

401-
@patch('gssapi.Credentials')
402-
@patch.dict('os.environ', {'KRB5CCNAME': '/tmp/my_cc'}, clear=True)
484+
@patch("gssapi.Credentials")
485+
@patch.dict("os.environ", {"KRB5CCNAME": "/tmp/my_cc"}, clear=True)
403486
def test_original_ccache_should_be_restored(self, Credentials):
404487
type(Credentials.return_value).lifetime = PropertyMock(
405-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
488+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
489+
)
406490

407-
with krbContext(using_keytab=True,
408-
principal='app/hostname@EXAMPLE.COM',
409-
ccache_file='/tmp/app_pid_cc'):
491+
with krbContext(
492+
using_keytab=True,
493+
principal="app/hostname@EXAMPLE.COM",
494+
ccache_file="/tmp/app_pid_cc",
495+
):
410496
# Inside context, given ccache should be used.
411-
self.assertEqual('/tmp/app_pid_cc', os.environ['KRB5CCNAME'])
497+
self.assertEqual("/tmp/app_pid_cc", os.environ["KRB5CCNAME"])
412498

413-
self.assertIn('KRB5CCNAME', os.environ)
414-
self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME'])
499+
self.assertIn("KRB5CCNAME", os.environ)
500+
self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"])
415501

416-
@patch('gssapi.Credentials')
417-
@patch.dict('os.environ', {}, clear=True)
502+
@patch("gssapi.Credentials")
503+
@patch.dict("os.environ", {}, clear=True)
418504
def test_init_in_default_ccache_without_original_krb5ccname_is_set(
419-
self, Credentials):
505+
self, Credentials
506+
):
420507
type(Credentials.return_value).lifetime = PropertyMock(
421-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
508+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
509+
)
422510

423-
with krbContext(using_keytab=True,
424-
principal='app/hostname@EXAMPLE.COM'):
425-
self.assertNotIn('KRB5CCNAME', os.environ)
511+
with krbContext(
512+
using_keytab=True, principal="app/hostname@EXAMPLE.COM"
513+
):
514+
self.assertNotIn("KRB5CCNAME", os.environ)
426515

427516
# Originally, no KRB5CCNAME is set, it should be cleaned after exit.
428-
self.assertNotIn('KRB5CCNAME', os.environ)
517+
self.assertNotIn("KRB5CCNAME", os.environ)
429518

430-
@patch('gssapi.Credentials')
431-
@patch.dict('os.environ', {'KRB5CCNAME': '/tmp/my_cc'}, clear=True)
519+
@patch("gssapi.Credentials")
520+
@patch.dict("os.environ", {"KRB5CCNAME": "/tmp/my_cc"}, clear=True)
432521
def test_init_in_default_ccache_and_original_krb5ccname_is_set(
433-
self, Credentials):
522+
self, Credentials
523+
):
434524
type(Credentials.return_value).lifetime = PropertyMock(
435-
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
525+
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)
526+
)
436527

437-
with krbContext(using_keytab=True,
438-
principal='app/hostname@EXAMPLE.COM'):
439-
self.assertNotIn('KRB5CCNAME', os.environ)
528+
with krbContext(
529+
using_keytab=True, principal="app/hostname@EXAMPLE.COM"
530+
):
531+
self.assertNotIn("KRB5CCNAME", os.environ)
440532

441-
self.assertIn('KRB5CCNAME', os.environ)
442-
self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME'])
533+
self.assertIn("KRB5CCNAME", os.environ)
534+
self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"])
443535

444-
@patch('gssapi.Credentials')
445-
@patch.dict(os.environ, {'KRB5CCNAME': '/tmp/my_cc'}, clear=True)
536+
@patch("gssapi.Credentials")
537+
@patch.dict(os.environ, {"KRB5CCNAME": "/tmp/my_cc"}, clear=True)
446538
def test_do_nothing_if_unnecessary_to_init(self, Credentials):
447-
with krbContext(using_keytab=True,
448-
principal='app/hostname@EXAMPLE.COM'):
539+
with krbContext(
540+
using_keytab=True, principal="app/hostname@EXAMPLE.COM"
541+
):
449542
# Nothing is changed, but original KRB5CCNAME must be removed
450543
# since default ccache is used.
451-
self.assertNotIn('KRB5CCNAME', os.environ)
544+
self.assertNotIn("KRB5CCNAME", os.environ)
452545

453546
# Original ccache must be restored.
454-
self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME'])
547+
self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"])
455548

456549

457550
class TestGetLogin(unittest.TestCase):
458551
"""Test get_login"""
459552

460-
@patch('os.getuid', return_value=1001)
461-
@patch('pwd.getpwuid')
553+
@patch("os.getuid", return_value=1001)
554+
@patch("pwd.getpwuid")
462555
def test_get_login(self, getpwuid, getuid):
463-
getpwuid.return_value.pw_name = 'user'
556+
getpwuid.return_value.pw_name = "user"
464557

465558
user = get_login()
466559

467560
getpwuid.assert_called_once_with(getuid.return_value)
468-
self.assertEqual('user', user)
561+
self.assertEqual("user", user)

‎tox.ini

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[tox]
2-
envlist = py36,py37,py38,py39,flake8
2+
envlist = py36,py37,py38,py39,black,flake8
33

44
[testenv]
55
usedevelop = True
@@ -10,3 +10,7 @@ commands = python3 -m pytest {posargs}
1010
[testenv:flake8]
1111
deps = flake8
1212
commands = flake8 krbcontext/ test/
13+
14+
[testenv:black]
15+
deps = black
16+
commands = black --check --diff --line-length 79 krbcontext/ test/

0 commit comments

Comments
 (0)
This repository has been archived.