Skip to content

Commit 8e79793

Browse files
authored
Merge pull request kubernetes-sigs#692 from wzshiming/feat/crd-stage
Support CRD for stage
2 parents beff3a0 + e0afbd0 commit 8e79793

File tree

22 files changed

+372
-180
lines changed

22 files changed

+372
-180
lines changed

.github/workflows/test.yaml

+8-27
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ jobs:
6969
BUILDER=${{ matrix.builder }} make build-cluster-image
7070
7171
test-kwok:
72+
strategy:
73+
fail-fast: false
74+
matrix:
75+
case:
76+
- kwok
77+
- kwok-with-cni
78+
continue-on-error: false
7279
runs-on: ubuntu-latest
7380
steps:
7481
- uses: actions/checkout@v3
@@ -92,33 +99,7 @@ jobs:
9299
- name: Test Workable
93100
shell: bash
94101
run: |
95-
./hack/e2e-test.sh kwok/kwok
96-
97-
test-kwok-with-cni:
98-
runs-on: ubuntu-latest
99-
steps:
100-
- uses: actions/checkout@v3
101-
- name: Set up Go
102-
uses: actions/setup-go@v4
103-
with:
104-
go-version: "1.20"
105-
106-
- name: Install Kind
107-
shell: bash
108-
run: |
109-
./hack/requirements.sh kind
110-
kind version
111-
112-
- name: Install kubectl
113-
shell: bash
114-
run: |
115-
./hack/requirements.sh kubectl
116-
kubectl version || :
117-
118-
- name: Test Workable
119-
shell: bash
120-
run: |
121-
./hack/e2e-test.sh kwok-with-cni/kwok-with-cni
102+
./hack/e2e-test.sh ${{ matrix.case }}/${{ matrix.case }}
122103
123104
test-kwokctl:
124105
# https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs

kustomize/kwok-with-cni/deployment-patch.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ spec:
2929
- --node-ip=$(POD_IP)
3030
- --node-port=10247
3131
- --node-lease-duration-seconds=40
32+
- --enable-crd=Stage,Attach,Exec,PortForward,Logs,ClusterAttach,ClusterExec,ClusterPortForward,ClusterLogs
3233
- --experimental-enable-cni=true
3334
volumeMounts:
3435
- name: etc-cni

kustomize/kwok/deployment.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ spec:
2828
- --node-port=10247
2929
- --cidr=10.0.0.1/24
3030
- --node-lease-duration-seconds=40
31+
- --enable-crd=Stage,Attach,Exec,PortForward,Logs,ClusterAttach,ClusterExec,ClusterPortForward,ClusterLogs
3132
env:
3233
- name: POD_IP
3334
valueFrom:

pkg/config/resources/cache.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resources
18+
19+
import (
20+
"sync"
21+
)
22+
23+
type cacheGetter[O any] struct {
24+
getter Getter[O]
25+
26+
currentVer string
27+
data O
28+
29+
mut sync.RWMutex
30+
}
31+
32+
func withCache[O any](getter Getter[O]) Getter[O] {
33+
return &cacheGetter[O]{getter: getter}
34+
}
35+
36+
func (g *cacheGetter[O]) Get() O {
37+
g.mut.RLock()
38+
latestVer := g.getter.Version()
39+
if g.currentVer == latestVer {
40+
data := g.data
41+
g.mut.RUnlock()
42+
return data
43+
}
44+
g.mut.RUnlock()
45+
46+
g.mut.Lock()
47+
defer g.mut.Unlock()
48+
if g.currentVer == latestVer {
49+
data := g.data
50+
return data
51+
}
52+
53+
data := g.getter.Get()
54+
g.data = data
55+
g.currentVer = latestVer
56+
return data
57+
}
58+
59+
func (g *cacheGetter[O]) Version() string {
60+
return g.getter.Version()
61+
}

pkg/config/resources/dynamic.go

+13-28
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package resources
1818

