3333import shutil
3434import subprocess # nosec B404
3535import sys
36- from typing import TYPE_CHECKING , Any , Dict , Iterable , List , Optional , Tuple , Union
36+ from abc import ABC , abstractmethod
37+ from typing import (
38+ TYPE_CHECKING ,
39+ Any ,
40+ Dict ,
41+ Iterable ,
42+ List ,
43+ NamedTuple ,
44+ Optional ,
45+ Tuple ,
46+ Union ,
47+ )
3748from urllib .parse import urlparse , urlunparse
3849
3950from dulwich .config import StackedConfig
@@ -54,17 +65,38 @@ class CredentialNotFoundError(SCMError):
5465 """Error occurred while retrieving credentials/no credentials available."""
5566
5667
57- class CredentialHelper :
58- """Helper for retrieving credentials for http/https git remotes
68+ class CredentialHelper (ABC ):
69+ """Base git-credential helper."""
70+
71+ @abstractmethod
72+ def get (self , ** kwargs ) -> "Credential" :
73+ """Get a matching credential from this helper.
74+
75+ Raises:
76+ CredentialNotFoundError: No matching credential was found.
77+ """
78+
79+ @abstractmethod
80+ def store (self , ** kwargs ):
81+ """Store the credential, if applicable to the helper"""
82+
83+ @abstractmethod
84+ def erase (self , ** kwargs ):
85+ """Remove a matching credential, if any, from the helper’s storage"""
86+
87+
88+ class GitCredentialHelper (CredentialHelper ):
89+ """Helper for retrieving credentials through git-credential-<helper> commands.
5990
6091 Usage:
61- >>> helper = CredentialHelper ("store") # Use `git credential-store`
92+ >>> helper = GitCredentialHelper ("store") # Use `git credential-store`
6293 >>> credentials = helper.get("https://github.com/dtrifiro/aprivaterepo")
6394 >>> username = credentials["username"]
6495 >>> password = credentials["password"]
6596 """
6697
6798 def __init__ (self , command : str ):
99+ super ().__init__ ()
68100 self ._command = command
69101 self ._run_kwargs : Dict [str , Any ] = {}
70102 if self ._command [0 ] == "!" :
@@ -102,10 +134,7 @@ def _prepare_command(self, action: Optional[str] = None) -> Union[str, List[str]
102134
103135 return [executable , * argv [1 :]]
104136
105- def get (
106- self ,
107- ** kwargs ,
108- ) -> "Credential" :
137+ def get (self , ** kwargs ) -> "Credential" :
109138 if kwargs .get ("protocol" , kwargs .get ("hostname" )) is None :
110139 raise ValueError ("One of protocol, hostname must be provided" )
111140 cmd = self ._prepare_command ("get" )
@@ -175,26 +204,111 @@ def erase(self, **kwargs):
175204 except FileNotFoundError :
176205 logger .debug ("Helper not found" , exc_info = True )
177206
207+ @staticmethod
208+ def get_matching_commands (
209+ base_url : str , config : Optional [Union ["ConfigDict" , "StackedConfig" ]] = None
210+ ):
211+ config = config or StackedConfig .default ()
212+ if isinstance (config , StackedConfig ):
213+ backends : Iterable ["ConfigDict" ] = config .backends
214+ else :
215+ backends = [config ]
216+
217+ for conf in backends :
218+ # We will try to match credential sections' url with the given url,
219+ # falling back to the generic section if there's no match
220+ for section in urlmatch_credential_sections (conf , base_url ):
221+ try :
222+ command = conf .get (section , "helper" )
223+ except KeyError :
224+ # no helper configured
225+ continue
226+ yield command .decode (conf .encoding or sys .getdefaultencoding ())
227+
228+
229+ class _CredentialKey (NamedTuple ):
230+ protocol : str
231+ host : Optional [str ]
232+ path : Optional [str ]
178233
179- def get_matching_helper_commands (
180- base_url : str , config : Optional [Union ["ConfigDict" , "StackedConfig" ]] = None
181- ):
182- config = config or StackedConfig .default ()
183- if isinstance (config , StackedConfig ):
184- backends : Iterable ["ConfigDict" ] = config .backends
185- else :
186- backends = [config ]
187234
188- for conf in backends :
189- # We will try to match credential sections' url with the given url,
190- # falling back to the generic section if there's no match
191- for section in urlmatch_credential_sections (conf , base_url ):
235+ class MemoryCredentialHelper (CredentialHelper ):
236+ """Memory credential helper that supports optional interactive input."""
237+
238+ def __init__ (self ):
239+ super ().__init__ ()
240+ self ._credentials : Dict ["_CredentialKey" , "Credential" ] = {}
241+
242+ def get (self , * , interactive : bool = False , ** kwargs ) -> "Credential" :
243+ """Get a matching credential from this helper.
244+
245+ Raises:
246+ CredentialNotFoundError: No matching credential was found.
247+ """
248+ from getpass import getpass
249+
250+ key = self ._key (** kwargs )
251+ if key .path :
252+ try_keys = [key , _CredentialKey (key .protocol , key .host , None )]
253+ else :
254+ try_keys = [key ]
255+ for try_key in try_keys :
192256 try :
193- command = conf . get ( section , "helper" )
257+ return self . _credentials [ try_key ]
194258 except KeyError :
195- # no helper configured
196- continue
197- yield command .decode (conf .encoding or sys .getdefaultencoding ())
259+ pass
260+ if not interactive or os .environ .get ("GIT_TERMINAL_PROMPT" ) == "0" :
261+ raise CredentialNotFoundError ("Interactive input is disabled" )
262+
263+ scheme = f"{ key .protocol } ://" if key .protocol else ""
264+ netloc = f"{ key .host } " if key .host else ""
265+ url = f"{ scheme } { netloc } "
266+ try :
267+ username = kwargs .get ("username" , "" )
268+ if not username :
269+ username = input (f"Username for '{ url } ': " )
270+ password = kwargs .get ("password" , "" )
271+ if not password :
272+ url = f"{ scheme } { username } @{ netloc } "
273+ password = getpass (f"Password for '{ url } ': " )
274+ except KeyboardInterrupt :
275+ raise CredentialNotFoundError ("User cancelled prompt" )
276+ return Credential (
277+ protocol = key .protocol ,
278+ host = key .host ,
279+ path = key .path ,
280+ username = username ,
281+ password = password ,
282+ memory_only = True ,
283+ )
284+
285+ def store (self , ** kwargs ):
286+ """Store the credential, if applicable to the helper"""
287+ cred = Credential (** kwargs )
288+ cred .memory_only = True
289+ key = self ._key (** kwargs )
290+ self ._credentials [key ] = cred
291+
292+ def erase (self , ** kwargs ):
293+ """Remove a matching credential, if any, from the helper’s storage"""
294+ key = self ._key (** kwargs )
295+ try :
296+ del self ._credentials [key ]
297+ except KeyError :
298+ pass
299+
300+ @staticmethod
301+ def _key (
302+ * ,
303+ protocol : str = "" ,
304+ host : Optional [str ] = None ,
305+ path : Optional [str ] = None ,
306+ ** kwargs ,
307+ ) -> _CredentialKey :
308+ return _CredentialKey (protocol , host , path )
309+
310+
311+ memory_helper = MemoryCredentialHelper ()
198312
199313
200314class Credential :
@@ -232,13 +346,15 @@ def __init__(
232346 password : Optional [str ] = None ,
233347 password_expiry_utc : Optional [int ] = None ,
234348 url : Optional [str ] = None ,
349+ memory_only : bool = False ,
235350 ):
236351 self .protocol = protocol
237352 self .host = host
238353 self .path = path
239354 self .username = username
240355 self .password = password
241356 self .password_expiry_utc = password_expiry_utc
357+ self .memory_only = memory_only
242358 self ._approved = False
243359 if url :
244360 parsed = urlparse (url )
@@ -281,28 +397,43 @@ def _helper_kwargs(self) -> Dict[str, str]:
281397 def helpers (self ) -> List ["CredentialHelper" ]:
282398 url = self .url
283399 return [
284- CredentialHelper (command ) for command in get_matching_helper_commands (url )
400+ GitCredentialHelper (command )
401+ for command in GitCredentialHelper .get_matching_commands (url )
285402 ]
286403
287404 def fill (self ) -> "Credential" :
288405 """Return a new credential with filled username and password."""
406+ try :
407+ return memory_helper .get (interactive = False , ** self ._helper_kwargs )
408+ except CredentialNotFoundError :
409+ pass
410+
289411 for helper in self .helpers :
290412 try :
291413 return helper .get (** self ._helper_kwargs )
292414 except CredentialNotFoundError :
293415 continue
416+
417+ try :
418+ return memory_helper .get (interactive = True , ** self ._helper_kwargs )
419+ except CredentialNotFoundError :
420+ pass
421+
294422 raise CredentialNotFoundError (f"No available credentials for '{ self .url } '" )
295423
296424 def approve (self ):
297425 """Store this credential in available helpers."""
298426 if self ._approved or not (self .username and self .password ):
299427 return
300- for helper in self .helpers :
301- helper .store (** self ._helper_kwargs )
428+ if not self .memory_only :
429+ for helper in self .helpers :
430+ helper .store (** self ._helper_kwargs )
431+ memory_helper .store (** self ._helper_kwargs )
302432 self ._approved = True
303433
304434 def reject (self ):
305435 """Remove this credential from available helpers."""
306436 for helper in self .helpers :
307437 helper .erase (** self ._helper_kwargs )
438+ memory_helper .erase (** self ._helper_kwargs )
308439 self ._approved = False
0 commit comments