Skip to content

Commit f08fa1a

Browse files
authored
Address PR comments for Support updates (#56)
Issue #55 Description of changes: * wrapped update operations in functions * made `unprocessedSecrets` implement error By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
1 parent 1721e84 commit f08fa1a

File tree

1 file changed

+197
-114
lines changed

1 file changed

+197
-114
lines changed

pkg/resource/cluster/hooks.go

+197-114
Original file line numberDiff line numberDiff line change
@@ -160,118 +160,175 @@ func (rm *resourceManager) customUpdate(
160160

161161
switch {
162162
case delta.DifferentAt("Spec.ClientAuthentication"):
163-
input := &svcsdk.UpdateSecurityInput{}
164-
if desired.ko.Status.CurrentVersion != nil {
165-
input.CurrentVersion = desired.ko.Status.CurrentVersion
166-
}
167-
if desired.ko.Status.ACKResourceMetadata.ARN != nil {
168-
input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN)
169-
}
170-
if desired.ko.Spec.ClientAuthentication != nil {
171-
f0 := &svcsdktypes.ClientAuthentication{}
172-
if desired.ko.Spec.ClientAuthentication.SASL != nil {
173-
f0f0 := &svcsdktypes.Sasl{}
174-
if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil &&
175-
desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil {
176-
f0f0f0 := &svcsdktypes.Iam{
177-
Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled,
178-
}
179-
f0f0.Iam = f0f0f0
180-
}
181-
if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil &&
182-
desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil {
183-
f0f0f1 := &svcsdktypes.Scram{
184-
Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled,
185-
}
186-
f0f0.Scram = f0f0f1
187-
}
188-
f0.Sasl = f0f0
189-
}
190-
if desired.ko.Spec.ClientAuthentication.TLS != nil {
191-
f0f1 := &svcsdktypes.Tls{}
192-
if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil {
193-
f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList)
194-
}
195-
if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil {
196-
f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled
197-
}
198-
f0.Tls = f0f1
199-
}
200-
if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil &&
201-
desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil {
202-
f0.Unauthenticated = &svcsdktypes.Unauthenticated{
203-
Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled,
204-
}
205-
}
206-
input.ClientAuthentication = f0
207-
}
208-
209-
_, err = rm.sdkapi.UpdateSecurity(ctx, input)
210-
rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err)
211-
if err != nil {
212-
return nil, err
213-
}
214-
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)
215-
err = requeueAfterAsyncUpdate()
216-
163+
return rm.updateClientAuthentication(ctx, updatedRes, latest)
217164
case delta.DifferentAt("Spec.AssociatedSCRAMSecrets"):
218165
err = rm.syncAssociatedScramSecrets(ctx, updatedRes, latest)
219166
if err != nil {
220-
return nil, err
167+
return latest, err
221168
}
222-
// Set synced condition to True after successful update
223-
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)
169+
return updatedRes, requeueAfterAsyncUpdate()
224170

225171
case delta.DifferentAt("Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize"):
226-
_, err := rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{
227-
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
228-
CurrentVersion: latest.ko.Status.CurrentVersion,
229-
TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{
230-
{
231-
KafkaBrokerNodeId: aws.String("ALL"),
232-
VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)),
233-
},
234-
},
235-
})
236-
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err)
237-
if err != nil {
238-
return nil, err
239-
}
240-
message := fmt.Sprintf("kafka is updating broker storage")
241-
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
242-
err = requeueAfterAsyncUpdate()
172+
return rm.updateBrokerStorage(ctx, updatedRes, latest)
243173

244174
case delta.DifferentAt("Spec.BrokerNodeGroupInfo.InstanceType"):
245-
_, err := rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{
246-
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
247-
CurrentVersion: latest.ko.Status.CurrentVersion,
248-
TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType,
249-
})
250-
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err)
251-
if err != nil {
252-
return nil, err
253-
}
254-
message := fmt.Sprintf("kafka is updating broker instanceType")
255-
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
256-
err = requeueAfterAsyncUpdate()
175+
return rm.updateBrokerType(ctx, desired, latest)
257176

