Skip to content

Commit 3857374

Browse files
committed
Handle BatchAssociateScramSecret errors
1 parent c40ba8f commit 3857374

File tree

7 files changed

+68
-25
lines changed

7 files changed

+68
-25
lines changed

apis/v1alpha1/ack-generate-metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
ack_generate_info:
2-
build_date: "2025-02-27T21:18:28Z"
2+
build_date: "2025-03-04T23:55:37Z"
33
build_hash: a326346bd3a6973254d247c9ab2dc76790c36241
44
go_version: go1.24.0
55
version: v0.43.2

config/controller/kustomization.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ kind: Kustomization
66
images:
77
- name: controller
88
newName: public.ecr.aws/aws-controllers-k8s/kafka-controller
9-
newTag: 1.0.2
9+
newTag: 1.0.5

helm/Chart.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
apiVersion: v1
22
name: kafka-chart
33
description: A Helm chart for the ACK service controller for Amazon Managed Streaming for Apache Kafka (MSK)
4-
version: 1.0.2
5-
appVersion: 1.0.2
4+
version: 1.0.5
5+
appVersion: 1.0.5
66
home: https://github.com/aws-controllers-k8s/kafka-controller
77
icon: https://raw.githubusercontent.com/aws/eks-charts/master/docs/logo/aws.png
88
sources:

helm/templates/NOTES.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{{ .Chart.Name }} has been installed.
2-
This chart deploys "public.ecr.aws/aws-controllers-k8s/kafka-controller:1.0.2".
2+
This chart deploys "public.ecr.aws/aws-controllers-k8s/kafka-controller:1.0.5".
33

44
Check its status by running:
55
kubectl --namespace {{ .Release.Namespace }} get pods -l "app.kubernetes.io/instance={{ .Release.Name }}"

helm/values.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
image:
66
repository: public.ecr.aws/aws-controllers-k8s/kafka-controller
7-
tag: 1.0.2
7+
tag: 1.0.5
88
pullPolicy: IfNotPresent
99
pullSecrets: []
1010

pkg/resource/cluster/hooks.go

+31-7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
svcapitypes "github.com/aws-controllers-k8s/kafka-controller/apis/v1alpha1"
2424
ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
2525
ackcondition "github.com/aws-controllers-k8s/runtime/pkg/condition"
26+
ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
2627
ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
2728
ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
2829
ackutil "github.com/aws-controllers-k8s/runtime/pkg/util"
@@ -354,6 +355,16 @@ func (rm *resourceManager) getAssociatedScramSecrets(
354355
return res, err
355356
}
356357

