Skip to content

Commit

Permalink
Fix partitioned topic not change (#185)
Browse files Browse the repository at this point in the history
* fix

* improve

* fix test

* fix test

* fix test
  • Loading branch information
labuladong authored Jan 29, 2024
1 parent 24d1187 commit 99df860
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 3 deletions.
15 changes: 12 additions & 3 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,18 @@ func (p *PulsarAdminClient) ApplyTopic(name string, params *TopicParams) error {
if err != nil {
return err
}
err = p.adminClient.Topics().Create(*topicName, int(*params.Partitions))
if err != nil && !IsAlreadyExist(err) {
return err
partitionNum := int(*params.Partitions)
err = p.adminClient.Topics().Create(*topicName, partitionNum)
if err != nil {
if !IsAlreadyExist(err) {
return err
}
if partitionNum > 0 {
// for partitioned topic, allow to change the partition number
if err = p.adminClient.Topics().Update(*topicName, partitionNum); err != nil {
return err
}
}
}

err = p.applyTopicPolicies(topicName, params)
Expand Down
43 changes: 43 additions & 0 deletions tests/operator/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
"github.com/onsi/gomega/format"
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"

v1alphav1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
"github.com/streamnative/pulsar-resources-operator/tests/utils"
Expand Down Expand Up @@ -58,6 +61,19 @@ var _ = Describe("Resources", func() {
ppermissionName string = "test-permission"
exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
partitionedTopic = &v1alphav1.PulsarTopic{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespaceName,
Name: "test-partitioned-topic",
},
Spec: v1alphav1.PulsarTopicSpec{
Name: "persistent://cloud/stage/partitioned-topic",
Partitions: pointer.Int32Ptr(1),
ConnectionRef: corev1.LocalObjectReference{
Name: pconnName,
},
},
}
)

BeforeEach(func() {
Expand Down Expand Up @@ -168,6 +184,8 @@ var _ = Describe("Resources", func() {
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
err = k8sClient.Create(ctx, ptopic2)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
err = k8sClient.Create(ctx, partitionedTopic)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
})

It("should be ready", func() {
Expand All @@ -177,6 +195,12 @@ var _ = Describe("Resources", func() {
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(t)
}, "20s", "100ms").Should(BeTrue())
Eventually(func() bool {
t := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: partitionedTopic.Namespace, Name: partitionedTopic.Name}
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(t)
}, "20s", "100ms").Should(BeTrue())
})

It("should have the schema set", func() {
Expand Down Expand Up @@ -232,6 +256,25 @@ var _ = Describe("Resources", func() {
g.Expect(stderr).Should(ContainSubstring("404"))
}, "5s", "100ms").Should(Succeed())
})

It("should increase the partitions successfully", func() {
curTopic := &v1alphav1.PulsarTopic{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Namespace: partitionedTopic.Namespace,
Name: partitionedTopic.Name,
}, curTopic)).ShouldNot(HaveOccurred())

curTopic.Spec.Partitions = pointer.Int32Ptr(2)
err := k8sClient.Update(ctx, curTopic)
Expect(err).ShouldNot(HaveOccurred())
Eventually(func() bool {
t := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: curTopic.Namespace, Name: curTopic.Name}
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(t)
}).Should(BeTrue())
})

})

Context("PulsarPermission operation", func() {
Expand Down

0 comments on commit 99df860

Please sign in to comment.