Skip to content

Commit 00062fa

Browse files
authored
Merge branch 'main' into test-fix-kafka
2 parents c09244d + 5d4b864 commit 00062fa

File tree

81 files changed

+2090
-511
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+2090
-511
lines changed

.github/pull_request_template.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,7 @@ Please make sure you've completed the relevant tasks for this PR, out of the fol
1414

1515
* [ ] Code compiles correctly
1616
* [ ] Created/updated tests
17-
* [ ] Extended the documentation / Created issue in the https://github.com/dapr/docs/ repo: dapr/docs#_[issue number]_
17+
* [ ] Extended the documentation
18+
* [ ] Created the dapr/docs PR: <insert PR link here>
19+
20+
**Note:** We expect contributors to open a corresponding documentation PR in the [dapr/docs](https://github.com/dapr/docs/) repository. As the implementer, you are the best person to document your work! Implementation PRs will not be merged until the documentation PR is opened and ready for review.

.github/workflows/components-contrib-all.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,20 @@ jobs:
6868
GOLANGCI_LINT_VER: "v1.64.6"
6969
strategy:
7070
matrix:
71-
os: [ubuntu-latest, windows-latest, macOS-latest]
71+
# TODO: @nelson-parente Upgrade macos latest once dns is fixed.
72+
os: [ubuntu-latest, windows-latest, macos-14]
7273
target_arch: [arm, amd64]
7374
include:
7475
- os: ubuntu-latest
7576
target_os: linux
7677
- os: windows-latest
7778
target_os: windows
78-
- os: macOS-latest
79+
- os: macos-14
7980
target_os: darwin
8081
exclude:
8182
- os: windows-latest
8283
target_arch: arm
83-
- os: macOS-latest
84+
- os: macos-14
8485
target_arch: arm
8586
steps:
8687
- name: Set default payload repo and ref

bindings/aws/kinesis/kinesis.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,15 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
156156
if a.closed.Load() {
157157
return errors.New("binding is closed")
158158
}
159+
159160
if a.metadata.KinesisConsumerMode == SharedThroughput {
160-
a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode))
161+
// Configure the KCL worker with custom endpoints for LocalStack
162+
config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode)
163+
if a.metadata.Endpoint != "" {
164+
config.KinesisEndpoint = a.metadata.Endpoint
165+
config.DynamoDBEndpoint = a.metadata.Endpoint
166+
}
167+
a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config)
161168
err = a.worker.Start()
162169
if err != nil {
163170
return err

bindings/aws/s3/s3.go

Lines changed: 52 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,17 @@ import (
2424
"net/http"
2525
"os"
2626
"reflect"
27-
"slices"
2827
"strings"
2928
"time"
3029

31-
"github.com/aws/aws-sdk-go-v2/service/s3/types"
3230
"github.com/aws/aws-sdk-go/aws"
33-
awsCommon "github.com/dapr/components-contrib/common/aws"
34-
awsCommonAuth "github.com/dapr/components-contrib/common/aws/auth"
35-
36-
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
37-
"github.com/aws/aws-sdk-go-v2/service/s3"
31+
"github.com/aws/aws-sdk-go/aws/awserr"
32+
"github.com/aws/aws-sdk-go/service/s3"
33+
"github.com/aws/aws-sdk-go/service/s3/s3manager"
3834
"github.com/google/uuid"
3935

4036
"github.com/dapr/components-contrib/bindings"
37+
awsAuth "github.com/dapr/components-contrib/common/authentication/aws"
4138
commonutils "github.com/dapr/components-contrib/common/utils"
4239
"github.com/dapr/components-contrib/metadata"
4340
"github.com/dapr/kit/logger"
@@ -63,12 +60,9 @@ const (
6360

6461
// AWSS3 is a binding for an AWS S3 storage bucket.
6562
type AWSS3 struct {
66-
metadata *s3Metadata
67-
logger logger.Logger
68-
s3Client *s3.Client
69-
s3Uploader *manager.Uploader
70-
s3Downloader *manager.Downloader
71-
s3PresignClient *s3.PresignClient
63+
metadata *s3Metadata
64+
authProvider awsAuth.Provider
65+
logger logger.Logger
7266
}
7367

7468
type s3Metadata struct {
@@ -112,42 +106,10 @@ func NewAWSS3(logger logger.Logger) bindings.OutputBinding {
112106
return &AWSS3{logger: logger}
113107
}
114108

115-
// Init does metadata parsing and connection creation.
116-
func (s *AWSS3) Init(ctx context.Context, metadata bindings.Metadata) error {
117-
m, err := s.parseMetadata(metadata)
118-
if err != nil {
119-
return err
120-
}
121-
s.metadata = m
122-
123-
authOpts := awsCommonAuth.Options{
124-
Logger: s.logger,
125-
126-
Properties: metadata.Properties,
127-
128-
Region: m.Region,
129-
Endpoint: m.Endpoint,
130-
AccessKey: m.AccessKey,
131-
SecretKey: m.SecretKey,
132-
SessionToken: m.SessionToken,
133-
}
134-
135-
var configOptions []awsCommon.ConfigOption
136-
137-
var s3Options []func(options *s3.Options)
138-
139-
if s.metadata.DisableSSL {
140-
s3Options = append(s3Options, func(options *s3.Options) {
141-
options.EndpointOptions.DisableHTTPS = true
142-
})
143-
}
144-
145-
if !s.metadata.ForcePathStyle {
146-
s3Options = append(s3Options, func(options *s3.Options) {
147-
options.UsePathStyle = true
148-
})
149-
}
109+
func (s *AWSS3) getAWSConfig(opts awsAuth.Options) *aws.Config {
110+
cfg := awsAuth.GetConfig(opts).WithS3ForcePathStyle(s.metadata.ForcePathStyle).WithDisableSSL(s.metadata.DisableSSL)
150111

112+
// Use a custom HTTP client to allow self-signed certs
151113
if s.metadata.InsecureSSL {
152114
customTransport := http.DefaultTransport.(*http.Transport).Clone()
153115
customTransport.TLSClientConfig = &tls.Config{
@@ -157,27 +119,44 @@ func (s *AWSS3) Init(ctx context.Context, metadata bindings.Metadata) error {
157119
client := &http.Client{
158120
Transport: customTransport,
159121
}
160-
configOptions = append(configOptions, awsCommon.WithHTTPClient(client))
122+
cfg = cfg.WithHTTPClient(client)
161123

162124
s.logger.Infof("aws s3: you are using 'insecureSSL' to skip server config verify which is unsafe!")
163125
}
126+
return cfg
127+
}
164128

165-
awsConfig, err := awsCommon.NewConfig(ctx, authOpts, configOptions...)
129+
// Init does metadata parsing and connection creation.
130+
func (s *AWSS3) Init(ctx context.Context, metadata bindings.Metadata) error {
131+
m, err := s.parseMetadata(metadata)
166132
if err != nil {
167-
return fmt.Errorf("s3 binding error: failed to create AWS config: %w", err)
133+
return err
168134
}
135+
s.metadata = m
169136

170-
s.s3Client = s3.NewFromConfig(awsConfig, s3Options...)
171-
172-
s.s3Uploader = manager.NewUploader(s.s3Client)
173-
s.s3Downloader = manager.NewDownloader(s.s3Client)
174-
175-
s.s3PresignClient = s3.NewPresignClient(s.s3Client)
137+
opts := awsAuth.Options{
138+
Logger: s.logger,
139+
Properties: metadata.Properties,
140+
Region: m.Region,
141+
Endpoint: m.Endpoint,
142+
AccessKey: m.AccessKey,
143+
SecretKey: m.SecretKey,
144+
SessionToken: m.SessionToken,
145+
}
146+
// extra configs needed per component type
147+
provider, err := awsAuth.NewProvider(ctx, opts, s.getAWSConfig(opts))
148+
if err != nil {
149+
return err
150+
}
151+
s.authProvider = provider
176152

177153
return nil
178154
}
179155

180156
func (s *AWSS3) Close() error {
157+
if s.authProvider != nil {
158+
return s.authProvider.Close()
159+
}
181160
return nil
182161
}
183162

@@ -236,25 +215,19 @@ func (s *AWSS3) create(ctx context.Context, req *bindings.InvokeRequest) (*bindi
236215
r = b64.NewDecoder(b64.StdEncoding, r)
237216
}
238217

239-
var storageClass types.StorageClass
218+
var storageClass *string
240219
if metadata.StorageClass != "" {
241-
// assert storageclass exists in the types.storageclass.values() slice
242-
storageClass = types.StorageClass(strings.ToUpper(metadata.StorageClass))
243-
if !slices.Contains(storageClass.Values(), storageClass) {
244-
return nil, fmt.Errorf("s3 binding error: invalid storage class '%s' provided", metadata.StorageClass)
245-
}
220+
storageClass = aws.String(metadata.StorageClass)
246221
}
247222

248-
s3UploaderPutObjectInput := &s3.PutObjectInput{
223+
resultUpload, err := s.authProvider.S3().Uploader.UploadWithContext(ctx, &s3manager.UploadInput{
249224
Bucket: ptr.Of(metadata.Bucket),
250225
Key: ptr.Of(key),
251226
Body: r,
252227
ContentType: contentType,
253228
StorageClass: storageClass,
254229
Tagging: tagging,
255-
}
256-
257-
resultUpload, err := s.s3Uploader.Upload(ctx, s3UploaderPutObjectInput)
230+
})
258231
if err != nil {
259232
return nil, fmt.Errorf("s3 binding error: uploading failed: %w", err)
260233
}
@@ -323,21 +296,16 @@ func (s *AWSS3) presignObject(ctx context.Context, bucket, key, ttl string) (str
323296
if err != nil {
324297
return "", fmt.Errorf("s3 binding error: cannot parse duration %s: %w", ttl, err)
325298
}
326-
s3GetObjectInput := &s3.GetObjectInput{
299+
objReq, _ := s.authProvider.S3().S3.GetObjectRequest(&s3.GetObjectInput{
327300
Bucket: ptr.Of(bucket),
328301
Key: ptr.Of(key),
329-
}
330-
331-
presignedObjectRequest, err := s.s3PresignClient.PresignGetObject(
332-
ctx,
333-
s3GetObjectInput,
334-
s3.WithPresignExpires(d),
335-
)
302+
})
303+
url, err := objReq.Presign(d)
336304
if err != nil {
337305
return "", fmt.Errorf("s3 binding error: failed to presign URL: %w", err)
338306
}
339307

340-
return presignedObjectRequest.URL, nil
308+
return url, nil
341309
}
342310

343311
func (s *AWSS3) get(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
@@ -352,16 +320,16 @@ func (s *AWSS3) get(ctx context.Context, req *bindings.InvokeRequest) (*bindings
352320
}
353321

354322
buff := &aws.WriteAtBuffer{}
355-
_, err = s.s3Downloader.Download(ctx,
323+
_, err = s.authProvider.S3().Downloader.DownloadWithContext(ctx,
356324
buff,
357325
&s3.GetObjectInput{
358326
Bucket: ptr.Of(s.metadata.Bucket),
359327
Key: ptr.Of(key),
360328
},
361329
)
362330
if err != nil {
363-
var awsErr *types.NoSuchKey
364-
if errors.As(err, &awsErr) {
331+
var awsErr awserr.Error
332+
if errors.As(err, &awsErr) && awsErr.Code() == s3.ErrCodeNoSuchKey {
365333
return nil, errors.New("object not found")
366334
}
367335
return nil, fmt.Errorf("s3 binding error: error downloading S3 object: %w", err)
@@ -386,16 +354,16 @@ func (s *AWSS3) delete(ctx context.Context, req *bindings.InvokeRequest) (*bindi
386354
if key == "" {
387355
return nil, fmt.Errorf("s3 binding error: required metadata '%s' missing", metadataKey)
388356
}
389-
_, err := s.s3Client.DeleteObject(
357+
_, err := s.authProvider.S3().S3.DeleteObjectWithContext(
390358
ctx,
391359
&s3.DeleteObjectInput{
392360
Bucket: ptr.Of(s.metadata.Bucket),
393361
Key: ptr.Of(key),
394362
},
395363
)
396364
if err != nil {
397-
var awsErr *types.NoSuchKey
398-
if errors.As(err, &awsErr) {
365+
var awsErr awserr.Error
366+
if errors.As(err, &awsErr) && awsErr.Code() == s3.ErrCodeNoSuchKey {
399367
return nil, errors.New("object not found")
400368
}
401369
return nil, fmt.Errorf("s3 binding error: delete operation failed: %w", err)
@@ -415,9 +383,9 @@ func (s *AWSS3) list(ctx context.Context, req *bindings.InvokeRequest) (*binding
415383
if payload.MaxResults < 1 {
416384
payload.MaxResults = defaultMaxResults
417385
}
418-
result, err := s.s3Client.ListObjects(ctx, &s3.ListObjectsInput{
386+
result, err := s.authProvider.S3().S3.ListObjectsWithContext(ctx, &s3.ListObjectsInput{
419387
Bucket: ptr.Of(s.metadata.Bucket),
420-
MaxKeys: ptr.Of(payload.MaxResults),
388+
MaxKeys: ptr.Of(int64(payload.MaxResults)),
421389
Marker: ptr.Of(payload.Marker),
422390
Prefix: ptr.Of(payload.Prefix),
423391
Delimiter: ptr.Of(payload.Delimiter),

bindings/azure/servicebusqueues/servicebusqueues.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (a *AzureServiceBusQueues) Init(ctx context.Context, metadata bindings.Meta
6262
return err
6363
}
6464

65-
a.client, err = impl.NewClient(a.metadata, metadata.Properties)
65+
a.client, err = impl.NewClient(a.metadata, metadata.Properties, a.logger)
6666
if err != nil {
6767
return err
6868
}

bindings/gcp/bucket/bucket.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,7 @@ func (g *GCPStorage) Init(ctx context.Context, metadata bindings.Metadata) error
110110
return err
111111
}
112112

113-
b, err := json.Marshal(m)
114-
if err != nil {
115-
return err
116-
}
117-
118-
clientOptions := option.WithCredentialsJSON(b)
119-
client, err := storage.NewClient(ctx, clientOptions)
113+
client, err := g.getClient(ctx, m)
120114
if err != nil {
121115
return err
122116
}
@@ -127,6 +121,41 @@ func (g *GCPStorage) Init(ctx context.Context, metadata bindings.Metadata) error
127121
return nil
128122
}
129123

124+
func (g *GCPStorage) getClient(ctx context.Context, m *gcpMetadata) (*storage.Client, error) {
125+
var client *storage.Client
126+
var err error
127+
128+
if m.Bucket == "" {
129+
return nil, errors.New("missing property `bucket` in metadata")
130+
}
131+
if m.ProjectID == "" {
132+
return nil, errors.New("missing property `project_id` in metadata")
133+
}
134+
135+
// Explicit authentication
136+
if m.PrivateKeyID != "" {
137+
var b []byte
138+
b, err = json.Marshal(m)
139+
if err != nil {
140+
return nil, err
141+
}
142+
143+
clientOptions := option.WithCredentialsJSON(b)
144+
client, err = storage.NewClient(ctx, clientOptions)
145+
if err != nil {
146+
return nil, err
147+
}
148+
} else {
149+
// Implicit authentication, using GCP Application Default Credentials (ADC)
150+
// Credentials search order: https://cloud.google.com/docs/authentication/application-default-credentials#order
151+
client, err = storage.NewClient(ctx)
152+
if err != nil {
153+
return nil, err
154+
}
155+
}
156+
return client, nil
157+
}
158+
130159
func (g *GCPStorage) parseMetadata(meta bindings.Metadata) (*gcpMetadata, error) {
131160
m := gcpMetadata{}
132161
err := kitmd.DecodeMetadata(meta.Properties, &m)

0 commit comments

Comments
 (0)