1919
import (
2020
"context"
21-
"sync"
2221
"time"
2322

2423
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -39,10 +38,18 @@ type ConvertFunc[O any, T runtime.Object, S ~[]T] func(objs S) O
3938

4039
// NewDynamicGetter returns a new Getter that returns the latest list of resources.
4140
func NewDynamicGetter[O any, T runtime.Object, L runtime.Object](syncer Syncer[T, L], convertFunc ConvertFunc[O, T, []T]) DynamicGetter[O] {
42-
return &dynamicGetter[O, T, L]{
41+
getter := &dynamicGetter[O, T, L]{
4342
syncer: syncer,
4443
convertFunc: convertFunc,
4544
}
45+
46+
return struct {
47+
Getter[O]
48+
Starter
49+
}{
50+
Getter: withCache[O](getter),
51+
Starter: getter,
52+
}
4653
}
4754

4855
type dynamicGetter[O any, T runtime.Object, L runtime.Object] struct {
@@ -52,11 +59,6 @@ type dynamicGetter[O any, T runtime.Object, L runtime.Object] struct {
5259

5360
store cache.Store
5461
controller cache.Controller
55-
56-
currentVer string
57-
data O
58-
59-
mut sync.RWMutex
6062
}
6163

6264
func (c *dynamicGetter[O, T, L]) Start(ctx context.Context) error {
@@ -82,33 +84,16 @@ func (c *dynamicGetter[O, T, L]) Start(ctx context.Context) error {
8284
}
8385

8486
func (c *dynamicGetter[O, T, L]) Get() O {
85-
latestVer := c.controller.LastSyncResourceVersion()
86-
87-
c.mut.RLock()
88-
if latestVer == c.currentVer {
89-
data := c.data
90-
c.mut.RUnlock()
91-
return data
92-
}
93-
c.mut.RUnlock()
94-
return c.updateAndReturn(latestVer)
95-
}
96-
97-
func (c *dynamicGetter[O, T, L]) updateAndReturn(latestVer string) O {
98-
c.mut.Lock()
99-
defer c.mut.Unlock()
100-
if latestVer == c.currentVer {
101-
return c.data
102-
}
103-
10487
list := c.store.List()
10588
currentList := make([]T, 0, len(list))
10689
for _, obj := range list {
10790
currentList = append(currentList, obj.(T))
10891
}
10992

11093
data := c.convertFunc(currentList)
111-
c.data = data
112-
c.currentVer = latestVer
11394
return data
11495
}
96+
97+
func (c *dynamicGetter[O, T, L]) Version() string {
98+
return c.controller.LastSyncResourceVersion()
99+
}

pkg/config/resources/filter.go

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resources
18+
19+
type filterGetter[O any, T any] struct {
20+
getter Getter[T]
21+
filterFunc func(T) O
22+
}
23+
24+
// NewFilter returns a new Getter that returns the given list.
25+
func NewFilter[O any, T any](getter Getter[T], filterFunc func(T) O) Getter[O] {
26+
return withCache[O](&filterGetter[O, T]{getter: getter, filterFunc: filterFunc})
27+
}
28+
29+
func (f *filterGetter[O, T]) Get() O {
30+
return f.filterFunc(f.getter.Get())
31+
}
32+
33+
func (f *filterGetter[O, T]) Version() string {
34+
return f.getter.Version()
35+
}

pkg/config/resources/resource.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@ limitations under the License.
1616

1717
package resources
1818

19-
import "context"
19+
import (
20+
"context"
21+
)
2022

2123
// Getter is an interface for getting resources.
2224
type Getter[O any] interface {
2325
Get() O
26+
Version() string
2427
}
2528

2629
// DynamicGetter is an interface for getting resources.

pkg/config/resources/static.go

+4
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,7 @@ func NewStaticGetter[T any](data T) Getter[T] {
2828
func (s *staticGetter[T]) Get() T {
2929
return s.data
3030
}
31+
32+
func (s *staticGetter[T]) Version() string {
33+
return ""
34+
}

pkg/kwok/cmd/root.go

+25-14
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"k8s.io/utils/clock"
3030

3131
"sigs.k8s.io/kwok/pkg/apis/internalversion"
32+
"sigs.k8s.io/kwok/pkg/apis/v1alpha1"
3233
"sigs.k8s.io/kwok/pkg/config"
3334
"sigs.k8s.io/kwok/pkg/kwok/controllers"
3435
"sigs.k8s.io/kwok/pkg/kwok/server"
@@ -84,6 +85,7 @@ func NewCommand(ctx context.Context) *cobra.Command {
8485
cmd.Flags().StringVar(&flags.Master, "master", flags.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig).")
8586
cmd.Flags().StringVar(&flags.Options.ServerAddress, "server-address", flags.Options.ServerAddress, "Address to expose the server on")
8687
cmd.Flags().UintVar(&flags.Options.NodeLeaseDurationSeconds, "node-lease-duration-seconds", flags.Options.NodeLeaseDurationSeconds, "Duration of node lease seconds")
88+
cmd.Flags().StringArrayVar(&flags.Options.EnableCRDs, "enable-crd", flags.Options.EnableCRDs, "List of CRDs to enable")
8789

8890
cmd.Flags().BoolVar(&flags.Options.EnableCNI, "experimental-enable-cni", flags.Options.EnableCNI, "Experimental support for getting pod ip from CNI, for CNI-related components, Only works with Linux")
8991
if config.GOOS != "linux" {
@@ -145,26 +147,34 @@ func runE(ctx context.Context, flags *flagpole) error {
145147
}
146148

147149
stagesData := config.FilterWithTypeFromContext[*internalversion.Stage](ctx)
150+
var nodeStages []*internalversion.Stage
151+
var podStages []*internalversion.Stage
148152

149-
nodeStages := filterStages(stagesData, "v1", "Node")
150-
if len(nodeStages) == 0 {
151-
nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages))
152-
if err != nil {
153-
return err
153+
if slices.Contains(flags.Options.EnableCRDs, v1alpha1.StageKind) {
154+
if len(stagesData) != 0 {
155+
return fmt.Errorf("stage already exists, cannot watch CRD")
154156
}
155-
if flags.Options.NodeLeaseDurationSeconds == 0 {
156-
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
157+
} else {
158+
nodeStages = filterStages(stagesData, "v1", "Node")
159+
if len(nodeStages) == 0 {
160+
nodeStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultNodeStages))
157161
if err != nil {
158162
return err
159163
}
160-
nodeStages = append(nodeStages, nodeHeartbeatStages...)
164+
if flags.Options.NodeLeaseDurationSeconds == 0 {
165+
nodeHeartbeatStages, err := controllers.NewStagesFromYaml([]byte(stages.DefaultNodeHeartbeatStages))
166+
if err != nil {
167+
return err
168+
}
169+
nodeStages = append(nodeStages, nodeHeartbeatStages...)
170+
}
161171
}
162-
}
163-
podStages := filterStages(stagesData, "v1", "Pod")
164-
if len(podStages) == 0 {
165-
podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages))
166-
if err != nil {
167-
return err
172+
podStages = filterStages(stagesData, "v1", "Pod")
173+
if len(podStages) == 0 {
174+
podStages, err = controllers.NewStagesFromYaml([]byte(stages.DefaultPodStages))
175+
if err != nil {
176+
return err
177+
}
168178
}
169179
}
170180

@@ -179,6 +189,7 @@ func runE(ctx context.Context, flags *flagpole) error {
179189
ctr, err := controllers.NewController(controllers.Config{
180190
Clock: clock.RealClock{},
181191
TypedClient: typedClient,
192+
TypedKwokClient: typedKwokClient,
182193
EnableCNI: flags.Options.EnableCNI,
183194
ManageAllNodes: flags.Options.ManageAllNodes,
184195
ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector,

0 commit comments

Comments
 (0)