Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add update operations for VolumeSize, InstanceType, and BrokerCount #55

Merged
merged 2 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apis/v1alpha1/ack-generate-metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
ack_generate_info:
build_date: "2025-02-20T18:10:11Z"
build_date: "2025-03-04T23:55:37Z"
build_hash: a326346bd3a6973254d247c9ab2dc76790c36241
go_version: go1.24.0
version: v0.43.2
api_directory_checksum: eda989f20dde9f1b4331ffa67dc3b9a5ef0d64e4
api_directory_checksum: 36fbfad1e0bff98a14b120ba292a7f6b4e546fb4
api_version: v1alpha1
aws_sdk_go_version: v1.32.6
generator_config_info:
file_checksum: 5ea49df43c7aef08a9ac8b7171e9f50c3ed82e13
file_checksum: c641b5dd9aa81f1f42655f2afe9fcfb9dc7de696
original_file_name: generator.yaml
last_modification:
reason: API generation
3 changes: 3 additions & 0 deletions apis/v1alpha1/cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions apis/v1alpha1/generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ resources:
CreateCluster:
input_fields:
ClusterName: Name
DescribeCluster:
input_fields:
ClusterName: Name
hooks:
sdk_read_one_post_set_output:
template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl
Expand Down Expand Up @@ -95,6 +98,11 @@ resources:
BootstrapBrokerStringVpcConnectivityTls:
type: string
is_read_only: true
CurrentVersion:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need both spec.KafkaVersion and `CurrentVersion? do they serve the same purpose?

Copy link
Member Author

@michaelhtm michaelhtm Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CurrentVersion is just the version of the cluster..it's different values from KafkaVersion...also ClusterVersion comes from the Describe output smh

from:
operation: DescribeCluster
path: ClusterInfo.CurrentVersion
is_read_only: true
tags:
# TODO(jaypipes): Ignore tags for now... we will add support later
ignore: true
5 changes: 5 additions & 0 deletions apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions config/crd/bases/kafka.services.k8s.aws_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ spec:
- type
type: object
type: array
currentVersion:
description: The current version of the MSK cluster.
type: string
state:
description: |-
The state of the cluster. The possible states are ACTIVE, CREATING, DELETING,
Expand Down
8 changes: 8 additions & 0 deletions generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ resources:
CreateCluster:
input_fields:
ClusterName: Name
DescribeCluster:
input_fields:
ClusterName: Name
hooks:
sdk_read_one_post_set_output:
template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl
Expand Down Expand Up @@ -95,6 +98,11 @@ resources:
BootstrapBrokerStringVpcConnectivityTls:
type: string
is_read_only: true
CurrentVersion:
from:
operation: DescribeCluster
path: ClusterInfo.CurrentVersion
is_read_only: true
tags:
# TODO(jaypipes): Ignore tags for now... we will add support later
ignore: true
3 changes: 3 additions & 0 deletions helm/crds/kafka.services.k8s.aws_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ spec:
- type
type: object
type: array
currentVersion:
description: The current version of the MSK cluster.
type: string
state:
description: |-
The state of the cluster. The possible states are ACTIVE, CREATING, DELETING,
Expand Down
170 changes: 153 additions & 17 deletions pkg/resource/cluster/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"errors"
"fmt"
"strings"
"time"

svcapitypes "github.com/aws-controllers-k8s/kafka-controller/apis/v1alpha1"
ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
ackcondition "github.com/aws-controllers-k8s/runtime/pkg/condition"
ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
ackutil "github.com/aws-controllers-k8s/runtime/pkg/util"
Expand All @@ -38,6 +40,7 @@ var (
string(svcsdktypes.ClusterStateDeleting),
string(svcsdktypes.ClusterStateFailed),
}
RequeueAfterUpdateDuration = 15 * time.Second
)

var (
Expand Down Expand Up @@ -113,6 +116,18 @@ func clusterDeleting(r *resource) bool {
return cs == strings.ToLower(string(svcsdktypes.ClusterStateDeleting))
}

// requeueAfterAsyncUpdate returns a `ackrequeue.RequeueNeededAfter` struct
// explaining the cluster cannot be modified until after the asynchronous update
// has (first, started and then) completed and the cluster reaches an active
// status.
func requeueAfterAsyncUpdate() *ackrequeue.RequeueNeededAfter {
return ackrequeue.NeededAfter(
fmt.Errorf("cluster has started asynchronously updating, cannot be modified until '%s'",
"Active"),
RequeueAfterUpdateDuration,
)
}

func (rm *resourceManager) customUpdate(
ctx context.Context,
desired *resource,
Expand All @@ -133,12 +148,6 @@ func (rm *resourceManager) customUpdate(
// Copy status from latest since it has the current cluster state
updatedRes.ko.Status = latest.ko.Status

if clusterDeleting(latest) {
msg := "Cluster is currently being deleted"
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &msg, nil)
return updatedRes, requeueWaitWhileDeleting
}

if !clusterActive(latest) {
msg := "Cluster is in '" + *latest.ko.Status.State + "' state"
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &msg, nil)
Expand All @@ -149,16 +158,120 @@ func (rm *resourceManager) customUpdate(
return updatedRes, requeueWaitUntilCanModify(latest)
}

if delta.DifferentAt("Spec.AssociatedSCRAMSecrets") {
switch {
case delta.DifferentAt("Spec.ClientAuthentication"):
input := &svcsdk.UpdateSecurityInput{}
if desired.ko.Status.CurrentVersion != nil {
input.CurrentVersion = desired.ko.Status.CurrentVersion
}
if desired.ko.Status.ACKResourceMetadata.ARN != nil {
input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN)
}
if desired.ko.Spec.ClientAuthentication != nil {
f0 := &svcsdktypes.ClientAuthentication{}
if desired.ko.Spec.ClientAuthentication.SASL != nil {
f0f0 := &svcsdktypes.Sasl{}
if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil &&
desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil {
f0f0f0 := &svcsdktypes.Iam{
Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled,
}
f0f0.Iam = f0f0f0
}
if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil &&
desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil {
f0f0f1 := &svcsdktypes.Scram{
Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled,
}
f0f0.Scram = f0f0f1
}
f0.Sasl = f0f0
}
if desired.ko.Spec.ClientAuthentication.TLS != nil {
f0f1 := &svcsdktypes.Tls{}
if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil {
f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList)
}
if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil {
f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled
}
f0.Tls = f0f1
}
if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil &&
desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil {
f0.Unauthenticated = &svcsdktypes.Unauthenticated{
Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled,
}
}
input.ClientAuthentication = f0
}

_, err = rm.sdkapi.UpdateSecurity(ctx, input)
rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err)
if err != nil {
return nil, err
}
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)
err = requeueAfterAsyncUpdate()

Comment on lines +163 to +216
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a function to hide this logic?

case delta.DifferentAt("Spec.AssociatedSCRAMSecrets"):
err = rm.syncAssociatedScramSecrets(ctx, updatedRes, latest)
if err != nil {
return nil, err
}
// Set synced condition to True after successful update
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)

case delta.DifferentAt("Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize"):
_, err := rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
CurrentVersion: latest.ko.Status.CurrentVersion,
TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{
{
KafkaBrokerNodeId: aws.String("ALL"),
VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)),
},
},
})
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err)
if err != nil {
return nil, err
}
message := fmt.Sprintf("kafka is updating broker storage")
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
err = requeueAfterAsyncUpdate()
Comment on lines +226 to +242
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


case delta.DifferentAt("Spec.BrokerNodeGroupInfo.InstanceType"):
_, err := rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
CurrentVersion: latest.ko.Status.CurrentVersion,
TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType,
})
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err)
if err != nil {
return nil, err
}
message := fmt.Sprintf("kafka is updating broker instanceType")
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
Comment on lines +245 to +255
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

err = requeueAfterAsyncUpdate()

case delta.DifferentAt("Spec.NumberOfBrokerNodes"):
_, err := rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
CurrentVersion: latest.ko.Status.CurrentVersion,
TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)),
})
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err)
if err != nil {
return nil, err
}
message := fmt.Sprintf("kafka is updating broker instanceType")
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
err = requeueAfterAsyncUpdate()

}

// Set synced condition to True after successful update
ackcondition.SetSynced(updatedRes, corev1.ConditionTrue, nil, nil)
return updatedRes, nil
return updatedRes, err
}

// syncAssociatedScramSecrets examines the Secret ARNs in the supplied Cluster
Expand Down Expand Up @@ -242,6 +355,16 @@ func (rm *resourceManager) getAssociatedScramSecrets(
return res, err
}

type unprocessedSecret struct {
errorCode string
errorMessage string
secretArn string
}

func (us unprocessedSecret) String() string {
return fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s", us.errorCode, us.errorMessage, us.secretArn)
}
Comment on lines +358 to +366
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about making an error type, encapsulating all the unprocessed secrets. And implement an Error method that formats it all for you. I guess that's what you're trying to do in L388+

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm, not sure how we can achieve that since each secret may have its own reason why it can't be attached...i don't think i can return a slice of errors..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless maybe if i make all three fields a list of strings..after i would just need to fix how I represent it as a string


// batchAssociateScramSecret associates the supplied scram secrets to the supplied Cluster
// resource
func (rm *resourceManager) batchAssociateScramSecret(
Expand All @@ -255,14 +378,27 @@ func (rm *resourceManager) batchAssociateScramSecret(

input := &svcsdk.BatchAssociateScramSecretInput{}
input.ClusterArn = (*string)(r.ko.Status.ACKResourceMetadata.ARN)
// Convert []*string to []string
unrefSecrets := make([]string, len(secretARNs))
for i, s := range secretARNs {
unrefSecrets[i] = *s
}
input.SecretArnList = unrefSecrets
_, err = rm.sdkapi.BatchAssociateScramSecret(ctx, input)
input.SecretArnList = aws.ToStringSlice(secretARNs)
resp, err := rm.sdkapi.BatchAssociateScramSecret(ctx, input)
rm.metrics.RecordAPICall("UPDATE", "BatchAssociateScramSecret", err)
if err != nil {
return err
}

if len(resp.UnprocessedScramSecrets) > 0 {
unprocessedSecrets := []unprocessedSecret{}
for _, uss := range resp.UnprocessedScramSecrets {
us := unprocessedSecret{
errorCode: aws.ToString(uss.ErrorCode),
errorMessage: aws.ToString(uss.ErrorMessage),
secretArn: aws.ToString(uss.SecretArn),
}
unprocessedSecrets = append(unprocessedSecrets, us)
}

return ackerr.NewTerminalError(fmt.Errorf("Cant attach secret arns: %v", unprocessedSecrets))
}

return err
}

Expand Down
25 changes: 25 additions & 0 deletions pkg/resource/cluster/sdk.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions templates/hooks/cluster/sdk_delete_pre_build_request.go.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@
if err := rm.syncAssociatedScramSecrets(ctx, &resource{ko: groupCpy}, r); err != nil {
return nil, err
}
if !clusterActive(r) {
// doing this to avoid BadRequestException
return r, ackrequeue.NeededAfter(
fmt.Errorf("waiting for cluster to be active before deletion"),
ackrequeue.DefaultRequeueAfterDuration,
)
}
Loading