Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
freeznet committed Jun 20, 2024
1 parent 3ef1bd8 commit 0adfd1f
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 17 deletions.
32 changes: 32 additions & 0 deletions pkg/admin/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,35 @@ func (d *DummyPulsarAdmin) DeleteCluster(name string) error {
func (d *DummyPulsarAdmin) CheckClusterExist(name string) (bool, error) {
return true, nil
}

func (d *DummyPulsarAdmin) DeletePulsarPackage(packageURL string) error {

Check warning on line 132 in pkg/admin/dummy.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported method DummyPulsarAdmin.DeletePulsarPackage should have comment or be unexported (revive)
return nil
}

func (d *DummyPulsarAdmin) ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string, changed bool) error {

Check warning on line 136 in pkg/admin/dummy.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported method DummyPulsarAdmin.ApplyPulsarPackage should have comment or be unexported (revive)
return nil
}

func (d *DummyPulsarAdmin) DeletePulsarFunction(tenant, namespace, name string) error {
return nil
}

func (d *DummyPulsarAdmin) ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec, changed bool) error {
return nil
}

func (d *DummyPulsarAdmin) DeletePulsarSink(tenant, namespace, name string) error {
return nil
}

func (d *DummyPulsarAdmin) ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec, changed bool) error {
return nil
}

func (d *DummyPulsarAdmin) DeletePulsarSource(tenant, namespace, name string) error {
return nil
}

func (d *DummyPulsarAdmin) ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec, changed bool) error {
return nil
}
36 changes: 28 additions & 8 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,12 +661,16 @@ func (p *PulsarAdminClient) DeletePulsarPackage(packageURL string) error {
return p.adminClient.Packages().Delete(packageURL)
}

