Skip to content

Commit c40ba8f

Browse files
committed
Add update operations for VolumeSize, InstanceType, and BrokerCount
Also added reconcile before deletion attempt if cluster is not active. Before, if we tried deleting a cluster that was not active, the controller would set it to terminal, and does not reconcile.
1 parent acdc0c7 commit c40ba8f

17 files changed

+235
-22
lines changed
+3-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
ack_generate_info:
2-
build_date: "2025-02-20T18:10:11Z"
2+
build_date: "2025-02-27T21:18:28Z"
33
build_hash: a326346bd3a6973254d247c9ab2dc76790c36241
44
go_version: go1.24.0
55
version: v0.43.2
6-
api_directory_checksum: eda989f20dde9f1b4331ffa67dc3b9a5ef0d64e4
6+
api_directory_checksum: 36fbfad1e0bff98a14b120ba292a7f6b4e546fb4
77
api_version: v1alpha1
88
aws_sdk_go_version: v1.32.6
99
generator_config_info:
10-
file_checksum: 5ea49df43c7aef08a9ac8b7171e9f50c3ed82e13
10+
file_checksum: c641b5dd9aa81f1f42655f2afe9fcfb9dc7de696
1111
original_file_name: generator.yaml
1212
last_modification:
1313
reason: API generation

apis/v1alpha1/cluster.go

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apis/v1alpha1/generator.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ resources:
2323
CreateCluster:
2424
input_fields:
2525
ClusterName: Name
26+
DescribeCluster:
27+
input_fields:
28+
ClusterName: Name
2629
hooks:
2730
sdk_read_one_post_set_output:
2831
template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl
@@ -95,6 +98,11 @@ resources:
9598
BootstrapBrokerStringVpcConnectivityTls:
9699
type: string
97100
is_read_only: true
101+
CurrentVersion:
102+
from:
103+
operation: DescribeCluster
104+
path: ClusterInfo.CurrentVersion
105+
is_read_only: true
98106
tags:
99107
# TODO(jaypipes): Ignore tags for now... we will add support later
100108
ignore: true

apis/v1alpha1/zz_generated.deepcopy.go

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/controller/kustomization.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ kind: Kustomization
66
images:
77
- name: controller
88
newName: public.ecr.aws/aws-controllers-k8s/kafka-controller
9-
newTag: 1.0.5
9+
newTag: 1.0.2

config/crd/bases/kafka.services.k8s.aws_clusters.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ spec:
356356
- type
357357
type: object
358358
type: array
359+
currentVersion:
360+
description: The current version of the MSK cluster.
361+
type: string
359362
state:
360363
description: |-
361364
The state of the cluster. The possible states are ACTIVE, CREATING, DELETING,

generator.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ resources:
2323
CreateCluster:
2424
input_fields:
2525
ClusterName: Name
26+
DescribeCluster:
27+
input_fields:
28+
ClusterName: Name
2629
hooks:
2730
sdk_read_one_post_set_output:
2831
template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl
@@ -95,6 +98,11 @@ resources:
9598
BootstrapBrokerStringVpcConnectivityTls:
9699
type: string
97100
is_read_only: true
101+
CurrentVersion:
102+
from:
103+
operation: DescribeCluster
104+
path: ClusterInfo.CurrentVersion
105+
is_read_only: true
98106
tags:
99107
# TODO(jaypipes): Ignore tags for now... we will add support later
100108
ignore: true

helm/Chart.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
apiVersion: v1
22
name: kafka-chart
33
description: A Helm chart for the ACK service controller for Amazon Managed Streaming for Apache Kafka (MSK)
4-
version: 1.0.5
5-
appVersion: 1.0.5
4+
version: 1.0.2
5+
appVersion: 1.0.2
66
home: https://github.com/aws-controllers-k8s/kafka-controller
77
icon: https://raw.githubusercontent.com/aws/eks-charts/master/docs/logo/aws.png
88
sources:

helm/crds/kafka.services.k8s.aws_clusters.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ spec:
356356
- type
357357
type: object
358358
type: array
359+
currentVersion:
360+
description: The current version of the MSK cluster.
361+
type: string
359362
state:
360363
description: |-
361364
The state of the cluster. The possible states are ACTIVE, CREATING, DELETING,

