Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/mo-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: mo
spec:
imageRepository: matrixorigin/matrixone
version: #TAG
version: 1.2.3
logService:
replicas: 3
sharedStorage:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
k8s.io/apiserver v0.27.2
k8s.io/client-go v0.27.2
k8s.io/component-helpers v0.27.2
k8s.io/klog v1.0.0
k8s.io/kubelet v0.27.2
k8s.io/kubernetes v1.27.2
k8s.io/utils v0.0.0-20230209194617-a36077c30491
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8b
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
Expand Down Expand Up @@ -869,6 +870,8 @@ k8s.io/component-base v0.27.2 h1:neju+7s/r5O4x4/txeUONNTS9r1HsPbyoPBAtHsDCpo=
k8s.io/component-base v0.27.2/go.mod h1:5UPk7EjfgrfgRIuDBFtsEFAe4DAvP3U+M8RTzoSJkpo=
k8s.io/component-helpers v0.27.2 h1:i9TgWJ6TH8lQ9x4ExHOwhVitrRpBOr7Wn8aZLbBWxkc=
k8s.io/component-helpers v0.27.2/go.mod h1:NwcpSKo1xzXtUtrUjj5NTSVWex84UPua/z0PYDcCzNo=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg=
Expand Down
10 changes: 7 additions & 3 deletions pkg/controllers/cnset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ func (c *Actor) Observe(ctx *recon.Context[*v1alpha1.CNSet]) (recon.Action[*v1al
return c.with(cs).Scale, nil
}

if cn.Spec.CacheVolume != nil {
if err := common.SyncCloneSetVolumeSize(ctx, cn, cn.Spec.CacheVolume.Size, cs); err != nil {
return nil, errors.WrapPrefix(err, "sync volume size", 0)
}
}

if recon.IsReady(&cn.Status.ConditionalStatus) {
cn.Status.Host = fmt.Sprintf("%s.%s", svc.Name, svc.Namespace)
cn.Status.Port = CNSQLPort
Expand Down Expand Up @@ -263,6 +269,7 @@ func (c *Actor) Create(ctx *recon.Context[*v1alpha1.CNSet]) error {
if err := syncCloneSet(ctx, cnSet); err != nil {
return errors.WrapPrefix(err, "sync clone set", 0)
}
syncPersistentVolumeClaim(cn, cnSet)

// create all resources
err := lo.Reduce[client.Object, error]([]client.Object{
Expand Down Expand Up @@ -331,9 +338,6 @@ func syncCloneSet(ctx *recon.Context[*v1alpha1.CNSet], cs *kruisev1alpha1.CloneS
if ctx.Dep != nil {
syncPodSpec(ctx.Obj, cs, ctx.Dep.Deps.LogSet.Spec.SharedStorage)
}
// support update cacheVolume, NOTE: pvc only updated when pod rolling updated
// ref: https://openkruise.io/zh/docs/next/user-manuals/cloneset/#%E6%94%AF%E6%8C%81-pvc-%E6%A8%A1%E6%9D%BF
syncPersistentVolumeClaim(cn, cs)
if pooling {
if cs.Annotations == nil {
cs.Annotations = map[string]string{}
Expand Down
125 changes: 125 additions & 0 deletions pkg/controllers/common/volume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"fmt"

"github.com/go-errors/errors"
recon "github.com/matrixorigin/controller-runtime/pkg/reconciler"
kruisev1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
kruisev1 "github.com/openkruise/kruise-api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func SyncCloneSetVolumeSize(kubeCli recon.KubeClient, owner client.Object, size resource.Quantity, cs *kruisev1alpha1.CloneSet) error {
var changed bool
for i := range cs.Spec.VolumeClaimTemplates {
if cs.Spec.VolumeClaimTemplates[i].Name == DataVolume {
oldSize := cs.Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[corev1.ResourceStorage]
c := oldSize.Cmp(size)
if c < 0 {
changed = true
cs.Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[corev1.ResourceStorage] = size
} else if c > 0 {
return errors.New(fmt.Sprintf("volume size cannot be decreased from %s to %s", oldSize.String(), size.String()))
}
}
}
if !changed {
return nil
}
podList := &corev1.PodList{}
err := kubeCli.List(podList, client.InNamespace(owner.GetNamespace()), client.MatchingLabels(SubResourceLabels(owner)))
if err != nil {
return errors.WrapPrefix(err, "list pods", 0)
}
for i := range podList.Items {
pod := &podList.Items[i]
instanceId := pod.Labels[kruisev1alpha1.CloneSetInstanceID]
if instanceId == "" {
continue
}
pvcList := &corev1.PersistentVolumeClaimList{}
err := kubeCli.List(pvcList, client.InNamespace(owner.GetNamespace()), client.MatchingLabels(map[string]string{
kruisev1alpha1.CloneSetInstanceID: instanceId,
}))

klog.Infof("sync volume size for %s, pvc list: %v", owner.GetName(), pvcList.Items)
if err != nil {
return errors.WrapPrefix(err, "list volumes", 0)
}
for j := range pvcList.Items {
pvc := &pvcList.Items[j]
current := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
if current.Cmp(size) < 0 {
if err := kubeCli.Patch(pvc, func() error {
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = size
return nil
}); err != nil {
return errors.WrapPrefix(err, "patch volume size", 0)
}
}
}
}
if err := kubeCli.Update(cs); err != nil {
return errors.WrapPrefix(err, "sync volume size", 0)
}
return nil
}

// SyncStsVolumeSize syncs the volume size of component backed by kruise statefuset
func SyncStsVolumeSize(kubeCli recon.KubeClient, owner client.Object, size resource.Quantity, sts *kruisev1.StatefulSet) error {
var changed bool
for i := range sts.Spec.VolumeClaimTemplates {
if sts.Spec.VolumeClaimTemplates[i].Name == DataVolume {
oldSize := sts.Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[corev1.ResourceStorage]
c := oldSize.Cmp(size)
if c < 0 {
changed = true
sts.Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[corev1.ResourceStorage] = size
} else if c > 0 {
return errors.New(fmt.Sprintf("volume size cannot be decreased from %s to %s", oldSize.String(), size.String()))
}
}
}
if !changed {
return nil
}
pvcList := &corev1.PersistentVolumeClaimList{}
err := kubeCli.List(pvcList, client.InNamespace(owner.GetNamespace()), client.MatchingLabels(SubResourceLabels(owner)))
if err != nil {
return errors.WrapPrefix(err, "list volumes", 0)
}
for i := range pvcList.Items {
pvc := &pvcList.Items[i]
current := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
if current.Cmp(size) < 0 {
if err := kubeCli.Patch(pvc, func() error {
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = size
return nil
}); err != nil {
return errors.WrapPrefix(err, "patch volume size", 0)
}
}
}
if err := kubeCli.Update(sts); err != nil {
return errors.WrapPrefix(err, "sync volume size", 0)
}
return nil
}
10 changes: 8 additions & 2 deletions pkg/controllers/dnset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package dnset

import (
"github.com/matrixorigin/matrixone-operator/api/features"
"github.com/matrixorigin/matrixone-operator/pkg/utils"
"strconv"
"time"

"github.com/matrixorigin/matrixone-operator/api/features"
"github.com/matrixorigin/matrixone-operator/pkg/utils"

"github.com/go-errors/errors"
recon "github.com/matrixorigin/controller-runtime/pkg/reconciler"
"github.com/matrixorigin/controller-runtime/pkg/util"
Expand Down Expand Up @@ -132,6 +133,11 @@ func (d *Actor) Observe(ctx *recon.Context[*v1alpha1.DNSet]) (recon.Action[*v1al
return d.with(sts, svc).Update, nil
}

if dn.Spec.CacheVolume != nil {
if err := common.SyncStsVolumeSize(ctx, dn, dn.Spec.CacheVolume.Size, sts); err != nil {
return nil, errors.WrapPrefix(err, "sync volume size", 0)
}
}
if err := d.syncMetricService(ctx); err != nil {
return nil, errors.WrapPrefix(err, "sync metric service", 0)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/logset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (r *Actor) Observe(ctx *recon.Context[*v1alpha1.LogSet]) (recon.Action[*v1a
return r.with(sts).Update, nil
}

if err = common.SyncStsVolumeSize(ctx, ls, ls.Spec.Volume.Size, sts); err != nil {
return nil, errors.WrapPrefix(err, "sync volume size", 0)
}

if err = r.syncBucketClaim(ctx, sts); err != nil {
return nil, errors.WrapPrefix(err, "sync bucket claim", 0)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/webhook/cnset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (c *cnSetValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runt
return nil, unexpectedKindError("CNSet", newObj)
}
errs = append(errs, validatePodSetUpdate(&oldCN.Spec.PodSet, &newCN.Spec.PodSet, field.NewPath("spec"))...)
errs = append(errs, validateVolumeUpdate(oldCN.Spec.CacheVolume, newCN.Spec.CacheVolume, field.NewPath("spec").Child("cacheVolume"))...)
return nil, invalidOrNil(errs, newCN)
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/webhook/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,17 @@ func validatePodSetUpdate(oldPodSet, newPodSet *v1alpha1.PodSet, path *field.Pat
}
return errs
}

func validateVolumeUpdate(oldVolume, newVolume *v1alpha1.Volume, path *field.Path) field.ErrorList {
var errs field.ErrorList
if oldVolume == nil || newVolume == nil {
return nil
}
if newVolume.StorageClassName != nil && oldVolume.StorageClassName != nil && *newVolume.StorageClassName != *oldVolume.StorageClassName {
errs = append(errs, field.Invalid(path.Child("storageClassName"), *newVolume.StorageClassName, "storageClassName is immutable"))
}
if newVolume.Size.Cmp(oldVolume.Size) < 0 {
errs = append(errs, field.Invalid(path.Child("size"), newVolume.Size, "volume size cannot be decreased"))
}
return errs
}
14 changes: 12 additions & 2 deletions pkg/webhook/dnset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,22 @@ func (d *dnSetValidator) ValidateCreate(_ context.Context, obj runtime.Object) (
return nil, invalidOrNil(errs, dnSet)
}

func (d *dnSetValidator) ValidateUpdate(ctx context.Context, _, newObj runtime.Object) (warnings admission.Warnings, err error) {
func (d *dnSetValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (warnings admission.Warnings, err error) {
var errs field.ErrorList
warnings, err = d.ValidateCreate(ctx, newObj)
if err != nil {
return warnings, err
}
return warnings, nil
oldDN, ok := oldObj.(*v1alpha1.DNSet)
if !ok {
return nil, unexpectedKindError("DNSet", oldObj)
}
newDN, ok := newObj.(*v1alpha1.DNSet)
if !ok {
return nil, unexpectedKindError("DNSet", newObj)
}
errs = append(errs, validateVolumeUpdate(oldDN.Spec.CacheVolume, newDN.Spec.CacheVolume, field.NewPath("spec").Child("cacheVolume"))...)
return warnings, invalidOrNil(errs, newDN)
}

func (d *dnSetValidator) ValidateDelete(_ context.Context, _ runtime.Object) (warnings admission.Warnings, err error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/webhook/logset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ func (l *logSetValidator) ValidateCreate(_ context.Context, obj runtime.Object)
}

func (l *logSetValidator) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Object) (warnings admission.Warnings, err error) {
var errs field.ErrorList
old := oldObj.(*v1alpha1.LogSet)
logSet := newObj.(*v1alpha1.LogSet)
errs := l.ValidateSpecUpdate(&old.Spec, &logSet.Spec, logSet.ObjectMeta)
errs = append(errs, l.ValidateSpecUpdate(&old.Spec, &logSet.Spec, logSet.ObjectMeta)...)
errs = append(errs, validateVolumeUpdate(&old.Spec.Volume, &logSet.Spec.Volume, field.NewPath("spec").Child("volume"))...)
return nil, invalidOrNil(errs, logSet)
}

Expand Down
3 changes: 2 additions & 1 deletion test/e2e/claim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package e2e
import (
"context"
"fmt"
"time"

"github.com/matrixorigin/controller-runtime/pkg/util"
"github.com/matrixorigin/matrixone-operator/api/core/v1alpha1"
. "github.com/onsi/ginkgo/v2"
Expand All @@ -27,7 +29,6 @@ import (
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"
)

const (
Expand Down
Loading