Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add crd for pulsar package & function & connector #206

Merged
merged 55 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
f1a9f8f
add crd for pulsar package & function & connector
freeznet Jun 18, 2024
3ef1bd8
add controllers
freeznet Jun 20, 2024
0adfd1f
add tests
freeznet Jun 20, 2024
1ed859e
Update api/v1alpha1/zz_generated.deepcopy.go
freeznet Jun 20, 2024
9524d0d
fix ci
freeznet Jun 20, 2024
da1a663
fix lint
freeznet Jun 20, 2024
705296e
fix build
freeznet Jun 20, 2024
de4ffda
fix build
freeznet Jun 20, 2024
1a99291
fix test
freeznet Jun 20, 2024
e7381a5
fix lint
freeznet Jun 20, 2024
f8e1322
fix golint
freeznet Jun 20, 2024
4ecfb0f
fix install
freeznet Jun 20, 2024
2f47057
add to charts
freeznet Jun 20, 2024
0a83daa
fix role
freeznet Jun 20, 2024
885d421
fix controller
freeznet Jun 20, 2024
b098130
fix controller
freeznet Jun 20, 2024
471aea5
fix package
freeznet Jun 20, 2024
bf8bbd2
remove package test
freeznet Jun 20, 2024
f38328f
fix ci
freeznet Jun 20, 2024
0b7daca
fix
freeznet Jun 20, 2024
230f190
use functions worker
freeznet Jun 20, 2024
ba13847
revert
freeznet Jun 20, 2024
4fab3d3
fix admin cli
freeznet Jun 21, 2024
4d10922
fix
freeznet Jun 21, 2024
2ef3153
fix charts
freeznet Jun 21, 2024
c710d84
fix license
freeznet Jun 21, 2024
220f28d
fix client version
freeznet Jun 21, 2024
9700005
fix
freeznet Jun 21, 2024
8aefd4a
skip function package
freeznet Jun 21, 2024
c0df511
fix reconcile
freeznet Jun 21, 2024
4e12e77
fix ci
freeznet Jun 21, 2024
126c42f
fix
freeznet Jun 21, 2024
69e4cdf
fix pulsar
freeznet Jun 21, 2024
9ae5c00
fix rep
freeznet Jun 21, 2024
e149496
fix url
freeznet Jun 21, 2024
031cd09
fix tab
freeznet Jun 21, 2024
49edc20
fix
freeznet Jun 21, 2024
01576b4
cleanup proxy
freeznet Jun 21, 2024
c04b98b
fix container name
freeznet Jun 21, 2024
ebc43d2
fix builtin
freeznet Jun 21, 2024
55470bc
fix connectors
freeznet Jun 21, 2024
be7e33e
use pulsar-all
freeznet Jun 21, 2024
2a0594b
docker hub login
freeznet Jun 21, 2024
6043a88
fix pulsarctl
freeznet Jun 21, 2024
35f6e79
fix narExtractionDirectory
freeznet Jun 21, 2024
de600cf
timeout
freeznet Jun 21, 2024
d5fc9f4
fix name
freeznet Jun 21, 2024
1e232dd
add docs
freeznet Jun 21, 2024
35c1eb2
fix license
freeznet Jun 21, 2024
8806ed9
fix chart role
freeznet Jun 21, 2024
96dc91c
Merge branch 'main' into freeznet/add-functions-connectors-admin
freeznet Jun 24, 2024
60bfcda
fix
freeznet Jun 24, 2024
e3bb921
Merge branch 'main' into freeznet/add-functions-connectors-admin
nlu90 Jun 24, 2024
24bf63b
fix license and lint
nlu90 Jun 24, 2024
087c81e
fix lint
nlu90 Jun 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/confi
CONTROLLER_GEN = $(shell pwd)/bin/controller-gen
.PHONY: controller-gen
controller-gen: ## Download controller-gen locally if necessary.
$(call go-get-tool,$(CONTROLLER_GEN),sigs.k8s.io/controller-tools/cmd/controller-gen@v0.8.0)
$(call go-get-tool,$(CONTROLLER_GEN),sigs.k8s.io/controller-tools/cmd/controller-gen@v0.15.0)

KUSTOMIZE = $(shell pwd)/bin/kustomize
.PHONY: kustomize
Expand Down
86 changes: 86 additions & 0 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package v1alpha1

