Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions cloud/pulsar_cluster_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2024 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cloud

import (
"testing"

cloudv1alpha1 "github.com/streamnative/cloud-api-server/pkg/apis/cloud/v1alpha1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestSetPulsarClusterIdentityStateHosted(t *testing.T) {
resourceData := resourcePulsarCluster().TestResourceData()
resourceData.Set("organization", "stale-org")
resourceData.Set("name", "stale-name")
resourceData.Set("instance_name", "stale-instance")
resourceData.Set("location", "stale-location")
resourceData.Set("pool_member_name", "")

cluster := &cloudv1alpha1.PulsarCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-a",
Namespace: "org-a",
},
Spec: cloudv1alpha1.PulsarClusterSpec{
InstanceName: "instance-a",
Location: "us-central1",
PoolMemberRef: cloudv1alpha1.PoolMemberReference{
Name: "pool-member-a",
Namespace: "org-a",
},
},
}

diagErr := setPulsarClusterIdentityState(resourceData, cluster)
assert.Nil(t, diagErr)
assert.Equal(t, "org-a", resourceData.Get("organization"))
assert.Equal(t, "cluster-a", resourceData.Get("name"))
assert.Equal(t, "instance-a", resourceData.Get("instance_name"))
assert.Equal(t, "us-central1", resourceData.Get("location"))
assert.Equal(t, "", resourceData.Get("pool_member_name"))
}

func TestSetPulsarClusterIdentityStateBYOC(t *testing.T) {
resourceData := resourcePulsarCluster().TestResourceData()
resourceData.Set("organization", "stale-org")
resourceData.Set("name", "stale-name")
resourceData.Set("instance_name", "stale-instance")
resourceData.Set("location", "")
resourceData.Set("pool_member_name", "stale-pool-member")

cluster := &cloudv1alpha1.PulsarCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-b",
Namespace: "org-b",
},
Spec: cloudv1alpha1.PulsarClusterSpec{
InstanceName: "instance-b",
Location: "us-central1",
PoolMemberRef: cloudv1alpha1.PoolMemberReference{
Name: "pool-member-b",
Namespace: "org-b",
},
},
}

diagErr := setPulsarClusterIdentityState(resourceData, cluster)
assert.Nil(t, diagErr)
assert.Equal(t, "org-b", resourceData.Get("organization"))
assert.Equal(t, "cluster-b", resourceData.Get("name"))
assert.Equal(t, "instance-b", resourceData.Get("instance_name"))
assert.Equal(t, "", resourceData.Get("location"))
assert.Equal(t, "pool-member-b", resourceData.Get("pool_member_name"))
}

func TestSetPulsarClusterIdentityStateImportHosted(t *testing.T) {
resourceData := resourcePulsarCluster().TestResourceData()
resourceData.Set("organization", "")
resourceData.Set("name", "")
resourceData.Set("instance_name", "")
resourceData.Set("location", "")
resourceData.Set("pool_member_name", "")

cluster := &cloudv1alpha1.PulsarCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-c",
Namespace: "org-c",
},
Spec: cloudv1alpha1.PulsarClusterSpec{
InstanceName: "instance-c",
Location: "europe-west1",
PoolMemberRef: cloudv1alpha1.PoolMemberReference{
Name: "pool-member-c",
Namespace: "org-c",
},
},
}

diagErr := setPulsarClusterIdentityState(resourceData, cluster)
assert.Nil(t, diagErr)
assert.Equal(t, "org-c", resourceData.Get("organization"))
assert.Equal(t, "cluster-c", resourceData.Get("name"))
assert.Equal(t, "instance-c", resourceData.Get("instance_name"))
assert.Equal(t, "europe-west1", resourceData.Get("location"))
assert.Equal(t, "", resourceData.Get("pool_member_name"))
}

