-
Notifications
You must be signed in to change notification settings - Fork 542
Feat/kinesis binding vmware go kcl v2 latest #4082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 12 commits
2eddb15
cabcec5
112fd8d
de8646f
90e071c
648a318
b686d94
9a70082
9dafc6d
8b93022
2ee207c
9b003aa
6ebe1a4
c064187
716b8a1
bf33f52
d8f5103
bd2639a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,13 +22,16 @@ import ( | |
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/aws/aws-sdk-go/aws" | ||
| "github.com/aws/aws-sdk-go/aws/request" | ||
| "github.com/aws/aws-sdk-go/service/kinesis" | ||
| "github.com/aws/aws-sdk-go-v2/aws" | ||
| awsv2config "github.com/aws/aws-sdk-go-v2/config" | ||
| v2creds "github.com/aws/aws-sdk-go-v2/credentials" | ||
| "github.com/aws/aws-sdk-go-v2/service/kinesis" | ||
| "github.com/aws/aws-sdk-go-v2/service/kinesis/types" | ||
| "github.com/cenkalti/backoff/v4" | ||
| "github.com/google/uuid" | ||
| "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" | ||
| "github.com/vmware/vmware-go-kcl/clientlibrary/worker" | ||
| "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" | ||
| "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" | ||
| "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker" | ||
|
|
||
| "github.com/dapr/components-contrib/bindings" | ||
| awsAuth "github.com/dapr/components-contrib/common/authentication/aws" | ||
|
|
@@ -42,17 +45,19 @@ type AWSKinesis struct { | |
| authProvider awsAuth.Provider | ||
| metadata *kinesisMetadata | ||
|
|
||
| worker *worker.Worker | ||
|
|
||
| streamName string | ||
| consumerName string | ||
| consumerARN *string | ||
| logger logger.Logger | ||
| consumerMode string | ||
|
|
||
| closed atomic.Bool | ||
| closeCh chan struct{} | ||
| wg sync.WaitGroup | ||
| worker *worker.Worker | ||
| kinesisClient *kinesis.Client | ||
| v2Credentials aws.CredentialsProvider | ||
|
|
||
| streamName string | ||
| consumerName string | ||
| consumerARN *string | ||
| logger logger.Logger | ||
| consumerMode string | ||
| applicationName string | ||
| closed atomic.Bool | ||
| closeCh chan struct{} | ||
| wg sync.WaitGroup | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add kinesis client here and init with sdk v2
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Done |
||
|
|
||
| // TODO: we need to clean up the metadata fields here and update this binding to use the builtin aws auth provider and reflect in metadata.yaml | ||
|
|
@@ -65,6 +70,7 @@ type kinesisMetadata struct { | |
| SecretKey string `json:"secretKey" mapstructure:"secretKey"` | ||
| SessionToken string `json:"sessionToken" mapstructure:"sessionToken"` | ||
| KinesisConsumerMode string `json:"mode" mapstructure:"mode"` | ||
| ApplicationName string `json:"applicationName" mapstructure:"applicationName"` | ||
| } | ||
|
|
||
| const ( | ||
|
|
@@ -116,6 +122,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error | |
| a.consumerMode = m.KinesisConsumerMode | ||
| a.streamName = m.StreamName | ||
| a.consumerName = m.ConsumerName | ||
| a.applicationName = m.ApplicationName | ||
| a.metadata = m | ||
|
|
||
| opts := awsAuth.Options{ | ||
|
|
@@ -132,6 +139,12 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error | |
| return err | ||
| } | ||
| a.authProvider = provider | ||
|
||
|
|
||
| // Create AWS SDK v2 client | ||
| if err := a.createKinesisClient(ctx); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -144,7 +157,7 @@ func (a *AWSKinesis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (* | |
| if partitionKey == "" { | ||
| partitionKey = uuid.New().String() | ||
| } | ||
| _, err := a.authProvider.Kinesis().Kinesis.PutRecordWithContext(ctx, &kinesis.PutRecordInput{ | ||
| _, err := a.kinesisClient.PutRecord(ctx, &kinesis.PutRecordInput{ | ||
| StreamName: &a.metadata.StreamName, | ||
| Data: req.Data, | ||
| PartitionKey: &partitionKey, | ||
|
|
@@ -158,21 +171,23 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| return errors.New("binding is closed") | ||
| } | ||
|
|
||
| if a.metadata.KinesisConsumerMode == SharedThroughput { | ||
| switch a.metadata.KinesisConsumerMode { | ||
| case SharedThroughput: | ||
| // initalize worker configuration | ||
| config := a.workerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName) | ||
| // Configure the KCL worker with custom endpoints for LocalStack | ||
| config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode) | ||
| if a.metadata.Endpoint != "" { | ||
| config.KinesisEndpoint = a.metadata.Endpoint | ||
| config.DynamoDBEndpoint = a.metadata.Endpoint | ||
| config = config.WithKinesisEndpoint(a.metadata.Endpoint) | ||
| config = config.WithDynamoDBEndpoint(a.metadata.Endpoint) | ||
| } | ||
| a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config) | ||
| err = a.worker.Start() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } else if a.metadata.KinesisConsumerMode == ExtendedFanout { | ||
| case ExtendedFanout: | ||
| var stream *kinesis.DescribeStreamOutput | ||
| stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) | ||
| stream, err = a.kinesisClient.DescribeStream(ctx, &kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -182,7 +197,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| } | ||
| } | ||
|
|
||
| stream, err := a.authProvider.Kinesis().Stream(ctx, a.streamName) | ||
| stream, err := a.getStreamARN(ctx, a.streamName) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get kinesis stream arn: %v", err) | ||
| } | ||
|
|
@@ -194,9 +209,10 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| case <-ctx.Done(): | ||
| case <-a.closeCh: | ||
| } | ||
| if a.metadata.KinesisConsumerMode == SharedThroughput { | ||
| switch a.metadata.KinesisConsumerMode { | ||
| case SharedThroughput: | ||
| a.worker.Shutdown() | ||
| } else if a.metadata.KinesisConsumerMode == ExtendedFanout { | ||
| case ExtendedFanout: | ||
| a.deregisterConsumer(ctx, stream, a.consumerARN) | ||
| } | ||
| }() | ||
|
|
@@ -205,7 +221,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| } | ||
|
|
||
| // Subscribe to all shards. | ||
| func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDescription, handler bindings.Handler) error { | ||
| func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc types.StreamDescription, handler bindings.Handler) error { | ||
| consumerARN, err := a.ensureConsumer(ctx, streamDesc.StreamARN) | ||
| if err != nil { | ||
| a.logger.Error(err) | ||
|
|
@@ -216,7 +232,7 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes | |
|
|
||
| a.wg.Add(len(streamDesc.Shards)) | ||
| for i, shard := range streamDesc.Shards { | ||
| go func(idx int, s *kinesis.Shard) { | ||
| go func(idx int, s types.Shard) { | ||
| defer a.wg.Done() | ||
|
|
||
| // Reconnection backoff | ||
|
|
@@ -232,14 +248,14 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes | |
| return | ||
| default: | ||
| } | ||
| sub, err := a.authProvider.Kinesis().Kinesis.SubscribeToShardWithContext(ctx, &kinesis.SubscribeToShardInput{ | ||
| sub, err := a.kinesisClient.SubscribeToShard(ctx, &kinesis.SubscribeToShardInput{ | ||
| ConsumerARN: consumerARN, | ||
| ShardId: s.ShardId, | ||
| StartingPosition: &kinesis.StartingPosition{Type: aws.String(kinesis.ShardIteratorTypeLatest)}, | ||
| StartingPosition: &types.StartingPosition{Type: types.ShardIteratorTypeLatest}, | ||
| }) | ||
| if err != nil { | ||
| wait := bo.NextBackOff() | ||
| a.logger.Errorf("Error while reading from shard %v: %v. Attempting to reconnect in %s...", s.ShardId, err, wait) | ||
| a.logger.Errorf("Error while reading from shard %v: %v. Attempting to reconnect in %s...", *s.ShardId, err, wait) | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
|
|
@@ -252,10 +268,10 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes | |
| bo.Reset() | ||
|
|
||
| // Process events | ||
| for event := range sub.EventStream.Events() { | ||
| for event := range sub.GetStream().Events() { | ||
| switch e := event.(type) { | ||
| case *kinesis.SubscribeToShardEvent: | ||
| for _, rec := range e.Records { | ||
| case *types.SubscribeToShardEventStreamMemberSubscribeToShardEvent: | ||
| for _, rec := range e.Value.Records { | ||
| handler(ctx, &bindings.ReadResponse{ | ||
| Data: rec.Data, | ||
| }) | ||
|
|
@@ -284,7 +300,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st | |
| // Only set timeout on consumer call. | ||
| conCtx, cancel := context.WithTimeout(ctx, 30*time.Second) | ||
| defer cancel() | ||
| consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerWithContext(conCtx, &kinesis.DescribeStreamConsumerInput{ | ||
| consumer, err := a.kinesisClient.DescribeStreamConsumer(conCtx, &kinesis.DescribeStreamConsumerInput{ | ||
| ConsumerName: &a.metadata.ConsumerName, | ||
| StreamARN: streamARN, | ||
| }) | ||
|
|
@@ -296,7 +312,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st | |
| } | ||
|
|
||
| func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (*string, error) { | ||
| consumer, err := a.authProvider.Kinesis().Kinesis.RegisterStreamConsumerWithContext(ctx, &kinesis.RegisterStreamConsumerInput{ | ||
| consumer, err := a.kinesisClient.RegisterStreamConsumer(ctx, &kinesis.RegisterStreamConsumerInput{ | ||
| ConsumerName: &a.metadata.ConsumerName, | ||
| StreamARN: streamARN, | ||
| }) | ||
|
|
@@ -319,7 +335,7 @@ func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, | |
| if a.consumerARN != nil { | ||
| // Use a background context because the running context may have been canceled already | ||
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
| _, err := a.authProvider.Kinesis().Kinesis.DeregisterStreamConsumerWithContext(ctx, &kinesis.DeregisterStreamConsumerInput{ | ||
| _, err := a.kinesisClient.DeregisterStreamConsumer(ctx, &kinesis.DeregisterStreamConsumerInput{ | ||
| ConsumerARN: consumerARN, | ||
| StreamARN: streamARN, | ||
| ConsumerName: &a.metadata.ConsumerName, | ||
|
|
@@ -332,34 +348,19 @@ func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, | |
| return nil | ||
| } | ||
|
|
||
| func (a *AWSKinesis) waitUntilConsumerExists(ctx aws.Context, input *kinesis.DescribeStreamConsumerInput, opts ...request.WaiterOption) error { | ||
| w := request.Waiter{ | ||
| Name: "WaitUntilConsumerExists", | ||
| MaxAttempts: 18, | ||
| Delay: request.ConstantWaiterDelay(10 * time.Second), | ||
| Acceptors: []request.WaiterAcceptor{ | ||
| { | ||
| State: request.SuccessWaiterState, | ||
| Matcher: request.PathWaiterMatch, Argument: "ConsumerDescription.ConsumerStatus", | ||
| Expected: "ACTIVE", | ||
| }, | ||
| }, | ||
| NewRequest: func(opts []request.Option) (*request.Request, error) { | ||
| var inCpy *kinesis.DescribeStreamConsumerInput | ||
| if input != nil { | ||
| tmp := *input | ||
| inCpy = &tmp | ||
| } | ||
| req, _ := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerRequest(inCpy) | ||
| req.SetContext(ctx) | ||
| req.ApplyOptions(opts...) | ||
|
|
||
| return req, nil | ||
| }, | ||
| func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis.DescribeStreamConsumerInput) error { | ||
| // Poll until consumer is active | ||
| for i := 0; i < 18; i++ { | ||
| consumer, err := a.kinesisClient.DescribeStreamConsumer(ctx, input) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if consumer.ConsumerDescription.ConsumerStatus == types.ConsumerStatusActive { | ||
| return nil | ||
| } | ||
| time.Sleep(10 * time.Second) | ||
| } | ||
| w.ApplyOptions(opts...) | ||
|
|
||
| return w.WaitWithContext(ctx) | ||
| return fmt.Errorf("consumer did not become active within timeout") | ||
| } | ||
|
|
||
| func (a *AWSKinesis) parseMetadata(meta bindings.Metadata) (*kinesisMetadata, error) { | ||
|
|
@@ -388,7 +389,7 @@ func (r *recordProcessorFactory) CreateProcessor() interfaces.IRecordProcessor { | |
| } | ||
|
|
||
| func (p *recordProcessor) Initialize(input *interfaces.InitializationInput) { | ||
| p.logger.Infof("Processing ShardId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) | ||
| p.logger.Infof("Processing ShardId: %v at checkpoint: %v", input.ShardId, *input.ExtendedSequenceNumber.SequenceNumber) | ||
| } | ||
|
|
||
| func (p *recordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) { | ||
|
|
@@ -414,6 +415,46 @@ func (p *recordProcessor) Shutdown(input *interfaces.ShutdownInput) { | |
| } | ||
| } | ||
|
|
||
| func (a *AWSKinesis) createKinesisClient(ctx context.Context) error { | ||
|
||
| // Convert v1 credentials to v2 | ||
| if v1Creds, err := a.authProvider.Kinesis().Credentials.Get(); err == nil { | ||
| a.v2Credentials = v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) | ||
| } else { | ||
| // Fallback to default v2 config if conversion failed | ||
| v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(a.authProvider.Kinesis().Region)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| a.v2Credentials = v2Config.Credentials | ||
| } | ||
|
|
||
| // Create v2 config and Kinesis client | ||
| v2Config := aws.Config{ | ||
| Region: a.authProvider.Kinesis().Region, | ||
| Credentials: a.v2Credentials, | ||
| } | ||
| a.kinesisClient = kinesis.NewFromConfig(v2Config) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *AWSKinesis) getStreamARN(ctx context.Context, streamName string) (*string, error) { | ||
| stream, err := a.kinesisClient.DescribeStream(ctx, &kinesis.DescribeStreamInput{ | ||
| StreamName: &streamName, | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return stream.StreamDescription.StreamARN, nil | ||
| } | ||
|
|
||
| func (a *AWSKinesis) workerCfg(_ context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { | ||
| const sharedMode = "shared" | ||
| if mode == sharedMode { | ||
| return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", a.v2Credentials) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // GetComponentMetadata returns the metadata of the component. | ||
| func (a *AWSKinesis) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { | ||
| metadataStruct := &kinesisMetadata{} | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need the authProvider?