import (
"encoding/json"

Check failure on line 18 in api/v1alpha1/common.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"reflect"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -86,3 +89,86 @@
instance.GetGeneration() == observedGeneration &&
meta.IsStatusConditionTrue(conditions, ConditionReady)
}

type FileRef struct {

Check warning on line 93 in api/v1alpha1/common.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type FileRef should have comment or be unexported (revive)
ConfigMapRef *corev1.LocalObjectReference `json:"configMapRef"`
Key string `json:"key"`
}

type PackageContentRef struct {

Check warning on line 98 in api/v1alpha1/common.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type PackageContentRef should have comment or be unexported (revive)
// +optional
FileRef *FileRef `json:"fileRef,omitempty"`

// +optional
URL string `json:"url,omitempty"`
}

type Resources struct {

Check warning on line 106 in api/v1alpha1/common.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type Resources should have comment or be unexported (revive)
CPU float64 `json:"cpu"`
Disk int64 `json:"disk"`
RAM int64 `json:"ram"`
}

type ProducerConfig struct {

Check warning on line 112 in api/v1alpha1/common.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type ProducerConfig should have comment or be unexported (revive)
MaxPendingMessages int `json:"maxPendingMessages" yaml:"maxPendingMessages"`
MaxPendingMessagesAcrossPartitions int `json:"maxPendingMessagesAcrossPartitions" yaml:"maxPendingMessagesAcrossPartitions"`

UseThreadLocalProducers bool `json:"useThreadLocalProducers" yaml:"useThreadLocalProducers"`
CryptoConfig *CryptoConfig `json:"cryptoConfig" yaml:"cryptoConfig"`
BatchBuilder string `json:"batchBuilder" yaml:"batchBuilder"`
CompressionType string `json:"compressionType" yaml:"compressionType"`
}

type ConsumerConfig struct {

Check warning on line 122 in api/v1alpha1/common.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type ConsumerConfig should have comment or be unexported (revive)
SchemaType string `json:"schemaType,omitempty" yaml:"schemaType"`
SerdeClassName string `json:"serdeClassName,omitempty" yaml:"serdeClassName"`
RegexPattern bool `json:"regexPattern,omitempty" yaml:"regexPattern"`
ReceiverQueueSize int `json:"receiverQueueSize,omitempty" yaml:"receiverQueueSize"`
SchemaProperties map[string]string `json:"schemaProperties,omitempty" yaml:"schemaProperties"`
ConsumerProperties map[string]string `json:"consumerProperties,omitempty" yaml:"consumerProperties"`
CryptoConfig *CryptoConfig `json:"cryptoConfig,omitempty" yaml:"cryptoConfig"`
PoolMessages bool `json:"poolMessages,omitempty" yaml:"poolMessages"`
}

type CryptoConfig struct {

Check warning on line 133 in api/v1alpha1/common.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type CryptoConfig should have comment or be unexported (revive)
CryptoKeyReaderClassName string `json:"cryptoKeyReaderClassName" yaml:"cryptoKeyReaderClassName"`
CryptoKeyReaderConfig map[string]string `json:"cryptoKeyReaderConfig" yaml:"cryptoKeyReaderConfig"`

EncryptionKeys []string `json:"encryptionKeys" yaml:"encryptionKeys"`
ProducerCryptoFailureAction string `json:"producerCryptoFailureAction" yaml:"producerCryptoFailureAction"`
ConsumerCryptoFailureAction string `json:"consumerCryptoFailureAction" yaml:"consumerCryptoFailureAction"`
}

// Config represents untyped YAML configuration.
type Config struct {
// Data holds the configuration keys and values.
// This field exists to work around https://github.com/kubernetes-sigs/kubebuilder/issues/528
Data map[string]interface{} `json:"-"`
}

// NewConfig constructs a Config with the given unstructured configuration data.
func NewConfig(cfg map[string]interface{}) Config {
return Config{Data: cfg}
}

// MarshalJSON implements the Marshaler interface.
func (c *Config) MarshalJSON() ([]byte, error) {
return json.Marshal(c.Data)
}

// UnmarshalJSON implements the Unmarshaler interface.
func (c *Config) UnmarshalJSON(data []byte) error {
var out map[string]interface{}
err := json.Unmarshal(data, &out)

Check failure on line 162 in api/v1alpha1/common.go

View workflow job for this annotation

GitHub Actions / lint

variable 'err' is only used in the if-statement (api/v1alpha1/common.go:163:2); consider using short syntax (ifshort)
if err != nil {
return err
}
c.Data = out
return nil
}

