diff --git a/pkg/resource/cluster/hooks.go b/pkg/resource/cluster/hooks.go index b47a3a2..043410b 100644 --- a/pkg/resource/cluster/hooks.go +++ b/pkg/resource/cluster/hooks.go @@ -160,118 +160,175 @@ func (rm *resourceManager) customUpdate( switch { case delta.DifferentAt("Spec.ClientAuthentication"): - input := &svcsdk.UpdateSecurityInput{} - if desired.ko.Status.CurrentVersion != nil { - input.CurrentVersion = desired.ko.Status.CurrentVersion - } - if desired.ko.Status.ACKResourceMetadata.ARN != nil { - input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN) - } - if desired.ko.Spec.ClientAuthentication != nil { - f0 := &svcsdktypes.ClientAuthentication{} - if desired.ko.Spec.ClientAuthentication.SASL != nil { - f0f0 := &svcsdktypes.Sasl{} - if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil && - desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil { - f0f0f0 := &svcsdktypes.Iam{ - Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled, - } - f0f0.Iam = f0f0f0 - } - if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil && - desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil { - f0f0f1 := &svcsdktypes.Scram{ - Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled, - } - f0f0.Scram = f0f0f1 - } - f0.Sasl = f0f0 - } - if desired.ko.Spec.ClientAuthentication.TLS != nil { - f0f1 := &svcsdktypes.Tls{} - if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil { - f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList) - } - if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil { - f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled - } - f0.Tls = f0f1 - } - if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil && - desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil { - f0.Unauthenticated = &svcsdktypes.Unauthenticated{ - Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled, - } - } - input.ClientAuthentication = f0 - } - - _, err = rm.sdkapi.UpdateSecurity(ctx, input) - rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err) - if err != nil { - return nil, err - } - ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil) - err = requeueAfterAsyncUpdate() - + return rm.updateClientAuthentication(ctx, updatedRes, latest) case delta.DifferentAt("Spec.AssociatedSCRAMSecrets"): err = rm.syncAssociatedScramSecrets(ctx, updatedRes, latest) if err != nil { - return nil, err + return latest, err } - // Set synced condition to True after successful update - ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil) + return updatedRes, requeueAfterAsyncUpdate() case delta.DifferentAt("Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize"): - _, err := rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{ - ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN), - CurrentVersion: latest.ko.Status.CurrentVersion, - TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{ - { - KafkaBrokerNodeId: aws.String("ALL"), - VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)), - }, - }, - }) - rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err) - if err != nil { - return nil, err - } - message := fmt.Sprintf("kafka is updating broker storage") - ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil) - err = requeueAfterAsyncUpdate() + return rm.updateBrokerStorage(ctx, updatedRes, latest) case delta.DifferentAt("Spec.BrokerNodeGroupInfo.InstanceType"): - _, err := rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{ - ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN), - CurrentVersion: latest.ko.Status.CurrentVersion, - TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType, - }) - rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err) - if err != nil { - return nil, err - } - message := fmt.Sprintf("kafka is updating broker instanceType") - ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil) - err = requeueAfterAsyncUpdate() + return rm.updateBrokerType(ctx, desired, latest) case delta.DifferentAt("Spec.NumberOfBrokerNodes"): - _, err := rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{ - ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN), - CurrentVersion: latest.ko.Status.CurrentVersion, - TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)), - }) - rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err) - if err != nil { - return nil, err - } - message := fmt.Sprintf("kafka is updating broker instanceType") - ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil) - err = requeueAfterAsyncUpdate() + return rm.updateNumberOfBrokerNodes(ctx, desired, latest) + } + + return updatedRes, nil +} + +// updateNumberOfBrokerNodes updates the number of broker +// nodes for the kafka cluster +func (rm *resourceManager) updateNumberOfBrokerNodes( + ctx context.Context, + desired *resource, + latest *resource, +) (updatedRes *resource, err error) { + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.updateNumberOfBrokerNodes") + defer func() { exit(err) }() + + _, err = rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{ + ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN), + CurrentVersion: latest.ko.Status.CurrentVersion, + TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)), + }) + rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err) + if err != nil { + return latest, err + } + message := fmt.Sprintf("kafka is updating broker number of broker nodes") + ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil) + + return desired, requeueAfterAsyncUpdate() +} + + +// updateBrokerType updates the broker type of the +// kafka cluster +func (rm *resourceManager) updateBrokerType( + ctx context.Context, + desired *resource, + latest *resource, +) (updatedRes *resource, err error) { + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.updateBrokerType") + defer func() { exit(err) }() + _, err = rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{ + ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN), + CurrentVersion: latest.ko.Status.CurrentVersion, + TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType, + }) + rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err) + if err != nil { + return nil, err + } + message := fmt.Sprintf("kafka is updating broker instanceType") + ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil) + + return desired, requeueAfterAsyncUpdate() +} + +// updateBrokerStorate updates the volumeSize of the +// kafka cluster broker storage +func (rm *resourceManager) updateBrokerStorage( + ctx context.Context, + desired *resource, + latest *resource, +) (updatedRes *resource, err error) { + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.updateBrokerStorage") + defer func() { exit(err) }() + _, err = rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{ + ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN), + CurrentVersion: latest.ko.Status.CurrentVersion, + TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{ + { + KafkaBrokerNodeId: aws.String("ALL"), + VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)), + }, + }, + }) + rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err) + if err != nil { + return nil, err } + message := fmt.Sprintf("kafka is updating broker storage") + ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil) + return desired, requeueAfterAsyncUpdate() +} + +// updateClientAuthentication updates the kafka cluster +// authentication settings +func (rm *resourceManager) updateClientAuthentication( + ctx context.Context, + desired *resource, + latest *resource, +) (updatedRes *resource, err error) { + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.updateClientAuthentication") + defer func() { exit(err) }() - return updatedRes, err + input := &svcsdk.UpdateSecurityInput{} + if latest.ko.Status.CurrentVersion != nil { + input.CurrentVersion = desired.ko.Status.CurrentVersion + } + if latest.ko.Status.ACKResourceMetadata.ARN != nil { + input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN) + } + if desired.ko.Spec.ClientAuthentication != nil { + f0 := &svcsdktypes.ClientAuthentication{} + if desired.ko.Spec.ClientAuthentication.SASL != nil { + f0f0 := &svcsdktypes.Sasl{} + if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil && + desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil { + f0f0f0 := &svcsdktypes.Iam{ + Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled, + } + f0f0.Iam = f0f0f0 + } + if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil && + desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil { + f0f0f1 := &svcsdktypes.Scram{ + Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled, + } + f0f0.Scram = f0f0f1 + } + f0.Sasl = f0f0 + } + if desired.ko.Spec.ClientAuthentication.TLS != nil { + f0f1 := &svcsdktypes.Tls{} + if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil { + f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList) + } + if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil { + f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled + } + f0.Tls = f0f1 + } + if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil && + desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil { + f0.Unauthenticated = &svcsdktypes.Unauthenticated{ + Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled, + } + } + input.ClientAuthentication = f0 + } + + _, err = rm.sdkapi.UpdateSecurity(ctx, input) + rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err) + if err != nil { + return nil, err + } + message := "kafka is updating the client authentication" + ackcondition.SetSynced(desired, corev1.ConditionFalse, &message, nil) + + return desired, err } // syncAssociatedScramSecrets examines the Secret ARNs in the supplied Cluster @@ -318,6 +375,9 @@ func (rm *resourceManager) syncAssociatedScramSecrets( } } + // Set synced condition to True after successful update + ackcondition.SetSynced(desired, corev1.ConditionFalse, nil, nil) + return nil } @@ -355,14 +415,28 @@ func (rm *resourceManager) getAssociatedScramSecrets( return res, err } -type unprocessedSecret struct { - errorCode string - errorMessage string - secretArn string +// unprocessedSecrets is an error returned by the +// BatchAssociateScramSecret or Disassociate. It represents the +// secretArns that could not be associated and the reason +type unprocessedSecrets struct { + error + errorCodes []string + errorMessages []string + secretArns []string } -func (us unprocessedSecret) String() string { - return fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s", us.errorCode, us.errorMessage, us.secretArn) +// Error implementation of unprocessedSecrets loops over the errorCodes +// errorMessages, and failedSecretArns +func (us unprocessedSecrets) Error() string { + // I don't see a case where the lengths will differ + // getting the minimum just in case, so we can avoid + // an index out of bounds + lenErrs := min(len(us.errorCodes), len(us.errorMessages), len(us.secretArns)) + errorMessage := "" + for i := range lenErrs { + errorMessage += fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s\n", us.errorCodes[i], us.errorMessages[i], us.secretArns[i]) + } + return errorMessage } // batchAssociateScramSecret associates the supplied scram secrets to the supplied Cluster @@ -386,17 +460,14 @@ func (rm *resourceManager) batchAssociateScramSecret( } if len(resp.UnprocessedScramSecrets) > 0 { - unprocessedSecrets := []unprocessedSecret{} + unprocessedSecrets := unprocessedSecrets{} for _, uss := range resp.UnprocessedScramSecrets { - us := unprocessedSecret{ - errorCode: aws.ToString(uss.ErrorCode), - errorMessage: aws.ToString(uss.ErrorMessage), - secretArn: aws.ToString(uss.SecretArn), - } - unprocessedSecrets = append(unprocessedSecrets, us) + unprocessedSecrets.errorCodes = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorCode)) + unprocessedSecrets.errorMessages = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorMessage)) + unprocessedSecrets.secretArns = append(unprocessedSecrets.errorCodes, aws.ToString(uss.SecretArn)) } - return ackerr.NewTerminalError(fmt.Errorf("Cant attach secret arns: %v", unprocessedSecrets)) + return ackerr.NewTerminalError(unprocessedSecrets) } return err @@ -421,8 +492,20 @@ func (rm *resourceManager) batchDisassociateScramSecret( unrefSecrets[i] = *s } input.SecretArnList = unrefSecrets - _, err = rm.sdkapi.BatchDisassociateScramSecret(ctx, input) + resp, err := rm.sdkapi.BatchDisassociateScramSecret(ctx, input) rm.metrics.RecordAPICall("UPDATE", "BatchDisassociateScramSecret", err) + + if len(resp.UnprocessedScramSecrets) > 0 { + unprocessedSecrets := unprocessedSecrets{} + for _, uss := range resp.UnprocessedScramSecrets { + unprocessedSecrets.errorCodes = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorCode)) + unprocessedSecrets.errorMessages = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorMessage)) + unprocessedSecrets.secretArns = append(unprocessedSecrets.errorCodes, aws.ToString(uss.SecretArn)) + } + + return ackerr.NewTerminalError(unprocessedSecrets) + } + return err }