helm/templates/NOTES.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{{ .Chart.Name }} has been installed.
2-
This chart deploys "public.ecr.aws/aws-controllers-k8s/kafka-controller:1.0.5".
2+
This chart deploys "public.ecr.aws/aws-controllers-k8s/kafka-controller:1.0.2".
33

44
Check its status by running:
55
kubectl --namespace {{ .Release.Namespace }} get pods -l "app.kubernetes.io/instance={{ .Release.Name }}"

helm/values.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
image:
66
repository: public.ecr.aws/aws-controllers-k8s/kafka-controller
7-
tag: 1.0.5
7+
tag: 1.0.2
88
pullPolicy: IfNotPresent
99
pullSecrets: []
1010

pkg/resource/cluster/hooks.go

+122-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"strings"
21+
"time"
2122

2223
svcapitypes "github.com/aws-controllers-k8s/kafka-controller/apis/v1alpha1"
2324
ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
@@ -38,6 +39,7 @@ var (
3839
string(svcsdktypes.ClusterStateDeleting),
3940
string(svcsdktypes.ClusterStateFailed),
4041
}
42+
RequeueAfterUpdateDuration = 15 * time.Second
4143
)
4244

4345
var (
@@ -113,6 +115,18 @@ func clusterDeleting(r *resource) bool {
113115
return cs == strings.ToLower(string(svcsdktypes.ClusterStateDeleting))
114116
}
115117

118+
// requeueAfterAsyncUpdate returns a `ackrequeue.RequeueNeededAfter` struct
119+
// explaining the cluster cannot be modified until after the asynchronous update
120+
// has (first, started and then) completed and the cluster reaches an active
121+
// status.
122+
func requeueAfterAsyncUpdate() *ackrequeue.RequeueNeededAfter {
123+
return ackrequeue.NeededAfter(
124+
fmt.Errorf("cluster has started asynchronously updating, cannot be modified until '%s'",
125+
"Active"),
126+
RequeueAfterUpdateDuration,
127+
)
128+
}
129+
116130
func (rm *resourceManager) customUpdate(
117131
ctx context.Context,
118132
desired *resource,
@@ -133,12 +147,6 @@ func (rm *resourceManager) customUpdate(
133147
// Copy status from latest since it has the current cluster state
134148
updatedRes.ko.Status = latest.ko.Status
135149

136-
if clusterDeleting(latest) {
137-
msg := "Cluster is currently being deleted"
138-
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &msg, nil)
139-
return updatedRes, requeueWaitWhileDeleting
140-
}
141-
142150
if !clusterActive(latest) {
143151
msg := "Cluster is in '" + *latest.ko.Status.State + "' state"
144152
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &msg, nil)
@@ -149,16 +157,120 @@ func (rm *resourceManager) customUpdate(
149157
return updatedRes, requeueWaitUntilCanModify(latest)
150158
}
151159

152-
if delta.DifferentAt("Spec.AssociatedSCRAMSecrets") {
160+
switch {
161+
case delta.DifferentAt("Spec.ClientAuthentication"):
162+
input := &svcsdk.UpdateSecurityInput{}
163+
if desired.ko.Status.CurrentVersion != nil {
164+
input.CurrentVersion = desired.ko.Status.CurrentVersion
165+
}
166+
if desired.ko.Status.ACKResourceMetadata.ARN != nil {
167+
input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN)
168+
}
169+
if desired.ko.Spec.ClientAuthentication != nil {
170+
f0 := &svcsdktypes.ClientAuthentication{}
171+
if desired.ko.Spec.ClientAuthentication.SASL != nil {
172+
f0f0 := &svcsdktypes.Sasl{}
173+
if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil &&
174+
desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil {
175+
f0f0f0 := &svcsdktypes.Iam{
176+
Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled,
177+
}
178+
f0f0.Iam = f0f0f0
179+
}
180+
if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil &&
181+
desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil {
182+
f0f0f1 := &svcsdktypes.Scram{
183+
Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled,
184+
}
185+
f0f0.Scram = f0f0f1
186+
}
187+
f0.Sasl = f0f0
188+
}
189+
if desired.ko.Spec.ClientAuthentication.TLS != nil {
190+
f0f1 := &svcsdktypes.Tls{}
191+
if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil {
192+
f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList)
193+
}
194+
if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil {
195+
f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled
196+
}
197+
f0.Tls = f0f1
198+
}
199+
if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil &&
200+
desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil {
201+
f0.Unauthenticated = &svcsdktypes.Unauthenticated{
202+
Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled,
203+
}
204+
}
205+
input.ClientAuthentication = f0
206+
}
207+
208+
_, err = rm.sdkapi.UpdateSecurity(ctx, input)
209+
rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err)
210+
if err != nil {
211+
return nil, err
212+
}
213+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)
214+
err = requeueAfterAsyncUpdate()
215+
216+
case delta.DifferentAt("Spec.AssociatedSCRAMSecrets"):
153217
err = rm.syncAssociatedScramSecrets(ctx, updatedRes, latest)
154218
if err != nil {
155219
return nil, err
156220
}
221+
// Set synced condition to True after successful update
222+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)
223+
224+
case delta.DifferentAt("Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize"):
225+
_, err := rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{
226+
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
227+
CurrentVersion: latest.ko.Status.CurrentVersion,
228+
TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{
229+
{
230+
KafkaBrokerNodeId: aws.String("ALL"),
231+
VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)),
232+
},
233+
},
234+
})
235+
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err)
236+
if err != nil {
237+
return nil, err
238+
}
239+
message := fmt.Sprintf("kafka is updating broker storage")
240+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
241+
err = requeueAfterAsyncUpdate()
242+
243+
case delta.DifferentAt("Spec.BrokerNodeGroupInfo.InstanceType"):
244+
_, err := rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{
245+
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
246+
CurrentVersion: latest.ko.Status.CurrentVersion,
247+
TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType,
248+
})
249+
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err)
250+
if err != nil {
251+
return nil, err
252+
}
253+
message := fmt.Sprintf("kafka is updating broker instanceType")
254+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
255+
err = requeueAfterAsyncUpdate()
256+
257+
case delta.DifferentAt("Spec.NumberOfBrokerNodes"):
258+
_, err := rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{
259+
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
260+
CurrentVersion: latest.ko.Status.CurrentVersion,
261+
TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)),
262+
})
263+
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err)
264+
if err != nil {
265+
return nil, err
266+
}
267+
message := fmt.Sprintf("kafka is updating broker instanceType")
268+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
269+
err = requeueAfterAsyncUpdate()
270+
157271
}
158272