// DeepCopyInto is an ~autogenerated~ deepcopy function, copying the receiver, writing into out. in must be non-nil.
// This exists here to work around https://github.com/kubernetes/code-generator/issues/50
func (c *Config) DeepCopyInto(out *Config) {
out.Data = runtime.DeepCopyJSON(c.Data)
}
274 changes: 274 additions & 0 deletions api/v1alpha1/pulsarfunction_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// Copyright 2022 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// PulsarFunctionSpec defines the desired state of PulsarFunction
type PulsarFunctionSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

// TimeoutMs is the function timeout in milliseconds
// +optional
TimeoutMs *int64 `json:"timeoutMs,omitempty"`

// TopicsPattern is the topics pattern that the function subscribes to
// +optional
TopicsPattern *string `json:"topicsPattern,omitempty"`

// CleanupSubscription is the flag to indicate whether the subscription should be cleaned up when the function is deleted
// +optional
CleanupSubscription bool `json:"cleanupSubscription"`

// RetainOrdering is the flag to indicate whether the function should retain ordering
// +optional
RetainOrdering bool `json:"retainOrdering"`

// RetainKeyOrdering is the flag to indicate whether the function should retain key ordering
// +optional
RetainKeyOrdering bool `json:"retainKeyOrdering"`

// BatchBuilder is the batch builder that the function uses
// +optional
BatchBuilder *string `json:"batchBuilder,omitempty"`

// ForwardSourceMessageProperty is the flag to indicate whether the function should forward source message properties
// +optional
ForwardSourceMessageProperty bool `json:"forwardSourceMessageProperty"`

// AutoAck is the flag to indicate whether the function should auto ack
// +optional
AutoAck bool `json:"autoAck"`

// Parallelism is the parallelism of the function
// +optional
Parallelism int `json:"parallelism,omitempty"`

// MaxMessageRetries is the max message retries of the function
// +optional
MaxMessageRetries *int `json:"maxMessageRetries,omitempty"`

// Output is the output of the function
// +optional
Output string `json:"output,omitempty"`

// ProducerConfig is the producer config of the function
// +optional
ProducerConfig *ProducerConfig `json:"producerConfig,omitempty"`

// CustomSchemaOutputs is the custom schema outputs of the function
// +optional
CustomSchemaOutputs map[string]string `json:"customSchemaOutputs,omitempty"`

// OutputSerdeClassName is the output serde class name of the function
// +optional
OutputSerdeClassName string `json:"outputSerdeClassName,omitempty"`

// LogTopic is the log topic of the function
// +optional
LogTopic string `json:"logTopic,omitempty"`

// ProcessingGuarantees is the processing guarantees of the function
// +optional
ProcessingGuarantees string `json:"processingGuarantees,omitempty"`

// OutputSchemaType is the output schema type of the function
// +optional
OutputSchemaType string `json:"outputSchemaType,omitempty"`

// OutputTypeClassName is the output type class name of the function
// +optional
OutputTypeClassName string `json:"outputTypeClassName,omitempty"`

// Runtime is the runtime of the function
// +optional
Runtime string `json:"runtime,omitempty"`

// DeadLetterTopic is the dead letter topic of the function
// +optional
DeadLetterTopic string `json:"deadLetterTopic,omitempty"`

// SubName is the sub name of the function
// +optional
SubName string `json:"subName,omitempty"`

// FQFN is the FQFN of the function
// +optional
FQFN string `json:"fqfn,omitempty"`

// Jar is the jar of the function
// +optional
Jar *PackageContentRef `json:"jar,omitempty"`

// Py is the py of the function
// +optional
Py *PackageContentRef `json:"py,omitempty"`

// Go is the go of the function
// +optional
Go *PackageContentRef `json:"go,omitempty"`

// FunctionType is the function type of the function
// +optional
FunctionType string `json:"functionType,omitempty"`

// RuntimeFlags is the runtime flags of the function
// +optional
RuntimeFlags string `json:"runtimeFlags,omitempty"`

// Tenant is the tenant of the function
// +optional
Tenant string `json:"tenant,omitempty"`

// Namespace is the namespace of the function
// +optional
Namespace string `json:"namespace,omitempty"`

// Name is the name of the function
// +optional
Name string `json:"name,omitempty"`

// ClassName is the class name of the function
// +optional
ClassName string `json:"className,omitempty"`

// Resources is the resources of the function
// +optional
Resources *Resources `json:"resources,omitempty"`

