diff --git a/krbcontext/context.py b/krbcontext/context.py index 0798ca0..55508e2 100644 --- a/krbcontext/context.py +++ b/krbcontext/context.py @@ -27,12 +27,12 @@ from threading import Lock -__all__ = ('krbContext',) +__all__ = ("krbContext",) -DEFAULT_CCACHE = 'DEFAULT_CCACHE' -DEFAULT_KEYTAB = 'DEFAULT_KEYTAB' -ENV_KRB5CCNAME = 'KRB5CCNAME' +DEFAULT_CCACHE = "DEFAULT_CCACHE" +DEFAULT_KEYTAB = "DEFAULT_KEYTAB" +ENV_KRB5CCNAME = "KRB5CCNAME" def get_login(): @@ -60,8 +60,14 @@ class krbContext(object): krbContext can work with ``with`` statement to simplify your code. """ - def __init__(self, using_keytab=False, principal=None, keytab_file=None, - ccache_file=None, password=None): + def __init__( + self, + using_keytab=False, + principal=None, + keytab_file=None, + ccache_file=None, + password=None, + ): """Initialize context :param bool using_keytab: indicate whether to initialize credential @@ -82,20 +88,26 @@ def __init__(self, using_keytab=False, principal=None, keytab_file=None, omitted, program will be blocked and prompts to enter a password from command line, which requires program runs in a terminal. """ - self._cleaned_options = self.clean_options(using_keytab=using_keytab, - principal=principal, - keytab_file=keytab_file, - ccache_file=ccache_file, - password=password) + self._cleaned_options = self.clean_options( + using_keytab=using_keytab, + principal=principal, + keytab_file=keytab_file, + ccache_file=ccache_file, + password=password, + ) self._original_krb5ccname = None self._inited = False self._init_lock = Lock() - def clean_options(self, - using_keytab=False, principal=None, - keytab_file=None, ccache_file=None, - password=None): + def clean_options( + self, + using_keytab=False, + principal=None, + keytab_file=None, + ccache_file=None, + password=None, + ): """Clean argument to related object :param bool using_keytab: refer to ``krbContext.__init__``. @@ -114,42 +126,43 @@ def clean_options(self, if using_keytab: if principal is None: - raise ValueError('Principal is required when using key table.') + raise ValueError("Principal is required when using key table.") princ_name = gssapi.Name( - principal, gssapi.NameType.kerberos_principal) + principal, gssapi.NameType.kerberos_principal + ) if keytab_file is None: - cleaned['keytab'] = DEFAULT_KEYTAB + cleaned["keytab"] = DEFAULT_KEYTAB elif not os.path.exists(keytab_file): - raise ValueError(f'Keytab file {keytab_file} does not exist.') + raise ValueError(f"Keytab file {keytab_file} does not exist.") else: - cleaned['keytab'] = keytab_file + cleaned["keytab"] = keytab_file else: if principal is None: principal = get_login() princ_name = gssapi.Name(principal, gssapi.NameType.user) - cleaned['using_keytab'] = using_keytab - cleaned['principal'] = princ_name - cleaned['ccache'] = ccache_file or DEFAULT_CCACHE - cleaned['password'] = password + cleaned["using_keytab"] = using_keytab + cleaned["principal"] = princ_name + cleaned["ccache"] = ccache_file or DEFAULT_CCACHE + cleaned["password"] = password return cleaned def init_with_keytab(self): """Initialize credential cache with keytab""" creds_opts = { - 'usage': 'initiate', - 'name': self._cleaned_options['principal'], + "usage": "initiate", + "name": self._cleaned_options["principal"], } store = {} - if self._cleaned_options['keytab'] != DEFAULT_KEYTAB: - store['client_keytab'] = self._cleaned_options['keytab'] - if self._cleaned_options['ccache'] != DEFAULT_CCACHE: - store['ccache'] = self._cleaned_options['ccache'] + if self._cleaned_options["keytab"] != DEFAULT_KEYTAB: + store["client_keytab"] = self._cleaned_options["keytab"] + if self._cleaned_options["ccache"] != DEFAULT_CCACHE: + store["ccache"] = self._cleaned_options["ccache"] if store: - creds_opts['store'] = store + creds_opts["store"] = store creds = gssapi.Credentials(**creds_opts) try: @@ -157,20 +170,24 @@ def init_with_keytab(self): except gssapi.exceptions.ExpiredCredentialsError: new_creds_opts = copy.deepcopy(creds_opts) # Get new credential and put it into a temporary ccache - temp_directory = tempfile.mkdtemp('-krbcontext') - temp_ccache = os.path.join(temp_directory, 'ccache') + temp_directory = tempfile.mkdtemp("-krbcontext") + temp_ccache = os.path.join(temp_directory, "ccache") try: - new_creds_opts.setdefault('store', {})['ccache'] = temp_ccache + new_creds_opts.setdefault("store", {})["ccache"] = temp_ccache creds = gssapi.Credentials(**new_creds_opts) # Then, store new credential back to original specified ccache, # whatever a given ccache file or the default one. _store = None # If default ccache is used, no need to specify ccache in # store parameter passed to ``creds.store``. - if self._cleaned_options['ccache'] != DEFAULT_CCACHE: - _store = {'ccache': store['ccache']} - creds.store(usage='initiate', store=_store, set_default=True, - overwrite=True) + if self._cleaned_options["ccache"] != DEFAULT_CCACHE: + _store = {"ccache": store["ccache"]} + creds.store( + usage="initiate", + store=_store, + set_default=True, + overwrite=True, + ) finally: shutil.rmtree(temp_directory, ignore_errors=True) @@ -188,24 +205,25 @@ def init_with_password(self): line but no attry is available. """ creds_opts = { - 'usage': 'initiate', - 'name': self._cleaned_options['principal'], + "usage": "initiate", + "name": self._cleaned_options["principal"], } - if self._cleaned_options['ccache'] != DEFAULT_CCACHE: - creds_opts['store'] = {'ccache': self._cleaned_options['ccache']} + if self._cleaned_options["ccache"] != DEFAULT_CCACHE: + creds_opts["store"] = {"ccache": self._cleaned_options["ccache"]} cred = gssapi.Credentials(**creds_opts) try: cred.lifetime except gssapi.exceptions.ExpiredCredentialsError: - password = self._cleaned_options['password'] + password = self._cleaned_options["password"] if not password: if not sys.stdin.isatty(): raise IOError( - 'krbContext is not running from a terminal. So, you ' - 'need to run kinit with your principal manually before' - ' anything goes.') + "krbContext is not running from a terminal. So, you " + "need to run kinit with your principal manually before" + " anything goes." + ) # If there is no password specified via API call, prompt to # enter one in order to continue to get credential. BUT, in @@ -218,19 +236,24 @@ def init_with_password(self): password = getpass.getpass() cred = gssapi.raw.acquire_cred_with_password( - self._cleaned_options['principal'], password.encode('utf-8')) + self._cleaned_options["principal"], password.encode("utf-8") + ) - ccache = self._cleaned_options['ccache'] + ccache = self._cleaned_options["ccache"] if ccache == DEFAULT_CCACHE: gssapi.raw.store_cred( cred.creds, - usage='initiate', overwrite=True, set_default=True, + usage="initiate", + overwrite=True, + set_default=True, ) else: - gssapi.raw.store_cred_into({'ccache': ccache}, - cred.creds, - usage='initiate', - overwrite=True) + gssapi.raw.store_cred_into( + {"ccache": ccache}, + cred.creds, + usage="initiate", + overwrite=True, + ) def _prepare_context(self): """Prepare context @@ -242,7 +265,7 @@ def _prepare_context(self): Internal use only. """ - ccache = self._cleaned_options['ccache'] + ccache = self._cleaned_options["ccache"] # Whatever there is KRB5CCNAME was set in current process, # original_krb5ccname will contain current value even if None if @@ -260,7 +283,7 @@ def _prepare_context(self): # us point to the given ccache by KRB5CCNAME. os.environ[ENV_KRB5CCNAME] = ccache - if self._cleaned_options['using_keytab']: + if self._cleaned_options["using_keytab"]: self.init_with_keytab() else: self.init_with_password() @@ -281,7 +304,7 @@ def __exit__(self, exc_type, exc_value, traceback): restored correctly, if there was. And, lock gets released as well. """ try: - if self._cleaned_options['ccache'] == DEFAULT_CCACHE: + if self._cleaned_options["ccache"] == DEFAULT_CCACHE: if self._original_krb5ccname: os.environ[ENV_KRB5CCNAME] = self._original_krb5ccname else: diff --git a/test/test_krbcontext.py b/test/test_krbcontext.py index 779fd29..87d04d7 100644 --- a/test/test_krbcontext.py +++ b/test/test_krbcontext.py @@ -19,97 +19,109 @@ def test_missing_principal(self): self.assertRaises(ValueError, krbContext, using_keytab=True) def test_all_defaults(self): - context = krbContext(using_keytab=True, - principal='HTTP/hostname@EXAMPLE.COM') + context = krbContext( + using_keytab=True, principal="HTTP/hostname@EXAMPLE.COM" + ) - self.assertTrue(context._cleaned_options['using_keytab']) + self.assertTrue(context._cleaned_options["using_keytab"]) expected_princ = gssapi.Name( - 'HTTP/hostname@EXAMPLE.COM', - gssapi.NameType.kerberos_principal) - self.assertEqual(expected_princ, context._cleaned_options['principal']) - self.assertEqual(kctx.DEFAULT_CCACHE, - context._cleaned_options['ccache']) - self.assertEqual(kctx.DEFAULT_KEYTAB, - context._cleaned_options['keytab']) - - @patch('os.path.exists') + "HTTP/hostname@EXAMPLE.COM", gssapi.NameType.kerberos_principal + ) + self.assertEqual(expected_princ, context._cleaned_options["principal"]) + self.assertEqual( + kctx.DEFAULT_CCACHE, context._cleaned_options["ccache"] + ) + self.assertEqual( + kctx.DEFAULT_KEYTAB, context._cleaned_options["keytab"] + ) + + @patch("os.path.exists") def test_specify_existing_keytab(self, exists): exists.return_value = True - context = krbContext(using_keytab=True, - principal='HTTP/hostname@EXAMPLE.COM', - keytab_file='/etc/app/app.keytab') - self.assertEqual('/etc/app/app.keytab', - context._cleaned_options['keytab']) + context = krbContext( + using_keytab=True, + principal="HTTP/hostname@EXAMPLE.COM", + keytab_file="/etc/app/app.keytab", + ) + self.assertEqual( + "/etc/app/app.keytab", context._cleaned_options["keytab"] + ) - @patch('os.path.exists') + @patch("os.path.exists") def test_specify_nonexisting_keytab(self, exists): exists.return_value = False - self.assertRaises(ValueError, - krbContext, - using_keytab=True, - principal='HTTP/hostname@EXAMPLE.COM', - keytab_file='/etc/app/app.keytab') + self.assertRaises( + ValueError, + krbContext, + using_keytab=True, + principal="HTTP/hostname@EXAMPLE.COM", + keytab_file="/etc/app/app.keytab", + ) def test_specify_ccache(self): - context = krbContext(using_keytab=True, - principal='HTTP/hostname@EXAMPLE.COM', - ccache_file='/var/app/krb5_ccache') - self.assertEqual('/var/app/krb5_ccache', - context._cleaned_options['ccache']) + context = krbContext( + using_keytab=True, + principal="HTTP/hostname@EXAMPLE.COM", + ccache_file="/var/app/krb5_ccache", + ) + self.assertEqual( + "/var/app/krb5_ccache", context._cleaned_options["ccache"] + ) class CleanArgumentsAsRegularUserTest(unittest.TestCase): """Test clean_context_options for not using keytab""" - @patch('krbcontext.context.get_login') + @patch("krbcontext.context.get_login") def test_all_defaults(self, get_login): - get_login.return_value = 'cqi' + get_login.return_value = "cqi" context = krbContext() - expected_princ = gssapi.Name(get_login.return_value, - gssapi.NameType.user) - self.assertEqual(expected_princ, - context._cleaned_options['principal']) - self.assertEqual(kctx.DEFAULT_CCACHE, - context._cleaned_options['ccache']) - self.assertFalse(context._cleaned_options['using_keytab']) + expected_princ = gssapi.Name( + get_login.return_value, gssapi.NameType.user + ) + self.assertEqual(expected_princ, context._cleaned_options["principal"]) + self.assertEqual( + kctx.DEFAULT_CCACHE, context._cleaned_options["ccache"] + ) + self.assertFalse(context._cleaned_options["using_keytab"]) def test_specify_ccache(self): - context = krbContext(principal='cqi', - ccache_file='/var/app/krb5_ccache') - self.assertEqual('/var/app/krb5_ccache', - context._cleaned_options['ccache']) + context = krbContext( + principal="cqi", ccache_file="/var/app/krb5_ccache" + ) + self.assertEqual( + "/var/app/krb5_ccache", context._cleaned_options["ccache"] + ) def test_specify_principal(self): - context = krbContext(principal='cqi') - expected_princ = gssapi.Name('cqi', gssapi.names.NameType.user) - self.assertEqual(expected_princ, - context._cleaned_options['principal']) + context = krbContext(principal="cqi") + expected_princ = gssapi.Name("cqi", gssapi.names.NameType.user) + self.assertEqual(expected_princ, context._cleaned_options["principal"]) class TestInitWithKeytab(unittest.TestCase): """Test krbContext.init_with_keytab""" def setUp(self): - self.service_principal = 'HTTP/hostname@EXAMPLE.COM' + self.service_principal = "HTTP/hostname@EXAMPLE.COM" self.princ_name = gssapi.Name( - self.service_principal, - gssapi.NameType.kerberos_principal) + self.service_principal, gssapi.NameType.kerberos_principal + ) - self.Lock = patch('krbcontext.context.Lock') + self.Lock = patch("krbcontext.context.Lock") self.Lock.start() # No need to create a real temp file for tests - self.tmp_dir = '/tmp/test-krbcontext' - self.tmp_ccache = os.path.join(self.tmp_dir, 'ccache') - self.mkdtemp = patch('tempfile.mkdtemp', - return_value=self.tmp_dir) + self.tmp_dir = "/tmp/test-krbcontext" + self.tmp_ccache = os.path.join(self.tmp_dir, "ccache") + self.mkdtemp = patch("tempfile.mkdtemp", return_value=self.tmp_dir) self.mkdtemp.start() - self.os_close = patch('os.close') + self.os_close = patch("os.close") self.os_close.start() def tearDown(self): @@ -117,196 +129,255 @@ def tearDown(self): self.mkdtemp.stop() self.Lock.stop() - @patch('gssapi.Credentials') + @patch("gssapi.Credentials") def test_cred_not_expired(self, Credentials): - context = krbContext(using_keytab=True, - principal=self.service_principal) + context = krbContext( + using_keytab=True, principal=self.service_principal + ) context.init_with_keytab() self.assertEqual(1, Credentials.call_count) Credentials.return_value.store.assert_not_called() - @patch('gssapi.Credentials') + @patch("gssapi.Credentials") def test_init_in_default_ccache_with_default_keytab(self, Credentials): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - context = krbContext(using_keytab=True, - principal=self.service_principal) + context = krbContext( + using_keytab=True, principal=self.service_principal + ) context.init_with_keytab() - Credentials.assert_has_calls([ - call(usage='initiate', name=self.princ_name), - call(usage='initiate', name=self.princ_name, - store={'ccache': self.tmp_ccache}), - ]) + Credentials.assert_has_calls( + [ + call(usage="initiate", name=self.princ_name), + call( + usage="initiate", + name=self.princ_name, + store={"ccache": self.tmp_ccache}, + ), + ] + ) Credentials.return_value.store.assert_called_once_with( - store=None, - usage='initiate', - set_default=True, - overwrite=True) + store=None, usage="initiate", set_default=True, overwrite=True + ) - @patch('gssapi.Credentials') - @patch('os.path.exists', return_value=True) - def test_init_in_default_ccache_with_given_keytab(self, - exists, - Credentials): + @patch("gssapi.Credentials") + @patch("os.path.exists", return_value=True) + def test_init_in_default_ccache_with_given_keytab( + self, exists, Credentials + ): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - keytab = '/etc/app/app.keytab' - context = krbContext(using_keytab=True, - principal=self.service_principal, - keytab_file=keytab) + keytab = "/etc/app/app.keytab" + context = krbContext( + using_keytab=True, + principal=self.service_principal, + keytab_file=keytab, + ) context.init_with_keytab() - Credentials.assert_has_calls([ - call(usage='initiate', name=self.princ_name, - store={'client_keytab': keytab}), - call(usage='initiate', name=self.princ_name, - store={'ccache': self.tmp_ccache, 'client_keytab': keytab}), - call().store(usage='initiate', store=None, - set_default=True, overwrite=True), - ]) + Credentials.assert_has_calls( + [ + call( + usage="initiate", + name=self.princ_name, + store={"client_keytab": keytab}, + ), + call( + usage="initiate", + name=self.princ_name, + store={"ccache": self.tmp_ccache, "client_keytab": keytab}, + ), + call().store( + usage="initiate", + store=None, + set_default=True, + overwrite=True, + ), + ] + ) Credentials.return_value.store.assert_called_once_with( - store=None, - usage='initiate', - set_default=True, - overwrite=True) + store=None, usage="initiate", set_default=True, overwrite=True + ) - @patch('gssapi.Credentials') + @patch("gssapi.Credentials") def test_init_in_given_ccache_with_default_keytab(self, Credentials): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - ccache = '/tmp/mycc' - context = krbContext(using_keytab=True, - principal=self.service_principal, - ccache_file=ccache) + ccache = "/tmp/mycc" + context = krbContext( + using_keytab=True, + principal=self.service_principal, + ccache_file=ccache, + ) context.init_with_keytab() - Credentials.assert_has_calls([ - call(usage='initiate', name=self.princ_name, - store={'ccache': ccache}), - call(usage='initiate', name=self.princ_name, - store={'ccache': self.tmp_ccache}), - ]) + Credentials.assert_has_calls( + [ + call( + usage="initiate", + name=self.princ_name, + store={"ccache": ccache}, + ), + call( + usage="initiate", + name=self.princ_name, + store={"ccache": self.tmp_ccache}, + ), + ] + ) Credentials.return_value.store.assert_called_once_with( - store={'ccache': ccache}, - usage='initiate', + store={"ccache": ccache}, + usage="initiate", set_default=True, - overwrite=True) + overwrite=True, + ) - @patch('gssapi.Credentials') - @patch('os.path.exists', return_value=True) + @patch("gssapi.Credentials") + @patch("os.path.exists", return_value=True) def test_init_with_given_keytab_and_ccache(self, exists, Credentials): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) - - keytab = '/etc/app/app.keytab' - ccache = '/tmp/mycc' - context = krbContext(using_keytab=True, - principal=self.service_principal, - keytab_file=keytab, - ccache_file=ccache) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) + + keytab = "/etc/app/app.keytab" + ccache = "/tmp/mycc" + context = krbContext( + using_keytab=True, + principal=self.service_principal, + keytab_file=keytab, + ccache_file=ccache, + ) context.init_with_keytab() - Credentials.assert_has_calls([ - call(usage='initiate', name=self.princ_name, - store={'client_keytab': keytab, 'ccache': ccache}), - call(usage='initiate', name=self.princ_name, - store={'client_keytab': keytab, 'ccache': self.tmp_ccache}), - ]) + Credentials.assert_has_calls( + [ + call( + usage="initiate", + name=self.princ_name, + store={"client_keytab": keytab, "ccache": ccache}, + ), + call( + usage="initiate", + name=self.princ_name, + store={"client_keytab": keytab, "ccache": self.tmp_ccache}, + ), + ] + ) Credentials.return_value.store.assert_called_once_with( - store={'ccache': ccache}, - usage='initiate', + store={"ccache": ccache}, + usage="initiate", set_default=True, - overwrite=True) + overwrite=True, + ) class TestInitWithPassword(unittest.TestCase): """Test krbContext.init_with_password""" def setUp(self): - self.principal = 'cqi' + self.principal = "cqi" self.princ_name = gssapi.Name(self.principal, gssapi.NameType.user) - @patch('gssapi.Credentials') - @patch('gssapi.raw.acquire_cred_with_password') - @patch('gssapi.raw.store_cred_into') + @patch("gssapi.Credentials") + @patch("gssapi.raw.acquire_cred_with_password") + @patch("gssapi.raw.store_cred_into") def test_no_need_init_if_not_expired( - self, store_cred_into, acquire_cred_with_password, Credentials): - context = krbContext(using_keytab=False, - principal=self.principal, - password='security') + self, store_cred_into, acquire_cred_with_password, Credentials + ): + context = krbContext( + using_keytab=False, principal=self.principal, password="security" + ) context.init_with_password() self.assertEqual(1, Credentials.call_count) store_cred_into.assert_not_called() acquire_cred_with_password.assert_not_called() - @patch('gssapi.Credentials') - @patch('gssapi.raw.acquire_cred_with_password') - @patch('gssapi.raw.store_cred') + @patch("gssapi.Credentials") + @patch("gssapi.raw.acquire_cred_with_password") + @patch("gssapi.raw.store_cred") def test_init_in_default_ccache( - self, store_cred, acquire_cred_with_password, Credentials): + self, store_cred, acquire_cred_with_password, Credentials + ): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - context = krbContext(using_keytab=False, - principal=self.principal, - password='security') + context = krbContext( + using_keytab=False, principal=self.principal, password="security" + ) context.init_with_password() acquire_cred_with_password.assert_called_once_with( - self.princ_name, b'security') + self.princ_name, b"security" + ) store_cred.assert_called_once_with( acquire_cred_with_password.return_value.creds, - usage='initiate', overwrite=True, set_default=True + usage="initiate", + overwrite=True, + set_default=True, ) - @patch('gssapi.Credentials') - @patch('gssapi.raw.acquire_cred_with_password') - @patch('gssapi.raw.store_cred_into') + @patch("gssapi.Credentials") + @patch("gssapi.raw.acquire_cred_with_password") + @patch("gssapi.raw.store_cred_into") def test_init_in_given_ccache( - self, store_cred_into, acquire_cred_with_password, Credentials): + self, store_cred_into, acquire_cred_with_password, Credentials + ): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - ccache = '/tmp/mycc' - context = krbContext(using_keytab=False, - principal=self.principal, - ccache_file=ccache, - password='security') + ccache = "/tmp/mycc" + context = krbContext( + using_keytab=False, + principal=self.principal, + ccache_file=ccache, + password="security", + ) context.init_with_password() Credentials.assert_called_once_with( - usage='initiate', - name=self.princ_name, - store={'ccache': ccache}) + usage="initiate", name=self.princ_name, store={"ccache": ccache} + ) acquire_cred_with_password.assert_called_once_with( - self.princ_name, b'security') + self.princ_name, b"security" + ) store_cred_into.assert_called_once_with( - {'ccache': '/tmp/mycc'}, + {"ccache": "/tmp/mycc"}, acquire_cred_with_password.return_value.creds, - usage='initiate', - overwrite=True) - - @patch('gssapi.Credentials') - @patch('sys.stdin.isatty', return_value=True) - @patch('getpass.getpass') - @patch('gssapi.raw.acquire_cred_with_password') - @patch('gssapi.raw.store_cred') - def test_init_cred_with_need_enter_password(self, store_cred, - acquire_cred_with_password, - getpass, isatty, - Credentials): + usage="initiate", + overwrite=True, + ) + + @patch("gssapi.Credentials") + @patch("sys.stdin.isatty", return_value=True) + @patch("getpass.getpass") + @patch("gssapi.raw.acquire_cred_with_password") + @patch("gssapi.raw.store_cred") + def test_init_cred_with_need_enter_password( + self, + store_cred, + acquire_cred_with_password, + getpass, + isatty, + Credentials, + ): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) - getpass.return_value = 'mypassword' + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) + getpass.return_value = "mypassword" context = krbContext(using_keytab=False, principal=self.principal) context.init_with_password() @@ -315,31 +386,36 @@ def test_init_cred_with_need_enter_password(self, store_cred, # Ensure this must be called. getpass.assert_called_once() - Credentials.assert_called_once_with(usage='initiate', - name=self.princ_name) + Credentials.assert_called_once_with( + usage="initiate", name=self.princ_name + ) acquire_cred_with_password.assert_called_once_with( - self.princ_name, b'mypassword') + self.princ_name, b"mypassword" + ) store_cred.assert_called_once_with( acquire_cred_with_password.return_value.creds, - usage='initiate', overwrite=True, set_default=True + usage="initiate", + overwrite=True, + set_default=True, ) - @patch('gssapi.Credentials') - @patch('sys.stdin.isatty', return_value=False) - def test_init_with_entering_password_but_not_in_atty(self, - isatty, - Credentials): + @patch("gssapi.Credentials") + @patch("sys.stdin.isatty", return_value=False) + def test_init_with_entering_password_but_not_in_atty( + self, isatty, Credentials + ): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) context = krbContext(using_keytab=False, principal=self.principal) self.assertRaises(IOError, context.init_with_password) - context = krbContext(using_keytab=False, - principal=self.principal, - password='') + context = krbContext( + using_keytab=False, principal=self.principal, password="" + ) self.assertRaises(IOError, context.init_with_password) @@ -348,121 +424,138 @@ class TestKrbContextManager(unittest.TestCase): def setUp(self): # Do not actually operate threading lock - self.init_Lock = patch('krbcontext.context.Lock') + self.init_Lock = patch("krbcontext.context.Lock") self.init_Lock.start() def tearDown(self): self.init_Lock.stop() - @patch('gssapi.Credentials') - @patch.dict('os.environ', {}, clear=True) + @patch("gssapi.Credentials") + @patch.dict("os.environ", {}, clear=True) def test_init_with_default_keytab(self, Credentials): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) - - with krbContext(using_keytab=True, - principal='app/hostname@EXAMPLE.COM', - ccache_file='/tmp/my_cc'): - self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME']) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - @patch('gssapi.Credentials') - @patch('gssapi.raw.acquire_cred_with_password') - @patch('gssapi.raw.store_cred') - @patch.dict('os.environ', {}, clear=True) + with krbContext( + using_keytab=True, + principal="app/hostname@EXAMPLE.COM", + ccache_file="/tmp/my_cc", + ): + self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"]) + + @patch("gssapi.Credentials") + @patch("gssapi.raw.acquire_cred_with_password") + @patch("gssapi.raw.store_cred") + @patch.dict("os.environ", {}, clear=True) def test_init_in_default_ccache_with_password( - self, store_cred, acquire_cred_with_password, Credentials): + self, store_cred, acquire_cred_with_password, Credentials + ): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - with krbContext(using_keytab=False, - principal='cqi', - password='security'): - self.assertNotIn('KRB5CCNAME', os.environ) + with krbContext( + using_keytab=False, principal="cqi", password="security" + ): + self.assertNotIn("KRB5CCNAME", os.environ) - self.assertNotIn('KRB5CCNAME', os.environ) + self.assertNotIn("KRB5CCNAME", os.environ) - @patch('gssapi.Credentials') - @patch('gssapi.raw.acquire_cred_with_password') - @patch('gssapi.raw.store_cred') - @patch.dict('os.environ', {'KRB5CCNAME': '/tmp/my_cc'}, clear=True) + @patch("gssapi.Credentials") + @patch("gssapi.raw.acquire_cred_with_password") + @patch("gssapi.raw.store_cred") + @patch.dict("os.environ", {"KRB5CCNAME": "/tmp/my_cc"}, clear=True) def test_after_init_in_default_ccache_original_ccache_should_be_restored( - self, store_cred, acquire_cred_with_password, Credentials): + self, store_cred, acquire_cred_with_password, Credentials + ): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - with krbContext(using_keytab=False, - principal='cqi', - password='security'): - self.assertNotIn('KRB5CCNAME', os.environ) + with krbContext( + using_keytab=False, principal="cqi", password="security" + ): + self.assertNotIn("KRB5CCNAME", os.environ) - self.assertIn('KRB5CCNAME', os.environ) - self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME']) + self.assertIn("KRB5CCNAME", os.environ) + self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"]) - @patch('gssapi.Credentials') - @patch.dict('os.environ', {'KRB5CCNAME': '/tmp/my_cc'}, clear=True) + @patch("gssapi.Credentials") + @patch.dict("os.environ", {"KRB5CCNAME": "/tmp/my_cc"}, clear=True) def test_original_ccache_should_be_restored(self, Credentials): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - with krbContext(using_keytab=True, - principal='app/hostname@EXAMPLE.COM', - ccache_file='/tmp/app_pid_cc'): + with krbContext( + using_keytab=True, + principal="app/hostname@EXAMPLE.COM", + ccache_file="/tmp/app_pid_cc", + ): # Inside context, given ccache should be used. - self.assertEqual('/tmp/app_pid_cc', os.environ['KRB5CCNAME']) + self.assertEqual("/tmp/app_pid_cc", os.environ["KRB5CCNAME"]) - self.assertIn('KRB5CCNAME', os.environ) - self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME']) + self.assertIn("KRB5CCNAME", os.environ) + self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"]) - @patch('gssapi.Credentials') - @patch.dict('os.environ', {}, clear=True) + @patch("gssapi.Credentials") + @patch.dict("os.environ", {}, clear=True) def test_init_in_default_ccache_without_original_krb5ccname_is_set( - self, Credentials): + self, Credentials + ): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - with krbContext(using_keytab=True, - principal='app/hostname@EXAMPLE.COM'): - self.assertNotIn('KRB5CCNAME', os.environ) + with krbContext( + using_keytab=True, principal="app/hostname@EXAMPLE.COM" + ): + self.assertNotIn("KRB5CCNAME", os.environ) # Originally, no KRB5CCNAME is set, it should be cleaned after exit. - self.assertNotIn('KRB5CCNAME', os.environ) + self.assertNotIn("KRB5CCNAME", os.environ) - @patch('gssapi.Credentials') - @patch.dict('os.environ', {'KRB5CCNAME': '/tmp/my_cc'}, clear=True) + @patch("gssapi.Credentials") + @patch.dict("os.environ", {"KRB5CCNAME": "/tmp/my_cc"}, clear=True) def test_init_in_default_ccache_and_original_krb5ccname_is_set( - self, Credentials): + self, Credentials + ): type(Credentials.return_value).lifetime = PropertyMock( - side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1)) + side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1) + ) - with krbContext(using_keytab=True, - principal='app/hostname@EXAMPLE.COM'): - self.assertNotIn('KRB5CCNAME', os.environ) + with krbContext( + using_keytab=True, principal="app/hostname@EXAMPLE.COM" + ): + self.assertNotIn("KRB5CCNAME", os.environ) - self.assertIn('KRB5CCNAME', os.environ) - self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME']) + self.assertIn("KRB5CCNAME", os.environ) + self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"]) - @patch('gssapi.Credentials') - @patch.dict(os.environ, {'KRB5CCNAME': '/tmp/my_cc'}, clear=True) + @patch("gssapi.Credentials") + @patch.dict(os.environ, {"KRB5CCNAME": "/tmp/my_cc"}, clear=True) def test_do_nothing_if_unnecessary_to_init(self, Credentials): - with krbContext(using_keytab=True, - principal='app/hostname@EXAMPLE.COM'): + with krbContext( + using_keytab=True, principal="app/hostname@EXAMPLE.COM" + ): # Nothing is changed, but original KRB5CCNAME must be removed # since default ccache is used. - self.assertNotIn('KRB5CCNAME', os.environ) + self.assertNotIn("KRB5CCNAME", os.environ) # Original ccache must be restored. - self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME']) + self.assertEqual("/tmp/my_cc", os.environ["KRB5CCNAME"]) class TestGetLogin(unittest.TestCase): """Test get_login""" - @patch('os.getuid', return_value=1001) - @patch('pwd.getpwuid') + @patch("os.getuid", return_value=1001) + @patch("pwd.getpwuid") def test_get_login(self, getpwuid, getuid): - getpwuid.return_value.pw_name = 'user' + getpwuid.return_value.pw_name = "user" user = get_login() getpwuid.assert_called_once_with(getuid.return_value) - self.assertEqual('user', user) + self.assertEqual("user", user) diff --git a/tox.ini b/tox.ini index 83dde93..3e1489f 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py36,py37,py38,py39,flake8 +envlist = py36,py37,py38,py39,black,flake8 [testenv] usedevelop = True @@ -10,3 +10,7 @@ commands = python3 -m pytest {posargs} [testenv:flake8] deps = flake8 commands = flake8 krbcontext/ test/ + +[testenv:black] +deps = black +commands = black --check --diff --line-length 79 krbcontext/ test/