Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
2eddb15
make the v2 creds provider be the default
swatimodi-scout Nov 3, 2025
cabcec5
feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2.
devendrapohekar-scout Oct 31, 2025
112fd8d
Resolved conflicts
swatimodi-scout Nov 11, 2025
de8646f
Update common/authentication/aws/client.go
swatimodi-scout Nov 11, 2025
90e071c
Update bindings/aws/kinesis/kinesis.go
swatimodi-scout Nov 11, 2025
648a318
Resolved comments given by acroca
swatimodi-scout Nov 11, 2025
b686d94
Removed v1 credentials for kinesis
swatimodi-scout Nov 13, 2025
9a70082
updated client.go
swatimodi-scout Nov 13, 2025
9dafc6d
Update AWS SDK to v2 and refactor Kinesis integration
rideshnath-scout Nov 13, 2025
8b93022
Merge branch 'main' into feat/kinesis-binding-vmware-go-kcl-v2-latest
swatimodi-scout Nov 14, 2025
2ee207c
Merge branch 'main' into feat/kinesis-binding-vmware-go-kcl-v2-latest
swatimodi-scout Nov 14, 2025
9b003aa
refactor: migrate Kinesis integration to AWS SDK v2 and update relate…
rideshnath-scout Nov 18, 2025
6ebe1a4
refactor: streamline AWS Kinesis client creation and remove unused au…
rideshnath-scout Nov 19, 2025
c064187
feat: add applicationName metadata field to Kinesis binding
rideshnath-scout Nov 19, 2025
716b8a1
refactor: update AWS SDK v2 dependencies and improve import organization
rideshnath-scout Nov 20, 2025
bf33f52
refactor: correct variable naming and improve context handling in tes…
rideshnath-scout Nov 20, 2025
d8f5103
refactor: reorder AWS SDK imports (resolve lint issues)
rideshnath-scout Nov 20, 2025
bd2639a
Merge branch 'main' into feat/kinesis-binding-vmware-go-kcl-v2-latest
swatimodi-scout Nov 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions bindings/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
"github.com/vmware/vmware-go-kcl/clientlibrary/worker"
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"