258177
case delta.DifferentAt("Spec.NumberOfBrokerNodes"):
259-
_, err := rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{
260-
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
261-
CurrentVersion: latest.ko.Status.CurrentVersion,
262-
TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)),
263-
})
264-
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err)
265-
if err != nil {
266-
return nil, err
267-
}
268-
message := fmt.Sprintf("kafka is updating broker instanceType")
269-
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
270-
err = requeueAfterAsyncUpdate()
178+
return rm.updateNumberOfBrokerNodes(ctx, desired, latest)
179+
}
180+
181+
return updatedRes, nil
182+
}
183+
184+
// updateNumberOfBrokerNodes updates the number of broker
185+
// nodes for the kafka cluster
186+
func (rm *resourceManager) updateNumberOfBrokerNodes(
187+
ctx context.Context,
188+
desired *resource,
189+
latest *resource,
190+
) (updatedRes *resource, err error) {
191+
rlog := ackrtlog.FromContext(ctx)
192+
exit := rlog.Trace("rm.updateNumberOfBrokerNodes")
193+
defer func() { exit(err) }()
194+
195+
_, err = rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{
196+
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
197+
CurrentVersion: latest.ko.Status.CurrentVersion,
198+
TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)),
199+
})
200+
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err)
201+
if err != nil {
202+
return latest, err
203+
}
204+
message := fmt.Sprintf("kafka is updating broker number of broker nodes")
205+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
206+
207+
return desired, requeueAfterAsyncUpdate()
208+
}
209+
210+
211+
// updateBrokerType updates the broker type of the
212+
// kafka cluster
213+
func (rm *resourceManager) updateBrokerType(
214+
ctx context.Context,
215+
desired *resource,
216+
latest *resource,
217+
) (updatedRes *resource, err error) {
218+
rlog := ackrtlog.FromContext(ctx)
219+
exit := rlog.Trace("rm.updateBrokerType")
220+
defer func() { exit(err) }()
221+
_, err = rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{
222+
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
223+
CurrentVersion: latest.ko.Status.CurrentVersion,
224+
TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType,
225+
})
226+
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err)
227+
if err != nil {
228+
return nil, err
229+
}
230+
message := fmt.Sprintf("kafka is updating broker instanceType")
231+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
232+
233+
return desired, requeueAfterAsyncUpdate()
234+
}
235+
236+
// updateBrokerStorate updates the volumeSize of the
237+
// kafka cluster broker storage
238+
func (rm *resourceManager) updateBrokerStorage(
239+
ctx context.Context,
240+
desired *resource,
241+
latest *resource,
242+
) (updatedRes *resource, err error) {
243+
rlog := ackrtlog.FromContext(ctx)
244+
exit := rlog.Trace("rm.updateBrokerStorage")
245+
defer func() { exit(err) }()
271246

247+
_, err = rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{
248+
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
249+
CurrentVersion: latest.ko.Status.CurrentVersion,
250+
TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{
251+
{
252+
KafkaBrokerNodeId: aws.String("ALL"),
253+
VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)),
254+
},
255+
},
256+
})
257+
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err)
258+
if err != nil {
259+
return nil, err
272260
}
261+
message := fmt.Sprintf("kafka is updating broker storage")
262+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
263+
return desired, requeueAfterAsyncUpdate()
264+
}
265+
266+
// updateClientAuthentication updates the kafka cluster
267+
// authentication settings
268+
func (rm *resourceManager) updateClientAuthentication(
269+
ctx context.Context,
270+
desired *resource,
271+
latest *resource,
272+
) (updatedRes *resource, err error) {
273+
rlog := ackrtlog.FromContext(ctx)
274+
exit := rlog.Trace("rm.updateClientAuthentication")
275+
defer func() { exit(err) }()
273276

274-
return updatedRes, err
277+
input := &svcsdk.UpdateSecurityInput{}
278+
if latest.ko.Status.CurrentVersion != nil {
279+
input.CurrentVersion = desired.ko.Status.CurrentVersion
280+
}
281+
if latest.ko.Status.ACKResourceMetadata.ARN != nil {
282+
input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN)
283+
}
284+
if desired.ko.Spec.ClientAuthentication != nil {
285+
f0 := &svcsdktypes.ClientAuthentication{}
286+
if desired.ko.Spec.ClientAuthentication.SASL != nil {
287+
f0f0 := &svcsdktypes.Sasl{}
288+
if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil &&
289+
desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil {
290+
f0f0f0 := &svcsdktypes.Iam{
291+
Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled,
292+
}
293+
f0f0.Iam = f0f0f0
294+
}
295+
if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil &&
296+
desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil {
297+
f0f0f1 := &svcsdktypes.Scram{
298+
Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled,
299+
}
300+
f0f0.Scram = f0f0f1
301+
}
302+
f0.Sasl = f0f0
303+
}
304+
if desired.ko.Spec.ClientAuthentication.TLS != nil {
305+
f0f1 := &svcsdktypes.Tls{}
306+
if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil {
307+
f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList)
308+
}
309+
if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil {
310+
f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled
311+
}
312+
f0.Tls = f0f1
313+
}
314+
if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil &&
315+
desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil {
316+
f0.Unauthenticated = &svcsdktypes.Unauthenticated{
317+
Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled,
318+
}
319+
}
320+
input.ClientAuthentication = f0
321+
}
322+
323+
_, err = rm.sdkapi.UpdateSecurity(ctx, input)
324+
rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err)
325+
if err != nil {
326+
return nil, err
327+
}
328+
message := "kafka is updating the client authentication"
329+
ackcondition.SetSynced(desired, corev1.ConditionFalse, &message, nil)
330+
331+
return desired, err
275332
}
276333