159-
// Set synced condition to True after successful update
160-
ackcondition.SetSynced(updatedRes, corev1.ConditionTrue, nil, nil)
161-
return updatedRes, nil
273+
return updatedRes, err
162274
}
163275

164276
// syncAssociatedScramSecrets examines the Secret ARNs in the supplied Cluster

pkg/resource/cluster/sdk.go

+25
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

templates/hooks/cluster/sdk_delete_pre_build_request.go.tpl

+7
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,10 @@
99
if err := rm.syncAssociatedScramSecrets(ctx, &resource{ko: groupCpy}, r); err != nil {
1010
return nil, err
1111
}
12+
if !clusterActive(r) {
13+
// doing this to avoid BadRequestException
14+
return r, ackrequeue.NeededAfter(
15+
fmt.Errorf("waiting for cluster to be active before deletion"),
16+
ackrequeue.DefaultRequeueAfterDuration,
17+
)
18+
}

templates/hooks/cluster/sdk_read_one_post_set_output.go.tpl

+5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
if resp.ClusterInfo.CurrentVersion != nil {
2+
ko.Status.CurrentVersion = resp.ClusterInfo.CurrentVersion
3+
} else {
4+
ko.Status.CurrentVersion = nil
5+
}
16
if !clusterActive(&resource{ko}) {
27
// Setting resource synced condition to false will trigger a requeue of
38
// the resource. No need to return a requeue error here.

test/e2e/resources/cluster_simple.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ spec:
1616
clientSubnets:
1717
- $SUBNET_ID_1
1818
- $SUBNET_ID_2
19+
storageInfo:
20+
ebsStorageInfo:
21+
volumeSize: 10
1922
kafkaVersion: "3.3.1"
2023
# NOTE(jaypipes): Number of broker nodes need to be a multiple of the number
2124
# of subnets

0 commit comments

Comments
 (0)