func TestSetPulsarClusterIdentityStateImportBYOC(t *testing.T) {
resourceData := resourcePulsarCluster().TestResourceData()
resourceData.Set("organization", "")
resourceData.Set("name", "")
resourceData.Set("instance_name", "")
resourceData.Set("location", "")
resourceData.Set("pool_member_name", "")

cluster := &cloudv1alpha1.PulsarCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster-d",
Namespace: "org-d",
},
Spec: cloudv1alpha1.PulsarClusterSpec{
InstanceName: "instance-d",
PoolMemberRef: cloudv1alpha1.PoolMemberReference{
Name: "pool-member-d",
Namespace: "org-d",
},
},
}

diagErr := setPulsarClusterIdentityState(resourceData, cluster)
assert.Nil(t, diagErr)
assert.Equal(t, "org-d", resourceData.Get("organization"))
assert.Equal(t, "cluster-d", resourceData.Get("name"))
assert.Equal(t, "instance-d", resourceData.Get("instance_name"))
assert.Equal(t, "", resourceData.Get("location"))
assert.Equal(t, "pool-member-d", resourceData.Get("pool_member_name"))
}
25 changes: 25 additions & 0 deletions cloud/pulsar_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func TestPulsarClusterWithMaintenanceWindow(t *testing.T) {
"us-central1", "rapid"),
Check: resource.ComposeTestCheckFunc(
testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "organization", "sndev"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "instance_name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "location", "us-central1"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "pool_member_name", ""),
),
},
},
Expand Down Expand Up @@ -154,6 +159,11 @@ func TestPulsarClusterRemoveMaintenanceWindow(t *testing.T) {
"us-central1", "rapid"),
Check: resource.ComposeTestCheckFunc(
testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "organization", "sndev"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "instance_name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "location", "us-central1"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "pool_member_name", ""),
),
},
{
Expand Down Expand Up @@ -189,6 +199,11 @@ func TestPulsarClusterUpdateMaintenanceWindow(t *testing.T) {
"us-central1", "rapid"),
Check: resource.ComposeTestCheckFunc(
testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "organization", "sndev"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "instance_name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "location", "us-central1"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "pool_member_name", ""),
),
},
{
Expand Down Expand Up @@ -224,6 +239,11 @@ func TestPulsarClusterMaintenanceWindowConfigDrift(t *testing.T) {
"us-central1", "rapid"),
Check: resource.ComposeTestCheckFunc(
testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "organization", "sndev"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "instance_name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "location", "us-central1"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "pool_member_name", ""),
),
},
{
Expand Down Expand Up @@ -258,6 +278,11 @@ func TestPulsarClusterConfigDrift(t *testing.T) {
"us-central1", "rapid"),
Check: resource.ComposeTestCheckFunc(
testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "organization", "sndev"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "instance_name", clusterGeneratedName),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "location", "us-central1"),
resource.TestCheckResourceAttr("streamnative_pulsar_cluster.test-pulsar-cluster", "pool_member_name", ""),
),
},
{
Expand Down
53 changes: 53 additions & 0 deletions cloud/resource_pulsar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,10 @@ func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta
}
return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER: %w", err))
}
if diagErr := setPulsarClusterIdentityState(d, pulsarCluster); diagErr != nil {
return diagErr
}
namespace = pulsarCluster.Namespace
_ = d.Set("ready", "False")
if pulsarCluster.Status.Conditions != nil {
for _, condition := range pulsarCluster.Status.Conditions {
Expand Down Expand Up @@ -917,6 +921,55 @@ func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta
return nil
}

func setPulsarClusterIdentityState(d *schema.ResourceData, pulsarCluster *cloudv1alpha1.PulsarCluster) diag.Diagnostics {
if err := d.Set("organization", pulsarCluster.Namespace); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_SET_ORGANIZATION: %w", err))
}
if err := d.Set("name", pulsarCluster.Name); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_SET_NAME: %w", err))
}
if err := d.Set("instance_name", pulsarCluster.Spec.InstanceName); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_SET_INSTANCE_NAME: %w", err))
}

