diff --git a/pkg/admin/dummy.go b/pkg/admin/dummy.go index b19b5473..50d0e4e4 100644 --- a/pkg/admin/dummy.go +++ b/pkg/admin/dummy.go @@ -128,3 +128,35 @@ func (d *DummyPulsarAdmin) DeleteCluster(name string) error { func (d *DummyPulsarAdmin) CheckClusterExist(name string) (bool, error) { return true, nil } + +func (d *DummyPulsarAdmin) DeletePulsarPackage(packageURL string) error { + return nil +} + +func (d *DummyPulsarAdmin) ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string, changed bool) error { + return nil +} + +func (d *DummyPulsarAdmin) DeletePulsarFunction(tenant, namespace, name string) error { + return nil +} + +func (d *DummyPulsarAdmin) ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec, changed bool) error { + return nil +} + +func (d *DummyPulsarAdmin) DeletePulsarSink(tenant, namespace, name string) error { + return nil +} + +func (d *DummyPulsarAdmin) ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec, changed bool) error { + return nil +} + +func (d *DummyPulsarAdmin) DeletePulsarSource(tenant, namespace, name string) error { + return nil +} + +func (d *DummyPulsarAdmin) ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec, changed bool) error { + return nil +} diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 275c1ab4..d4623ffa 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -661,12 +661,16 @@ func (p *PulsarAdminClient) DeletePulsarPackage(packageURL string) error { return p.adminClient.Packages().Delete(packageURL) } -func (p *PulsarAdminClient) ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string) error { +func (p *PulsarAdminClient) ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string, changed bool) error { packageName, err := utils.GetPackageName(packageURL) if err != nil { return err } - err = p.adminClient.Packages().Upload(packageName.GetCompleteName(), filePath, description, contact, properties) + if changed { + err = p.adminClient.Packages().UpdateMetadata(packageName.GetCompleteName(), description, contact, properties) + } else { + err = p.adminClient.Packages().Upload(packageName.GetCompleteName(), filePath, description, contact, properties) + } if err != nil { if !IsAlreadyExist(err) { return err @@ -680,7 +684,7 @@ func (p *PulsarAdminClient) DeletePulsarFunction(tenant, namespace, name string) return p.adminClient.Functions().DeleteFunction(tenant, namespace, name) } -func (p *PulsarAdminClient) ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec) error { +func (p *PulsarAdminClient) ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec, changed bool) error { functionConfig := utils.FunctionConfig{ Tenant: tenant, @@ -818,7 +822,12 @@ func (p *PulsarAdminClient) ApplyPulsarFunction(tenant, namespace, name, package return errors.New("FunctionConfig need to specify Jar, Py, or Go package URL") } - err := p.adminClient.Functions().CreateFuncWithURL(&functionConfig, packageURL) + var err error + if changed { + err = p.adminClient.Functions().UpdateFunctionWithURL(&functionConfig, packageURL, nil) + } else { + err = p.adminClient.Functions().CreateFuncWithURL(&functionConfig, packageURL) + } if err != nil { if !IsAlreadyExist(err) { return err @@ -832,7 +841,7 @@ func (p *PulsarAdminClient) DeletePulsarSink(tenant, namespace, name string) err return p.adminClient.Sinks().DeleteSink(tenant, namespace, name) } -func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec) error { +func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec, changed bool) error { sinkConfig := utils.SinkConfig{ Tenant: tenant, Namespace: namespace, @@ -923,7 +932,12 @@ func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL sinkConfig.Secrets = secrets } - err := p.adminClient.Sinks().CreateSinkWithURL(&sinkConfig, packageURL) + var err error + if changed { + err = p.adminClient.Sinks().UpdateSinkWithURL(&sinkConfig, packageURL, nil) + } else { + err = p.adminClient.Sinks().CreateSinkWithURL(&sinkConfig, packageURL) + } if err != nil { if !IsAlreadyExist(err) { return err @@ -937,7 +951,7 @@ func (p *PulsarAdminClient) DeletePulsarSource(tenant, namespace, name string) e return p.adminClient.Sources().DeleteSource(tenant, namespace, name) } -func (p *PulsarAdminClient) ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec) error { +func (p *PulsarAdminClient) ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec, changed bool) error { sourceConfig := utils.SourceConfig{ Tenant: tenant, Namespace: namespace, @@ -1012,7 +1026,13 @@ func (p *PulsarAdminClient) ApplyPulsarSource(tenant, namespace, name, packageUR sourceConfig.CustomRuntimeOptions = string(jByte) } - err := p.adminClient.Sources().CreateSourceWithURL(&sourceConfig, packageURL) + var err error + + if changed { + err = p.adminClient.Sources().UpdateSourceWithURL(&sourceConfig, packageURL, nil) + } else { + err = p.adminClient.Sources().CreateSourceWithURL(&sourceConfig, packageURL) + } if err != nil { if !IsAlreadyExist(err) { return err diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index c231e327..7aa4c536 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -156,25 +156,25 @@ type PulsarAdmin interface { DeletePulsarPackage(packageURL string) error // ApplyPulsarPackage apply pulsar package - ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string) error + ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string, changed bool) error // DeletePulsarFunction delete pulsar function DeletePulsarFunction(tenant, namespace, name string) error // ApplyPulsarFunction apply pulsar function - ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec) error + ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec, changed bool) error // DeletePulsarSink delete pulsar sink DeletePulsarSink(tenant, namespace, name string) error // ApplyPulsarSink apply pulsar sink - ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec) error + ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec, changed bool) error // DeletePulsarSource delete pulsar source DeletePulsarSource(tenant, namespace, name string) error // ApplyPulsarSource apply pulsar source - ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec) error + ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec, changed bool) error } // PulsarAdminCreator is the function type to create a PulsarAdmin with config diff --git a/pkg/connection/reconcile_function.go b/pkg/connection/reconcile_function.go index 54ed010d..d4641e68 100644 --- a/pkg/connection/reconcile_function.go +++ b/pkg/connection/reconcile_function.go @@ -136,7 +136,7 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar return err } - if err := pulsarAdmin.ApplyPulsarFunction(instance.Spec.Tenant, instance.Spec.Namespace, instance.Spec.Name, packageUrl, &instance.Spec); err != nil { + if err := pulsarAdmin.ApplyPulsarFunction(instance.Spec.Tenant, instance.Spec.Namespace, instance.Spec.Name, packageUrl, &instance.Spec, instance.Status.ObservedGeneration > 1); err != nil { meta.SetStatusCondition(&instance.Status.Conditions, *NewErrorCondition(instance.Generation, err.Error())) log.Error(err, "Failed to apply function") if err := r.conn.client.Status().Update(ctx, instance); err != nil { diff --git a/pkg/connection/reconcile_package.go b/pkg/connection/reconcile_package.go index 24f9c6e0..7e4d4a54 100644 --- a/pkg/connection/reconcile_package.go +++ b/pkg/connection/reconcile_package.go @@ -128,7 +128,7 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd } defer os.Remove(filePath) - if err := pulsarAdmin.ApplyPulsarPackage(pkg.Spec.PackageURL, filePath, pkg.Spec.Description, pkg.Spec.Contact, pkg.Spec.Properties); err != nil { + if err := pulsarAdmin.ApplyPulsarPackage(pkg.Spec.PackageURL, filePath, pkg.Spec.Description, pkg.Spec.Contact, pkg.Spec.Properties, pkg.Status.ObservedGeneration > 1); err != nil { meta.SetStatusCondition(&pkg.Status.Conditions, *NewErrorCondition(pkg.Generation, err.Error())) log.Error(err, "Failed to apply package") if err := r.conn.client.Status().Update(ctx, pkg); err != nil { diff --git a/pkg/connection/reconcile_sink.go b/pkg/connection/reconcile_sink.go index ec87e90d..90f3f74e 100644 --- a/pkg/connection/reconcile_sink.go +++ b/pkg/connection/reconcile_sink.go @@ -130,7 +130,7 @@ func (r *PulsarSinkReconciler) ReconcileSink(ctx context.Context, pulsarAdmin ad return err } - if err := pulsarAdmin.ApplyPulsarSink(sink.Spec.Tenant, sink.Spec.Namespace, sink.Spec.Name, packageUrl, &sink.Spec); err != nil { + if err := pulsarAdmin.ApplyPulsarSink(sink.Spec.Tenant, sink.Spec.Namespace, sink.Spec.Name, packageUrl, &sink.Spec, sink.Status.ObservedGeneration > 1); err != nil { meta.SetStatusCondition(&sink.Status.Conditions, *NewErrorCondition(sink.Generation, err.Error())) log.Error(err, "Failed to apply sink") if err := r.conn.client.Status().Update(ctx, sink); err != nil { diff --git a/pkg/connection/reconcile_source.go b/pkg/connection/reconcile_source.go index 52139501..d9ae5391 100644 --- a/pkg/connection/reconcile_source.go +++ b/pkg/connection/reconcile_source.go @@ -130,7 +130,7 @@ func (r *PulsarSourceReconciler) ReconcileSource(ctx context.Context, pulsarAdmi return err } - if err := pulsarAdmin.ApplyPulsarSource(source.Spec.Tenant, source.Spec.Namespace, source.Spec.Name, packageUrl, &source.Spec); err != nil { + if err := pulsarAdmin.ApplyPulsarSource(source.Spec.Tenant, source.Spec.Namespace, source.Spec.Name, packageUrl, &source.Spec, source.Status.ObservedGeneration > 1); err != nil { meta.SetStatusCondition(&source.Status.Conditions, *NewErrorCondition(source.Generation, err.Error())) log.Error(err, "Failed to apply source") if err := r.conn.client.Status().Update(ctx, source); err != nil { diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index a57fc0c5..832db839 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -74,6 +74,17 @@ var _ = Describe("Resources", func() { }, }, } + ppackage *v1alphav1.PulsarPackage + ppackageurl string = "source://public/default/test-source@latest" + pfuncName string = "test-func" + pfuncClassName string = "org.apache.pulsar.functions.api.examples.ExclamationFunction" + pfunc *v1alphav1.PulsarFunction + psinkpackageurl string = "builtin://data-generator" + psink *v1alphav1.PulsarSink + psinkClassName string = "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink" + psource *v1alphav1.PulsarSource + psourcepackageurl string = "builtin://data-generator" + psourceClassName string = "org.apache.pulsar.io.datagenerator.DataGeneratorSource" ) BeforeEach(func() { @@ -94,7 +105,10 @@ var _ = Describe("Resources", func() { roles := []string{"ironman"} actions := []string{"produce", "consume", "functions"} ppermission = utils.MakePulsarPermission(namespaceName, ppermissionName, topicName, pconnName, v1alphav1.PulsarResourceTypeTopic, roles, actions, v1alphav1.CleanUpAfterDeletion) - + ppackage = utils.MakePulsarPackage(namespaceName, pfuncName, ppackageurl, pconnName, lifecyclePolicy) + pfunc = utils.MakePulsarFunction(namespaceName, pfuncName, "file:///pulsar/examples/api-examples.jar", pconnName, lifecyclePolicy) + psink = utils.MakePulsarSink(namespaceName, pfuncName, psinkpackageurl, pconnName, lifecyclePolicy) + psource = utils.MakePulsarSource(namespaceName, pfuncName, psourcepackageurl, pconnName, lifecyclePolicy) }) Describe("Basic resource operations", Ordered, func() { @@ -293,6 +307,68 @@ var _ = Describe("Resources", func() { }) }) + Context("PulsarFunction & PulsarPackage operation", func() { + It("should create the pulsarpackage successfully", func() { + err := k8sClient.Create(ctx, pfuncpackage) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("the package should be ready", func() { + Eventually(func() bool { + p := &v1alphav1.PulsarPackage{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pfuncName} + Expect(k8sClient.Get(ctx, tns, p)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(p) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("should create the pulsarfunction successfully", func() { + err := k8sClient.Create(ctx, pfunc) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("the function should be ready", func() { + Eventually(func() bool { + f := &v1alphav1.PulsarFunction{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pfuncName} + Expect(k8sClient.Get(ctx, tns, f)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(f) + }, "20s", "100ms").Should(BeTrue()) + }) + }) + + Context("PulsarSink operation", func() { + It("should create the pulsarsink successfully", func() { + err := k8sClient.Create(ctx, psink) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("the sink should be ready", func() { + Eventually(func() bool { + s := &v1alphav1.PulsarSink{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pfuncName} + Expect(k8sClient.Get(ctx, tns, s)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(s) + }, "20s", "100ms").Should(BeTrue()) + }) + }) + + Context("PulsarSource operation", func() { + It("should create the pulsarsource successfully", func() { + err := k8sClient.Create(ctx, psource) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("the source should be ready", func() { + Eventually(func() bool { + s := &v1alphav1.PulsarSource{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pfuncName} + Expect(k8sClient.Get(ctx, tns, s)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(s) + }, "20s", "100ms").Should(BeTrue()) + }) + }) + AfterAll(func() { Eventually(func(g Gomega) { t := &v1alphav1.PulsarTopic{} diff --git a/tests/utils/k8s.go b/tests/utils/k8s.go index a7333248..658bbec8 100644 --- a/tests/utils/k8s.go +++ b/tests/utils/k8s.go @@ -16,6 +16,10 @@ package utils import ( "bytes" + "fmt" + "io/ioutil" + "os" + "os/exec" "path/filepath" "strings" @@ -79,3 +83,49 @@ func ExecInPod(config *rest.Config, namespace, podName, containerName, command s } return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), nil } + +func TransferFileFromPodToPod(fromNamespace, fromPodName, fromContainerName, fromFilePath, toNamespace, toPodName, toContainerName, toFilePath string) error { + // Create a temporary directory to store the file locally + tempDir, err := ioutil.TempDir("", "transfer-file") + if err != nil { + return fmt.Errorf("failed to create temp directory: %v", err) + } + defer os.RemoveAll(tempDir) + + // Temporary file path + localFilePath := filepath.Join(tempDir, "transfer-file") + + // Step 1: Copy the file from the source pod to the local temporary file + err = CopyFileFromPod(fromNamespace, fromPodName, fromContainerName, fromFilePath, localFilePath) + if err != nil { + return fmt.Errorf("failed to copy file from pod: %v", err) + } + + // Step 2: Copy the file from the local temporary file to the destination pod + err = CopyFileToPod(localFilePath, toNamespace, toPodName, toContainerName, toFilePath) + if err != nil { + return fmt.Errorf("failed to copy file to pod: %v", err) + } + + return nil +} + +// CopyFileFromPod copies a file from a pod to a local path +func CopyFileFromPod(namespace, podName, containerName, srcPath, dstPath string) error { + cmd := []string{"kubectl", "cp", fmt.Sprintf("%s/%s:%s", namespace, podName, srcPath), dstPath, "-c", containerName} + out, err := exec.Command(cmd[0], cmd[1:]...).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to execute command %s: %v\nOutput: %s", cmd, err, string(out)) + } + return nil +} + +// CopyFileToPod copies a local file to a pod +func CopyFileToPod(srcPath, namespace, podName, containerName, dstPath string) error { + cmd := []string{"kubectl", "cp", srcPath, fmt.Sprintf("%s/%s:%s", namespace, podName, dstPath), "-c", containerName} + out, err := exec.Command(cmd[0], cmd[1:]...).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to execute command %s: %v\nOutput: %s", cmd, err, string(out)) + } + return nil +} diff --git a/tests/utils/spec.go b/tests/utils/spec.go index ca0a2f4a..6b0b818c 100644 --- a/tests/utils/spec.go +++ b/tests/utils/spec.go @@ -18,6 +18,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" rsutils "github.com/streamnative/pulsar-resources-operator/pkg/utils" @@ -115,3 +116,152 @@ func MakePulsarPermission(namespace, name, resourceName, connectionName string, }, } } + +// MakePulsarPackage will generate a object of PulsarPackage +func MakePulsarPackage(namespace, name, packageURL, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarPackage { + return &v1alpha1.PulsarPackage{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarPackageSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + PackageURL: packageURL, + FileURL: "https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.10.4/connectors/pulsar-io-file-2.10.4.nar", + Description: "pulsar-io-file-2.10.4", + LifecyclePolicy: policy, + }, + } +} + +// MakePulsarFunction will generate a object of PulsarFunction +func MakePulsarFunction(namespace, name, functionPackageUrl, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarFunction { + return &v1alpha1.PulsarFunction{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarFunctionSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + LifecyclePolicy: policy, + Jar: &v1alpha1.PackageContentRef{URL: functionPackageUrl}, + Tenant: "public", + Namespace: "default", + Name: name, + Inputs: []string{"input"}, + Output: "output", + Parallelism: 1, + ProcessingGuarantees: "ATLEAST_ONCE", + ClassName: "org.apache.pulsar.functions.api.examples.ExclamationFunction", + SubName: "test-sub", + SubscriptionPosition: "Latest", + CleanupSubscription: true, + SkipToLatest: true, + ForwardSourceMessageProperty: true, + RetainKeyOrdering: true, + AutoAck: true, + MaxMessageRetries: pointer.Int(101), + DeadLetterTopic: "dl-topic", + LogTopic: "func-log", + TimeoutMs: pointer.Int64(6666), + Secrets: map[string]v1alpha1.SecretKeyRef{ + "SECRET1": { + "sectest", "hello", + }, + }, + CustomRuntimeOptions: &v1alpha1.Config{Data: map[string]interface{}{ + "env": map[string]string{ + "HELLO": "WORLD", + }, + }}, + }, + } +} + +// MakePulsarSink will generate a object of PulsarSink +func MakePulsarSink(namespace, name, sinkPackageUrl, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarSink { + return &v1alpha1.PulsarSink{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarSinkSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + LifecyclePolicy: policy, + Archive: &v1alpha1.PackageContentRef{URL: sinkPackageUrl}, + Tenant: "public", + Namespace: "default", + Name: name, + Inputs: []string{"sink-input"}, + Parallelism: 1, + ProcessingGuarantees: "EFFECTIVELY_ONCE", + CleanupSubscription: false, + SourceSubscriptionPosition: "Latest", + AutoAck: true, + ClassName: "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink", + Resources: &v1alpha1.Resources{ + CPU: 1, + RAM: 2048, + Disk: 102400, + }, + Secrets: map[string]v1alpha1.SecretKeyRef{ + "SECRET1": { + "sectest", "hello", + }, + }, + CustomRuntimeOptions: &v1alpha1.Config{Data: map[string]interface{}{ + "env": map[string]string{ + "HELLO": "WORLD", + }, + }}, + }, + } +} + +// MakePulsarSource will generate a object of PulsarSource +func MakePulsarSource(namespace, name, sourcePackageUrl, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarSource { + return &v1alpha1.PulsarSource{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarSourceSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + LifecyclePolicy: policy, + Archive: &v1alpha1.PackageContentRef{URL: sourcePackageUrl}, + Tenant: "public", + Namespace: "default", + Name: name, + TopicName: "sink-input", + Parallelism: 1, + ProcessingGuarantees: "EFFECTIVELY_ONCE", + ClassName: "org.apache.pulsar.io.datagenerator.DataGeneratorSource", + Configs: &v1alpha1.Config{Data: map[string]interface{}{ + "sleepBetweenMessages": 1000, + }}, + Resources: &v1alpha1.Resources{ + CPU: 1, + RAM: 512, + Disk: 102400, + }, + Secrets: map[string]v1alpha1.SecretKeyRef{ + "SECRET1": { + "sectest", "hello", + }, + }, + CustomRuntimeOptions: &v1alpha1.Config{Data: map[string]interface{}{ + "env": map[string]string{ + "HELLO": "WORLD", + }, + }}, + }, + } +}