Skip to content

Commit

Permalink
fix: refactor unstructured_util and reduce k8s api call (#339)
Browse files Browse the repository at this point in the history
Signed-off-by: chandankumar4 <[email protected]>
  • Loading branch information
chandankumar4 authored Oct 24, 2024
1 parent 5908451 commit 7da779d
Show file tree
Hide file tree
Showing 17 changed files with 185 additions and 298 deletions.
7 changes: 4 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,15 @@ func main() {
numaLogger.Fatal(err, "Failed to start configmap watcher")
}

if err := kubernetes.SetDynamicClient(newRawConfig); err != nil {
numaLogger.Fatal(err, "Failed to set dynamic client")
}

//+kubebuilder:scaffold:builder

pipelineRolloutReconciler := controller.NewPipelineRolloutReconciler(
mgr.GetClient(),
mgr.GetScheme(),
newRawConfig,
customMetrics,
mgr.GetEventRecorderFor(apiv1.RolloutPipeline),
)
Expand Down Expand Up @@ -179,7 +182,6 @@ func main() {
isbServiceRolloutReconciler := controller.NewISBServiceRolloutReconciler(
mgr.GetClient(),
mgr.GetScheme(),
newRawConfig,
customMetrics,
mgr.GetEventRecorderFor(apiv1.RolloutISBSvc),
)
Expand All @@ -191,7 +193,6 @@ func main() {
monoVertexRolloutReconciler := controller.NewMonoVertexRolloutReconciler(
mgr.GetClient(),
mgr.GetScheme(),
newRawConfig,
customMetrics,
mgr.GetEventRecorderFor(apiv1.RolloutMonoVertex),
)
Expand Down
11 changes: 11 additions & 0 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,21 @@ const (
// EnvLogLevel log level that is defined by `--loglevel` option
EnvLogLevel = "NUMAPLANE_LOG_LEVEL"

// NumaflowAPIGroup is the group of the Numaflow API
NumaflowAPIGroup = "numaflow.numaproj.io"

// NumaflowAPIVersion is the version of the Numaflow API
NumaflowAPIVersion = "v1alpha1"

// NumaflowPipelineKind is the kind of the Numaflow Pipeline
NumaflowPipelineKind = "Pipeline"

// NumaflowMonoVertexKind is the kind of the Numaflow MonoVertex
NumaflowMonoVertexKind = "MonoVertex"

// NumaflowISBServiceKind is the kind of the Numaflow ISB Service
NumaflowISBServiceKind = "InterStepBufferService"

// LABELS:

// LabelKeyNumaplaneInstance Resource metadata labels (keys and values) used for tracking
Expand Down
11 changes: 6 additions & 5 deletions internal/controller/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ import (
"testing"
"time"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
numaflowversioned "github.com/numaproj/numaflow/pkg/client/clientset/versioned"
"github.com/numaproj/numaplane/internal/common"
"github.com/numaproj/numaplane/internal/controller/config"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
k8sclientgo "k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
numaflowversioned "github.com/numaproj/numaflow/pkg/client/clientset/versioned"
"github.com/numaproj/numaplane/internal/common"
"github.com/numaproj/numaplane/internal/controller/config"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)

func createPipelineRolloutInK8S(ctx context.Context, t *testing.T, numaplaneClient client.Client, pipelineRollout *apiv1.PipelineRollout) {
Expand Down
12 changes: 4 additions & 8 deletions internal/controller/isbservicerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -64,7 +63,6 @@ const (
type ISBServiceRolloutReconciler struct {
client client.Client
scheme *runtime.Scheme
restConfig *rest.Config
customMetrics *metrics.CustomMetrics
// the recorder is used to record events
recorder record.EventRecorder
Expand All @@ -76,15 +74,13 @@ type ISBServiceRolloutReconciler struct {
func NewISBServiceRolloutReconciler(
c client.Client,
s *runtime.Scheme,
restConfig *rest.Config,
customMetrics *metrics.CustomMetrics,
recorder record.EventRecorder,
) *ISBServiceRolloutReconciler {

r := &ISBServiceRolloutReconciler{
c,
s,
restConfig,
customMetrics,
recorder,
nil,
Expand Down Expand Up @@ -230,8 +226,8 @@ func (r *ISBServiceRolloutReconciler) reconcile(ctx context.Context, isbServiceR

newISBServiceDef := &kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: "InterStepBufferService",
APIVersion: "numaflow.numaproj.io/v1alpha1",
Kind: common.NumaflowISBServiceKind,
APIVersion: common.NumaflowAPIGroup + "/" + common.NumaflowAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Name: isbServiceRollout.Name,
Expand Down Expand Up @@ -453,7 +449,7 @@ func (r *ISBServiceRolloutReconciler) isISBServiceUpdating(ctx context.Context,
}

func (r *ISBServiceRolloutReconciler) getPipelineList(ctx context.Context, rolloutNamespace string, rolloutName string) ([]*kubernetes.GenericObject, error) {
gvk := schema.GroupVersionKind{Group: common.NumaflowAPIGroup, Version: common.NumaflowAPIVersion, Kind: "Pipeline"}
gvk := schema.GroupVersionKind{Group: common.NumaflowAPIGroup, Version: common.NumaflowAPIVersion, Kind: common.NumaflowPipelineKind}
return kubernetes.ListResources(ctx, r.client, gvk,
client.InNamespace(rolloutNamespace),
client.MatchingLabels{common.LabelKeyISBServiceNameForPipeline: rolloutName},
Expand Down Expand Up @@ -615,7 +611,7 @@ func (r *ISBServiceRolloutReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Watch InterStepBufferServices
isbServiceUns := &unstructured.Unstructured{}
isbServiceUns.SetGroupVersionKind(schema.GroupVersionKind{
Kind: "InterStepBufferService",
Kind: common.NumaflowISBServiceKind,
Group: common.NumaflowAPIGroup,
Version: common.NumaflowAPIVersion,
})
Expand Down
12 changes: 6 additions & 6 deletions internal/controller/isbservicerollout_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
Expand All @@ -42,9 +41,9 @@ import (
"github.com/numaproj/numaplane/internal/common"
"github.com/numaproj/numaplane/internal/controller/config"
"github.com/numaproj/numaplane/internal/util"
"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
"github.com/numaproj/numaplane/internal/util/metrics"

apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
commontest "github.com/numaproj/numaplane/tests/common"
)
Expand Down Expand Up @@ -255,6 +254,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {

restConfig, numaflowClientSet, numaplaneClient, k8sClientSet, err := commontest.PrepareK8SEnvironment()
assert.Nil(t, err)
assert.Nil(t, kubernetes.SetDynamicClient(restConfig))

config.GetConfigManagerInstance().UpdateUSDEConfig(config.USDEConfig{DefaultUpgradeStrategy: config.PPNDStrategyID})

Expand All @@ -265,7 +265,7 @@ func Test_reconcile_isbservicerollout_PPND(t *testing.T) {

recorder := record.NewFakeRecorder(64)

r := NewISBServiceRolloutReconciler(numaplaneClient, scheme.Scheme, restConfig, customMetrics, recorder)
r := NewISBServiceRolloutReconciler(numaplaneClient, scheme.Scheme, customMetrics, recorder)

trueValue := true
falseValue := false
Expand Down Expand Up @@ -513,8 +513,8 @@ func createDefaultISBService(jetstreamVersion string, phase numaflowv1.ISBSvcPha
}
return &numaflowv1.InterStepBufferService{
TypeMeta: metav1.TypeMeta{
Kind: "InterStepBufferService",
APIVersion: "numaflow.numaproj.io/v1alpha1",
Kind: common.NumaflowISBServiceKind,
APIVersion: common.NumaflowAPIGroup + "/" + common.NumaflowAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Name: defaultISBSvcRolloutName,
Expand Down
39 changes: 23 additions & 16 deletions internal/controller/monovertexrollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import (
"fmt"
"time"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
"github.com/numaproj/numaplane/internal/util/metrics"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -41,6 +37,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

numaflowv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaplane/internal/common"
"github.com/numaproj/numaplane/internal/util/kubernetes"
"github.com/numaproj/numaplane/internal/util/logger"
"github.com/numaproj/numaplane/internal/util/metrics"
apiv1 "github.com/numaproj/numaplane/pkg/apis/numaplane/v1alpha1"
)

const (
Expand All @@ -51,7 +54,6 @@ const (
type MonoVertexRolloutReconciler struct {
client client.Client
scheme *runtime.Scheme
restConfig *rest.Config
customMetrics *metrics.CustomMetrics
// the recorder is used to record events
recorder record.EventRecorder
Expand All @@ -60,15 +62,13 @@ type MonoVertexRolloutReconciler struct {
func NewMonoVertexRolloutReconciler(
client client.Client,
s *runtime.Scheme,
restConfig *rest.Config,
customMetrics *metrics.CustomMetrics,
recorder record.EventRecorder,
) *MonoVertexRolloutReconciler {

return &MonoVertexRolloutReconciler{
client,
s,
restConfig,
customMetrics,
recorder,
}
Expand Down Expand Up @@ -182,8 +182,8 @@ func (r *MonoVertexRolloutReconciler) reconcile(ctx context.Context, monoVertexR

newMonoVertexDef := &kubernetes.GenericObject{
TypeMeta: metav1.TypeMeta{
Kind: "MonoVertex",
APIVersion: "numaflow.numaproj.io/v1alpha1",
Kind: common.NumaflowMonoVertexKind,
APIVersion: common.NumaflowAPIGroup + "/" + common.NumaflowAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Name: monoVertexRollout.Name,
Expand All @@ -193,13 +193,14 @@ func (r *MonoVertexRolloutReconciler) reconcile(ctx context.Context, monoVertexR
Spec: monoVertexRollout.Spec.MonoVertex.Spec,
}

existingMonoVertexDef, err := kubernetes.GetLiveResource(ctx, r.restConfig, newMonoVertexDef, "monovertices")
existingMonoVertexDef, err := kubernetes.GetResource(ctx, r.client, newMonoVertexDef.GroupVersionKind(),
k8stypes.NamespacedName{Namespace: newMonoVertexDef.Namespace, Name: newMonoVertexDef.Name})
if err != nil {
if apierrors.IsNotFound(err) {
numaLogger.Debugf("MonoVertex %s/%s doesn't exist so creating", monoVertexRollout.Namespace, monoVertexRollout.Name)
monoVertexRollout.Status.MarkPending()

if err := kubernetes.CreateCR(ctx, r.restConfig, newMonoVertexDef, "monovertices"); err != nil {
if err := kubernetes.CreateResource(ctx, r.client, newMonoVertexDef); err != nil {
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -243,7 +244,13 @@ func (r *MonoVertexRolloutReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

// Watch MonoVertices
if err := controller.Watch(source.Kind(mgr.GetCache(), &numaflowv1.MonoVertex{}),
monoVertexUns := &unstructured.Unstructured{}
monoVertexUns.SetGroupVersionKind(schema.GroupVersionKind{
Kind: common.NumaflowMonoVertexKind,
Group: common.NumaflowAPIGroup,
Version: common.NumaflowAPIVersion,
})
if err := controller.Watch(source.Kind(mgr.GetCache(), monoVertexUns),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &apiv1.MonoVertexRollout{}, handler.OnlyControllerOwner()),
predicate.ResourceVersionChangedPredicate{}); err != nil {
return err
Expand All @@ -252,7 +259,7 @@ func (r *MonoVertexRolloutReconciler) SetupWithManager(mgr ctrl.Manager) error {
return nil
}

func mergeMonoVertex(existingMonoVertex *kubernetes.GenericObject, newMonoVertex *kubernetes.GenericObject) (*kubernetes.GenericObject, error) {
func mergeMonoVertex(existingMonoVertex, newMonoVertex *kubernetes.GenericObject) (*kubernetes.GenericObject, error) {
resultMonoVertex := existingMonoVertex.DeepCopy()
resultMonoVertex.Spec = *newMonoVertex.Spec.DeepCopy()
// Use the same replicas as the existing MonoVertex
Expand Down Expand Up @@ -325,7 +332,7 @@ func (r *MonoVertexRolloutReconciler) needsUpdate(old, new *apiv1.MonoVertexRoll
}

func (r *MonoVertexRolloutReconciler) updateMonoVertex(ctx context.Context, monoVertexRollout *apiv1.MonoVertexRollout, newMonoVertexDef *kubernetes.GenericObject) error {
err := kubernetes.UpdateCR(ctx, r.restConfig, newMonoVertexDef, "monovertices")
err := kubernetes.UpdateResource(ctx, r.client, newMonoVertexDef)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (r *NumaflowControllerRolloutReconciler) getChildTypeString() string {
}

func (r *NumaflowControllerRolloutReconciler) getPipelineList(ctx context.Context, rolloutNamespace string, _ string) ([]*kubernetes.GenericObject, error) {
return kubernetes.ListLiveResource(ctx, r.restConfig, common.NumaflowAPIGroup, common.NumaflowAPIVersion, "pipelines", rolloutNamespace, common.LabelKeyPipelineRolloutForPipeline, "")
return kubernetes.ListLiveResource(ctx, common.NumaflowAPIGroup, common.NumaflowAPIVersion, "pipelines", rolloutNamespace, common.LabelKeyPipelineRolloutForPipeline, "")
}

func (r *NumaflowControllerRolloutReconciler) getRolloutKey(rolloutNamespace string, rolloutName string) string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func Test_reconcile_numaflowcontrollerrollout_PPND(t *testing.T) {

restConfig, numaflowClientSet, numaplaneClient, k8sClientSet, err := commontest.PrepareK8SEnvironment()
assert.Nil(t, err)
assert.Nil(t, kubernetes.SetDynamicClient(restConfig))

config.GetConfigManagerInstance().UpdateUSDEConfig(config.USDEConfig{DefaultUpgradeStrategy: config.PPNDStrategyID})
controllerDefinitions, err := getNumaflowControllerDefinitions("../../tests/config/controller-definitions-config.yaml")
Expand Down
17 changes: 9 additions & 8 deletions internal/controller/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"fmt"
"sync"

"github.com/numaproj/numaplane/internal/util/kubernetes"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/numaproj/numaplane/internal/util/kubernetes"
)

var (
Expand Down Expand Up @@ -72,19 +73,19 @@ func (pm *PauseModule) getPauseRequest(requester string) (*bool, bool) {
}

// pause pipeline
func (pm *PauseModule) pausePipeline(ctx context.Context, restConfig *rest.Config, pipeline *kubernetes.GenericObject) error {
func (pm *PauseModule) pausePipeline(ctx context.Context, c client.Client, pipeline *kubernetes.GenericObject) error {
var existingPipelineSpec PipelineSpec
if err := json.Unmarshal(pipeline.Spec.Raw, &existingPipelineSpec); err != nil {
return err
}

return pm.updatePipelineLifecycle(ctx, restConfig, pipeline, "Paused")
return pm.updatePipelineLifecycle(ctx, c, pipeline, "Paused")
}

// resume pipeline
// lock the maps while we change pipeline lifecycle so nobody changes their pause request
// while we run; otherwise, they may think they are pausing the pipeline while it's running
func (pm *PauseModule) runPipelineIfSafe(ctx context.Context, restConfig *rest.Config, pipeline *kubernetes.GenericObject) (bool, error) {
func (pm *PauseModule) runPipelineIfSafe(ctx context.Context, c client.Client, pipeline *kubernetes.GenericObject) (bool, error) {
pm.lock.RLock()
defer pm.lock.RUnlock()

Expand All @@ -101,21 +102,21 @@ func (pm *PauseModule) runPipelineIfSafe(ctx context.Context, restConfig *rest.C
return false, nil
}

err := pm.updatePipelineLifecycle(ctx, restConfig, pipeline, "Running")
err := pm.updatePipelineLifecycle(ctx, c, pipeline, "Running")
if err != nil {
return false, err
}
return true, nil
}

func (pm *PauseModule) updatePipelineLifecycle(ctx context.Context, restConfig *rest.Config, pipeline *kubernetes.GenericObject, phase string) error {
func (pm *PauseModule) updatePipelineLifecycle(ctx context.Context, c client.Client, pipeline *kubernetes.GenericObject, phase string) error {

err := withDesiredPhase(pipeline, phase)
if err != nil {
return err
}

err = kubernetes.UpdateCR(ctx, restConfig, pipeline, "pipelines")
err = kubernetes.UpdateResource(ctx, c, pipeline)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 7da779d

Please sign in to comment.