currentPoolMemberName := d.Get("pool_member_name").(string)
currentLocation := d.Get("location").(string)
remotePoolMemberName := pulsarCluster.Spec.PoolMemberRef.Name
remoteLocation := pulsarCluster.Spec.Location

switch {
case currentPoolMemberName != "":
if err := d.Set("location", ""); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_CLEAR_LOCATION: %w", err))
}
if err := d.Set("pool_member_name", remotePoolMemberName); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_SET_POOL_MEMBER_NAME: %w", err))
}
case currentLocation != "":
if err := d.Set("location", remoteLocation); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_SET_LOCATION: %w", err))
}
if err := d.Set("pool_member_name", ""); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_CLEAR_POOL_MEMBER_NAME: %w", err))
}
case remoteLocation != "":
if err := d.Set("location", remoteLocation); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_SET_LOCATION: %w", err))
}
if err := d.Set("pool_member_name", ""); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_CLEAR_POOL_MEMBER_NAME: %w", err))
}
default:
if err := d.Set("location", ""); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_CLEAR_LOCATION: %w", err))
}
if err := d.Set("pool_member_name", remotePoolMemberName); err != nil {
return diag.FromErr(fmt.Errorf("ERROR_SET_POOL_MEMBER_NAME: %w", err))
}
}
return nil
}

func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
serverless := d.Get("type")
displayNameChanged := d.HasChange("display_name")
Expand Down
29 changes: 26 additions & 3 deletions cloud/resource_service_account_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,20 @@ func resourceServiceAccountBindingCreate(ctx context.Context, d *schema.Resource
return diag.FromErr(fmt.Errorf("ERROR_CREATE_SERVICE_ACCOUNT_BINDING: %w", err))
}
_ = d.Set("name", serviceAccountBinding.Name)
// Don't retry too frequently to avoid affecting the api-server.
err = retry.RetryContext(ctx, 5*time.Second, func() *retry.RetryError {
err = retry.RetryContext(ctx, 2*time.Minute, func() *retry.RetryError {
serviceAccountBinding, err := clientSet.CloudV1alpha1().ServiceAccountBindings(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return retry.RetryableError(fmt.Errorf("CONTINUE_RETRY_CREATE_SERVICE_ACCOUNT_BINDING"))
Comment thread
freeznet marked this conversation as resolved.
}
return retry.NonRetryableError(fmt.Errorf("ERROR_RETRY_CREATE_SERVICE_ACCOUNT_BINDING: %w", err))
}
if !isServiceAccountBindingReady(serviceAccountBinding) {
return retry.RetryableError(fmt.Errorf("CONTINUE_RETRY_CREATE_SERVICE_ACCOUNT_BINDING"))
}
dia := resourceServiceAccountBindingRead(ctx, d, meta)
if dia.HasError() {
return retry.NonRetryableError(fmt.Errorf("ERROR_RETRY_CREATE_SERVICE_ACCOUNT_BINDING: %s", dia[0].Summary))
return retry.NonRetryableError(fmt.Errorf("ERROR_READ_SERVICE_ACCOUNT_BINDING: %s", dia[0].Summary))
}
return nil
})
Expand Down Expand Up @@ -234,3 +243,17 @@ func resourceServiceAccountBindingUpdate(ctx context.Context, d *schema.Resource
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_SERVICE_ACCOUNT_BINDING: " +
"The service account binding does not support updates, please recreate it"))
}

func isServiceAccountBindingReady(serviceAccountBinding *v1alpha1.ServiceAccountBinding) bool {
iamReady := !serviceAccountBinding.Spec.EnableIAMAccountCreation
ready := false
for _, condition := range serviceAccountBinding.Status.Conditions {
if condition.Type == "IAMAccountReady" && condition.Status == "True" {
iamReady = true
}
if condition.Type == "Ready" && condition.Status == "True" {
ready = true
}
}
return iamReady && ready
}
Loading
Loading