"github.com/dapr/components-contrib/bindings"
awsAuth "github.com/dapr/components-contrib/common/authentication/aws"
Expand All @@ -49,10 +49,10 @@ type AWSKinesis struct {
consumerARN *string
logger logger.Logger
consumerMode string

closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
applicationName string
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add kinesis client here and init with sdk v2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add kinesis client here and init with sdk v2

Done


// TODO: we need to clean up the metadata fields here and update this binding to use the builtin aws auth provider and reflect in metadata.yaml
Expand All @@ -65,6 +65,7 @@ type kinesisMetadata struct {
SecretKey string `json:"secretKey" mapstructure:"secretKey"`
SessionToken string `json:"sessionToken" mapstructure:"sessionToken"`
KinesisConsumerMode string `json:"mode" mapstructure:"mode"`
ApplicationName string `json:"applicationName" mapstructure:"applicationName"`
}

const (
Expand Down Expand Up @@ -116,6 +117,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error
a.consumerMode = m.KinesisConsumerMode
a.streamName = m.StreamName
a.consumerName = m.ConsumerName
a.applicationName = m.ApplicationName
a.metadata = m

opts := awsAuth.Options{
Expand Down Expand Up @@ -158,19 +160,21 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
return errors.New("binding is closed")
}

if a.metadata.KinesisConsumerMode == SharedThroughput {
switch a.metadata.KinesisConsumerMode {
case SharedThroughput:
// initalize worker configuration
config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName)
// Configure the KCL worker with custom endpoints for LocalStack
config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode)
if a.metadata.Endpoint != "" {
config.KinesisEndpoint = a.metadata.Endpoint
config.DynamoDBEndpoint = a.metadata.Endpoint
config = config.WithKinesisEndpoint(a.metadata.Endpoint)
config = config.WithDynamoDBEndpoint(a.metadata.Endpoint)
}
a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config)
err = a.worker.Start()
if err != nil {
return err
}
} else if a.metadata.KinesisConsumerMode == ExtendedFanout {
case ExtendedFanout:
var stream *kinesis.DescribeStreamOutput
stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName})
if err != nil {
Expand All @@ -194,9 +198,10 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
case <-ctx.Done():
case <-a.closeCh:
}
if a.metadata.KinesisConsumerMode == SharedThroughput {
switch a.metadata.KinesisConsumerMode {
case SharedThroughput:
a.worker.Shutdown()
} else if a.metadata.KinesisConsumerMode == ExtendedFanout {
case ExtendedFanout:
a.deregisterConsumer(ctx, stream, a.consumerARN)
}
}()
Expand Down
18 changes: 10 additions & 8 deletions bindings/aws/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import (
func TestParseMetadata(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{
"accessKey": "key",
"region": "region",
"secretKey": "secret",
"consumerName": "test",
"streamName": "stream",
"mode": "extended",
"endpoint": "endpoint",
"sessionToken": "token",
"accessKey": "key",
"region": "region",
"secretKey": "secret",
"consumerName": "test",
"streamName": "stream",
"mode": "extended",
"endpoint": "endpoint",
"sessionToken": "token",
"applicationName": "applicationName",
}
kinesis := AWSKinesis{}
meta, err := kinesis.parseMetadata(m)
Expand All @@ -45,4 +46,5 @@ func TestParseMetadata(t *testing.T) {
assert.Equal(t, "endpoint", meta.Endpoint)
assert.Equal(t, "token", meta.SessionToken)
assert.Equal(t, "extended", meta.KinesisConsumerMode)
assert.Equal(t, "applicationName", meta.ApplicationName)
}
28 changes: 14 additions & 14 deletions common/authentication/aws/client.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these changes to the old auth package should be reverted, as they are not necessary as part of this PR unless a full cleanup takes place.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"errors"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go-v2/aws"
awsv2config "github.com/aws/aws-sdk-go-v2/config"
v2creds "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
Expand All @@ -36,7 +38,7 @@ import (
"github.com/aws/aws-sdk-go/service/ssm"
"github.com/aws/aws-sdk-go/service/ssm/ssmiface"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/vmware/vmware-go-kcl/clientlibrary/config"
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
)

type Clients struct {
Expand Down Expand Up @@ -181,27 +183,25 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string
stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
})
if stream != nil {
return stream.StreamDescription.StreamARN, err
if err != nil {
return nil, err
}
return stream.StreamDescription.StreamARN, nil
}

return nil, errors.New("unable to get stream arn due to empty client")
}