// WindowConfig is the window config of the function
// +optional
WindowConfig *WindowConfig `json:"windowConfig,omitempty"`

// Inputs is the inputs of the function
// +optional
Inputs []string `json:"inputs,omitempty"`

// UserConfig is the user config of the function
// +optional
UserConfig *Config `json:"userConfig,omitempty"`

// CustomSerdeInputs is the custom serde inputs of the function
// +optional
CustomSerdeInputs map[string]string `json:"customSerdeInputs,omitempty"`

// CustomSchemaInputs is the custom schema inputs of the function
// +optional
CustomSchemaInputs map[string]string `json:"customSchemaInputs,omitempty"`

// InputSpecs is the input specs of the function
// +optional
InputSpecs map[string]ConsumerConfig `json:"inputSpecs,omitempty"`

// InputTypeClassName is the input type class name of the function
// +optional
InputTypeClassName string `json:"inputTypeClassName,omitempty"`

// CustomRuntimeOptions is the custom runtime options of the function
// +optional
CustomRuntimeOptions *Config `json:"customRuntimeOptions,omitempty"`

// Secrets is the secrets of the function
// +optional
Secrets map[string]SecretKeyRef `json:"secrets,omitempty"`

// MaxPendingAsyncRequests is the max pending async requests of the function
// +optional
MaxPendingAsyncRequests int `json:"maxPendingAsyncRequests,omitempty"`

// ExposePulsarAdminClientEnabled is the flag to indicate whether the function should expose pulsar admin client
// +optional
ExposePulsarAdminClientEnabled bool `json:"exposePulsarAdminClientEnabled"`

// SkipToLatest is the flag to indicate whether the function should skip to latest
// +optional
SkipToLatest bool `json:"skipToLatest"`

// SubscriptionPosition is the subscription position of the function
// +optional
SubscriptionPosition string `json:"subscriptionPosition,omitempty"`

// ConnectionRef is the reference to the PulsarConnection resource
ConnectionRef corev1.LocalObjectReference `json:"connectionRef"`
}

type WindowConfig struct {

Check warning on line 214 in api/v1alpha1/pulsarfunction_types.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type WindowConfig should have comment or be unexported (revive)
WindowLengthCount *int `json:"windowLengthCount" yaml:"windowLengthCount"`
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs" yaml:"windowLengthDurationMs"`
SlidingIntervalCount *int `json:"slidingIntervalCount" yaml:"slidingIntervalCount"`
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs" yaml:"slidingIntervalDurationMs"`
LateDataTopic *string `json:"lateDataTopic" yaml:"lateDataTopic"`
MaxLagMs *int64 `json:"maxLagMs" yaml:"maxLagMs"`
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs" yaml:"watermarkEmitIntervalMs"`
TimestampExtractorClassName *string `json:"timestampExtractorClassName" yaml:"timestampExtractorClassName"`
ActualWindowFunctionClassName *string `json:"actualWindowFunctionClassName" yaml:"actualWindowFunctionClassName"`
ProcessingGuarantees *string `json:"processingGuarantees" yaml:"processingGuarantees"`
}

// PulsarFunctionStatus defines the observed state of PulsarFunction
type PulsarFunctionStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file

// ObservedGeneration is the most recent generation observed for this resource.
// It corresponds to the metadata generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`

// Represents the observations of a connection's current state.
// +patchMergeKey=type
// +patchStrategy=merge
// +listType=map
// +listMapKey=type
// +optional
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:resource:categories=pulsar;pulsarres,shortName=pfunction
//+kubebuilder:printcolumn:name="RESOURCE_NAME",type=string,JSONPath=`.spec.name`
//+kubebuilder:printcolumn:name="GENERATION",type=string,JSONPath=`.metadata.generation`
//+kubebuilder:printcolumn:name="OBSERVED_GENERATION",type=string,JSONPath=`.status.observedGeneration`
//+kubebuilder:printcolumn:name="READY",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].status`

// PulsarFunction is the Schema for the pulsar functions API
type PulsarFunction struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec PulsarFunctionSpec `json:"spec,omitempty"`
Status PulsarFunctionStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// PulsarFunctionList contains a list of PulsarFunction
type PulsarFunctionList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []PulsarFunction `json:"items"`
}

func init() {
SchemeBuilder.Register(&PulsarFunction{}, &PulsarFunctionList{})
}
Loading
Loading