-
Notifications
You must be signed in to change notification settings - Fork 542
Feat/kinesis binding vmware go kcl v2 latest #4082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
2eddb15
cabcec5
112fd8d
de8646f
90e071c
648a318
b686d94
9a70082
9dafc6d
8b93022
2ee207c
9b003aa
6ebe1a4
c064187
716b8a1
bf33f52
d8f5103
bd2639a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,8 +27,8 @@ import ( | |
| "github.com/aws/aws-sdk-go/service/kinesis" | ||
| "github.com/cenkalti/backoff/v4" | ||
| "github.com/google/uuid" | ||
| "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" | ||
| "github.com/vmware/vmware-go-kcl/clientlibrary/worker" | ||
| "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" | ||
| "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker" | ||
|
|
||
| "github.com/dapr/components-contrib/bindings" | ||
| awsAuth "github.com/dapr/components-contrib/common/authentication/aws" | ||
|
|
@@ -49,10 +49,13 @@ type AWSKinesis struct { | |
| consumerARN *string | ||
| logger logger.Logger | ||
| consumerMode string | ||
|
|
||
| closed atomic.Bool | ||
| closeCh chan struct{} | ||
| wg sync.WaitGroup | ||
| closed atomic.Bool | ||
| closeCh chan struct{} | ||
| wg sync.WaitGroup | ||
| // applicationName is required for KCL (Kinesis Client Library) worker configuration | ||
| // in shared throughput mode. It identifies the consumer application and is used | ||
| // for DynamoDB table naming and checkpointing. | ||
| applicationName string | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add kinesis client here and init with sdk v2
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Done |
||
|
|
||
| // TODO: we need to clean up the metadata fields here and update this binding to use the builtin aws auth provider and reflect in metadata.yaml | ||
|
|
@@ -65,6 +68,7 @@ type kinesisMetadata struct { | |
| SecretKey string `json:"secretKey" mapstructure:"secretKey"` | ||
| SessionToken string `json:"sessionToken" mapstructure:"sessionToken"` | ||
| KinesisConsumerMode string `json:"mode" mapstructure:"mode"` | ||
| ApplicationName string `json:"applicationName" mapstructure:"applicationName"` | ||
| } | ||
|
|
||
| const ( | ||
|
|
@@ -116,6 +120,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error | |
| a.consumerMode = m.KinesisConsumerMode | ||
| a.streamName = m.StreamName | ||
| a.consumerName = m.ConsumerName | ||
| a.applicationName = m.ApplicationName | ||
| a.metadata = m | ||
|
|
||
| opts := awsAuth.Options{ | ||
|
|
@@ -158,19 +163,21 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| return errors.New("binding is closed") | ||
| } | ||
|
|
||
| if a.metadata.KinesisConsumerMode == SharedThroughput { | ||
| switch a.metadata.KinesisConsumerMode { | ||
| case SharedThroughput: | ||
| // initalize worker configuration | ||
| config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName) | ||
| // Configure the KCL worker with custom endpoints for LocalStack | ||
| config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode) | ||
| if a.metadata.Endpoint != "" { | ||
| config.KinesisEndpoint = a.metadata.Endpoint | ||
| config.DynamoDBEndpoint = a.metadata.Endpoint | ||
| config.WithKinesisEndpoint(a.metadata.Endpoint) | ||
| config.WithDynamoDBEndpoint(a.metadata.Endpoint) | ||
swatimodi-scout marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config) | ||
| err = a.worker.Start() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } else if a.metadata.KinesisConsumerMode == ExtendedFanout { | ||
| case ExtendedFanout: | ||
| var stream *kinesis.DescribeStreamOutput | ||
| stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) | ||
| if err != nil { | ||
|
|
@@ -194,9 +201,10 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| case <-ctx.Done(): | ||
| case <-a.closeCh: | ||
| } | ||
| if a.metadata.KinesisConsumerMode == SharedThroughput { | ||
| switch a.metadata.KinesisConsumerMode { | ||
| case SharedThroughput: | ||
| a.worker.Shutdown() | ||
| } else if a.metadata.KinesisConsumerMode == ExtendedFanout { | ||
| case ExtendedFanout: | ||
| a.deregisterConsumer(ctx, stream, a.consumerARN) | ||
| } | ||
| }() | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these changes to the old auth package should be reverted, as they are not necessary as part of this PR unless a full cleanup takes place. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,9 @@ import ( | |
| "errors" | ||
| "sync" | ||
|
|
||
| "github.com/aws/aws-sdk-go/aws" | ||
| "github.com/aws/aws-sdk-go-v2/aws" | ||
| awsv2config "github.com/aws/aws-sdk-go-v2/config" | ||
| v2creds "github.com/aws/aws-sdk-go-v2/credentials" | ||
| "github.com/aws/aws-sdk-go/aws/credentials" | ||
| "github.com/aws/aws-sdk-go/aws/session" | ||
| "github.com/aws/aws-sdk-go/service/dynamodb" | ||
|
|
@@ -36,7 +38,7 @@ import ( | |
| "github.com/aws/aws-sdk-go/service/ssm" | ||
| "github.com/aws/aws-sdk-go/service/ssm/ssmiface" | ||
| "github.com/aws/aws-sdk-go/service/sts" | ||
| "github.com/vmware/vmware-go-kcl/clientlibrary/config" | ||
| "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" | ||
| ) | ||
|
|
||
| type Clients struct { | ||
|
|
@@ -181,27 +183,41 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string | |
| stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{ | ||
| StreamName: aws.String(streamName), | ||
| }) | ||
| if stream != nil { | ||
| return stream.StreamDescription.StreamARN, err | ||
| /** | ||
| * If the error is not nil, do not proceed to the next step | ||
| * as it may cause a nil pointer error on stream.StreamDescription.StreamARN. | ||
| */ | ||
|
||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return stream.StreamDescription.StreamARN, err | ||
swatimodi-scout marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| return nil, errors.New("unable to get stream arn due to empty client") | ||
| } | ||
|
|
||
| func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode string) *config.KinesisClientLibConfiguration { | ||
| func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { | ||
| const sharedMode = "shared" | ||
| if c.Kinesis != nil { | ||
| if mode == sharedMode { | ||
| if c.Credentials != nil { | ||
| kclConfig := config.NewKinesisClientLibConfigWithCredential(consumer, | ||
| stream, c.Region, consumer, | ||
| c.Credentials) | ||
| // Try v2 default config first (standard approach for v2 components) | ||
| v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(region)) | ||
| if err == nil { | ||
| kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Config.Credentials) | ||
| return kclConfig | ||
| } | ||
| // Fallback to v1 credentials if v2 fails | ||
|
||
| v1Creds, v1Err := c.Credentials.Get() | ||
| if v1Err != nil { | ||
| // Both v2 and v1 failed, return nil | ||
| return nil | ||
| } | ||
| // Convert v1 credentials to v2 format | ||
| v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) | ||
| kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds) | ||
| return kclConfig | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please share some insights on why the additional field here?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applicationName is required for KCL (Kinesis Client Library) worker configuration in shared throughput mode. It identifies the consumer application and is used for DynamoDB table naming and checkpointing.
Without applicationName we were facing an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this field next to
consumerMode. And the comment is not necessary if you ask me, I'd remove it for consistency, but feel free to keep if you think it's useful :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done