func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode string) *config.KinesisClientLibConfiguration {
func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration {
const sharedMode = "shared"
if c.Kinesis != nil {
if mode == sharedMode {
if c.Credentials != nil {
kclConfig := config.NewKinesisClientLibConfigWithCredential(consumer,
stream, c.Region, consumer,
c.Credentials)
return kclConfig
}
if c.Kinesis != nil && mode == sharedMode {
v1Creds, err := c.Credentials.Get()
if err != nil {
return nil
}
v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels wrong to have both v1 and v2 living together. Can't we migrate completely to v2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels wrong to have both v1 and v2 living together. Can't we migrate completely to v2?

Done please review it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works, it's not wired up to the refresh at all.
I think it was better before, but the conversion from v1 credentials to v2 should be done in the KinesisClients.New, which is called in the refresh operations.
This way, you can convert all kinesis to v2 and only do the v1->v2 in the New right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thansks @swatimodi-scout! What if you fully migrate the aws client to v2? you do not need to migrate all other components, it would be something similar to this PR that @mikeee is working on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thansks @swatimodi-scout! What if you fully migrate the aws client to v2? you do not need to migrate all other components, it would be something similar to this PR that @mikeee is working on.

@javier-aliaga Yes, that makes sense. However, we’re not migrating the entire AWS SDK version at this stage — my current change focuses only on updating vmware-go-kcl to vmware-go-kcl-v2.

Would you prefer that I continue implementing the AWS SDK v2 changes for Kinesis within this same PR, or should I move that work to a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works, it's not wired up to the refresh at all. I think it was better before, but the conversion from v1 credentials to v2 should be done in the KinesisClients.New, which is called in the refresh operations. This way, you can convert all kinesis to v2 and only do the v1->v2 in the New right?

@acroca I updated in KinesisClients.New. It is working i checked in local. Let me know if i need to revert back to previous version where we were supporting both v1 and v2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do it in the same PR as changing to aws-sdk-v2 is related to update the vmware-go-kcl-v2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikeee I updated the aws-sdk-v2 issue to track it
#3896

Copy link
Contributor Author

@swatimodi-scout swatimodi-scout Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do it in the same PR as changing to aws-sdk-v2 is related to update the vmware-go-kcl-v2

@javier-aliaga Please review the code.

return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion common/authentication/aws/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestKinesisClients_WorkerCfg(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.consumer, tt.mode)
cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.kinesisClient.Region, tt.mode, tt.consumer)
if tt.expectedConfig == nil {
assert.Equal(t, tt.expectedConfig, cfg)
return
Expand Down
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ require (
sigs.k8s.io/yaml v1.4.0
)

require (
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 // indirect
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 // indirect
)

require (
cel.dev/expr v0.23.0 // indirect
cloud.google.com/go v0.120.0 // indirect
Expand Down Expand Up @@ -203,7 +208,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect
github.com/aws/smithy-go v1.22.5 // indirect
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.4.0 // indirect
Expand Down Expand Up @@ -392,6 +396,7 @@ require (
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/vmware/vmware-go-kcl-v2 v1.0.0
github.com/x448/float16 v0.8.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
Expand Down
11 changes: 8 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,12 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a h1:QFemvMGPnajaeRBkFc1HoEA7qzVjUv+rkYb1/ps1/UE=
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI=
github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.55.6 h1:cSg4pvZ3m8dgYcgqB97MrcdjUmZ1BeMYKUxMMB89IPk=
github.com/aws/aws-sdk-go v1.55.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
github.com/aws/aws-sdk-go-v2 v1.36.5 h1:0OF9RiEMEdDdZEMqF9MRjevyxAQcf6gY+E7vwBILFj0=
github.com/aws/aws-sdk-go-v2 v1.36.5/go.mod h1:EYrzvCCN9CMUTa5+6lf6MM4tq3Zjp8UhSGR/cBsjai0=
Expand Down Expand Up @@ -313,6 +313,9 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.17/go.mod
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72HRZDLMtmVQiLG2tLfQcaWLCssELvGl+Zf2WVxMmR8=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 h1:t0E6FzREdtCsiLIoLCWsYliNsRBgyGD/MCK571qk4MI=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17/go.mod h1:ygpklyoaypuyDvOM5ujWGrYWpAK3h7ugnmKCU/76Ys4=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I=
github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY=
github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4 h1:BN6+zko+qO9Tl9S0ywUPNvY0gvlFK4Zmj2Y0a8paFkk=
github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4/go.mod h1:hbMVfSdZneCht4UmPOsejDt93QnetQPFuLOOqbuybqs=
github.com/aws/aws-sdk-go-v2/service/sns v1.34.7 h1:OBuZE9Wt8h2imuRktu+WfjiTGrnYdCIJg8IX92aalHE=
Expand All @@ -332,8 +335,8 @@ github.com/aws/rolesanywhere-credential-helper v1.0.4/go.mod h1:QVGNxlDlYhjR0/ZU
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw=
github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo=
github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8=
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -1714,6 +1717,8 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk=
github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo=
github.com/vmware/vmware-go-kcl-v2 v1.0.0 h1:HPT5vu+khRmGspBSc/+AilEWbRGoTZhjlYqdrBbRMZs=
github.com/vmware/vmware-go-kcl-v2 v1.0.0/go.mod h1:GBDu+P4Neo0vwZAk0ZUCEC8GYsUOWvi3XhFwAZR3SjA=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
Expand Down