Skip to content

Commit baf7abf

Browse files
committed
Implementation with unit tests
1 parent 8cadba4 commit baf7abf

File tree

252 files changed

+28806
-68
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

252 files changed

+28806
-68
lines changed

cmd/gce-pd-csi-driver/main.go

+35-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"strings"
2828
"time"
2929

30+
"k8s.io/client-go/kubernetes"
31+
"k8s.io/client-go/rest"
3032
"k8s.io/klog/v2"
3133
"k8s.io/utils/strings/slices"
3234
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
@@ -94,6 +96,8 @@ var (
9496

9597
extraTagsStr = flag.String("extra-tags", "", "Extra tags to attach to each Compute Disk, Image, Snapshot created. It is a comma separated list of parent id, key and value like '<parent_id1>/<tag_key1>/<tag_value1>,...,<parent_idN>/<tag_keyN>/<tag_valueN>'. parent_id is the Organization or the Project ID or Project name where the tag key and the tag value resources exist. A maximum of 50 tags bindings is allowed for a resource. See https://cloud.google.com/resource-manager/docs/tags/tags-overview, https://cloud.google.com/resource-manager/docs/tags/tags-creating-and-managing for details")
9698

99+
diskTopology = flag.Bool("disk-topology", false, "If set to true, the driver will add a topology.gke.io/[disk-type] topology label to the Topologies returned in CreateVolumeResponse")
100+
97101
version string
98102
)
99103

@@ -227,7 +231,10 @@ func handle() {
227231
}
228232
initialBackoffDuration := time.Duration(*errorBackoffInitialDurationMs) * time.Millisecond
229233
maxBackoffDuration := time.Duration(*errorBackoffMaxDurationMs) * time.Millisecond
230-
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag)
234+
args := &driver.GCEControllerServerArgs{
235+
EnableDiskTopology: *diskTopology,
236+
}
237+
controllerServer = driver.NewControllerServer(gceDriver, cloudProvider, initialBackoffDuration, maxBackoffDuration, fallbackRequisiteZones, *enableStoragePoolsFlag, *enableDataCacheFlag, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig, *enableHdHAFlag, args)
231238
} else if *cloudConfigFilePath != "" {
232239
klog.Warningf("controller service is disabled but cloud config given - it has no effect")
233240
}
@@ -239,19 +246,33 @@ func handle() {
239246
if err != nil {
240247
klog.Fatalf("Failed to get safe mounter: %v", err.Error())
241248
}
249+
242250
deviceUtils := deviceutils.NewDeviceUtils()
243251
statter := mountmanager.NewStatter(mounter)
244252
meta, err := metadataservice.NewMetadataService()
245253
if err != nil {
246254
klog.Fatalf("Failed to set up metadata service: %v", err.Error())
247255
}
256+
248257
nsArgs := driver.NodeServerArgs{
249258
EnableDeviceInUseCheck: *enableDeviceInUseCheck,
250259
DeviceInUseTimeout: *deviceInUseTimeout,
251260
EnableDataCache: *enableDataCacheFlag,
252261
DataCacheEnabledNodePool: isDataCacheEnabledNodePool(ctx, *nodeName),
262+
EnableDiskTopology: *diskTopology,
253263
}
254-
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, nsArgs)
264+
265+
if *diskTopology {
266+
klog.V(2).Infof("Setting up kubeClient")
267+
kubeClient, err := instantiateKubeClient()
268+
if err != nil {
269+
klog.Fatalf("Failed to instantiate Kubernetes client: %v", err)
270+
}
271+
nsArgs.KubeClient = kubeClient
272+
}
273+
274+
nodeServer = driver.NewNodeServer(gceDriver, mounter, deviceUtils, meta, statter, &nsArgs)
275+
255276
if *maxConcurrentFormatAndMount > 0 {
256277
nodeServer = nodeServer.WithSerializedFormatAndMount(*formatAndMountTimeout, *maxConcurrentFormatAndMount)
257278
}
@@ -288,6 +309,18 @@ func handle() {
288309
gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing, metricsManager)
289310
}
290311

312+
func instantiateKubeClient() (*kubernetes.Clientset, error) {
313+
cfg, err := rest.InClusterConfig()
314+
if err != nil {
315+
return nil, fmt.Errorf("failed to create REST Config for k8s client: %w", err)
316+
}
317+
kubeClient, err := kubernetes.NewForConfig(cfg)
318+
if err != nil {
319+
return nil, fmt.Errorf("failed to create k8s client: %w", err)
320+
}
321+
return kubeClient, nil
322+
}
323+
291324
func notEmpty(v string) bool {
292325
return v != ""
293326
}

