diff --git a/api/v1alpha1/pulsartopic_types.go b/api/v1alpha1/pulsartopic_types.go index 29dbe431..922d0f00 100644 --- a/api/v1alpha1/pulsartopic_types.go +++ b/api/v1alpha1/pulsartopic_types.go @@ -120,6 +120,13 @@ type PulsarTopicSpec struct { // between two Pulsar instances. // +optional GeoReplicationRefs []*corev1.LocalObjectReference `json:"geoReplicationRefs,omitempty"` + + // ReplicationClusters is the list of clusters to which the topic is replicated + // This is **ONLY** used if you are replicating clusters within the same Pulsar instance. + // Please use `GeoReplicationRefs` instead if you are setting up geo-replication + // between two Pulsar instances. + // +optional + ReplicationClusters []string `json:"replicationClusters,omitempty"` } // SchemaInfo defines the Pulsar Schema for a topic. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 9f971246..0eddbf01 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -1551,6 +1551,11 @@ func (in *PulsarTopicSpec) DeepCopyInto(out *PulsarTopicSpec) { } } } + if in.ReplicationClusters != nil { + in, out := &in.ReplicationClusters, &out.ReplicationClusters + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarTopicSpec. diff --git a/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml b/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml index b8ce0ad8..ba162567 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml @@ -62,7 +62,10 @@ spec: name: v1alpha1 schema: openAPIV3Schema: - description: PulsarConnection is the Schema for the pulsarconnections API + description: |- + PulsarConnection is the Schema for the pulsarconnections API + It represents a connection to a Pulsar cluster and includes both the desired state (Spec) + and the observed state (Status) of the connection. properties: apiVersion: description: |- @@ -82,34 +85,63 @@ spec: metadata: type: object spec: - description: PulsarConnectionSpec defines the desired state of PulsarConnection + description: |- + PulsarConnectionSpec defines the desired state of PulsarConnection + It specifies the configuration for connecting to a Pulsar cluster. + + + For plaintext (non-TLS) Pulsar clusters: + - Set AdminServiceURL to "http://:" + - Set BrokerServiceURL to "pulsar://:" + + + For TLS-enabled Pulsar clusters: + - Set AdminServiceSecureURL to "https://:" + - Set BrokerServiceSecureURL to "pulsar+ssl://:" + - Optionally set BrokerClientTrustCertsFilePath if using custom CA certificates properties: adminServiceSecureURL: - description: AdminServiceSecureURL is the admin service url for secure - connection. + description: |- + AdminServiceSecureURL is the HTTPS URL for secure connections to the Pulsar admin service. + Use this for encrypted administrative operations. pattern: ^https://.+$ type: string adminServiceURL: - description: AdminServiceURL is the admin service url of the pulsar - cluster + description: |- + AdminServiceURL is the HTTP(S) URL for the Pulsar cluster's admin service. + This URL is used for administrative operations. pattern: ^https?://.+$ type: string authentication: - description: Authentication defines authentication configurations + description: |- + Authentication defines the authentication configuration for connecting to the Pulsar cluster. + It supports both token-based and OAuth2-based authentication methods. properties: oauth2: - description: PulsarAuthenticationOAuth2 indicates the parameters - which are need by pulsar OAuth2 + description: |- + OAuth2 specifies the configuration for OAuth2-based authentication. + This includes all necessary parameters for setting up OAuth2 authentication with Pulsar. + For detailed information on the OAuth2 fields, refer to the PulsarAuthenticationOAuth2 struct. properties: audience: + description: |- + Audience is the intended recipient of the token. In Pulsar's context, this is usually + the URL of your Pulsar cluster or a specific identifier for your Pulsar service. type: string clientID: + description: ClientID is the OAuth2 client identifier issued + to the client during the registration process. type: string issuerEndpoint: + description: |- + IssuerEndpoint is the URL of the OAuth2 authorization server. + This is typically the base URL of your identity provider's OAuth2 service. type: string key: - description: ValueOrSecretRef is a string or a secret reference - of the authentication + description: |- + Key is either the client secret or the path to a JSON credentials file. + For confidential clients, this would be the client secret. + For public clients using JWT authentication, this would be the path to the JSON credentials file. properties: secretRef: description: SecretKeyRef indicates a secret name and @@ -127,6 +159,9 @@ spec: type: string type: object scope: + description: |- + Scope is an optional field to request specific permissions from the OAuth2 server. + If not specified, the default scope defined by the OAuth2 server will be used. type: string required: - audience @@ -135,8 +170,10 @@ spec: - key type: object token: - description: ValueOrSecretRef is a string or a secret reference - of the authentication + description: |- + Token specifies the configuration for token-based authentication. + This can be either a direct token value or a reference to a secret containing the token. + If using a secret, the token should be stored under the specified key in the secret. properties: secretRef: description: SecretKeyRef indicates a secret name and key @@ -154,31 +191,38 @@ spec: type: object type: object brokerClientTrustCertsFilePath: - description: BrokerClientTrustCertsFilePath Path for the trusted TLS - certificate file for outgoing connection to a server (broker) + description: |- + BrokerClientTrustCertsFilePath is the file path to the trusted TLS certificate + for outgoing connections to Pulsar brokers. This is used for TLS verification. type: string brokerServiceSecureURL: - description: BrokerServiceSecureURL is the broker service url for - secure connection. + description: |- + BrokerServiceSecureURL is the TLS-enabled URL for secure connections to Pulsar brokers. + Use this for encrypted communications with the Pulsar cluster. pattern: ^pulsar\+ssl://.+$ type: string brokerServiceURL: - description: BrokerServiceURL is the broker service url of the pulsar - cluster + description: |- + BrokerServiceURL is the non-TLS URL for connecting to Pulsar brokers. + Use this for non-secure connections to the Pulsar cluster. pattern: ^pulsar?://.+$ type: string clusterName: description: |- - ClusterName indicates the local cluster name of the pulsar cluster. It should - set when enabling the Geo Replication + ClusterName specifies the name of the local Pulsar cluster. + When setting up Geo-Replication between Pulsar instances, this should be enabled to identify the cluster. type: string type: object status: - description: PulsarConnectionStatus defines the observed state of PulsarConnection + description: |- + PulsarConnectionStatus defines the observed state of PulsarConnection. + It provides information about the current status of the Pulsar connection. properties: conditions: - description: Represents the observations of a connection's current - state. + description: |- + Conditions represent the latest available observations of the connection's current state. + It follows the Kubernetes conventions for condition types and status. + The "Ready" condition type is typically used to indicate the overall status. items: description: |- Condition contains details for one aspect of the current state of this API Resource. @@ -260,10 +304,14 @@ spec: description: |- ObservedGeneration is the most recent generation observed for this resource. It corresponds to the metadata generation, which is updated on mutation by the API Server. + This field is used to track whether the controller has processed the latest changes. format: int64 type: integer secretKeyHash: - description: SecretKeyHash is the hash of the secret ref + description: |- + SecretKeyHash is the hash of the secret reference used for authentication. + This is used to detect changes in the secret without exposing sensitive information. + The controller should update this hash when the secret changes. type: string type: object type: object diff --git a/config/crd/bases/resource.streamnative.io_pulsarfunctions.yaml b/config/crd/bases/resource.streamnative.io_pulsarfunctions.yaml index 0b63ecf3..d7f2bb89 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarfunctions.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarfunctions.yaml @@ -197,9 +197,10 @@ spec: type: object lifecyclePolicy: description: |- - PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource - in pulsar cluster after resource is deleted by controller - KeepAfterDeletion or CleanUpAfterDeletion + PulsarResourceLifeCyclePolicy defines the behavior for managing Pulsar resources + when the corresponding custom resource (CR) is deleted from the Kubernetes cluster. + This policy allows users to control whether Pulsar resources should be retained or + removed from the Pulsar cluster after the CR is deleted. enum: - CleanUpAfterDeletion - KeepAfterDeletion diff --git a/config/crd/bases/resource.streamnative.io_pulsargeoreplications.yaml b/config/crd/bases/resource.streamnative.io_pulsargeoreplications.yaml index 03c7d2d6..4870e67c 100644 --- a/config/crd/bases/resource.streamnative.io_pulsargeoreplications.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsargeoreplications.yaml @@ -79,9 +79,10 @@ spec: x-kubernetes-map-type: atomic lifecyclePolicy: description: |- - PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource - in pulsar cluster after resource is deleted by controller - KeepAfterDeletion or CleanUpAfterDeletion + PulsarResourceLifeCyclePolicy defines the behavior for managing Pulsar resources + when the corresponding custom resource (CR) is deleted from the Kubernetes cluster. + This policy allows users to control whether Pulsar resources should be retained or + removed from the Pulsar cluster after the CR is deleted. enum: - CleanUpAfterDeletion - KeepAfterDeletion diff --git a/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml b/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml index d2bd07f4..f8aef445 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml @@ -49,7 +49,10 @@ spec: name: v1alpha1 schema: openAPIV3Schema: - description: PulsarNamespace is the Schema for the pulsarnamespaces API + description: |- + PulsarNamespace is the Schema for the pulsarnamespaces API + It represents a Pulsar namespace in the Kubernetes cluster and includes both + the desired state (Spec) and the observed state (Status) of the namespace. properties: apiVersion: description: |- @@ -69,36 +72,47 @@ spec: metadata: type: object spec: - description: PulsarNamespaceSpec defines the desired state of PulsarNamespace + description: |- + PulsarNamespaceSpec defines the desired state of a Pulsar namespace. + It corresponds to the configuration options available in Pulsar's namespace admin API. properties: backlogQuotaLimitSize: anyOf: - type: integer - type: string + description: |- + BacklogQuotaLimitSize specifies the size limit for message backlog. + When the limit is reached, older messages will be removed or handled according to the retention policy. pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true backlogQuotaLimitTime: description: |- - Backlog - Should set at least one of them if setting backlog + BacklogQuotaLimitTime specifies the time limit for message backlog. + Messages older than this limit will be removed or handled according to the retention policy. type: string backlogQuotaRetentionPolicy: + description: |- + BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. + Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". type: string backlogQuotaType: description: |- - BacklogQuotaType controls the backlog by setting the type to destination_storage or message_age - destination_storage limits backlog by size (in bytes). message_age limits backlog by time, - that is, message timestamp (broker or publish timestamp) + BacklogQuotaType controls how the backlog quota is enforced. + "destination_storage" limits backlog by size (in bytes), while "message_age" limits by time. enum: - destination_storage - message_age type: string bundles: + description: |- + Bundles specifies the number of bundles to split the namespace into. + This affects how the namespace is distributed across the cluster. format: int32 type: integer connectionRef: - description: ConnectionRef is the reference to the PulsarConnection - resource + description: |- + ConnectionRef is the reference to the PulsarConnection resource + used to connect to the Pulsar cluster for this namespace. properties: name: description: |- @@ -109,8 +123,12 @@ spec: type: object x-kubernetes-map-type: atomic geoReplicationRefs: - description: GeoReplicationRefs is the reference list to the PulsarGeoReplication - resource + description: |- + GeoReplicationRefs is a list of references to PulsarGeoReplication resources, + used to configure geo-replication for this namespace. + This is **ONLY** used when you are using PulsarGeoReplication for setting up geo-replication + between two Pulsar instances. + Please use `ReplicationClusters` instead if you are replicating clusters within the same Pulsar instance. items: description: |- LocalObjectReference contains enough information to let you locate the @@ -127,32 +145,42 @@ spec: type: array lifecyclePolicy: description: |- - PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource - in pulsar cluster after resource is deleted by controller - KeepAfterDeletion or CleanUpAfterDeletion + LifecyclePolicy determines whether to keep or delete the Pulsar namespace + when the Kubernetes resource is deleted. enum: - CleanUpAfterDeletion - KeepAfterDeletion type: string maxConsumersPerSubscription: + description: MaxConsumersPerSubscription sets the maximum number of + consumers allowed on a single subscription in the namespace. format: int32 type: integer maxConsumersPerTopic: + description: MaxConsumersPerTopic sets the maximum number of consumers + allowed on a single topic in the namespace. format: int32 type: integer maxProducersPerTopic: - description: Tenant Policy Setting + description: MaxProducersPerTopic sets the maximum number of producers + allowed on a single topic in the namespace. format: int32 type: integer messageTTL: - description: MessageTTL indicates the message ttl for the namespace + description: |- + MessageTTL specifies the Time to Live (TTL) for messages in the namespace. + Messages older than this TTL will be automatically marked as consumed. type: string name: - description: Name is the namespace name + description: Name is the fully qualified namespace name in the format + "tenant/namespace". type: string replicationClusters: - description: ReplicationClusters is the list of clusters to which - the namespace is replicated + description: |- + ReplicationClusters is the list of clusters to which the namespace is replicated + This is **ONLY** used if you are replicating clusters within the same Pulsar instance. + Please use `GeoReplicationRefs` instead if you are setting up geo-replication + between two Pulsar instances. items: type: string type: array @@ -160,12 +188,15 @@ spec: anyOf: - type: integer - type: string + description: |- + RetentionSize specifies the maximum size of backlog retained in the namespace. + Should be set in conjunction with RetentionTime for effective retention policy. pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true retentionTime: description: |- - Retention - Should set at least one of them if setting retention + RetentionTime specifies the minimum time to retain messages in the namespace. + Should be set in conjunction with RetentionSize for effective retention policy. Retention Quota must exceed configured backlog quota for namespace type: string required: @@ -176,8 +207,10 @@ spec: description: PulsarNamespaceStatus defines the observed state of PulsarNamespace properties: conditions: - description: Represents the observations of a connection's current - state. + description: |- + Conditions represent the latest available observations of the namespace's current state. + It follows the Kubernetes conventions for condition types and status. + The "Ready" condition type is typically used to indicate the overall status of the namespace. items: description: |- Condition contains details for one aspect of the current state of this API Resource. @@ -256,13 +289,15 @@ spec: - type x-kubernetes-list-type: map geoReplicationEnabled: - description: GeoReplicationEnabled indicates whether geo-replication + description: |- + GeoReplicationEnabled indicates whether geo-replication between two Pulsar instances (via PulsarGeoReplication) is enabled for the namespace type: boolean observedGeneration: description: |- ObservedGeneration is the most recent generation observed for this resource. It corresponds to the metadata generation, which is updated on mutation by the API Server. + This field is used to track whether the controller has processed the latest changes. format: int64 type: integer type: object diff --git a/config/crd/bases/resource.streamnative.io_pulsarpackages.yaml b/config/crd/bases/resource.streamnative.io_pulsarpackages.yaml index eb3b62b8..0539160a 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarpackages.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarpackages.yaml @@ -94,9 +94,10 @@ spec: type: string lifecyclePolicy: description: |- - PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource - in pulsar cluster after resource is deleted by controller - KeepAfterDeletion or CleanUpAfterDeletion + PulsarResourceLifeCyclePolicy defines the behavior for managing Pulsar resources + when the corresponding custom resource (CR) is deleted from the Kubernetes cluster. + This policy allows users to control whether Pulsar resources should be retained or + removed from the Pulsar cluster after the CR is deleted. enum: - CleanUpAfterDeletion - KeepAfterDeletion diff --git a/config/crd/bases/resource.streamnative.io_pulsarpermissions.yaml b/config/crd/bases/resource.streamnative.io_pulsarpermissions.yaml index 2ab07071..8505f079 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarpermissions.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarpermissions.yaml @@ -58,7 +58,9 @@ spec: name: v1alpha1 schema: openAPIV3Schema: - description: PulsarPermission is the Schema for the pulsarpermissions API + description: |- + PulsarPermission is the Schema for the pulsarpermissions API. + It represents a set of permissions granted to specific roles for a Pulsar resource (namespace or topic). properties: apiVersion: description: |- @@ -78,18 +80,21 @@ spec: metadata: type: object spec: - description: PulsarPermissionSpec defines the desired state of PulsarPermission + description: |- + PulsarPermissionSpec defines the desired state of PulsarPermission. + It specifies the configuration for granting permissions to Pulsar resources. properties: actions: description: |- - Actions contains a list of action to grant. - the options include produce,consume,functions + Actions is a list of permissions to grant. + Valid options include "produce", "consume", and "functions". items: type: string type: array connectionRef: - description: ConnectionRef is the reference to the PulsarConnection - resource + description: |- + ConnectionRef is the reference to the PulsarConnection resource + used to connect to the Pulsar cluster for this permission. properties: name: description: |- @@ -101,24 +106,25 @@ spec: x-kubernetes-map-type: atomic lifecyclePolicy: description: |- - LifecyclePolicy is the policy that how to deal with pulsar resource when - PulsarPermission is deleted + LifecyclePolicy determines how to handle the Pulsar permissions + when the PulsarPermission resource is deleted. type: string resourceName: - description: ResourceName name of the target resource which will be - granted the permssions + description: |- + ResourceName is the name of the target resource (namespace or topic) + to which the permissions will be granted. type: string resourceType: - description: ResourceType indicates the resource type, the options - include namespace and topic + description: ResourceType indicates whether the permission is for + a namespace or a topic. enum: - namespace - topic type: string roles: description: |- - Roles contains a list of role which will be granted the same permissions - for the same target + Roles is a list of role names that will be granted the specified permissions + for the target resource. items: type: string type: array @@ -129,11 +135,15 @@ spec: - roles type: object status: - description: PulsarPermissionStatus defines the observed state of PulsarPermission + description: |- + PulsarPermissionStatus defines the observed state of PulsarPermission. + It provides information about the current status of the Pulsar permission configuration. properties: conditions: - description: Represents the observations of a connection's current - state. + description: |- + Conditions represent the latest available observations of the PulsarPermission's current state. + It follows the Kubernetes conventions for condition types and status. + The "Ready" condition type is typically used to indicate the overall status of the permission configuration. items: description: |- Condition contains details for one aspect of the current state of this API Resource. @@ -215,6 +225,7 @@ spec: description: |- ObservedGeneration is the most recent generation observed for this resource. It corresponds to the metadata generation, which is updated on mutation by the API Server. + This field is used to track whether the controller has processed the latest changes. format: int64 type: integer type: object diff --git a/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml b/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml index e5cffe7e..bbb0ef5f 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarsinks.yaml @@ -161,9 +161,10 @@ spec: type: array lifecyclePolicy: description: |- - PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource - in pulsar cluster after resource is deleted by controller - KeepAfterDeletion or CleanUpAfterDeletion + PulsarResourceLifeCyclePolicy defines the behavior for managing Pulsar resources + when the corresponding custom resource (CR) is deleted from the Kubernetes cluster. + This policy allows users to control whether Pulsar resources should be retained or + removed from the Pulsar cluster after the CR is deleted. enum: - CleanUpAfterDeletion - KeepAfterDeletion diff --git a/config/crd/bases/resource.streamnative.io_pulsarsources.yaml b/config/crd/bases/resource.streamnative.io_pulsarsources.yaml index 73a35945..c3deca9e 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarsources.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarsources.yaml @@ -112,9 +112,10 @@ spec: x-kubernetes-preserve-unknown-fields: true lifecyclePolicy: description: |- - PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource - in pulsar cluster after resource is deleted by controller - KeepAfterDeletion or CleanUpAfterDeletion + PulsarResourceLifeCyclePolicy defines the behavior for managing Pulsar resources + when the corresponding custom resource (CR) is deleted from the Kubernetes cluster. + This policy allows users to control whether Pulsar resources should be retained or + removed from the Pulsar cluster after the CR is deleted. enum: - CleanUpAfterDeletion - KeepAfterDeletion diff --git a/config/crd/bases/resource.streamnative.io_pulsartenants.yaml b/config/crd/bases/resource.streamnative.io_pulsartenants.yaml index d4d83ba4..a098576b 100644 --- a/config/crd/bases/resource.streamnative.io_pulsartenants.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsartenants.yaml @@ -69,19 +69,30 @@ spec: metadata: type: object spec: - description: PulsarTenantSpec defines the desired state of PulsarTenant + description: |- + PulsarTenantSpec defines the desired state of PulsarTenant. + It corresponds to the configuration options available in Pulsar's tenant admin API. properties: adminRoles: + description: |- + AdminRoles is a list of roles that have administrative privileges for this tenant. + These roles can perform actions like creating namespaces, topics, and managing permissions. items: type: string type: array allowedClusters: + description: |- + AllowedClusters is a list of clusters that this tenant is allowed to access. + This field is optional and can be used to restrict the clusters a tenant can connect to. + Please use `GeoReplicationRefs` instead if you are setting up geo-replication + between multiple Pulsar instances. items: type: string type: array connectionRef: - description: ConnectionRef is the reference to the PulsarConnection - resource + description: |- + ConnectionRef is the reference to the PulsarConnection resource + used to connect to the Pulsar cluster for this tenant. properties: name: description: |- @@ -92,8 +103,13 @@ spec: type: object x-kubernetes-map-type: atomic geoReplicationRefs: - description: GeoReplicationRefs is the reference list to the PulsarGeoReplication - resource + description: |- + GeoReplicationRefs is a list of references to PulsarGeoReplication resources, + used to configure geo-replication for this tenant across multiple Pulsar instances. + This is **ONLY** used when you are using PulsarGeoReplication for setting up geo-replication + between multiple Pulsar instances. + Please use `AllowedClusters` instead if you are allowing a tenant to be available within + specific clusters in a same Pulsar instance. items: description: |- LocalObjectReference contains enough information to let you locate the @@ -110,26 +126,32 @@ spec: type: array lifecyclePolicy: description: |- - PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource - in pulsar cluster after resource is deleted by controller - KeepAfterDeletion or CleanUpAfterDeletion + LifecyclePolicy determines whether to keep or delete the Pulsar tenant + when the Kubernetes resource is deleted. enum: - CleanUpAfterDeletion - KeepAfterDeletion type: string name: - description: Name is the tenant name + description: |- + Name is the tenant name. + This field is required and must be unique within the Pulsar cluster. type: string required: - connectionRef - name type: object status: - description: PulsarTenantStatus defines the observed state of PulsarTenant + description: |- + PulsarTenantStatus defines the observed state of PulsarTenant. + It contains information about the current state of the Pulsar tenant. properties: conditions: - description: Represents the observations of a connection's current - state. + description: |- + Conditions represent the latest available observations of the PulsarTenant's current state. + It follows the Kubernetes conventions for condition types and status. + The "Ready" condition type is typically used to indicate the overall status of the tenant. + Other condition types may be used to provide more detailed status information. items: description: |- Condition contains details for one aspect of the current state of this API Resource. @@ -211,6 +233,7 @@ spec: description: |- ObservedGeneration is the most recent generation observed for this resource. It corresponds to the metadata generation, which is updated on mutation by the API Server. + This field is used to track whether the controller has processed the latest changes. format: int64 type: integer type: object diff --git a/config/crd/bases/resource.streamnative.io_pulsartopics.yaml b/config/crd/bases/resource.streamnative.io_pulsartopics.yaml index c86fc997..b205f16b 100644 --- a/config/crd/bases/resource.streamnative.io_pulsartopics.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsartopics.yaml @@ -52,7 +52,10 @@ spec: name: v1alpha1 schema: openAPIV3Schema: - description: PulsarTopic is the Schema for the pulsartopics API + description: |- + PulsarTopic is the Schema for the pulsartopics API + It represents a Pulsar topic in the Kubernetes cluster and includes both + the desired state (Spec) and the observed state (Status) of the topic. properties: apiVersion: description: |- @@ -72,24 +75,33 @@ spec: metadata: type: object spec: - description: PulsarTopicSpec defines the desired state of PulsarTopic + description: |- + PulsarTopicSpec defines the desired state of PulsarTopic. + It corresponds to the configuration options available in Pulsar's topic admin API. properties: backlogQuotaLimitSize: anyOf: - type: integer - type: string + description: |- + BacklogQuotaLimitSize specifies the size limit for message backlog. + When the limit is reached, older messages will be removed or handled according to the retention policy. pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true backlogQuotaLimitTime: description: |- - Backlog - Should set at least one of them if setting backlog + BacklogQuotaLimitTime specifies the time limit for message backlog. + Messages older than this limit will be removed or handled according to the retention policy. type: string backlogQuotaRetentionPolicy: + description: |- + BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded. + Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". type: string connectionRef: - description: ConnectionRef is the reference to the PulsarConnection - resource + description: |- + ConnectionRef is the reference to the PulsarConnection resource + used to connect to the Pulsar cluster for this topic. properties: name: description: |- @@ -100,8 +112,11 @@ spec: type: object x-kubernetes-map-type: atomic geoReplicationRefs: - description: GeoReplicationRefs is the reference list to the PulsarGeoReplication - resource + description: |- + GeoReplicationRefs is a list of references to PulsarGeoReplication resources, + used to configure geo-replication for this topic across multiple Pulsar instances. + This is **ONLY** used when you are using PulsarGeoReplication for setting up geo-replication + between two Pulsar instances. items: description: |- LocalObjectReference contains enough information to let you locate the @@ -118,68 +133,103 @@ spec: type: array lifecyclePolicy: description: |- - PulsarResourceLifeCyclePolicy indicates whether it will keep or delete the resource - in pulsar cluster after resource is deleted by controller - KeepAfterDeletion or CleanUpAfterDeletion + LifecyclePolicy determines whether to keep or delete the Pulsar topic + when the Kubernetes resource is deleted. enum: - CleanUpAfterDeletion - KeepAfterDeletion type: string maxConsumers: + description: MaxConsumers sets the maximum number of consumers allowed + on the topic. format: int32 type: integer maxProducers: - description: Topic Policy Setting + description: MaxProducers sets the maximum number of producers allowed + on the topic. format: int32 type: integer maxUnAckedMessagesPerConsumer: - description: Max unacked messages + description: |- + MaxUnAckedMessagesPerConsumer sets the maximum number of unacknowledged + messages allowed for a consumer before it's blocked from receiving more messages. format: int32 type: integer maxUnAckedMessagesPerSubscription: + description: |- + MaxUnAckedMessagesPerSubscription sets the maximum number of unacknowledged + messages allowed for a subscription before it's blocked from receiving more messages. format: int32 type: integer messageTTL: - description: MessageTTL indicates the message ttl for the topic + description: |- + MessageTTL specifies the Time to Live (TTL) for messages on the topic. + Messages older than this TTL will be automatically marked as deleted. type: string name: description: Name is the topic name type: string partitions: default: 0 + description: |- + Partitions specifies the number of partitions for a partitioned topic. + Set to 0 for a non-partitioned topic. format: int32 type: integer persistent: default: true + description: |- + Persistent determines if the topic is persistent (true) or non-persistent (false). + Defaults to true if not specified. type: boolean + replicationClusters: + description: |- + ReplicationClusters is the list of clusters to which the topic is replicated + This is **ONLY** used if you are replicating clusters within the same Pulsar instance. + Please use `GeoReplicationRefs` instead if you are setting up geo-replication + between two Pulsar instances. + items: + type: string + type: array retentionSize: anyOf: - type: integer - type: string + description: |- + RetentionSize specifies the maximum size of backlog retained on the topic. + Should be set in conjunction with RetentionTime for effective retention policy. + Retention Quota must exceed configured backlog quota for topic pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ x-kubernetes-int-or-string: true retentionTime: description: |- - Retention - Should set at least one of them if setting retention + RetentionTime specifies the minimum time to retain messages on the topic. + Should be set in conjunction with RetentionSize for effective retention policy. Retention Quota must exceed configured backlog quota for topic type: string schemaInfo: - description: |- - SchemaInfo defines the Pulsar Schema. - It is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level. + description: SchemaInfo defines the schema for the topic, if any. properties: properties: additionalProperties: type: string - description: Properties is a user defined properties as a string/string - map + description: |- + Properties is a map of user-defined properties associated with the schema. + These can be used to store additional metadata about the schema. type: object schema: - description: Schema is schema data + description: |- + Schema contains the actual schema definition. + For AVRO and JSON schemas, this should be a JSON string of the schema definition. + For PROTOBUF schemas, this should be the protobuf definition string. + For BYTES or NONE schemas, this field can be empty. type: string type: - description: Type determines how to interpret the schema data + description: |- + Type determines how to interpret the schema data. + Valid values include: "AVRO", "JSON", "PROTOBUF", "PROTOBUF_NATIVE", "KEY_VALUE", "BYTES", or "NONE". + For KEY_VALUE schemas, use the format "KEY_VALUE(KeyType,ValueType)" where KeyType and ValueType + are one of the other schema types. type: string type: object required: @@ -190,8 +240,11 @@ spec: description: PulsarTopicStatus defines the observed state of PulsarTopic properties: conditions: - description: Represents the observations of a connection's current - state. + description: |- + Conditions represent the latest available observations of the PulsarTopic's current state. + It follows the Kubernetes conventions for condition types and status. + The "Ready" condition type indicates the overall status of the topic. + The "PolicyReady" condition type indicates whether the topic policies have been successfully applied. items: description: |- Condition contains details for one aspect of the current state of this API Resource. @@ -270,12 +323,15 @@ spec: - type x-kubernetes-list-type: map geoReplicationEnabled: - description: GeoReplicationEnabled + description: |- + GeoReplicationEnabled indicates whether geo-replication is enabled for this topic. + This is set to true when GeoReplicationRefs are configured in the spec and successfully applied. type: boolean observedGeneration: description: |- ObservedGeneration is the most recent generation observed for this resource. It corresponds to the metadata generation, which is updated on mutation by the API Server. + This field is used to track whether the controller has processed the latest changes. format: int64 type: integer type: object diff --git a/docs/pulsar_topic.md b/docs/pulsar_topic.md index 3a3d0638..02ab249b 100644 --- a/docs/pulsar_topic.md +++ b/docs/pulsar_topic.md @@ -8,28 +8,48 @@ The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to ## Specifications -| Field | Description | Required | -|-------|-------------|----------| -| `name` | The fully qualified topic name in the format "persistent://tenant/namespace/topic" or "non-persistent://tenant/namespace/topic". | Yes | -| `connectionRef` | Reference to the PulsarConnection resource used to connect to the Pulsar cluster for this topic. | Yes | -| `persistent` | Whether the topic is persistent or non-persistent. Default is false. Can also be set by topic name prefix. | No | -| `partitions` | Number of partitions for the topic. Default is 0. | No | -| `maxProducers` | Maximum number of producers allowed on the topic. | No | -| `maxConsumers` | Maximum number of consumers allowed on the topic. | No | -| `messageTTL` | Time to Live (TTL) for messages in the topic. Messages older than this TTL will be automatically marked as consumed. | No | -| `maxUnAckedMessagesPerConsumer` | Maximum number of unacknowledged messages allowed per consumer. | No | -| `maxUnAckedMessagesPerSubscription` | Maximum number of unacknowledged messages allowed per subscription. | No | -| `retentionTime` | Minimum time to retain messages in the topic. Should be set in conjunction with retentionSize for effective retention policy. | No | -| `retentionSize` | Maximum size of backlog retained in the topic. Should be set in conjunction with retentionTime for effective retention policy. | No | -| `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No | -| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No | -| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No | +| Field | Description | Required | +|-------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------| +| `name` | The fully qualified topic name in the format "persistent://tenant/namespace/topic" or "non-persistent://tenant/namespace/topic". | Yes | +| `connectionRef` | Reference to the PulsarConnection resource used to connect to the Pulsar cluster for this topic. | Yes | +| `persistent` | Whether the topic is persistent or non-persistent. Default is false. Can also be set by topic name prefix. | No | +| `partitions` | Number of partitions for the topic. Default is 0. | No | +| `maxProducers` | Maximum number of producers allowed on the topic. | No | +| `maxConsumers` | Maximum number of consumers allowed on the topic. | No | +| `messageTTL` | Time to Live (TTL) for messages in the topic. Messages older than this TTL will be automatically marked as consumed. | No | +| `maxUnAckedMessagesPerConsumer` | Maximum number of unacknowledged messages allowed per consumer. | No | +| `maxUnAckedMessagesPerSubscription` | Maximum number of unacknowledged messages allowed per subscription. | No | +| `retentionTime` | Minimum time to retain messages in the topic. Should be set in conjunction with retentionSize for effective retention policy. | No | +| `retentionSize` | Maximum size of backlog retained in the topic. Should be set in conjunction with retentionTime for effective retention policy. | No | +| `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No | +| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No | +| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No | | `lifecyclePolicy` | Determines whether to keep or delete the Pulsar topic when the Kubernetes resource is deleted. Options: `CleanUpAfterDeletion`, `KeepAfterDeletion`. Default is `CleanUpAfterDeletion`. | No | -| `schemaInfo` | Schema information for the topic. See [schemaInfo](#schemainfo) for more details. | No | -| `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to enable geo-replication at the topic level. | No | +| `schemaInfo` | Schema information for the topic. See [schemaInfo](#schemainfo) for more details. | No | +| `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to enable geo-replication at the topic level. | No | +| `replicationClusters` | List of clusters to which the topic is replicated. Use only if replicating clusters within the same Pulsar instance. | No | Note: Valid time units for duration fields are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks). +## replicationClusters vs geoReplicationRefs + +The `replicationClusters` and `geoReplicationRefs` fields serve different purposes in configuring replication for a Pulsar topic: + +1. `replicationClusters`: + - Use this when replicating data between clusters within the same Pulsar instance. + - It's a simple list of cluster names to which the topic should be replicated. + - This is suitable for scenarios where all clusters are managed by the same Pulsar instance and have direct connectivity. + - Example use case: Replicating data between regions within a single Pulsar instance. + +2. `geoReplicationRefs`: + - Use this when setting up geo-replication between separate Pulsar instances. + - It references PulsarGeoReplication resources, which contain more detailed configuration for connecting to external Pulsar clusters. + - This is appropriate for scenarios involving separate Pulsar deployments, possibly in different data centers or cloud providers. + - Example use case: Replicating data between two independent Pulsar instancesin different geographical locations. + +Choose `replicationClusters` for simpler, intra-instance replication, and `geoReplicationRefs` for more complex, inter-instance geo-replication scenarios. These fields are mutually exclusive; use only one depending on your replication requirements. + + ## Create A Pulsar Topic 1. Define a topic named `persistent://test-tenant/testns/topic123` by using the YAML file and save the YAML file `topic.yaml`. diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 5a673adc..c262368e 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -126,7 +126,7 @@ func (p *PulsarAdminClient) SetNamespaceClusters(completeNSName string, clusters // ApplyTopic creates a topic with policies func (p *PulsarAdminClient) ApplyTopic(name string, params *TopicParams) (creationErr error, policyErr error) { - completeTopicName := makeCompleteTopicName(name, params.Persistent) + completeTopicName := MakeCompleteTopicName(name, params.Persistent) topicName, err := utils.GetTopicName(completeTopicName) if err != nil { return err, nil @@ -295,7 +295,7 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param // GetTopicClusters get the assigned clusters of the topic to the local default cluster func (p *PulsarAdminClient) GetTopicClusters(name string, persistent *bool) ([]string, error) { - completeTopicName := makeCompleteTopicName(name, persistent) + completeTopicName := MakeCompleteTopicName(name, persistent) topicName, err := utils.GetTopicName(completeTopicName) if err != nil { return []string{}, err @@ -309,7 +309,7 @@ func (p *PulsarAdminClient) GetTopicClusters(name string, persistent *bool) ([]s // SetTopicClusters resets the assigned clusters of the topic to the local default cluster func (p *PulsarAdminClient) SetTopicClusters(name string, persistent *bool, clusters []string) error { - completeTopicName := makeCompleteTopicName(name, persistent) + completeTopicName := MakeCompleteTopicName(name, persistent) topicName, err := utils.GetTopicName(completeTopicName) if err != nil { return err @@ -428,7 +428,7 @@ func (p *PulsarAdminClient) applyTenantPolicies(completeNSName string, params *N return nil } -func makeCompleteTopicName(topicName string, persistent *bool) string { +func MakeCompleteTopicName(topicName string, persistent *bool) string { if strings.Contains(topicName, TopicDomainSeparator) { return topicName } diff --git a/pkg/connection/reconcile_namespace.go b/pkg/connection/reconcile_namespace.go index 4eca508a..df5d5650 100644 --- a/pkg/connection/reconcile_namespace.go +++ b/pkg/connection/reconcile_namespace.go @@ -155,30 +155,33 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls } if refs := namespace.Spec.GeoReplicationRefs; len(refs) != 0 || len(namespace.Spec.ReplicationClusters) > 0 { - for _, ref := range refs { - geoReplication := &resourcev1alpha1.PulsarGeoReplication{} - namespacedName := types.NamespacedName{ - Namespace: namespace.Namespace, - Name: ref.Name, - } - if err := r.conn.client.Get(ctx, namespacedName, geoReplication); err != nil { - return err - } - log.V(1).Info("Found geo replication", "GEO Replication", geoReplication.Name) - destConnection := &resourcev1alpha1.PulsarConnection{} - namespacedName = types.NamespacedName{ - Name: geoReplication.Spec.DestinationConnectionRef.Name, - Namespace: geoReplication.Namespace, - } - if err := r.conn.client.Get(ctx, namespacedName, destConnection); err != nil { - log.Error(err, "Failed to get destination connection for geo replication") - return err - } - params.ReplicationClusters = append(params.ReplicationClusters, destConnection.Spec.ClusterName) - params.ReplicationClusters = append(params.ReplicationClusters, r.conn.connection.Spec.ClusterName) + if len(refs) > 0 && len(namespace.Spec.ReplicationClusters) > 0 { + return fmt.Errorf("GeoReplicationRefs and ReplicationClusters cannot be set at the same time") } - - if len(namespace.Spec.ReplicationClusters) > 0 { + if len(refs) > 0 { + for _, ref := range refs { + geoReplication := &resourcev1alpha1.PulsarGeoReplication{} + namespacedName := types.NamespacedName{ + Namespace: namespace.Namespace, + Name: ref.Name, + } + if err := r.conn.client.Get(ctx, namespacedName, geoReplication); err != nil { + return err + } + log.V(1).Info("Found geo replication", "GEO Replication", geoReplication.Name) + destConnection := &resourcev1alpha1.PulsarConnection{} + namespacedName = types.NamespacedName{ + Name: geoReplication.Spec.DestinationConnectionRef.Name, + Namespace: geoReplication.Namespace, + } + if err := r.conn.client.Get(ctx, namespacedName, destConnection); err != nil { + log.Error(err, "Failed to get destination connection for geo replication") + return err + } + params.ReplicationClusters = append(params.ReplicationClusters, destConnection.Spec.ClusterName) + params.ReplicationClusters = append(params.ReplicationClusters, r.conn.connection.Spec.ClusterName) + } + } else if len(namespace.Spec.ReplicationClusters) > 0 { parts := strings.Split(namespace.Spec.Name, "/") if len(parts) != 2 { err := fmt.Errorf("invalid namespace name %s", namespace.Spec.Name) diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index e12b76d9..921a0971 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -18,7 +18,9 @@ import ( "context" "fmt" "reflect" + "slices" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/go-logr/logr" "github.com/streamnative/pulsar-resources-operator/pkg/feature" corev1 "k8s.io/api/core/v1" @@ -175,16 +177,48 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin r.applyDefault(params) - if refs := topic.Spec.GeoReplicationRefs; len(refs) != 0 { - for _, ref := range refs { - if err := r.applyGeo(ctx, params, ref, topic); err != nil { - log.Error(err, "Failed to get destination connection for geo replication "+ref.Name) - policyErrs = append(policyErrs, err) + if refs := topic.Spec.GeoReplicationRefs; len(refs) != 0 || len(topic.Spec.ReplicationClusters) > 0 { + if len(refs) > 0 && len(topic.Spec.ReplicationClusters) > 0 { + return fmt.Errorf("GeoReplicationRefs and ReplicationClusters cannot be set at the same time") + } + + if len(refs) > 0 { + for _, ref := range refs { + if err := r.applyGeo(ctx, params, ref, topic); err != nil { + log.Error(err, "Failed to get destination connection for geo replication "+ref.Name) + policyErrs = append(policyErrs, err) + } + } + } else if len(topic.Spec.ReplicationClusters) > 0 { + topicNameStr := admin.MakeCompleteTopicName(topic.Spec.Name, topic.Spec.Persistent) + topicName, err := utils.GetTopicName(topicNameStr) + if err != nil { + return err + } + tenantName := topicName.GetTenant() + allowedClusters, err := pulsarAdmin.GetTenantAllowedClusters(tenantName) + if err != nil { + return err + } + for _, cluster := range topic.Spec.ReplicationClusters { + if !slices.Contains(allowedClusters, cluster) { + err := fmt.Errorf("cluster %s is not allowed in tenant %s", cluster, tenantName) + meta.SetStatusCondition(&topic.Status.Conditions, *NewErrorCondition(topic.Generation, err.Error())) + log.Error(err, "Failed to apply topic") + if err := r.conn.client.Status().Update(ctx, topic); err != nil { + log.Error(err, "Failed to update the topic status") + return err + } + return err + } + params.ReplicationClusters = append(params.ReplicationClusters, topic.Spec.ReplicationClusters...) } } - log.Info("apply topic with replication clusters", "clusters", params.ReplicationClusters) - topic.Status.GeoReplicationEnabled = true + if len(params.ReplicationClusters) > 0 { + log.Info("apply topic with replication clusters", "clusters", params.ReplicationClusters) + topic.Status.GeoReplicationEnabled = true + } } else if topic.Status.GeoReplicationEnabled { // when GeoReplicationRefs is removed, it should reset the topic clusters params.ReplicationClusters = []string{r.conn.connection.Spec.ClusterName}