@@ -115,19 +115,19 @@ class FeatureProcessorLineageHandler:
115
115
116
116
def create_lineage (self , tags : Optional [List [Dict [str , str ]]] = None ) -> None :
117
117
"""Create and Update Feature Processor Lineage"""
118
- input_feature_group_contexts : List [
119
- FeatureGroupContexts
120
- ] = self . _retrieve_input_feature_group_contexts ( )
118
+ input_feature_group_contexts : List [FeatureGroupContexts ] = (
119
+ self . _retrieve_input_feature_group_contexts ()
120
+ )
121
121
output_feature_group_contexts : FeatureGroupContexts = (
122
122
self ._retrieve_output_feature_group_contexts ()
123
123
)
124
124
input_raw_data_artifacts : List [Artifact ] = self ._retrieve_input_raw_data_artifacts ()
125
- transformation_code_artifact : Optional [
126
- Artifact
127
- ] = S3LineageEntityHandler . create_transformation_code_artifact (
128
- transformation_code = self .transformation_code ,
129
- pipeline_last_update_time = self .pipeline [ LAST_MODIFIED_TIME ]. strftime ( "%s" ) ,
130
- sagemaker_session = self . sagemaker_session ,
125
+ transformation_code_artifact : Optional [Artifact ] = (
126
+ S3LineageEntityHandler . create_transformation_code_artifact (
127
+ transformation_code = self . transformation_code ,
128
+ pipeline_last_update_time = self .pipeline [ LAST_MODIFIED_TIME ]. strftime ( "%s" ) ,
129
+ sagemaker_session = self .sagemaker_session ,
130
+ )
131
131
)
132
132
if transformation_code_artifact is not None :
133
133
logger .info ("Created Transformation Code Artifact: %s" , transformation_code_artifact )
@@ -362,40 +362,40 @@ def _update_pipeline_lineage(
362
362
current_pipeline_version_context : Context = self ._get_pipeline_version_context (
363
363
last_update_time = pipeline_context .properties [LAST_UPDATE_TIME ]
364
364
)
365
- upstream_feature_group_associations : Iterator [
366
- AssociationSummary
367
- ] = LineageAssociationHandler . list_upstream_associations (
368
- # pylint: disable=no-member
369
- entity_arn = current_pipeline_version_context . context_arn ,
370
- source_type = FEATURE_GROUP_PIPELINE_VERSION_CONTEXT_TYPE ,
371
- sagemaker_session = self . sagemaker_session ,
365
+ upstream_feature_group_associations : Iterator [AssociationSummary ] = (
366
+ LineageAssociationHandler . list_upstream_associations (
367
+ # pylint: disable=no-member
368
+ entity_arn = current_pipeline_version_context . context_arn ,
369
+ source_type = FEATURE_GROUP_PIPELINE_VERSION_CONTEXT_TYPE ,
370
+ sagemaker_session = self . sagemaker_session ,
371
+ )
372
372
)
373
373
374
- upstream_raw_data_associations : Iterator [
375
- AssociationSummary
376
- ] = LineageAssociationHandler . list_upstream_associations (
377
- # pylint: disable=no-member
378
- entity_arn = current_pipeline_version_context . context_arn ,
379
- source_type = DATA_SET ,
380
- sagemaker_session = self . sagemaker_session ,
374
+ upstream_raw_data_associations : Iterator [AssociationSummary ] = (
375
+ LineageAssociationHandler . list_upstream_associations (
376
+ # pylint: disable=no-member
377
+ entity_arn = current_pipeline_version_context . context_arn ,
378
+ source_type = DATA_SET ,
379
+ sagemaker_session = self . sagemaker_session ,
380
+ )
381
381
)
382
382
383
- upstream_transformation_code : Iterator [
384
- AssociationSummary
385
- ] = LineageAssociationHandler . list_upstream_associations (
386
- # pylint: disable=no-member
387
- entity_arn = current_pipeline_version_context . context_arn ,
388
- source_type = TRANSFORMATION_CODE ,
389
- sagemaker_session = self . sagemaker_session ,
383
+ upstream_transformation_code : Iterator [AssociationSummary ] = (
384
+ LineageAssociationHandler . list_upstream_associations (
385
+ # pylint: disable=no-member
386
+ entity_arn = current_pipeline_version_context . context_arn ,
387
+ source_type = TRANSFORMATION_CODE ,
388
+ sagemaker_session = self . sagemaker_session ,
389
+ )
390
390
)
391
391
392
- downstream_feature_group_associations : Iterator [
393
- AssociationSummary
394
- ] = LineageAssociationHandler . list_downstream_associations (
395
- # pylint: disable=no-member
396
- entity_arn = current_pipeline_version_context . context_arn ,
397
- destination_type = FEATURE_GROUP_PIPELINE_VERSION_CONTEXT_TYPE ,
398
- sagemaker_session = self . sagemaker_session ,
392
+ downstream_feature_group_associations : Iterator [AssociationSummary ] = (
393
+ LineageAssociationHandler . list_downstream_associations (
394
+ # pylint: disable=no-member
395
+ entity_arn = current_pipeline_version_context . context_arn ,
396
+ destination_type = FEATURE_GROUP_PIPELINE_VERSION_CONTEXT_TYPE ,
397
+ sagemaker_session = self . sagemaker_session ,
398
+ )
399
399
)
400
400
401
401
is_upstream_feature_group_equal : bool = self ._compare_upstream_feature_groups (
@@ -598,9 +598,9 @@ def _update_last_transformation_code(
598
598
last_transformation_code_artifact .properties ["state" ]
599
599
== TRANSFORMATION_CODE_STATUS_ACTIVE
600
600
):
601
- last_transformation_code_artifact .properties [
602
- "state"
603
- ] = TRANSFORMATION_CODE_STATUS_INACTIVE
601
+ last_transformation_code_artifact .properties ["state" ] = (
602
+ TRANSFORMATION_CODE_STATUS_INACTIVE
603
+ )
604
604
last_transformation_code_artifact .properties ["exclusive_end_date" ] = self .pipeline [
605
605
LAST_MODIFIED_TIME
606
606
].strftime ("%s" )
0 commit comments