deploy/kubernetes/base/controller/cluster_setup.yaml

+4-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ rules:
205205
verbs: ['use']
206206
resourceNames:
207207
- csi-gce-pd-node-psp
208-
- apiGroups: [""]
208+
- apiGroups: [""] # The core API group
209209
resources: ["nodes"]
210210
verbs: ["get", "list"]
211211
---
@@ -220,6 +220,9 @@ rules:
220220
verbs: ['use']
221221
resourceNames:
222222
- csi-gce-pd-node-psp-win
223+
- apiGroups: [""] # The core API group
224+
resources: ["nodes"]
225+
verbs: ["get", "list"]
223226
---
224227

225228
apiVersion: rbac.authorization.k8s.io/v1

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ require (
5656
github.com/cespare/xxhash/v2 v2.3.0 // indirect
5757
github.com/davecgh/go-spew v1.1.1 // indirect
5858
github.com/emicklei/go-restful/v3 v3.12.2 // indirect
59+
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
5960
github.com/felixge/httpsnoop v1.0.4 // indirect
6061
github.com/go-logr/logr v1.4.2 // indirect
6162
github.com/go-logr/stdr v1.2.2 // indirect

go.sum

+1
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,7 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
10071007
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
10081008
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
10091009
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
1010+
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
10101011
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
10111012
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZMPRZwes7CROmyNKgQzC3XPs6L/G2EJLHddWejkmf4=
10121013
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=

pkg/common/constants.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ package common
1818

1919
const (
2020
// Keys for Topology. This key will be shared amongst drivers from GCP
21-
TopologyKeyZone = "topology.gke.io/zone"
21+
TopologyKeyPrefix = "topology.gke.io"
22+
TopologyKeyZone = TopologyKeyPrefix + "/zone"
2223

2324
// VolumeAttributes for Partition
2425
VolumeAttributePartition = "partition"

pkg/common/utils.go

+8
Original file line numberDiff line numberDiff line change
@@ -749,3 +749,11 @@ func ShortString(s string) string {
749749
}
750750
return string(short)
751751
}
752+
753+
func IsGKETopologyLabel(key string) bool {
754+
return strings.HasPrefix(key, TopologyKeyPrefix)
755+
}
756+
757+
func TopologyLabelKey(diskType string) string {
758+
return fmt.Sprintf("%s/%s", TopologyKeyPrefix, diskType)
759+
}

pkg/gce-cloud-provider/metadata/fake.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ type fakeServiceManager struct{}
2020

2121
var _ MetadataService = &fakeServiceManager{}
2222

23-
const (
24-
FakeZone = "country-region-zone"
25-
FakeProject = "test-project"
23+
var (
24+
FakeMachineType = "n1-standard-1"
25+
FakeZone = "country-region-zone"
26+
FakeProject = "test-project"
27+
FakeName = "test-name"
2628
)
2729

28-
var FakeMachineType = "n1-standard-1"
29-
3030
func NewFakeService() MetadataService {
3131
return &fakeServiceManager{}
3232
}
@@ -40,7 +40,7 @@ func (manager *fakeServiceManager) GetProject() string {
4040
}
4141

4242
func (manager *fakeServiceManager) GetName() string {
43-
return "test-name"
43+
return FakeName
4444
}
4545

4646
func (manager *fakeServiceManager) GetMachineType() string {
@@ -50,3 +50,11 @@ func (manager *fakeServiceManager) GetMachineType() string {
5050
func SetMachineType(s string) {
5151
FakeMachineType = s
5252
}
53+
54+
func SetZone(s string) {
55+
FakeZone = s
56+
}
57+
58+
func SetName(s string) {
59+
FakeName = s
60+
}

pkg/gce-pd-csi-driver/controller.go

+27-10
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ type GCEControllerServer struct {
120120
// Embed UnimplementedControllerServer to ensure the driver returns Unimplemented for any
121121
// new RPC methods that might be introduced in future versions of the spec.
122122
csi.UnimplementedControllerServer
123+
124+
EnableDiskTopology bool
125+
}
126+
127+
type GCEControllerServerArgs struct {
128+
EnableDiskTopology bool
123129
}
124130

125131
type MultiZoneVolumeHandleConfig struct {
@@ -320,7 +326,7 @@ func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req
320326
if len(req.GetName()) == 0 {
321327
return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
322328
}
323-
if volumeCapabilities == nil || len(volumeCapabilities) == 0 {
329+
if len(volumeCapabilities) == 0 {
324330
return nil, status.Error(codes.InvalidArgument, "CreateVolume Volume capabilities must be provided")
325331
}
326332

@@ -509,7 +515,7 @@ func (gceCS *GCEControllerServer) createMultiZoneDisk(ctx context.Context, req *
509515
// Use the first response as a template
510516
volumeId := fmt.Sprintf("projects/%s/zones/%s/disks/%s", gceCS.CloudProvider.GetDefaultProject(), common.MultiZoneValue, req.GetName())
511517
klog.V(4).Infof("CreateVolume succeeded for multi-zone disks in zones %s: %v", zones, multiZoneVolKey)
512-
return generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil
518+
return gceCS.generateCreateVolumeResponseWithVolumeId(createdDisks[0], zones, params, dataCacheParams, enableDataCache, volumeId), nil
513519
}
514520

515521
func (gceCS *GCEControllerServer) getZonesWithDiskNameAndType(ctx context.Context, name string, diskType string) ([]string, error) {
@@ -603,7 +609,7 @@ func (gceCS *GCEControllerServer) createSingleDeviceDisk(ctx context.Context, re
603609
return nil, common.LoggedError("CreateVolume failed: %v", err)
604610
}
605611

606-
return generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), err
612+
return gceCS.generateCreateVolumeResponseWithVolumeId(disk, zones, params, dataCacheParams, enableDataCache, volumeID), err
607613
}
608614

609615
func (gceCS *GCEControllerServer) createSingleDisk(ctx context.Context, req *csi.CreateVolumeRequest, params common.DiskParameters, volKey *meta.Key, zones []string) (*gce.CloudDisk, error) {
@@ -2278,9 +2284,11 @@ func getZonesFromTopology(topList []*csi.Topology) ([]string, error) {
22782284
func getZoneFromSegment(seg map[string]string) (string, error) {
22792285
var zone string
22802286
for k, v := range seg {
2281-
switch k {
2282-
case common.TopologyKeyZone:
2287+
switch {
2288+
case k == common.TopologyKeyZone:
22832289
zone = v
2290+
case common.IsGKETopologyLabel(k):
2291+
continue
22842292
default:
22852293
return "", fmt.Errorf("topology segment has unknown key %v", k)
22862294
}
@@ -2370,21 +2378,30 @@ func extractVolumeContext(context map[string]string) (*PDCSIContext, error) {
23702378
case contextForceAttach:
23712379
b, err := common.ConvertStringToBool(val)
23722380
if err != nil {
2373-
return nil, fmt.Errorf("Bad volume context force attach: %v", err)
2381+
return nil, fmt.Errorf("bad volume context force attach: %w", err)
23742382
}
23752383
info.ForceAttach = b
23762384
}
23772385
}
23782386
return info, nil
23792387
}
23802388

2381-
func generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse {
2389+
func (gceCS *GCEControllerServer) generateCreateVolumeResponseWithVolumeId(disk *gce.CloudDisk, zones []string, params common.DiskParameters, dataCacheParams common.DataCacheParameters, enableDataCache bool, volumeId string) *csi.CreateVolumeResponse {
23822390
tops := []*csi.Topology{}
23832391
for _, zone := range zones {
2384-
tops = append(tops, &csi.Topology{
2385-
Segments: map[string]string{common.TopologyKeyZone: zone},
2386-
})
2392+
top := &csi.Topology{
2393+
Segments: map[string]string{
2394+
common.TopologyKeyZone: zone,
2395+
},
2396+
}
2397+
2398+
if gceCS.EnableDiskTopology {
2399+
top.Segments[common.TopologyLabelKey(params.DiskType)] = "true"
2400+
}
2401+
2402+
tops = append(tops, top)
23872403
}
2404+
23882405
realDiskSizeBytes := common.GbToBytes(disk.GetSizeGb())
23892406
createResp := &csi.CreateVolumeResponse{
23902407
Volume: &csi.Volume{

0 commit comments

Comments
 (0)