358+
type unprocessedSecret struct {
359+
errorCode string
360+
errorMessage string
361+
secretArn string
362+
}
363+
364+
func (us unprocessedSecret) String() string {
365+
return fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s", us.errorCode, us.errorMessage, us.secretArn)
366+
}
367+
357368
// batchAssociateScramSecret associates the supplied scram secrets to the supplied Cluster
358369
// resource
359370
func (rm *resourceManager) batchAssociateScramSecret(
@@ -367,14 +378,27 @@ func (rm *resourceManager) batchAssociateScramSecret(
367378

368379
input := &svcsdk.BatchAssociateScramSecretInput{}
369380
input.ClusterArn = (*string)(r.ko.Status.ACKResourceMetadata.ARN)
370-
// Convert []*string to []string
371-
unrefSecrets := make([]string, len(secretARNs))
372-
for i, s := range secretARNs {
373-
unrefSecrets[i] = *s
374-
}
375-
input.SecretArnList = unrefSecrets
376-
_, err = rm.sdkapi.BatchAssociateScramSecret(ctx, input)
381+
input.SecretArnList = aws.ToStringSlice(secretARNs)
382+
resp, err := rm.sdkapi.BatchAssociateScramSecret(ctx, input)
377383
rm.metrics.RecordAPICall("UPDATE", "BatchAssociateScramSecret", err)
384+
if err != nil {
385+
return err
386+
}
387+
388+
if len(resp.UnprocessedScramSecrets) > 0 {
389+
unprocessedSecrets := []unprocessedSecret{}
390+
for _, uss := range resp.UnprocessedScramSecrets {
391+
us := unprocessedSecret{
392+
errorCode: aws.ToString(uss.ErrorCode),
393+
errorMessage: aws.ToString(uss.ErrorMessage),
394+
secretArn: aws.ToString(uss.SecretArn),
395+
}
396+
unprocessedSecrets = append(unprocessedSecrets, us)
397+
}
398+
399+
return ackerr.NewTerminalError(fmt.Errorf("Cant attach secret arns: %v", unprocessedSecrets))
400+
}
401+
378402
return err
379403
}
380404

test/e2e/tests/test_cluster.py

+31-12
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
# even trying to fetch a cluster's state.
3434
CREATE_WAIT_AFTER_SECONDS = 180
3535
DELETE_WAIT_SECONDS = 300
36-
MODIFY_WAIT_AFTER_SECONDS = 1800
36+
MODIFY_WAIT_AFTER_SECONDS = 10
37+
LONG_UPDATE_WAIT = 600
3738

3839
# Time to wait after the cluster has changed status, for the CR to update
3940
CHECK_STATUS_WAIT_SECONDS = 60
@@ -127,23 +128,14 @@ def test_crud(self, simple_cluster):
127128
assert len(latest_secrets) == 1
128129
assert secret_1 in latest_secrets
129130

130-
updated_volume_size = cr['spec']['brokerNodeGroupInfo']['storageInfo']['ebsStorageInfo']['volumeSize'] + 10
131-
132131
# associate one more secret to the cluster
133132
updates = {
134133
"spec": {
135134
"associatedSCRAMSecrets": [secret_1, secret_2],
136-
'brokerNodeGroupInfo': {
137-
"storageInfo": {
138-
"ebsStorageInfo": {
139-
"volumeSize": updated_volume_size
140-
}
141-
}
142-
}
143135
},
144136
}
145137
k8s.patch_custom_resource(ref, updates)
146-
138+
time.sleep(CHECK_STATUS_WAIT_SECONDS)
147139
assert k8s.wait_on_condition(
148140
ref,
149141
"ACK.ResourceSynced",
@@ -160,14 +152,41 @@ def test_crud(self, simple_cluster):
160152
assert len(latest_secrets) == 2
161153
assert secret_1 in latest_secrets and secret_2 in latest_secrets
162154

155+
updated_volume_size = cr['spec']['brokerNodeGroupInfo']['storageInfo']['ebsStorageInfo']['volumeSize'] + 10
156+
updates = {
157+
"spec": {
158+
'brokerNodeGroupInfo': {
159+
"storageInfo": {
160+
"ebsStorageInfo": {
161+
"volumeSize": updated_volume_size
162+
}
163+
}
164+
}
165+
}
166+
}
167+
k8s.patch_custom_resource(ref, updates)
168+
time.sleep(CHECK_STATUS_WAIT_SECONDS)
169+
assert k8s.wait_on_condition(
170+
ref,
171+
"ACK.ResourceSynced",
172+
"True",
173+
wait_periods=LONG_UPDATE_WAIT,
174+
)
175+
176+
cluster.wait_until(
177+
cluster_arn,
178+
cluster.state_matches("ACTIVE"),
179+
)
163180

164181
latest_cluster = cluster.get_by_arn(cluster_arn)
165182
assert latest_cluster is not None
183+
184+
cr = k8s.get_resource(ref)
166185

167186
latest_volume = latest_cluster['BrokerNodeGroupInfo']["StorageInfo"]["EbsStorageInfo"]["VolumeSize"]
168187
desired_volume = cr['spec']['brokerNodeGroupInfo']['storageInfo']['ebsStorageInfo']['volumeSize']
169188

170-
assert latest_volume == desired_volume == updated_volume_size
189+
assert latest_volume == desired_volume
171190

172191
# remove all associated secrets
173192
updates = {

0 commit comments

Comments
 (0)