Skip to content
This repository was archived by the owner on May 30, 2022. It is now read-only.

Format and check codebase by black #40

Merged
merged 1 commit into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 79 additions & 56 deletions krbcontext/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@

from threading import Lock

__all__ = ('krbContext',)
__all__ = ("krbContext",)


DEFAULT_CCACHE = 'DEFAULT_CCACHE'
DEFAULT_KEYTAB = 'DEFAULT_KEYTAB'
ENV_KRB5CCNAME = 'KRB5CCNAME'
DEFAULT_CCACHE = "DEFAULT_CCACHE"
DEFAULT_KEYTAB = "DEFAULT_KEYTAB"
ENV_KRB5CCNAME = "KRB5CCNAME"


def get_login():
Expand Down Expand Up @@ -60,8 +60,14 @@ class krbContext(object):
krbContext can work with ``with`` statement to simplify your code.
"""

def __init__(self, using_keytab=False, principal=None, keytab_file=None,
ccache_file=None, password=None):
def __init__(
self,
using_keytab=False,
principal=None,
keytab_file=None,
ccache_file=None,
password=None,
):
"""Initialize context

:param bool using_keytab: indicate whether to initialize credential
Expand All @@ -82,20 +88,26 @@ def __init__(self, using_keytab=False, principal=None, keytab_file=None,
omitted, program will be blocked and prompts to enter a password
from command line, which requires program runs in a terminal.
"""
self._cleaned_options = self.clean_options(using_keytab=using_keytab,
principal=principal,
keytab_file=keytab_file,
ccache_file=ccache_file,
password=password)
self._cleaned_options = self.clean_options(
using_keytab=using_keytab,
principal=principal,
keytab_file=keytab_file,
ccache_file=ccache_file,
password=password,
)
self._original_krb5ccname = None
self._inited = False

self._init_lock = Lock()

def clean_options(self,
using_keytab=False, principal=None,
keytab_file=None, ccache_file=None,
password=None):
def clean_options(
self,
using_keytab=False,
principal=None,
keytab_file=None,
ccache_file=None,
password=None,
):
"""Clean argument to related object

:param bool using_keytab: refer to ``krbContext.__init__``.
Expand All @@ -114,63 +126,68 @@ def clean_options(self,

if using_keytab:
if principal is None:
raise ValueError('Principal is required when using key table.')
raise ValueError("Principal is required when using key table.")
princ_name = gssapi.Name(
principal, gssapi.NameType.kerberos_principal)
principal, gssapi.NameType.kerberos_principal
)

if keytab_file is None:
cleaned['keytab'] = DEFAULT_KEYTAB
cleaned["keytab"] = DEFAULT_KEYTAB
elif not os.path.exists(keytab_file):
raise ValueError(f'Keytab file {keytab_file} does not exist.')
raise ValueError(f"Keytab file {keytab_file} does not exist.")
else:
cleaned['keytab'] = keytab_file
cleaned["keytab"] = keytab_file
else:
if principal is None:
principal = get_login()
princ_name = gssapi.Name(principal, gssapi.NameType.user)

cleaned['using_keytab'] = using_keytab
cleaned['principal'] = princ_name
cleaned['ccache'] = ccache_file or DEFAULT_CCACHE
cleaned['password'] = password
cleaned["using_keytab"] = using_keytab
cleaned["principal"] = princ_name
cleaned["ccache"] = ccache_file or DEFAULT_CCACHE
cleaned["password"] = password

return cleaned

def init_with_keytab(self):
"""Initialize credential cache with keytab"""
creds_opts = {
'usage': 'initiate',
'name': self._cleaned_options['principal'],
"usage": "initiate",
"name": self._cleaned_options["principal"],
}

store = {}
if self._cleaned_options['keytab'] != DEFAULT_KEYTAB:
store['client_keytab'] = self._cleaned_options['keytab']
if self._cleaned_options['ccache'] != DEFAULT_CCACHE:
store['ccache'] = self._cleaned_options['ccache']
if self._cleaned_options["keytab"] != DEFAULT_KEYTAB:
store["client_keytab"] = self._cleaned_options["keytab"]
if self._cleaned_options["ccache"] != DEFAULT_CCACHE:
store["ccache"] = self._cleaned_options["ccache"]
if store:
creds_opts['store'] = store
creds_opts["store"] = store

creds = gssapi.Credentials(**creds_opts)
try:
creds.lifetime
except gssapi.exceptions.ExpiredCredentialsError:
new_creds_opts = copy.deepcopy(creds_opts)
# Get new credential and put it into a temporary ccache
temp_directory = tempfile.mkdtemp('-krbcontext')
temp_ccache = os.path.join(temp_directory, 'ccache')
temp_directory = tempfile.mkdtemp("-krbcontext")
temp_ccache = os.path.join(temp_directory, "ccache")
try:
new_creds_opts.setdefault('store', {})['ccache'] = temp_ccache
new_creds_opts.setdefault("store", {})["ccache"] = temp_ccache
creds = gssapi.Credentials(**new_creds_opts)
# Then, store new credential back to original specified ccache,
# whatever a given ccache file or the default one.
_store = None
# If default ccache is used, no need to specify ccache in
# store parameter passed to ``creds.store``.
if self._cleaned_options['ccache'] != DEFAULT_CCACHE:
_store = {'ccache': store['ccache']}
creds.store(usage='initiate', store=_store, set_default=True,
overwrite=True)
if self._cleaned_options["ccache"] != DEFAULT_CCACHE:
_store = {"ccache": store["ccache"]}
creds.store(
usage="initiate",
store=_store,
set_default=True,
overwrite=True,
)
finally:
shutil.rmtree(temp_directory, ignore_errors=True)

Expand All @@ -188,24 +205,25 @@ def init_with_password(self):
line but no attry is available.
"""
creds_opts = {
'usage': 'initiate',
'name': self._cleaned_options['principal'],
"usage": "initiate",
"name": self._cleaned_options["principal"],
}
if self._cleaned_options['ccache'] != DEFAULT_CCACHE:
creds_opts['store'] = {'ccache': self._cleaned_options['ccache']}
if self._cleaned_options["ccache"] != DEFAULT_CCACHE:
creds_opts["store"] = {"ccache": self._cleaned_options["ccache"]}

cred = gssapi.Credentials(**creds_opts)
try:
cred.lifetime
except gssapi.exceptions.ExpiredCredentialsError:
password = self._cleaned_options['password']
password = self._cleaned_options["password"]

if not password:
if not sys.stdin.isatty():
raise IOError(
'krbContext is not running from a terminal. So, you '
'need to run kinit with your principal manually before'
' anything goes.')
"krbContext is not running from a terminal. So, you "
"need to run kinit with your principal manually before"
" anything goes."
)

# If there is no password specified via API call, prompt to
# enter one in order to continue to get credential. BUT, in
Expand All @@ -218,19 +236,24 @@ def init_with_password(self):
password = getpass.getpass()

cred = gssapi.raw.acquire_cred_with_password(
self._cleaned_options['principal'], password.encode('utf-8'))
self._cleaned_options["principal"], password.encode("utf-8")
)

ccache = self._cleaned_options['ccache']
ccache = self._cleaned_options["ccache"]
if ccache == DEFAULT_CCACHE:
gssapi.raw.store_cred(
cred.creds,
usage='initiate', overwrite=True, set_default=True,
usage="initiate",
overwrite=True,
set_default=True,
)
else:
gssapi.raw.store_cred_into({'ccache': ccache},
cred.creds,
usage='initiate',
overwrite=True)
gssapi.raw.store_cred_into(
{"ccache": ccache},
cred.creds,
usage="initiate",
overwrite=True,
)

def _prepare_context(self):
"""Prepare context
Expand All @@ -242,7 +265,7 @@ def _prepare_context(self):

Internal use only.
"""
ccache = self._cleaned_options['ccache']
ccache = self._cleaned_options["ccache"]

# Whatever there is KRB5CCNAME was set in current process,
# original_krb5ccname will contain current value even if None if
Expand All @@ -260,7 +283,7 @@ def _prepare_context(self):
# us point to the given ccache by KRB5CCNAME.
os.environ[ENV_KRB5CCNAME] = ccache

if self._cleaned_options['using_keytab']:
if self._cleaned_options["using_keytab"]:
self.init_with_keytab()
else:
self.init_with_password()
Expand All @@ -281,7 +304,7 @@ def __exit__(self, exc_type, exc_value, traceback):
restored correctly, if there was. And, lock gets released as well.
"""
try:
if self._cleaned_options['ccache'] == DEFAULT_CCACHE:
if self._cleaned_options["ccache"] == DEFAULT_CCACHE:
if self._original_krb5ccname:
os.environ[ENV_KRB5CCNAME] = self._original_krb5ccname
else:
Expand Down
Loading