From c40ba8fa6dca57d1db73d6fe68e231438725d604 Mon Sep 17 00:00:00 2001 From: michaelhtm <98621731+michaelhtm@users.noreply.github.com> Date: Wed, 26 Feb 2025 13:58:14 -0800 Subject: [PATCH 1/2] Add update operations for VolumeSize, InstanceType, and BrokerCount Also added reconcile before deletion attempt if cluster is not active. Before, if we tried deleting a cluster that was not active, the controller would set it to terminal, and does not reconcile. --- apis/v1alpha1/ack-generate-metadata.yaml | 6 +- apis/v1alpha1/cluster.go | 3 + apis/v1alpha1/generator.yaml | 8 ++ apis/v1alpha1/zz_generated.deepcopy.go | 5 + config/controller/kustomization.yaml | 2 +- .../kafka.services.k8s.aws_clusters.yaml | 3 + generator.yaml | 8 ++ helm/Chart.yaml | 4 +- .../crds/kafka.services.k8s.aws_clusters.yaml | 3 + helm/templates/NOTES.txt | 2 +- helm/values.yaml | 2 +- pkg/resource/cluster/hooks.go | 132 ++++++++++++++++-- pkg/resource/cluster/sdk.go | 25 ++++ .../sdk_delete_pre_build_request.go.tpl | 7 + .../sdk_read_one_post_set_output.go.tpl | 5 + test/e2e/resources/cluster_simple.yaml | 3 + test/e2e/tests/test_cluster.py | 39 +++++- 17 files changed, 235 insertions(+), 22 deletions(-) diff --git a/apis/v1alpha1/ack-generate-metadata.yaml b/apis/v1alpha1/ack-generate-metadata.yaml index 47c1cf5..75da985 100755 --- a/apis/v1alpha1/ack-generate-metadata.yaml +++ b/apis/v1alpha1/ack-generate-metadata.yaml @@ -1,13 +1,13 @@ ack_generate_info: - build_date: "2025-02-20T18:10:11Z" + build_date: "2025-02-27T21:18:28Z" build_hash: a326346bd3a6973254d247c9ab2dc76790c36241 go_version: go1.24.0 version: v0.43.2 -api_directory_checksum: eda989f20dde9f1b4331ffa67dc3b9a5ef0d64e4 +api_directory_checksum: 36fbfad1e0bff98a14b120ba292a7f6b4e546fb4 api_version: v1alpha1 aws_sdk_go_version: v1.32.6 generator_config_info: - file_checksum: 5ea49df43c7aef08a9ac8b7171e9f50c3ed82e13 + file_checksum: c641b5dd9aa81f1f42655f2afe9fcfb9dc7de696 original_file_name: generator.yaml last_modification: reason: API generation diff --git a/apis/v1alpha1/cluster.go b/apis/v1alpha1/cluster.go index fb03356..f84e65d 100644 --- a/apis/v1alpha1/cluster.go +++ b/apis/v1alpha1/cluster.go @@ -90,6 +90,9 @@ type ClusterStatus struct { BootstrapBrokerStringVPCConnectivitySASLSCRAM *string `json:"bootstrapBrokerStringVPCConnectivitySASLSCRAM,omitempty"` // +kubebuilder:validation:Optional BootstrapBrokerStringVPCConnectivityTLS *string `json:"bootstrapBrokerStringVPCConnectivityTLS,omitempty"` + // The current version of the MSK cluster. + // +kubebuilder:validation:Optional + CurrentVersion *string `json:"currentVersion,omitempty"` // The state of the cluster. The possible states are ACTIVE, CREATING, DELETING, // FAILED, HEALING, MAINTENANCE, REBOOTING_BROKER, and UPDATING. // +kubebuilder:validation:Optional diff --git a/apis/v1alpha1/generator.yaml b/apis/v1alpha1/generator.yaml index ab93797..1926819 100644 --- a/apis/v1alpha1/generator.yaml +++ b/apis/v1alpha1/generator.yaml @@ -23,6 +23,9 @@ resources: CreateCluster: input_fields: ClusterName: Name + DescribeCluster: + input_fields: + ClusterName: Name hooks: sdk_read_one_post_set_output: template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl @@ -95,6 +98,11 @@ resources: BootstrapBrokerStringVpcConnectivityTls: type: string is_read_only: true + CurrentVersion: + from: + operation: DescribeCluster + path: ClusterInfo.CurrentVersion + is_read_only: true tags: # TODO(jaypipes): Ignore tags for now... we will add support later ignore: true \ No newline at end of file diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 1d08671..b3db8f6 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -861,6 +861,11 @@ func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) { *out = new(string) **out = **in } + if in.CurrentVersion != nil { + in, out := &in.CurrentVersion, &out.CurrentVersion + *out = new(string) + **out = **in + } if in.State != nil { in, out := &in.State, &out.State *out = new(string) diff --git a/config/controller/kustomization.yaml b/config/controller/kustomization.yaml index 82c6b9e..8b59930 100644 --- a/config/controller/kustomization.yaml +++ b/config/controller/kustomization.yaml @@ -6,4 +6,4 @@ kind: Kustomization images: - name: controller newName: public.ecr.aws/aws-controllers-k8s/kafka-controller - newTag: 1.0.5 + newTag: 1.0.2 diff --git a/config/crd/bases/kafka.services.k8s.aws_clusters.yaml b/config/crd/bases/kafka.services.k8s.aws_clusters.yaml index 5e1d32d..77e9b6d 100644 --- a/config/crd/bases/kafka.services.k8s.aws_clusters.yaml +++ b/config/crd/bases/kafka.services.k8s.aws_clusters.yaml @@ -356,6 +356,9 @@ spec: - type type: object type: array + currentVersion: + description: The current version of the MSK cluster. + type: string state: description: |- The state of the cluster. The possible states are ACTIVE, CREATING, DELETING, diff --git a/generator.yaml b/generator.yaml index ab93797..1926819 100644 --- a/generator.yaml +++ b/generator.yaml @@ -23,6 +23,9 @@ resources: CreateCluster: input_fields: ClusterName: Name + DescribeCluster: + input_fields: + ClusterName: Name hooks: sdk_read_one_post_set_output: template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl @@ -95,6 +98,11 @@ resources: BootstrapBrokerStringVpcConnectivityTls: type: string is_read_only: true + CurrentVersion: + from: + operation: DescribeCluster + path: ClusterInfo.CurrentVersion + is_read_only: true tags: # TODO(jaypipes): Ignore tags for now... we will add support later ignore: true \ No newline at end of file diff --git a/helm/Chart.yaml b/helm/Chart.yaml index b8638f0..b5a618f 100644 --- a/helm/Chart.yaml +++ b/helm/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v1 name: kafka-chart description: A Helm chart for the ACK service controller for Amazon Managed Streaming for Apache Kafka (MSK) -version: 1.0.5 -appVersion: 1.0.5 +version: 1.0.2 +appVersion: 1.0.2 home: https://github.com/aws-controllers-k8s/kafka-controller icon: https://raw.githubusercontent.com/aws/eks-charts/master/docs/logo/aws.png sources: diff --git a/helm/crds/kafka.services.k8s.aws_clusters.yaml b/helm/crds/kafka.services.k8s.aws_clusters.yaml index 5e1d32d..77e9b6d 100644 --- a/helm/crds/kafka.services.k8s.aws_clusters.yaml +++ b/helm/crds/kafka.services.k8s.aws_clusters.yaml @@ -356,6 +356,9 @@ spec: - type type: object type: array + currentVersion: + description: The current version of the MSK cluster. + type: string state: description: |- The state of the cluster. The possible states are ACTIVE, CREATING, DELETING, diff --git a/helm/templates/NOTES.txt b/helm/templates/NOTES.txt index fbf1052..9164992 100644 --- a/helm/templates/NOTES.txt +++ b/helm/templates/NOTES.txt @@ -1,5 +1,5 @@ {{ .Chart.Name }} has been installed. -This chart deploys "public.ecr.aws/aws-controllers-k8s/kafka-controller:1.0.5". +This chart deploys "public.ecr.aws/aws-controllers-k8s/kafka-controller:1.0.2". Check its status by running: kubectl --namespace {{ .Release.Namespace }} get pods -l "app.kubernetes.io/instance={{ .Release.Name }}" diff --git a/helm/values.yaml b/helm/values.yaml index 23a68a0..a0fc7ef 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -4,7 +4,7 @@ image: repository: public.ecr.aws/aws-controllers-k8s/kafka-controller - tag: 1.0.5 + tag: 1.0.2 pullPolicy: IfNotPresent pullSecrets: [] diff --git a/pkg/resource/cluster/hooks.go b/pkg/resource/cluster/hooks.go index 3206806..b858e64 100644 --- a/pkg/resource/cluster/hooks.go +++ b/pkg/resource/cluster/hooks.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "strings" + "time" svcapitypes "github.com/aws-controllers-k8s/kafka-controller/apis/v1alpha1" ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare" @@ -38,6 +39,7 @@ var ( string(svcsdktypes.ClusterStateDeleting), string(svcsdktypes.ClusterStateFailed), } + RequeueAfterUpdateDuration = 15 * time.Second ) var ( @@ -113,6 +115,18 @@ func clusterDeleting(r *resource) bool { return cs == strings.ToLower(string(svcsdktypes.ClusterStateDeleting)) } +// requeueAfterAsyncUpdate returns a `ackrequeue.RequeueNeededAfter` struct +// explaining the cluster cannot be modified until after the asynchronous update +// has (first, started and then) completed and the cluster reaches an active +// status. +func requeueAfterAsyncUpdate() *ackrequeue.RequeueNeededAfter { + return ackrequeue.NeededAfter( + fmt.Errorf("cluster has started asynchronously updating, cannot be modified until '%s'", + "Active"), + RequeueAfterUpdateDuration, + ) +} + func (rm *resourceManager) customUpdate( ctx context.Context, desired *resource, @@ -133,12 +147,6 @@ func (rm *resourceManager) customUpdate( // Copy status from latest since it has the current cluster state updatedRes.ko.Status = latest.ko.Status - if clusterDeleting(latest) { - msg := "Cluster is currently being deleted" - ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &msg, nil) - return updatedRes, requeueWaitWhileDeleting - } - if !clusterActive(latest) { msg := "Cluster is in '" + *latest.ko.Status.State + "' state" ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &msg, nil) @@ -149,16 +157,120 @@ func (rm *resourceManager) customUpdate( return updatedRes, requeueWaitUntilCanModify(latest) } - if delta.DifferentAt("Spec.AssociatedSCRAMSecrets") { + switch { + case delta.DifferentAt("Spec.ClientAuthentication"): + input := &svcsdk.UpdateSecurityInput{} + if desired.ko.Status.CurrentVersion != nil { + input.CurrentVersion = desired.ko.Status.CurrentVersion + } + if desired.ko.Status.ACKResourceMetadata.ARN != nil { + input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN) + } + if desired.ko.Spec.ClientAuthentication != nil { + f0 := &svcsdktypes.ClientAuthentication{} + if desired.ko.Spec.ClientAuthentication.SASL != nil { + f0f0 := &svcsdktypes.Sasl{} + if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil && + desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil { + f0f0f0 := &svcsdktypes.Iam{ + Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled, + } + f0f0.Iam = f0f0f0 + } + if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil && + desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil { + f0f0f1 := &svcsdktypes.Scram{ + Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled, + } + f0f0.Scram = f0f0f1 + } + f0.Sasl = f0f0 + } + if desired.ko.Spec.ClientAuthentication.TLS != nil { + f0f1 := &svcsdktypes.Tls{} + if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil { + f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList) + } + if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil { + f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled + } + f0.Tls = f0f1 + } + if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil && + desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil { + f0.Unauthenticated = &svcsdktypes.Unauthenticated{ + Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled, + } + } + input.ClientAuthentication = f0 + } + + _, err = rm.sdkapi.UpdateSecurity(ctx, input) + rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err) + if err != nil { + return nil, err + } + ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil) + err = requeueAfterAsyncUpdate() + + case delta.DifferentAt("Spec.AssociatedSCRAMSecrets"): err = rm.syncAssociatedScramSecrets(ctx, updatedRes, latest) if err != nil { return nil, err } + // Set synced condition to True after successful update + ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil) + + case delta.DifferentAt("Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize"): + _, err := rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{ + ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN), + CurrentVersion: latest.ko.Status.CurrentVersion, + TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{ + { + KafkaBrokerNodeId: aws.String("ALL"), + VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)), + }, + }, + }) + rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err) + if err != nil { + return nil, err + } + message := fmt.Sprintf("kafka is updating broker storage") + ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil) + err = requeueAfterAsyncUpdate() + + case delta.DifferentAt("Spec.BrokerNodeGroupInfo.InstanceType"): + _, err := rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{ + ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN), + CurrentVersion: latest.ko.Status.CurrentVersion, + TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType, + }) + rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err) + if err != nil { + return nil, err + } + message := fmt.Sprintf("kafka is updating broker instanceType") + ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil) + err = requeueAfterAsyncUpdate() + + case delta.DifferentAt("Spec.NumberOfBrokerNodes"): + _, err := rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{ + ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN), + CurrentVersion: latest.ko.Status.CurrentVersion, + TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)), + }) + rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err) + if err != nil { + return nil, err + } + message := fmt.Sprintf("kafka is updating broker instanceType") + ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil) + err = requeueAfterAsyncUpdate() + } - // Set synced condition to True after successful update - ackcondition.SetSynced(updatedRes, corev1.ConditionTrue, nil, nil) - return updatedRes, nil + return updatedRes, err } // syncAssociatedScramSecrets examines the Secret ARNs in the supplied Cluster diff --git a/pkg/resource/cluster/sdk.go b/pkg/resource/cluster/sdk.go index bff6d1d..77edd59 100644 --- a/pkg/resource/cluster/sdk.go +++ b/pkg/resource/cluster/sdk.go @@ -191,6 +191,16 @@ func (rm *resourceManager) sdkFind( arn := ackv1alpha1.AWSResourceName(*resp.ClusterInfo.ClusterArn) ko.Status.ACKResourceMetadata.ARN = &arn } + if resp.ClusterInfo.ClusterName != nil { + ko.Spec.Name = resp.ClusterInfo.ClusterName + } else { + ko.Spec.Name = nil + } + if resp.ClusterInfo.CurrentVersion != nil { + ko.Status.CurrentVersion = resp.ClusterInfo.CurrentVersion + } else { + ko.Status.CurrentVersion = nil + } if resp.ClusterInfo.EncryptionInfo != nil { f9 := &svcapitypes.EncryptionInfo{} if resp.ClusterInfo.EncryptionInfo.EncryptionAtRest != nil { @@ -319,6 +329,11 @@ func (rm *resourceManager) sdkFind( } rm.setStatusDefaults(ko) + if resp.ClusterInfo.CurrentVersion != nil { + ko.Status.CurrentVersion = resp.ClusterInfo.CurrentVersion + } else { + ko.Status.CurrentVersion = nil + } if !clusterActive(&resource{ko}) { // Setting resource synced condition to false will trigger a requeue of // the resource. No need to return a requeue error here. @@ -675,6 +690,13 @@ func (rm *resourceManager) sdkDelete( if err := rm.syncAssociatedScramSecrets(ctx, &resource{ko: groupCpy}, r); err != nil { return nil, err } + if !clusterActive(r) { + // doing this to avoid BadRequestException + return r, ackrequeue.NeededAfter( + fmt.Errorf("waiting for cluster to be active before deletion"), + ackrequeue.DefaultRequeueAfterDuration, + ) + } input, err := rm.newDeleteRequestPayload(r) if err != nil { @@ -697,6 +719,9 @@ func (rm *resourceManager) newDeleteRequestPayload( if r.ko.Status.ACKResourceMetadata != nil && r.ko.Status.ACKResourceMetadata.ARN != nil { res.ClusterArn = (*string)(r.ko.Status.ACKResourceMetadata.ARN) } + if r.ko.Status.CurrentVersion != nil { + res.CurrentVersion = r.ko.Status.CurrentVersion + } return res, nil } diff --git a/templates/hooks/cluster/sdk_delete_pre_build_request.go.tpl b/templates/hooks/cluster/sdk_delete_pre_build_request.go.tpl index 403aace..58a27e6 100644 --- a/templates/hooks/cluster/sdk_delete_pre_build_request.go.tpl +++ b/templates/hooks/cluster/sdk_delete_pre_build_request.go.tpl @@ -9,3 +9,10 @@ if err := rm.syncAssociatedScramSecrets(ctx, &resource{ko: groupCpy}, r); err != nil { return nil, err } + if !clusterActive(r) { + // doing this to avoid BadRequestException + return r, ackrequeue.NeededAfter( + fmt.Errorf("waiting for cluster to be active before deletion"), + ackrequeue.DefaultRequeueAfterDuration, + ) + } diff --git a/templates/hooks/cluster/sdk_read_one_post_set_output.go.tpl b/templates/hooks/cluster/sdk_read_one_post_set_output.go.tpl index b37a69e..381130f 100644 --- a/templates/hooks/cluster/sdk_read_one_post_set_output.go.tpl +++ b/templates/hooks/cluster/sdk_read_one_post_set_output.go.tpl @@ -1,3 +1,8 @@ + if resp.ClusterInfo.CurrentVersion != nil { + ko.Status.CurrentVersion = resp.ClusterInfo.CurrentVersion + } else { + ko.Status.CurrentVersion = nil + } if !clusterActive(&resource{ko}) { // Setting resource synced condition to false will trigger a requeue of // the resource. No need to return a requeue error here. diff --git a/test/e2e/resources/cluster_simple.yaml b/test/e2e/resources/cluster_simple.yaml index 55280eb..a9a7f07 100644 --- a/test/e2e/resources/cluster_simple.yaml +++ b/test/e2e/resources/cluster_simple.yaml @@ -16,6 +16,9 @@ spec: clientSubnets: - $SUBNET_ID_1 - $SUBNET_ID_2 + storageInfo: + ebsStorageInfo: + volumeSize: 10 kafkaVersion: "3.3.1" # NOTE(jaypipes): Number of broker nodes need to be a multiple of the number # of subnets diff --git a/test/e2e/tests/test_cluster.py b/test/e2e/tests/test_cluster.py index e0f8d20..5cf5beb 100644 --- a/test/e2e/tests/test_cluster.py +++ b/test/e2e/tests/test_cluster.py @@ -32,8 +32,8 @@ # often 25+ minutes even for small clusters. We wait this amount of time before # even trying to fetch a cluster's state. CREATE_WAIT_AFTER_SECONDS = 180 -DELETE_WAIT_SECONDS = 30 -MODIFY_WAIT_AFTER_SECONDS = 10 +DELETE_WAIT_SECONDS = 300 +MODIFY_WAIT_AFTER_SECONDS = 1800 # Time to wait after the cluster has changed status, for the CR to update CHECK_STATUS_WAIT_SECONDS = 60 @@ -127,17 +127,48 @@ def test_crud(self, simple_cluster): assert len(latest_secrets) == 1 assert secret_1 in latest_secrets + updated_volume_size = cr['spec']['brokerNodeGroupInfo']['storageInfo']['ebsStorageInfo']['volumeSize'] + 10 + # associate one more secret to the cluster updates = { - "spec": {"associatedSCRAMSecrets": [secret_1, secret_2]}, + "spec": { + "associatedSCRAMSecrets": [secret_1, secret_2], + 'brokerNodeGroupInfo': { + "storageInfo": { + "ebsStorageInfo": { + "volumeSize": updated_volume_size + } + } + } + }, } k8s.patch_custom_resource(ref, updates) - time.sleep(MODIFY_WAIT_AFTER_SECONDS) + + assert k8s.wait_on_condition( + ref, + "ACK.ResourceSynced", + "True", + wait_periods=MODIFY_WAIT_AFTER_SECONDS, + ) + + cluster.wait_until( + cluster_arn, + cluster.state_matches("ACTIVE"), + ) latest_secrets = cluster.get_associated_scram_secrets(cluster_arn) assert len(latest_secrets) == 2 assert secret_1 in latest_secrets and secret_2 in latest_secrets + + latest_cluster = cluster.get_by_arn(cluster_arn) + assert latest_cluster is not None + + latest_volume = latest_cluster['BrokerNodeGroupInfo']["StorageInfo"]["EbsStorageInfo"]["VolumeSize"] + desired_volume = cr['spec']['brokerNodeGroupInfo']['storageInfo']['ebsStorageInfo']['volumeSize'] + + assert latest_volume == desired_volume == updated_volume_size + # remove all associated secrets updates = { "spec": {"associatedSCRAMSecrets": []}, From 385737432c5441e36677f0f95de4075fd194c3de Mon Sep 17 00:00:00 2001 From: michaelhtm <98621731+michaelhtm@users.noreply.github.com> Date: Fri, 28 Feb 2025 12:51:35 -0800 Subject: [PATCH 2/2] Handle BatchAssociateScramSecret errors --- apis/v1alpha1/ack-generate-metadata.yaml | 2 +- config/controller/kustomization.yaml | 2 +- helm/Chart.yaml | 4 +-- helm/templates/NOTES.txt | 2 +- helm/values.yaml | 2 +- pkg/resource/cluster/hooks.go | 38 +++++++++++++++++---- test/e2e/tests/test_cluster.py | 43 +++++++++++++++++------- 7 files changed, 68 insertions(+), 25 deletions(-) diff --git a/apis/v1alpha1/ack-generate-metadata.yaml b/apis/v1alpha1/ack-generate-metadata.yaml index 75da985..946dc04 100755 --- a/apis/v1alpha1/ack-generate-metadata.yaml +++ b/apis/v1alpha1/ack-generate-metadata.yaml @@ -1,5 +1,5 @@ ack_generate_info: - build_date: "2025-02-27T21:18:28Z" + build_date: "2025-03-04T23:55:37Z" build_hash: a326346bd3a6973254d247c9ab2dc76790c36241 go_version: go1.24.0 version: v0.43.2 diff --git a/config/controller/kustomization.yaml b/config/controller/kustomization.yaml index 8b59930..82c6b9e 100644 --- a/config/controller/kustomization.yaml +++ b/config/controller/kustomization.yaml @@ -6,4 +6,4 @@ kind: Kustomization images: - name: controller newName: public.ecr.aws/aws-controllers-k8s/kafka-controller - newTag: 1.0.2 + newTag: 1.0.5 diff --git a/helm/Chart.yaml b/helm/Chart.yaml index b5a618f..b8638f0 100644 --- a/helm/Chart.yaml +++ b/helm/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v1 name: kafka-chart description: A Helm chart for the ACK service controller for Amazon Managed Streaming for Apache Kafka (MSK) -version: 1.0.2 -appVersion: 1.0.2 +version: 1.0.5 +appVersion: 1.0.5 home: https://github.com/aws-controllers-k8s/kafka-controller icon: https://raw.githubusercontent.com/aws/eks-charts/master/docs/logo/aws.png sources: diff --git a/helm/templates/NOTES.txt b/helm/templates/NOTES.txt index 9164992..fbf1052 100644 --- a/helm/templates/NOTES.txt +++ b/helm/templates/NOTES.txt @@ -1,5 +1,5 @@ {{ .Chart.Name }} has been installed. -This chart deploys "public.ecr.aws/aws-controllers-k8s/kafka-controller:1.0.2". +This chart deploys "public.ecr.aws/aws-controllers-k8s/kafka-controller:1.0.5". Check its status by running: kubectl --namespace {{ .Release.Namespace }} get pods -l "app.kubernetes.io/instance={{ .Release.Name }}" diff --git a/helm/values.yaml b/helm/values.yaml index a0fc7ef..23a68a0 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -4,7 +4,7 @@ image: repository: public.ecr.aws/aws-controllers-k8s/kafka-controller - tag: 1.0.2 + tag: 1.0.5 pullPolicy: IfNotPresent pullSecrets: [] diff --git a/pkg/resource/cluster/hooks.go b/pkg/resource/cluster/hooks.go index b858e64..b47a3a2 100644 --- a/pkg/resource/cluster/hooks.go +++ b/pkg/resource/cluster/hooks.go @@ -23,6 +23,7 @@ import ( svcapitypes "github.com/aws-controllers-k8s/kafka-controller/apis/v1alpha1" ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare" ackcondition "github.com/aws-controllers-k8s/runtime/pkg/condition" + ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors" ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue" ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log" ackutil "github.com/aws-controllers-k8s/runtime/pkg/util" @@ -354,6 +355,16 @@ func (rm *resourceManager) getAssociatedScramSecrets( return res, err } +type unprocessedSecret struct { + errorCode string + errorMessage string + secretArn string +} + +func (us unprocessedSecret) String() string { + return fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s", us.errorCode, us.errorMessage, us.secretArn) +} + // batchAssociateScramSecret associates the supplied scram secrets to the supplied Cluster // resource func (rm *resourceManager) batchAssociateScramSecret( @@ -367,14 +378,27 @@ func (rm *resourceManager) batchAssociateScramSecret( input := &svcsdk.BatchAssociateScramSecretInput{} input.ClusterArn = (*string)(r.ko.Status.ACKResourceMetadata.ARN) - // Convert []*string to []string - unrefSecrets := make([]string, len(secretARNs)) - for i, s := range secretARNs { - unrefSecrets[i] = *s - } - input.SecretArnList = unrefSecrets - _, err = rm.sdkapi.BatchAssociateScramSecret(ctx, input) + input.SecretArnList = aws.ToStringSlice(secretARNs) + resp, err := rm.sdkapi.BatchAssociateScramSecret(ctx, input) rm.metrics.RecordAPICall("UPDATE", "BatchAssociateScramSecret", err) + if err != nil { + return err + } + + if len(resp.UnprocessedScramSecrets) > 0 { + unprocessedSecrets := []unprocessedSecret{} + for _, uss := range resp.UnprocessedScramSecrets { + us := unprocessedSecret{ + errorCode: aws.ToString(uss.ErrorCode), + errorMessage: aws.ToString(uss.ErrorMessage), + secretArn: aws.ToString(uss.SecretArn), + } + unprocessedSecrets = append(unprocessedSecrets, us) + } + + return ackerr.NewTerminalError(fmt.Errorf("Cant attach secret arns: %v", unprocessedSecrets)) + } + return err } diff --git a/test/e2e/tests/test_cluster.py b/test/e2e/tests/test_cluster.py index 5cf5beb..f6117c1 100644 --- a/test/e2e/tests/test_cluster.py +++ b/test/e2e/tests/test_cluster.py @@ -33,7 +33,8 @@ # even trying to fetch a cluster's state. CREATE_WAIT_AFTER_SECONDS = 180 DELETE_WAIT_SECONDS = 300 -MODIFY_WAIT_AFTER_SECONDS = 1800 +MODIFY_WAIT_AFTER_SECONDS = 10 +LONG_UPDATE_WAIT = 600 # Time to wait after the cluster has changed status, for the CR to update CHECK_STATUS_WAIT_SECONDS = 60 @@ -127,23 +128,14 @@ def test_crud(self, simple_cluster): assert len(latest_secrets) == 1 assert secret_1 in latest_secrets - updated_volume_size = cr['spec']['brokerNodeGroupInfo']['storageInfo']['ebsStorageInfo']['volumeSize'] + 10 - # associate one more secret to the cluster updates = { "spec": { "associatedSCRAMSecrets": [secret_1, secret_2], - 'brokerNodeGroupInfo': { - "storageInfo": { - "ebsStorageInfo": { - "volumeSize": updated_volume_size - } - } - } }, } k8s.patch_custom_resource(ref, updates) - + time.sleep(CHECK_STATUS_WAIT_SECONDS) assert k8s.wait_on_condition( ref, "ACK.ResourceSynced", @@ -160,14 +152,41 @@ def test_crud(self, simple_cluster): assert len(latest_secrets) == 2 assert secret_1 in latest_secrets and secret_2 in latest_secrets + updated_volume_size = cr['spec']['brokerNodeGroupInfo']['storageInfo']['ebsStorageInfo']['volumeSize'] + 10 + updates = { + "spec": { + 'brokerNodeGroupInfo': { + "storageInfo": { + "ebsStorageInfo": { + "volumeSize": updated_volume_size + } + } + } + } + } + k8s.patch_custom_resource(ref, updates) + time.sleep(CHECK_STATUS_WAIT_SECONDS) + assert k8s.wait_on_condition( + ref, + "ACK.ResourceSynced", + "True", + wait_periods=LONG_UPDATE_WAIT, + ) + + cluster.wait_until( + cluster_arn, + cluster.state_matches("ACTIVE"), + ) latest_cluster = cluster.get_by_arn(cluster_arn) assert latest_cluster is not None + + cr = k8s.get_resource(ref) latest_volume = latest_cluster['BrokerNodeGroupInfo']["StorageInfo"]["EbsStorageInfo"]["VolumeSize"] desired_volume = cr['spec']['brokerNodeGroupInfo']['storageInfo']['ebsStorageInfo']['volumeSize'] - assert latest_volume == desired_volume == updated_volume_size + assert latest_volume == desired_volume # remove all associated secrets updates = {