@@ -448,6 +448,26 @@ def create_manifest_claim_message(digest: str, reference: str) -> str:
448448 }
449449 return base64 .b64encode (json .dumps (manifest_claim ).encode ("latin1" )).decode ("latin1" )
450450
451+ def _prepare_messages (self , operation : ContainerSignOperation ) -> List [List [MsgMessage ]]:
452+ fargs = []
453+ for digest , reference in zip (operation .digests , operation .references ):
454+ repo = reference .split ("/" , 1 )[1 ].split (":" )[0 ]
455+ fargs .append (
456+ FData (
457+ args = [
458+ self .create_manifest_claim_message (digest = digest , reference = reference ),
459+ repo ,
460+ operation ,
461+ SignRequestType .CONTAINER ,
462+ ],
463+ kwargs = {
464+ "extra_attrs" : {"pub_task_id" : operation .task_id , "manifest_digest" : digest }
465+ },
466+ )
467+ )
468+ ret = run_in_parallel (self ._create_msg_messages , fargs )
469+ return list (ret .values ())
470+
451471 def container_sign (self : MsgSigner , operation : ContainerSignOperation ) -> SigningResults :
452472 """Run container signing operation.
453473
@@ -476,24 +496,9 @@ def container_sign(self: MsgSigner, operation: ContainerSignOperation) -> Signin
476496
477497 LOG .info (f"Container sign operation for { len (operation .digests )} " )
478498
479- fargs = []
480- for digest , reference in zip (operation .digests , operation .references ):
481- repo = reference .split ("/" , 1 )[1 ].split (":" )[0 ]
482- fargs .append (
483- FData (
484- args = [
485- self .create_manifest_claim_message (digest = digest , reference = reference ),
486- repo ,
487- operation ,
488- SignRequestType .CONTAINER ,
489- ],
490- kwargs = {
491- "extra_attrs" : {"pub_task_id" : operation .task_id , "manifest_digest" : digest }
492- },
493- )
494- )
495- ret = run_in_parallel (self ._create_msg_messages , fargs )
496- for n , _key_messages in ret .items ():
499+ ret = self ._prepare_messages (operation )
500+
501+ for _key_messages in ret :
497502 for message in _key_messages :
498503 message_to_data [message .body ["request_id" ]] = message
499504 messages .append (message )
@@ -613,6 +618,175 @@ def container_sign(self: MsgSigner, operation: ContainerSignOperation) -> Signin
613618 return signing_results
614619
615620
621+ class MsgBatchSigner (MsgSigner ):
622+ """Messaging batch signer class."""
623+
624+ _signer_config_key : str = "msg_batch_signer"
625+
626+ chunk_size : int = field (
627+ init = False ,
628+ metadata = {
629+ "description" : "Identify how many signing claims should be send in one message" ,
630+ "sample" : 10 ,
631+ },
632+ )
633+
634+ SUPPORTED_OPERATIONS : ClassVar [List [Type [SignOperation ]]] = [
635+ ContainerSignOperation ,
636+ ]
637+
638+ def _construct_signing_batch_message (
639+ self : Self ,
640+ claims : List [str ],
641+ signing_keys : List [str ],
642+ repo : str ,
643+ signing_key_names : List [str ] = [],
644+ extra_attrs : Optional [Dict [str , Any ]] = None ,
645+ sig_type : str = SignRequestType .CONTAINER ,
646+ ) -> dict [str , Any ]:
647+ data_attr = "claims" if sig_type == SignRequestType .CONTAINER else "data"
648+ _extra_attrs = extra_attrs or {}
649+ processed_claims = [
650+ {
651+ "claim_file" : claim ,
652+ "sig_keyname" : signing_key_names ,
653+ "sig_key_ids" : [sig_key [- 8 :] for sig_key in signing_keys ],
654+ "manifest_digest" : digest ,
655+ }
656+ for claim , digest in zip (claims , _extra_attrs .get ("manifest_digest" , "" ))
657+ ]
658+ message = {
659+ data_attr : processed_claims ,
660+ "request_id" : str (uuid .uuid4 ()),
661+ "created" : isodate_now (),
662+ "requested_by" : self .creator ,
663+ "repo" : repo ,
664+ }
665+ _extra_attrs .pop ("manifest_digest" , None )
666+ message .update (_extra_attrs )
667+ return message
668+
669+ def _create_msg_batch_message (
670+ self : Self ,
671+ data : List [str ],
672+ repo : str ,
673+ operation : SignOperation ,
674+ sig_type : SignRequestType ,
675+ extra_attrs : Optional [Dict [str , Any ]] = None ,
676+ ) -> List [MsgMessage ]:
677+ messages = []
678+ signing_keys = []
679+ for _signing_key in operation .signing_keys :
680+ if _signing_key in self .key_aliases :
681+ signing_keys .append (self .key_aliases [_signing_key ])
682+ LOG .info (
683+ f"Using signing key alias { self .key_aliases [_signing_key ]} for { _signing_key } "
684+ )
685+ else :
686+ signing_keys .append (_signing_key )
687+
688+ extra_attrs = extra_attrs or {}
689+ headers = self ._construct_headers (sig_type , extra_attrs = extra_attrs )
690+ if isinstance (operation , ContainerSignOperation ):
691+ extra_attrs ["manifest_digest" ] = operation .digests
692+ ret = MsgMessage (
693+ headers = headers ,
694+ body = self ._construct_signing_batch_message (
695+ data ,
696+ signing_keys ,
697+ repo ,
698+ signing_key_names = (
699+ operation .signing_key_names
700+ if operation .signing_key_names
701+ else ["" * len (signing_keys )]
702+ ),
703+ extra_attrs = extra_attrs ,
704+ sig_type = sig_type .value ,
705+ ),
706+ address = self .topic_send_to .format (
707+ ** dict (list (asdict (self ).items ()) + list (asdict (operation ).items ()))
708+ ),
709+ )
710+ LOG .debug (f"Construted message with request_id { ret .body ['request_id' ]} " )
711+ messages .append (ret )
712+ return messages
713+
714+ def _prepare_messages (self : Self , operation : ContainerSignOperation ) -> List [List [MsgMessage ]]:
715+ messages : List [List [MsgMessage ]] = []
716+ repo_groups : Dict [str , Dict [str , List [str ]]] = {}
717+ for digest , reference in zip (operation .digests , operation .references ):
718+ repo = reference .split ("/" , 1 )[1 ].split (":" )[0 ]
719+ if repo not in repo_groups :
720+ repo_groups [repo ] = cast (dict [str , list [str ]], {"digests" : [], "references" : []})
721+ repo_groups [repo ]["digests" ].append (digest )
722+ repo_groups [repo ]["references" ].append (reference )
723+
724+ batch_data : List [FData ] = []
725+ for repo , group in repo_groups .items ():
726+ claims = []
727+ digests = []
728+
729+ for digest , reference in zip (group ["digests" ], group ["references" ]):
730+ claims .append (
731+ self .create_manifest_claim_message (digest = digest , reference = reference )
732+ )
733+ digests .append (digest )
734+ if len (claims ) >= self .chunk_size :
735+ fdata = FData (
736+ args = [claims , repo , operation , SignRequestType .CONTAINER ],
737+ kwargs = {
738+ "extra_attrs" : {
739+ "pub_task_id" : operation .task_id ,
740+ "manifest_digest" : digests ,
741+ }
742+ },
743+ )
744+ batch_data .append (fdata )
745+ claims = []
746+ digests = []
747+ if claims :
748+ fdata = FData (
749+ args = [claims , repo , operation , SignRequestType .CONTAINER ],
750+ kwargs = {
751+ "extra_attrs" : {
752+ "pub_task_id" : operation .task_id ,
753+ "manifest_digest" : digests ,
754+ }
755+ },
756+ )
757+ batch_data .append (fdata )
758+
759+ ret = run_in_parallel (self ._create_msg_batch_message , batch_data )
760+ messages .extend (list (ret .values ()))
761+ return messages
762+
763+ def load_config (self : Self , config_data : Dict [str , Any ]) -> None :
764+ """Load configuration of messaging signer.
765+
766+ Arguments:
767+ config_data (dict): configuration data to load
768+ """
769+ self .messaging_brokers = config_data ["msg_batch_signer" ]["messaging_brokers" ]
770+ self .messaging_cert_key = os .path .expanduser (
771+ config_data ["msg_batch_signer" ]["messaging_cert_key" ]
772+ )
773+ self .messaging_ca_cert = os .path .expanduser (
774+ config_data ["msg_batch_signer" ]["messaging_ca_cert" ]
775+ )
776+ self .topic_send_to = config_data ["msg_batch_signer" ]["topic_send_to" ]
777+ self .topic_listen_to = config_data ["msg_batch_signer" ]["topic_listen_to" ]
778+ self .environment = config_data ["msg_batch_signer" ]["environment" ]
779+ self .service = config_data ["msg_batch_signer" ]["service" ]
780+ self .message_id_key = config_data ["msg_batch_signer" ]["message_id_key" ]
781+ self .retries = config_data ["msg_batch_signer" ]["retries" ]
782+ self .send_retries = config_data ["msg_batch_signer" ]["send_retries" ]
783+ self .log_level = config_data ["msg_batch_signer" ]["log_level" ]
784+ self .timeout = config_data ["msg_batch_signer" ]["timeout" ]
785+ self .creator = self ._get_cert_subject_cn ()
786+ self .key_aliases = config_data ["msg_batch_signer" ].get ("key_aliases" , {})
787+ self .chunk_size = config_data ["msg_batch_signer" ]["chunk_size" ]
788+
789+
616790def msg_clear_sign (
617791 inputs : List [str ],
618792 signing_keys : List [str ] = [],
@@ -671,9 +845,14 @@ def msg_container_sign(
671845 digest : list [str ] = [],
672846 reference : list [str ] = [],
673847 requester : str = "" ,
848+ signer_type : str = "single" ,
674849) -> Dict [str , Any ]:
675850 """Run containersign operation with cli arguments."""
676- msg_signer = MsgSigner ()
851+ if signer_type == "single" :
852+ msg_signer = MsgSigner ()
853+ elif signer_type == "batch" :
854+ msg_signer = MsgBatchSigner ()
855+
677856 config = _get_config_file (config_file )
678857 msg_signer .load_config (load_config (os .path .expanduser (config )))
679858 if requester :
@@ -819,6 +998,9 @@ def msg_clear_sign_main(
819998 default = "INFO" ,
820999 help = "Set log level" ,
8211000)
1001+ @click .option (
1002+ "--signer-type" , type = click .Choice (["single" , "batch" ]), default = "single" , help = "Signer type"
1003+ )
8221004def msg_container_sign_main (
8231005 signing_key : List [str ] = [],
8241006 signing_key_name : List [str ] = [],
@@ -829,6 +1011,7 @@ def msg_container_sign_main(
8291011 requester : str = "" ,
8301012 raw : bool = False ,
8311013 log_level : str = "INFO" ,
1014+ signer_type : str = "single" ,
8321015) -> None :
8331016 """Entry point method for containersign operation.
8341017
@@ -856,6 +1039,7 @@ def msg_container_sign_main(
8561039 digest = digest ,
8571040 reference = reference ,
8581041 requester = requester ,
1042+ signer_type = signer_type ,
8591043 )
8601044 if not raw :
8611045 click .echo (json .dumps (ret ))
0 commit comments