Skip to content

Commit 648a318

Browse files
Resolved comments given by acroca
Signed-off-by: swatimodi-scout <[email protected]>
1 parent 90e071c commit 648a318

File tree

2 files changed

+7
-26
lines changed

2 files changed

+7
-26
lines changed

bindings/aws/kinesis/kinesis.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,10 @@ type AWSKinesis struct {
4949
consumerARN *string
5050
logger logger.Logger
5151
consumerMode string
52+
applicationName string
5253
closed atomic.Bool
5354
closeCh chan struct{}
5455
wg sync.WaitGroup
55-
// applicationName is required for KCL (Kinesis Client Library) worker configuration
56-
// in shared throughput mode. It identifies the consumer application and is used
57-
// for DynamoDB table naming and checkpointing.
58-
applicationName string
5956
}
6057

6158
// TODO: we need to clean up the metadata fields here and update this binding to use the builtin aws auth provider and reflect in metadata.yaml

common/authentication/aws/client.go

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,6 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string
183183
stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{
184184
StreamName: aws.String(streamName),
185185
})
186-
/**
187-
* If the error is not nil, do not proceed to the next step
188-
* as it may cause a nil pointer error on stream.StreamDescription.StreamARN.
189-
*/
190186
if err != nil {
191187
return nil, err
192188
}
@@ -198,25 +194,13 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string
198194

199195
func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration {
200196
const sharedMode = "shared"
201-
if c.Kinesis != nil {
202-
if mode == sharedMode {
203-
// Try v2 default config first (standard approach for v2 components)
204-
v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(region))
205-
if err == nil {
206-
kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Config.Credentials)
207-
return kclConfig
208-
}
209-
// Fallback to v1 credentials if v2 fails
210-
v1Creds, v1Err := c.Credentials.Get()
211-
if v1Err != nil {
212-
// Both v2 and v1 failed, return nil
213-
return nil
214-
}
215-
// Convert v1 credentials to v2 format
216-
v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken)
217-
kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds)
218-
return kclConfig
197+
if c.Kinesis != nil && mode == sharedMode {
198+
v1Creds, err := c.Credentials.Get()
199+
if err != nil {
200+
return nil
219201
}
202+
v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken)
203+
return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds)
220204
}
221205
return nil
222206
}

0 commit comments

Comments
 (0)