277334
// syncAssociatedScramSecrets examines the Secret ARNs in the supplied Cluster
@@ -318,6 +375,9 @@ func (rm *resourceManager) syncAssociatedScramSecrets(
318375
}
319376
}
320377

378+
// Set synced condition to True after successful update
379+
ackcondition.SetSynced(desired, corev1.ConditionFalse, nil, nil)
380+
321381
return nil
322382
}
323383

@@ -355,14 +415,28 @@ func (rm *resourceManager) getAssociatedScramSecrets(
355415
return res, err
356416
}
357417

358-
type unprocessedSecret struct {
359-
errorCode string
360-
errorMessage string
361-
secretArn string
418+
// unprocessedSecrets is an error returned by the
419+
// BatchAssociateScramSecret or Disassociate. It represents the
420+
// secretArns that could not be associated and the reason
421+
type unprocessedSecrets struct {
422+
error
423+
errorCodes []string
424+
errorMessages []string
425+
secretArns []string
362426
}
363427

364-
func (us unprocessedSecret) String() string {
365-
return fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s", us.errorCode, us.errorMessage, us.secretArn)
428+
// Error implementation of unprocessedSecrets loops over the errorCodes
429+
// errorMessages, and failedSecretArns
430+
func (us unprocessedSecrets) Error() string {
431+
// I don't see a case where the lengths will differ
432+
// getting the minimum just in case, so we can avoid
433+
// an index out of bounds
434+
lenErrs := min(len(us.errorCodes), len(us.errorMessages), len(us.secretArns))
435+
errorMessage := ""
436+
for i := range lenErrs {
437+
errorMessage += fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s\n", us.errorCodes[i], us.errorMessages[i], us.secretArns[i])
438+
}
439+
return errorMessage
366440
}
367441

368442
// batchAssociateScramSecret associates the supplied scram secrets to the supplied Cluster
@@ -386,17 +460,14 @@ func (rm *resourceManager) batchAssociateScramSecret(
386460
}
387461

388462
if len(resp.UnprocessedScramSecrets) > 0 {
389-
unprocessedSecrets := []unprocessedSecret{}
463+
unprocessedSecrets := unprocessedSecrets{}
390464
for _, uss := range resp.UnprocessedScramSecrets {
391-
us := unprocessedSecret{
392-
errorCode: aws.ToString(uss.ErrorCode),
393-
errorMessage: aws.ToString(uss.ErrorMessage),
394-
secretArn: aws.ToString(uss.SecretArn),
395-
}
396-
unprocessedSecrets = append(unprocessedSecrets, us)
465+
unprocessedSecrets.errorCodes = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorCode))
466+
unprocessedSecrets.errorMessages = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorMessage))
467+
unprocessedSecrets.secretArns = append(unprocessedSecrets.errorCodes, aws.ToString(uss.SecretArn))
397468
}
398469

399-
return ackerr.NewTerminalError(fmt.Errorf("Cant attach secret arns: %v", unprocessedSecrets))
470+
return ackerr.NewTerminalError(unprocessedSecrets)
400471
}
401472

402473
return err
@@ -421,8 +492,20 @@ func (rm *resourceManager) batchDisassociateScramSecret(
421492
unrefSecrets[i] = *s
422493
}
423494
input.SecretArnList = unrefSecrets
424-
_, err = rm.sdkapi.BatchDisassociateScramSecret(ctx, input)
495+
resp, err := rm.sdkapi.BatchDisassociateScramSecret(ctx, input)
425496
rm.metrics.RecordAPICall("UPDATE", "BatchDisassociateScramSecret", err)
497+
498+
if len(resp.UnprocessedScramSecrets) > 0 {
499+
unprocessedSecrets := unprocessedSecrets{}
500+
for _, uss := range resp.UnprocessedScramSecrets {
501+
unprocessedSecrets.errorCodes = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorCode))
502+
unprocessedSecrets.errorMessages = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorMessage))
503+
unprocessedSecrets.secretArns = append(unprocessedSecrets.errorCodes, aws.ToString(uss.SecretArn))
504+
}
505+
506+
return ackerr.NewTerminalError(unprocessedSecrets)
507+
}
508+
426509
return err
427510
}
428511

0 commit comments

Comments
 (0)