From 1e60f57828c6fdb078c530d914ef3c7005de1fa9 Mon Sep 17 00:00:00 2001 From: Quenby Mitchell Date: Tue, 16 Sep 2025 17:54:53 -0600 Subject: [PATCH 1/3] Supporting using pod default credentials for Integration Source and Sink --- config/core/resources/integrationsink.yaml | 3 + config/core/resources/integrationsource.yaml | 3 + pkg/apis/common/integration/v1alpha1/auth.go | 6 +- .../integration_sink_validation_test.go | 49 +++++ .../v1alpha1/integration_validation_test.go | 80 ++++++++ .../sink/resources/container_image.go | 23 +++ .../sink/resources/container_image_test.go | 174 ++++++++++++++++++ .../source/resources/containersource.go | 23 +++ .../source/resources/containersource_test.go | 108 +++++++++++ 9 files changed, 467 insertions(+), 2 deletions(-) create mode 100644 pkg/reconciler/integration/sink/resources/container_image_test.go diff --git a/config/core/resources/integrationsink.yaml b/config/core/resources/integrationsink.yaml index 0c41963a9d4..7f47b9f28f6 100644 --- a/config/core/resources/integrationsink.yaml +++ b/config/core/resources/integrationsink.yaml @@ -323,6 +323,9 @@ spec: name: description: 'Secret name' type: string + serviceAccountName: + description: 'Optional ServiceAccount to assign to pod. This enables the pod default credentials to be used instead of the auth secret.' + type: string status: description: Status represents the current state of the IntegrationSink. This data may be out of date. type: object diff --git a/config/core/resources/integrationsource.yaml b/config/core/resources/integrationsource.yaml index ebd976f5106..5cd90571ecc 100644 --- a/config/core/resources/integrationsource.yaml +++ b/config/core/resources/integrationsource.yaml @@ -322,6 +322,9 @@ spec: name: description: 'Secret name' type: string + serviceAccountName: + description: 'Optional ServiceAccount to assign to pod. This enables the pod default credentials to be used instead of the auth secret.' + type: string template: type: object x-kubernetes-preserve-unknown-fields: true diff --git a/pkg/apis/common/integration/v1alpha1/auth.go b/pkg/apis/common/integration/v1alpha1/auth.go index 8f815119bc9..e1afdd95084 100644 --- a/pkg/apis/common/integration/v1alpha1/auth.go +++ b/pkg/apis/common/integration/v1alpha1/auth.go @@ -25,11 +25,13 @@ type Auth struct { // SecretKey is the AWS secret access key. SecretKey string `json:"secretKey,omitempty"` + + ServiceAccountName string `json:"serviceAccountName,omitempty"` } func (a *Auth) HasAuth() bool { - return a != nil && a.Secret != nil && - a.Secret.Ref != nil && a.Secret.Ref.Name != "" + return a != nil && ((a.Secret != nil && + a.Secret.Ref != nil && a.Secret.Ref.Name != "") || a.ServiceAccountName != "") } type Secret struct { diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go index 704262505d7..aa126371fc3 100644 --- a/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go +++ b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go @@ -63,6 +63,23 @@ func TestIntegrationSinkSpecValidation(t *testing.T) { }, want: nil, }, + { + name: "valid AWS S3 sink with service account and region", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-bucket", + }, + Auth: &v1alpha1.Auth{ + ServiceAccountName: "aws-service-account", + }, + }, + }, + want: nil, + }, { name: "valid AWS SQS sink with auth and region", spec: IntegrationSinkSpec{ @@ -84,6 +101,23 @@ func TestIntegrationSinkSpecValidation(t *testing.T) { }, want: nil, }, + { + name: "valid AWS SQS sink with service account and region", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-queue", + }, + Auth: &v1alpha1.Auth{ + ServiceAccountName: "aws-service-account", + }, + }, + }, + want: nil, + }, { name: "multiple sinks set (invalid)", spec: IntegrationSinkSpec{ @@ -188,6 +222,21 @@ func TestIntegrationSinkSpecValidation(t *testing.T) { }, want: apis.ErrMissingField("aws.auth.secret.ref.name"), }, + { + name: "AWS sink without auth credentials (invalid)", + spec: IntegrationSinkSpec{ + Aws: &Aws{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-bucket", + }, + Auth: &v1alpha1.Auth{}, + }, + }, + want: apis.ErrMissingField("aws.auth.secret.ref.name"), + }, { name: "AWS S3 sink without region (invalid)", spec: IntegrationSinkSpec{ diff --git a/pkg/apis/sources/v1alpha1/integration_validation_test.go b/pkg/apis/sources/v1alpha1/integration_validation_test.go index 5894376feab..8cb38f9c2b0 100644 --- a/pkg/apis/sources/v1alpha1/integration_validation_test.go +++ b/pkg/apis/sources/v1alpha1/integration_validation_test.go @@ -64,6 +64,23 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { }, want: nil, }, + { + name: "valid AWS S3 source with service account and region", + spec: IntegrationSourceSpec{ + Aws: &Aws{ + S3: &v1alpha1.AWSS3{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-bucket", + }, + Auth: &v1alpha1.Auth{ + ServiceAccountName: "aws-service-account", + }, + }, + }, + want: nil, + }, { name: "valid AWS SQS source with auth and region", spec: IntegrationSourceSpec{ @@ -85,6 +102,23 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { }, want: nil, }, + { + name: "valid AWS SQS source with service account and region", + spec: IntegrationSourceSpec{ + Aws: &Aws{ + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-queue", + }, + Auth: &v1alpha1.Auth{ + ServiceAccountName: "aws-service-account", + }, + }, + }, + want: nil, + }, { name: "valid AWS DDBStreams source with auth and region", spec: IntegrationSourceSpec{ @@ -106,6 +140,23 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { }, want: nil, }, + { + name: "valid AWS DDBStreams source with service account and region", + spec: IntegrationSourceSpec{ + Aws: &Aws{ + DDBStreams: &v1alpha1.AWSDDBStreams{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Table: "example-table", + }, + Auth: &v1alpha1.Auth{ + ServiceAccountName: "aws-service-account", + }, + }, + }, + want: nil, + }, { name: "multiple sources set (invalid)", spec: IntegrationSourceSpec{ @@ -172,6 +223,35 @@ func TestIntegrationSourceSpecValidation(t *testing.T) { }, want: apis.ErrMissingField("aws.sqs.arn"), }, + { + name: "AWS SQS source without Auth (invalid)", + spec: IntegrationSourceSpec{ + Aws: &Aws{ + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-queue", + }, + }, + }, + want: apis.ErrMissingField("aws.auth.secret.ref.name"), + }, + { + name: "AWS SQS source without Auth credentials (invalid)", + spec: IntegrationSourceSpec{ + Aws: &Aws{ + SQS: &v1alpha1.AWSSQS{ + AWSCommon: v1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "example-queue", + }, + Auth: &v1alpha1.Auth{}, + }, + }, + want: apis.ErrMissingField("aws.auth.secret.ref.name"), + }, { name: "AWS DDBStreams source without Table (invalid)", spec: IntegrationSourceSpec{ diff --git a/pkg/reconciler/integration/sink/resources/container_image.go b/pkg/reconciler/integration/sink/resources/container_image.go index 52b7528acca..edb9d6824c3 100644 --- a/pkg/reconciler/integration/sink/resources/container_image.go +++ b/pkg/reconciler/integration/sink/resources/container_image.go @@ -107,6 +107,7 @@ func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink, authProxyImage string, f }, }, }, + ServiceAccountName: makeServiceAccountName(sink), }, }, }, @@ -367,6 +368,11 @@ func makeEnv(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags) []corev integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SINK_ACCESSKEY", commonv1a1.AwsAccessKey, secretName), integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SINK_SECRETKEY", commonv1a1.AwsSecretKey, secretName), }...) + } else { + envVars = append(envVars, corev1.EnvVar{ + Name: "CAMEL_KAMELET_AWS_S3_SINK_USEDEFAULTCREDENTIALSPROVIDER", + Value: "true", + }) } return envVars } @@ -379,6 +385,11 @@ func makeEnv(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags) []corev integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SINK_ACCESSKEY", commonv1a1.AwsAccessKey, secretName), integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SINK_SECRETKEY", commonv1a1.AwsSecretKey, secretName), }...) + } else { + envVars = append(envVars, corev1.EnvVar{ + Name: "CAMEL_KAMELET_AWS_SQS_SINK_USEDEFAULTCREDENTIALSPROVIDER", + Value: "true", + }) } return envVars } @@ -391,6 +402,11 @@ func makeEnv(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags) []corev integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SNS_SINK_ACCESSKEY", commonv1a1.AwsAccessKey, secretName), integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SNS_SINK_SECRETKEY", commonv1a1.AwsSecretKey, secretName), }...) + } else { + envVars = append(envVars, corev1.EnvVar{ + Name: "CAMEL_KAMELET_AWS_SNS_SINK_USEDEFAULTCREDENTIALSPROVIDER", + Value: "true", + }) } return envVars } @@ -399,6 +415,13 @@ func makeEnv(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags) []corev return envVars } +func makeServiceAccountName(sink *v1alpha1.IntegrationSink) string { + if sink.Spec.Aws != nil && sink.Spec.Aws.Auth != nil && sink.Spec.Aws.Auth.ServiceAccountName != "" { + return sink.Spec.Aws.Auth.ServiceAccountName + } + return "" +} + func selectImage(sink *v1alpha1.IntegrationSink) string { // Injected in ./config/core/deployments/controller.yaml switch { diff --git a/pkg/reconciler/integration/sink/resources/container_image_test.go b/pkg/reconciler/integration/sink/resources/container_image_test.go new file mode 100644 index 00000000000..873f762e919 --- /dev/null +++ b/pkg/reconciler/integration/sink/resources/container_image_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + "sort" + "testing" + + "knative.dev/eventing/pkg/reconciler/integration" + + "knative.dev/pkg/kmeta" + + "github.com/google/go-cmp/cmp" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + commonintegrationv1alpha1 "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/apis/sinks/v1alpha1" +) + +const ( + testName = "test-integrationsink" + testNamespace = "test-namespace" + testUID = "test-uid" +) + +func TestNewSQSContainerSink(t *testing.T) { + const timerImage = "quay.io/sqs-image" + t.Setenv("INTEGRATION_SINK_AWS_SQS_IMAGE", timerImage) + sink := &v1alpha1.IntegrationSink{ + ObjectMeta: metav1.ObjectMeta{ + Name: testName, + Namespace: testNamespace, + UID: testUID, + }, + Spec: v1alpha1.IntegrationSinkSpec{ + Aws: &v1alpha1.Aws{ + SQS: &commonintegrationv1alpha1.AWSSQS{ + AWSCommon: commonintegrationv1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "arn:aws:sqs:us-east-1:123456789012:knative-integration-source", + DeleteAfterRead: true, + AutoCreateQueue: false, + Host: "amazonaws.com", + Protocol: "https", + Greedy: false, + Delay: 500, + MaxMessagesPerPoll: 1, + WaitTimeSeconds: 0, + VisibilityTimeout: 0, + }, + Auth: &commonintegrationv1alpha1.Auth{ + ServiceAccountName: "aws-service-account", + }, + }, + }, + } + + want := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-deployment", testName), + Namespace: testNamespace, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(sink), + }, + Labels: integration.Labels(testName), + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: integration.Labels(testName), + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: integration.Labels(testName), + }, + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{ + { + Name: fmt.Sprintf("%s-server-tls", testName), + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: fmt.Sprintf("%s-server-tls", testName), + Optional: ptr.To(true), + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "sink", + Image: timerImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Ports: []corev1.ContainerPort{ + { + ContainerPort: 8080, + Protocol: corev1.ProtocolTCP, + Name: "http", + }, + { + ContainerPort: 8443, + Protocol: corev1.ProtocolTCP, + Name: "https", + }}, + Env: []corev1.EnvVar{ + {Name: "QUARKUS_HTTP_SSL_CERTIFICATE_KEY-FILES", Value: "/etc/test-integrationsink-server-tls/tls.key"}, + {Name: "QUARKUS_HTTP_SSL_CERTIFICATE_FILES", Value: "/etc/test-integrationsink-server-tls/tls.crt"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_REGION", Value: "us-east-1"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_OVERRIDEENDPOINT", Value: "false"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_QUEUE_NAME_OR_ARN", Value: "arn:aws:sqs:us-east-1:123456789012:knative-integration-source"}, + + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_DELETEAFTERREAD", Value: "true"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_AUTOCREATEQUEUE", Value: "false"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_GREEDY", Value: "false"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_DELAY", Value: "500"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_MAXMESSAGESPERPOLL", Value: "1"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_AMAZONAWSHOST", Value: "amazonaws.com"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_PROTOCOL", Value: "https"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_WAITTIMESECONDS", Value: "0"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_VISIBILITYTIMEOUT", Value: "0"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_USEDEFAULTCREDENTIALSPROVIDER", Value: "true"}, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: fmt.Sprintf("%s-server-tls", testName), + MountPath: "/etc/" + fmt.Sprintf("%s-server-tls", testName), + ReadOnly: true, + }, + }, + }, + }, + ServiceAccountName: "aws-service-account", + }, + }, + }, + } + + got, _ := MakeDeploymentSpec(sink, "unused", feature.Flags{}, nil, nil) + sortOpts := []cmp.Option{ + cmp.Transformer("SortEnvVars", func(in corev1.Container) corev1.Container { + out := in + if len(out.Env) > 0 { + sort.Slice(out.Env, func(i, j int) bool { + return out.Env[i].Name < out.Env[j].Name + }) + } + return out + }), + } + if diff := cmp.Diff(want, got, sortOpts...); diff != "" { + t.Errorf("MakeDeploymentSpec() mismatch (-want +got):\n%s", diff) + } +} diff --git a/pkg/reconciler/integration/source/resources/containersource.go b/pkg/reconciler/integration/source/resources/containersource.go index c043089aeb7..39f0d390a8b 100644 --- a/pkg/reconciler/integration/source/resources/containersource.go +++ b/pkg/reconciler/integration/source/resources/containersource.go @@ -47,6 +47,7 @@ func NewContainerSource(source *v1alpha1.IntegrationSource, oidc bool) *sourcesv Env: makeEnv(source, oidc), }, } + source.Spec.Template.Spec.ServiceAccountName = makeServiceAccountName(source) return &sourcesv1.ContainerSource{ ObjectMeta: metav1.ObjectMeta{ @@ -101,6 +102,11 @@ func makeEnv(source *v1alpha1.IntegrationSource, oidc bool) []corev1.EnvVar { integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_ACCESSKEY", commonv1a1.AwsAccessKey, secretName), integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_SECRETKEY", commonv1a1.AwsSecretKey, secretName), }...) + } else { + envVars = append(envVars, corev1.EnvVar{ + Name: "CAMEL_KAMELET_AWS_S3_SOURCE_USEDEFAULTCREDENTIALSPROVIDER", + Value: "true", + }) } return envVars } @@ -113,6 +119,11 @@ func makeEnv(source *v1alpha1.IntegrationSource, oidc bool) []corev1.EnvVar { integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_ACCESSKEY", commonv1a1.AwsAccessKey, secretName), integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_SECRETKEY", commonv1a1.AwsSecretKey, secretName), }...) + } else { + envVars = append(envVars, corev1.EnvVar{ + Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_USEDEFAULTCREDENTIALSPROVIDER", + Value: "true", + }) } return envVars } @@ -125,6 +136,11 @@ func makeEnv(source *v1alpha1.IntegrationSource, oidc bool) []corev1.EnvVar { integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_ACCESSKEY", commonv1a1.AwsAccessKey, secretName), integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_SECRETKEY", commonv1a1.AwsSecretKey, secretName), }...) + } else { + envVars = append(envVars, corev1.EnvVar{ + Name: "CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_USEDEFAULTCREDENTIALSPROVIDER", + Value: "true", + }) } return envVars } @@ -133,6 +149,13 @@ func makeEnv(source *v1alpha1.IntegrationSource, oidc bool) []corev1.EnvVar { return envVars } +func makeServiceAccountName(source *v1alpha1.IntegrationSource) string { + if source.Spec.Aws != nil && source.Spec.Aws.Auth != nil && source.Spec.Aws.Auth.ServiceAccountName != "" { + return source.Spec.Aws.Auth.ServiceAccountName + } + return "" +} + func selectImage(source *v1alpha1.IntegrationSource) string { // Injected in ./config/core/deployments/controller.yaml switch { diff --git a/pkg/reconciler/integration/source/resources/containersource_test.go b/pkg/reconciler/integration/source/resources/containersource_test.go index e342b1900cd..7528916c939 100644 --- a/pkg/reconciler/integration/source/resources/containersource_test.go +++ b/pkg/reconciler/integration/source/resources/containersource_test.go @@ -18,6 +18,7 @@ package resources import ( "fmt" + "sort" "testing" "knative.dev/eventing/pkg/reconciler/integration" @@ -29,6 +30,7 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + commonintegrationv1alpha1 "knative.dev/eventing/pkg/apis/common/integration/v1alpha1" "knative.dev/eventing/pkg/apis/sources/v1alpha1" duckv1 "knative.dev/pkg/apis/duck/v1" ) @@ -110,3 +112,109 @@ func TestNewContainerSource(t *testing.T) { t.Errorf("NewContainerSource() mismatch (-want +got):\n%s", diff) } } + +func TestNewSQSContainerSource(t *testing.T) { + const timerImage = "quay.io/sqs-image" + t.Setenv("INTEGRATION_SOURCE_AWS_SQS_IMAGE", timerImage) + source := &v1alpha1.IntegrationSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: testName, + Namespace: testNamespace, + UID: testUID, + }, + Spec: v1alpha1.IntegrationSourceSpec{ + Aws: &v1alpha1.Aws{ + SQS: &commonintegrationv1alpha1.AWSSQS{ + AWSCommon: commonintegrationv1alpha1.AWSCommon{ + Region: "us-east-1", + }, + Arn: "arn:aws:sqs:us-east-1:123456789012:knative-integration-source", + DeleteAfterRead: true, + AutoCreateQueue: false, + Host: "amazonaws.com", + Protocol: "https", + Greedy: false, + Delay: 500, + MaxMessagesPerPoll: 1, + WaitTimeSeconds: 0, + VisibilityTimeout: 0, + }, + Auth: &commonintegrationv1alpha1.Auth{ + ServiceAccountName: "aws-service-account", + }, + }, + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + URI: apis.HTTP("http://test-sink"), + }, + }, + }, + } + + want := &sourcesv1.ContainerSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-containersource", testName), + Namespace: testNamespace, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(source), + }, + Labels: integration.Labels(source.Name), + }, + Spec: sourcesv1.ContainerSourceSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: integration.Labels(source.Name), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "source", + Image: timerImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + {Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED", Value: "true"}, + {Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH", Value: "/knative-custom-certs/knative-eventing-bundle.pem"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_REGION", Value: "us-east-1"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_OVERRIDEENDPOINT", Value: "false"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_QUEUE_NAME_OR_ARN", Value: "arn:aws:sqs:us-east-1:123456789012:knative-integration-source"}, + + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_DELETEAFTERREAD", Value: "true"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_AUTOCREATEQUEUE", Value: "false"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_GREEDY", Value: "false"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_DELAY", Value: "500"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_MAXMESSAGESPERPOLL", Value: "1"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_AMAZONAWSHOST", Value: "amazonaws.com"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_PROTOCOL", Value: "https"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_WAITTIMESECONDS", Value: "0"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_VISIBILITYTIMEOUT", Value: "0"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_USEDEFAULTCREDENTIALSPROVIDER", Value: "true"}, + }, + }, + }, + ServiceAccountName: "aws-service-account", + }, + }, + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + URI: apis.HTTP("http://test-sink"), + }, + }, + }, + } + + got := NewContainerSource(source, false) + sortOpts := []cmp.Option{ + cmp.Transformer("SortEnvVars", func(in corev1.Container) corev1.Container { + out := in + if len(out.Env) > 0 { + sort.Slice(out.Env, func(i, j int) bool { + return out.Env[i].Name < out.Env[j].Name + }) + } + return out + }), + } + if diff := cmp.Diff(want, got, sortOpts...); diff != "" { + t.Errorf("NewContainerSource() mismatch (-want +got):\n%s", diff) + } +} From fb160b511b598b01f836b1b4abc85284dfefa2e5 Mon Sep 17 00:00:00 2001 From: Quenby Mitchell Date: Wed, 1 Oct 2025 15:45:03 -0600 Subject: [PATCH 2/3] use different environment variables --- .../integration/sink/resources/container_image.go | 6 +++--- .../integration/sink/resources/container_image_test.go | 2 +- .../integration/source/resources/containersource.go | 6 +++--- .../integration/source/resources/containersource_test.go | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/reconciler/integration/sink/resources/container_image.go b/pkg/reconciler/integration/sink/resources/container_image.go index edb9d6824c3..c706207f5e1 100644 --- a/pkg/reconciler/integration/sink/resources/container_image.go +++ b/pkg/reconciler/integration/sink/resources/container_image.go @@ -370,7 +370,7 @@ func makeEnv(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags) []corev }...) } else { envVars = append(envVars, corev1.EnvVar{ - Name: "CAMEL_KAMELET_AWS_S3_SINK_USEDEFAULTCREDENTIALSPROVIDER", + Name: "CAMEL_KAMELET_AWS_S3_SINK_USE_DEFAULT_CREDENTIALS_PROVIDER", Value: "true", }) } @@ -387,7 +387,7 @@ func makeEnv(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags) []corev }...) } else { envVars = append(envVars, corev1.EnvVar{ - Name: "CAMEL_KAMELET_AWS_SQS_SINK_USEDEFAULTCREDENTIALSPROVIDER", + Name: "CAMEL_KAMELET_AWS_SQS_SINK_USE_DEFAULT_CREDENTIALS_PROVIDER", Value: "true", }) } @@ -404,7 +404,7 @@ func makeEnv(sink *v1alpha1.IntegrationSink, featureFlags feature.Flags) []corev }...) } else { envVars = append(envVars, corev1.EnvVar{ - Name: "CAMEL_KAMELET_AWS_SNS_SINK_USEDEFAULTCREDENTIALSPROVIDER", + Name: "CAMEL_KAMELET_AWS_SNS_SINK_USE_DEFAULT_CREDENTIALS_PROVIDER", Value: "true", }) } diff --git a/pkg/reconciler/integration/sink/resources/container_image_test.go b/pkg/reconciler/integration/sink/resources/container_image_test.go index 873f762e919..504122d2407 100644 --- a/pkg/reconciler/integration/sink/resources/container_image_test.go +++ b/pkg/reconciler/integration/sink/resources/container_image_test.go @@ -139,7 +139,7 @@ func TestNewSQSContainerSink(t *testing.T) { {Name: "CAMEL_KAMELET_AWS_SQS_SINK_PROTOCOL", Value: "https"}, {Name: "CAMEL_KAMELET_AWS_SQS_SINK_WAITTIMESECONDS", Value: "0"}, {Name: "CAMEL_KAMELET_AWS_SQS_SINK_VISIBILITYTIMEOUT", Value: "0"}, - {Name: "CAMEL_KAMELET_AWS_SQS_SINK_USEDEFAULTCREDENTIALSPROVIDER", Value: "true"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SINK_USE_DEFAULT_CREDENTIALS_PROVIDER", Value: "true"}, }, VolumeMounts: []corev1.VolumeMount{ { diff --git a/pkg/reconciler/integration/source/resources/containersource.go b/pkg/reconciler/integration/source/resources/containersource.go index 39f0d390a8b..19c21317c18 100644 --- a/pkg/reconciler/integration/source/resources/containersource.go +++ b/pkg/reconciler/integration/source/resources/containersource.go @@ -104,7 +104,7 @@ func makeEnv(source *v1alpha1.IntegrationSource, oidc bool) []corev1.EnvVar { }...) } else { envVars = append(envVars, corev1.EnvVar{ - Name: "CAMEL_KAMELET_AWS_S3_SOURCE_USEDEFAULTCREDENTIALSPROVIDER", + Name: "CAMEL_KAMELET_AWS_S3_SOURCE_USE_DEFAULT_CREDENTIALS_PROVIDER", Value: "true", }) } @@ -121,7 +121,7 @@ func makeEnv(source *v1alpha1.IntegrationSource, oidc bool) []corev1.EnvVar { }...) } else { envVars = append(envVars, corev1.EnvVar{ - Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_USEDEFAULTCREDENTIALSPROVIDER", + Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_USE_DEFAULT_CREDENTIALS_PROVIDER", Value: "true", }) } @@ -138,7 +138,7 @@ func makeEnv(source *v1alpha1.IntegrationSource, oidc bool) []corev1.EnvVar { }...) } else { envVars = append(envVars, corev1.EnvVar{ - Name: "CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_USEDEFAULTCREDENTIALSPROVIDER", + Name: "CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_USE_DEFAULT_CREDENTIALS_PROVIDER", Value: "true", }) } diff --git a/pkg/reconciler/integration/source/resources/containersource_test.go b/pkg/reconciler/integration/source/resources/containersource_test.go index 7528916c939..c49bcbb6397 100644 --- a/pkg/reconciler/integration/source/resources/containersource_test.go +++ b/pkg/reconciler/integration/source/resources/containersource_test.go @@ -187,7 +187,7 @@ func TestNewSQSContainerSource(t *testing.T) { {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_PROTOCOL", Value: "https"}, {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_WAITTIMESECONDS", Value: "0"}, {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_VISIBILITYTIMEOUT", Value: "0"}, - {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_USEDEFAULTCREDENTIALSPROVIDER", Value: "true"}, + {Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_USE_DEFAULT_CREDENTIALS_PROVIDER", Value: "true"}, }, }, }, From f282427faa4e60949d7e4581bb6ca71b0e685158 Mon Sep 17 00:00:00 2001 From: Dave Protasowski Date: Wed, 8 Oct 2025 10:56:04 -0400 Subject: [PATCH 3/3] increase timeout in TLS test to allow cert manager to clean itself up --- test/rekt/integrationsource_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/rekt/integrationsource_test.go b/test/rekt/integrationsource_test.go index 5071b48da49..06a4de3079a 100644 --- a/test/rekt/integrationsource_test.go +++ b/test/rekt/integrationsource_test.go @@ -21,6 +21,7 @@ package rekt import ( "testing" + "time" "knative.dev/pkg/system" "knative.dev/reconciler-test/pkg/environment" @@ -56,6 +57,7 @@ func TestIntegrationSourceWithTLS(t *testing.T) { k8s.WithEventListener, environment.Managed(t), eventshub.WithTLS(t), + environment.WithPollTimings(5*time.Second, 4*time.Minute), ) env.ParallelTest(ctx, t, integrationsource.SendEventsWithTLSRecieverAsSink())