func (p *PulsarAdminClient) ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string) error {
func (p *PulsarAdminClient) ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string, changed bool) error {
packageName, err := utils.GetPackageName(packageURL)
if err != nil {
return err
}
err = p.adminClient.Packages().Upload(packageName.GetCompleteName(), filePath, description, contact, properties)
if changed {
err = p.adminClient.Packages().UpdateMetadata(packageName.GetCompleteName(), description, contact, properties)
} else {
err = p.adminClient.Packages().Upload(packageName.GetCompleteName(), filePath, description, contact, properties)
}
if err != nil {
if !IsAlreadyExist(err) {
return err
Expand All @@ -680,7 +684,7 @@ func (p *PulsarAdminClient) DeletePulsarFunction(tenant, namespace, name string)
return p.adminClient.Functions().DeleteFunction(tenant, namespace, name)
}

func (p *PulsarAdminClient) ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec) error {
func (p *PulsarAdminClient) ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec, changed bool) error {

functionConfig := utils.FunctionConfig{
Tenant: tenant,
Expand Down Expand Up @@ -818,7 +822,12 @@ func (p *PulsarAdminClient) ApplyPulsarFunction(tenant, namespace, name, package
return errors.New("FunctionConfig need to specify Jar, Py, or Go package URL")
}

err := p.adminClient.Functions().CreateFuncWithURL(&functionConfig, packageURL)
var err error
if changed {
err = p.adminClient.Functions().UpdateFunctionWithURL(&functionConfig, packageURL, nil)
} else {
err = p.adminClient.Functions().CreateFuncWithURL(&functionConfig, packageURL)
}
if err != nil {
if !IsAlreadyExist(err) {
return err
Expand All @@ -832,7 +841,7 @@ func (p *PulsarAdminClient) DeletePulsarSink(tenant, namespace, name string) err
return p.adminClient.Sinks().DeleteSink(tenant, namespace, name)
}

func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec) error {
func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec, changed bool) error {
sinkConfig := utils.SinkConfig{
Tenant: tenant,
Namespace: namespace,
Expand Down Expand Up @@ -923,7 +932,12 @@ func (p *PulsarAdminClient) ApplyPulsarSink(tenant, namespace, name, packageURL
sinkConfig.Secrets = secrets
}

err := p.adminClient.Sinks().CreateSinkWithURL(&sinkConfig, packageURL)
var err error
if changed {
err = p.adminClient.Sinks().UpdateSinkWithURL(&sinkConfig, packageURL, nil)
} else {
err = p.adminClient.Sinks().CreateSinkWithURL(&sinkConfig, packageURL)
}
if err != nil {
if !IsAlreadyExist(err) {
return err
Expand All @@ -937,7 +951,7 @@ func (p *PulsarAdminClient) DeletePulsarSource(tenant, namespace, name string) e
return p.adminClient.Sources().DeleteSource(tenant, namespace, name)
}

func (p *PulsarAdminClient) ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec) error {
func (p *PulsarAdminClient) ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec, changed bool) error {
sourceConfig := utils.SourceConfig{
Tenant: tenant,
Namespace: namespace,
Expand Down Expand Up @@ -1012,7 +1026,13 @@ func (p *PulsarAdminClient) ApplyPulsarSource(tenant, namespace, name, packageUR
sourceConfig.CustomRuntimeOptions = string(jByte)
}

err := p.adminClient.Sources().CreateSourceWithURL(&sourceConfig, packageURL)
var err error

if changed {
err = p.adminClient.Sources().UpdateSourceWithURL(&sourceConfig, packageURL, nil)
} else {
err = p.adminClient.Sources().CreateSourceWithURL(&sourceConfig, packageURL)
}
if err != nil {
if !IsAlreadyExist(err) {
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/admin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,25 +156,25 @@ type PulsarAdmin interface {
DeletePulsarPackage(packageURL string) error

// ApplyPulsarPackage apply pulsar package
ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string) error
ApplyPulsarPackage(packageURL, filePath, description, contact string, properties map[string]string, changed bool) error

// DeletePulsarFunction delete pulsar function
DeletePulsarFunction(tenant, namespace, name string) error

// ApplyPulsarFunction apply pulsar function
ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec) error
ApplyPulsarFunction(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarFunctionSpec, changed bool) error

// DeletePulsarSink delete pulsar sink
DeletePulsarSink(tenant, namespace, name string) error

// ApplyPulsarSink apply pulsar sink
ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec) error
ApplyPulsarSink(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSinkSpec, changed bool) error

// DeletePulsarSource delete pulsar source
DeletePulsarSource(tenant, namespace, name string) error

// ApplyPulsarSource apply pulsar source
ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec) error
ApplyPulsarSource(tenant, namespace, name, packageURL string, param *v1alpha1.PulsarSourceSpec, changed bool) error
}

// PulsarAdminCreator is the function type to create a PulsarAdmin with config
Expand Down
2 changes: 1 addition & 1 deletion pkg/connection/reconcile_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (r *PulsarFunctionReconciler) ReconcileFunction(ctx context.Context, pulsar
return err
}

if err := pulsarAdmin.ApplyPulsarFunction(instance.Spec.Tenant, instance.Spec.Namespace, instance.Spec.Name, packageUrl, &instance.Spec); err != nil {
if err := pulsarAdmin.ApplyPulsarFunction(instance.Spec.Tenant, instance.Spec.Namespace, instance.Spec.Name, packageUrl, &instance.Spec, instance.Status.ObservedGeneration > 1); err != nil {
meta.SetStatusCondition(&instance.Status.Conditions, *NewErrorCondition(instance.Generation, err.Error()))
log.Error(err, "Failed to apply function")
if err := r.conn.client.Status().Update(ctx, instance); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/connection/reconcile_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (r *PulsarPackageReconciler) ReconcilePackage(ctx context.Context, pulsarAd
}
defer os.Remove(filePath)

if err := pulsarAdmin.ApplyPulsarPackage(pkg.Spec.PackageURL, filePath, pkg.Spec.Description, pkg.Spec.Contact, pkg.Spec.Properties); err != nil {
if err := pulsarAdmin.ApplyPulsarPackage(pkg.Spec.PackageURL, filePath, pkg.Spec.Description, pkg.Spec.Contact, pkg.Spec.Properties, pkg.Status.ObservedGeneration > 1); err != nil {
meta.SetStatusCondition(&pkg.Status.Conditions, *NewErrorCondition(pkg.Generation, err.Error()))
log.Error(err, "Failed to apply package")
if err := r.conn.client.Status().Update(ctx, pkg); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/connection/reconcile_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (r *PulsarSinkReconciler) ReconcileSink(ctx context.Context, pulsarAdmin ad
return err
}

if err := pulsarAdmin.ApplyPulsarSink(sink.Spec.Tenant, sink.Spec.Namespace, sink.Spec.Name, packageUrl, &sink.Spec); err != nil {
if err := pulsarAdmin.ApplyPulsarSink(sink.Spec.Tenant, sink.Spec.Namespace, sink.Spec.Name, packageUrl, &sink.Spec, sink.Status.ObservedGeneration > 1); err != nil {
meta.SetStatusCondition(&sink.Status.Conditions, *NewErrorCondition(sink.Generation, err.Error()))
log.Error(err, "Failed to apply sink")
if err := r.conn.client.Status().Update(ctx, sink); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/connection/reconcile_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (r *PulsarSourceReconciler) ReconcileSource(ctx context.Context, pulsarAdmi
return err
}

if err := pulsarAdmin.ApplyPulsarSource(source.Spec.Tenant, source.Spec.Namespace, source.Spec.Name, packageUrl, &source.Spec); err != nil {
if err := pulsarAdmin.ApplyPulsarSource(source.Spec.Tenant, source.Spec.Namespace, source.Spec.Name, packageUrl, &source.Spec, source.Status.ObservedGeneration > 1); err != nil {
meta.SetStatusCondition(&source.Status.Conditions, *NewErrorCondition(source.Generation, err.Error()))
log.Error(err, "Failed to apply source")
if err := r.conn.client.Status().Update(ctx, source); err != nil {
Expand Down
78 changes: 77 additions & 1 deletion tests/operator/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ var _ = Describe("Resources", func() {
},
},
}
ppackage *v1alphav1.PulsarPackage
ppackageurl string = "source://public/default/test-source@latest"
pfuncName string = "test-func"
pfuncClassName string = "org.apache.pulsar.functions.api.examples.ExclamationFunction"
pfunc *v1alphav1.PulsarFunction
psinkpackageurl string = "builtin://data-generator"
psink *v1alphav1.PulsarSink
psinkClassName string = "org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink"
psource *v1alphav1.PulsarSource
psourcepackageurl string = "builtin://data-generator"
psourceClassName string = "org.apache.pulsar.io.datagenerator.DataGeneratorSource"
)

BeforeEach(func() {
Expand All @@ -94,7 +105,10 @@ var _ = Describe("Resources", func() {
roles := []string{"ironman"}
actions := []string{"produce", "consume", "functions"}
ppermission = utils.MakePulsarPermission(namespaceName, ppermissionName, topicName, pconnName, v1alphav1.PulsarResourceTypeTopic, roles, actions, v1alphav1.CleanUpAfterDeletion)

ppackage = utils.MakePulsarPackage(namespaceName, pfuncName, ppackageurl, pconnName, lifecyclePolicy)
pfunc = utils.MakePulsarFunction(namespaceName, pfuncName, "file:///pulsar/examples/api-examples.jar", pconnName, lifecyclePolicy)
psink = utils.MakePulsarSink(namespaceName, pfuncName, psinkpackageurl, pconnName, lifecyclePolicy)
psource = utils.MakePulsarSource(namespaceName, pfuncName, psourcepackageurl, pconnName, lifecyclePolicy)
})

Describe("Basic resource operations", Ordered, func() {
Expand Down Expand Up @@ -293,6 +307,68 @@ var _ = Describe("Resources", func() {
})
})

Context("PulsarFunction & PulsarPackage operation", func() {
It("should create the pulsarpackage successfully", func() {
err := k8sClient.Create(ctx, pfuncpackage)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
})

It("the package should be ready", func() {
Eventually(func() bool {
p := &v1alphav1.PulsarPackage{}
tns := types.NamespacedName{Namespace: namespaceName, Name: pfuncName}
Expect(k8sClient.Get(ctx, tns, p)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(p)
}, "20s", "100ms").Should(BeTrue())
})

It("should create the pulsarfunction successfully", func() {
err := k8sClient.Create(ctx, pfunc)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
})

It("the function should be ready", func() {
Eventually(func() bool {
f := &v1alphav1.PulsarFunction{}
tns := types.NamespacedName{Namespace: namespaceName, Name: pfuncName}
Expect(k8sClient.Get(ctx, tns, f)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(f)
}, "20s", "100ms").Should(BeTrue())
})
})

Context("PulsarSink operation", func() {
It("should create the pulsarsink successfully", func() {
err := k8sClient.Create(ctx, psink)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
})

It("the sink should be ready", func() {
Eventually(func() bool {
s := &v1alphav1.PulsarSink{}
tns := types.NamespacedName{Namespace: namespaceName, Name: pfuncName}
Expect(k8sClient.Get(ctx, tns, s)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(s)
}, "20s", "100ms").Should(BeTrue())
})
})

Context("PulsarSource operation", func() {
It("should create the pulsarsource successfully", func() {
err := k8sClient.Create(ctx, psource)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
})

It("the source should be ready", func() {
Eventually(func() bool {
s := &v1alphav1.PulsarSource{}
tns := types.NamespacedName{Namespace: namespaceName, Name: pfuncName}
Expect(k8sClient.Get(ctx, tns, s)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(s)
}, "20s", "100ms").Should(BeTrue())
})
})

AfterAll(func() {
Eventually(func(g Gomega) {
t := &v1alphav1.PulsarTopic{}
Expand Down
50 changes: 50 additions & 0 deletions tests/utils/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ package utils

import (
"bytes"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"

Expand Down Expand Up @@ -79,3 +83,49 @@ func ExecInPod(config *rest.Config, namespace, podName, containerName, command s
}
return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), nil
}

func TransferFileFromPodToPod(fromNamespace, fromPodName, fromContainerName, fromFilePath, toNamespace, toPodName, toContainerName, toFilePath string) error {
// Create a temporary directory to store the file locally
tempDir, err := ioutil.TempDir("", "transfer-file")
if err != nil {
return fmt.Errorf("failed to create temp directory: %v", err)
}
defer os.RemoveAll(tempDir)

// Temporary file path
localFilePath := filepath.Join(tempDir, "transfer-file")

// Step 1: Copy the file from the source pod to the local temporary file
err = CopyFileFromPod(fromNamespace, fromPodName, fromContainerName, fromFilePath, localFilePath)
if err != nil {
return fmt.Errorf("failed to copy file from pod: %v", err)
}

// Step 2: Copy the file from the local temporary file to the destination pod
err = CopyFileToPod(localFilePath, toNamespace, toPodName, toContainerName, toFilePath)
if err != nil {
return fmt.Errorf("failed to copy file to pod: %v", err)
}

return nil
}

// CopyFileFromPod copies a file from a pod to a local path
func CopyFileFromPod(namespace, podName, containerName, srcPath, dstPath string) error {
cmd := []string{"kubectl", "cp", fmt.Sprintf("%s/%s:%s", namespace, podName, srcPath), dstPath, "-c", containerName}
out, err := exec.Command(cmd[0], cmd[1:]...).CombinedOutput()
if err != nil {
return fmt.Errorf("failed to execute command %s: %v\nOutput: %s", cmd, err, string(out))
}
return nil
}

// CopyFileToPod copies a local file to a pod
func CopyFileToPod(srcPath, namespace, podName, containerName, dstPath string) error {
cmd := []string{"kubectl", "cp", srcPath, fmt.Sprintf("%s/%s:%s", namespace, podName, dstPath), "-c", containerName}
out, err := exec.Command(cmd[0], cmd[1:]...).CombinedOutput()
if err != nil {
return fmt.Errorf("failed to execute command %s: %v\nOutput: %s", cmd, err, string(out))
}
return nil
}
Loading

0 comments on commit 0adfd1f

Please sign in to comment.