Skip to content

Commit 03673b8

Browse files
committed
Add support for custom WrapTransport for Kubernetes Client
1 parent b1c2a5d commit 03673b8

12 files changed

+54
-40
lines changed

authz/authz.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88
"github.com/rancher/rke/templates"
99
)
1010

11-
func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string) error {
11+
func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
1212
log.Infof(ctx, "[authz] Creating rke-job-deployer ServiceAccount")
13-
k8sClient, err := k8s.NewClient(kubeConfigPath)
13+
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
1414
if err != nil {
1515
return err
1616
}
@@ -24,9 +24,9 @@ func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string)
2424
return nil
2525
}
2626

27-
func ApplySystemNodeClusterRoleBinding(ctx context.Context, kubeConfigPath string) error {
27+
func ApplySystemNodeClusterRoleBinding(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
2828
log.Infof(ctx, "[authz] Creating system:node ClusterRoleBinding")
29-
k8sClient, err := k8s.NewClient(kubeConfigPath)
29+
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
3030
if err != nil {
3131
return err
3232
}

authz/psp.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88
"github.com/rancher/rke/templates"
99
)
1010

11-
func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string) error {
11+
func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
1212
log.Infof(ctx, "[authz] Applying default PodSecurityPolicy")
13-
k8sClient, err := k8s.NewClient(kubeConfigPath)
13+
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
1414
if err != nil {
1515
return err
1616
}
@@ -21,9 +21,9 @@ func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string) e
2121
return nil
2222
}
2323

24-
func ApplyDefaultPodSecurityPolicyRole(ctx context.Context, kubeConfigPath string) error {
24+
func ApplyDefaultPodSecurityPolicyRole(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
2525
log.Infof(ctx, "[authz] Applying default PodSecurityPolicy Role and RoleBinding")
26-
k8sClient, err := k8s.NewClient(kubeConfigPath)
26+
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
2727
if err != nil {
2828
return err
2929
}

cluster/addons.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (c *Cluster) doAddonDeploy(ctx context.Context, addonYaml, resourceName str
8888

8989
func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, addonName string) error {
9090
log.Infof(ctx, "[addons] Saving addon ConfigMap to Kubernetes")
91-
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath)
91+
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
9292
if err != nil {
9393
return err
9494
}
@@ -116,7 +116,7 @@ func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, add
116116

117117
func (c *Cluster) ApplySystemAddonExcuteJob(addonJob string) error {
118118

119-
if err := k8s.ApplyK8sSystemJob(addonJob, c.LocalKubeConfigPath); err != nil {
119+
if err := k8s.ApplyK8sSystemJob(addonJob, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
120120
fmt.Println(err)
121121
return err
122122
}

cluster/cluster.go

+14-11
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type Cluster struct {
3838
DockerDialerFactory hosts.DialerFactory
3939
LocalConnDialerFactory hosts.DialerFactory
4040
PrivateRegistriesMap map[string]v3.PrivateRegistry
41+
K8sWrapTransport k8s.WrapTransport
4142
}
4243

4344
const (
@@ -122,14 +123,16 @@ func ParseCluster(
122123
rkeConfig *v3.RancherKubernetesEngineConfig,
123124
clusterFilePath, configDir string,
124125
dockerDialerFactory,
125-
localConnDialerFactory hosts.DialerFactory) (*Cluster, error) {
126+
localConnDialerFactory hosts.DialerFactory,
127+
k8sWrapTransport k8s.WrapTransport) (*Cluster, error) {
126128
var err error
127129
c := &Cluster{
128130
RancherKubernetesEngineConfig: *rkeConfig,
129131
ConfigPath: clusterFilePath,
130132
DockerDialerFactory: dockerDialerFactory,
131133
LocalConnDialerFactory: localConnDialerFactory,
132134
PrivateRegistriesMap: make(map[string]v3.PrivateRegistry),
135+
K8sWrapTransport: k8sWrapTransport,
133136
}
134137
// Setting cluster Defaults
135138
c.setClusterDefaults(ctx)
@@ -187,7 +190,7 @@ func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error {
187190
return fmt.Errorf("Failed to redeploy local admin config with new host")
188191
}
189192
workingConfig = newConfig
190-
if _, err := GetK8sVersion(kubeCluster.LocalKubeConfigPath); err == nil {
193+
if _, err := GetK8sVersion(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport); err == nil {
191194
log.Infof(ctx, "[reconcile] host [%s] is active master on the cluster", cpHost.Address)
192195
break
193196
}
@@ -197,8 +200,8 @@ func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error {
197200
return nil
198201
}
199202

200-
func isLocalConfigWorking(ctx context.Context, localKubeConfigPath string) bool {
201-
if _, err := GetK8sVersion(localKubeConfigPath); err != nil {
203+
func isLocalConfigWorking(ctx context.Context, localKubeConfigPath string, k8sWrapTransport k8s.WrapTransport) bool {
204+
if _, err := GetK8sVersion(localKubeConfigPath, k8sWrapTransport); err != nil {
202205
log.Infof(ctx, "[reconcile] Local config is not vaild, rebuilding admin config")
203206
return false
204207
}
@@ -230,22 +233,22 @@ func getLocalAdminConfigWithNewAddress(localConfigPath, cpAddress string) string
230233
}
231234

232235
func (c *Cluster) ApplyAuthzResources(ctx context.Context) error {
233-
if err := authz.ApplyJobDeployerServiceAccount(ctx, c.LocalKubeConfigPath); err != nil {
236+
if err := authz.ApplyJobDeployerServiceAccount(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
234237
return fmt.Errorf("Failed to apply the ServiceAccount needed for job execution: %v", err)
235238
}
236239
if c.Authorization.Mode == NoneAuthorizationMode {
237240
return nil
238241
}
239242
if c.Authorization.Mode == services.RBACAuthorizationMode {
240-
if err := authz.ApplySystemNodeClusterRoleBinding(ctx, c.LocalKubeConfigPath); err != nil {
243+
if err := authz.ApplySystemNodeClusterRoleBinding(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
241244
return fmt.Errorf("Failed to apply the ClusterRoleBinding needed for node authorization: %v", err)
242245
}
243246
}
244247
if c.Authorization.Mode == services.RBACAuthorizationMode && c.Services.KubeAPI.PodSecurityPolicy {
245-
if err := authz.ApplyDefaultPodSecurityPolicy(ctx, c.LocalKubeConfigPath); err != nil {
248+
if err := authz.ApplyDefaultPodSecurityPolicy(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
246249
return fmt.Errorf("Failed to apply default PodSecurityPolicy: %v", err)
247250
}
248-
if err := authz.ApplyDefaultPodSecurityPolicyRole(ctx, c.LocalKubeConfigPath); err != nil {
251+
if err := authz.ApplyDefaultPodSecurityPolicyRole(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
249252
return fmt.Errorf("Failed to apply default PodSecurityPolicy ClusterRole and ClusterRoleBinding: %v", err)
250253
}
251254
}
@@ -262,7 +265,7 @@ func (c *Cluster) deployAddons(ctx context.Context) error {
262265
func (c *Cluster) SyncLabelsAndTaints(ctx context.Context) error {
263266
if len(c.ControlPlaneHosts) > 0 {
264267
log.Infof(ctx, "[sync] Syncing nodes Labels and Taints")
265-
k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath)
268+
k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
266269
if err != nil {
267270
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
268271
}
@@ -297,9 +300,9 @@ func (c *Cluster) PrePullK8sImages(ctx context.Context) error {
297300
return nil
298301
}
299302

300-
func ConfigureCluster(ctx context.Context, rkeConfig v3.RancherKubernetesEngineConfig, crtBundle map[string]pki.CertificatePKI, clusterFilePath, configDir string) error {
303+
func ConfigureCluster(ctx context.Context, rkeConfig v3.RancherKubernetesEngineConfig, crtBundle map[string]pki.CertificatePKI, clusterFilePath, configDir string, k8sWrapTransport k8s.WrapTransport) error {
301304
// dialer factories are not needed here since we are not uses docker only k8s jobs
302-
kubeCluster, err := ParseCluster(ctx, &rkeConfig, clusterFilePath, configDir, nil, nil)
305+
kubeCluster, err := ParseCluster(ctx, &rkeConfig, clusterFilePath, configDir, nil, nil, k8sWrapTransport)
303306
if err != nil {
304307
return err
305308
}

cluster/plan.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const (
1818

1919
func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) (v3.RKEPlan, error) {
2020
clusterPlan := v3.RKEPlan{}
21-
myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil)
21+
myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil, nil)
2222
// rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags.
2323
uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts)
2424
for _, host := range uniqHosts {

cluster/reconcile.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster)
2727
return nil
2828
}
2929

30-
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath)
30+
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
3131
if err != nil {
3232
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
3333
}
@@ -90,7 +90,7 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster,
9090
}
9191

9292
for _, toDeleteHost := range cpToDelete {
93-
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath)
93+
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
9494
if err != nil {
9595
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
9696
}

cluster/state.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func (c *Cluster) SaveClusterState(ctx context.Context, rkeConfig *v3.RancherKub
1919
if len(c.ControlPlaneHosts) > 0 {
2020
// Reinitialize kubernetes Client
2121
var err error
22-
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath)
22+
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
2323
if err != nil {
2424
return fmt.Errorf("Failed to re-initialize Kubernetes Client: %v", err)
2525
}
@@ -44,14 +44,14 @@ func (c *Cluster) GetClusterState(ctx context.Context) (*Cluster, error) {
4444
log.Infof(ctx, "[state] Found local kube config file, trying to get state from cluster")
4545

4646
// to handle if current local admin is down and we need to use new cp from the list
47-
if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath) {
47+
if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport) {
4848
if err := rebuildLocalAdminConfig(ctx, c); err != nil {
4949
return nil, err
5050
}
5151
}
5252

5353
// initiate kubernetes client
54-
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath)
54+
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
5555
if err != nil {
5656
log.Warnf(ctx, "Failed to initiate new Kubernetes Client: %v", err)
5757
return nil, nil
@@ -140,9 +140,9 @@ func getStateFromKubernetes(ctx context.Context, kubeClient *kubernetes.Clientse
140140
}
141141
}
142142

143-
func GetK8sVersion(localConfigPath string) (string, error) {
143+
func GetK8sVersion(localConfigPath string, k8sWrapTransport k8s.WrapTransport) (string, error) {
144144
logrus.Debugf("[version] Using %s to connect to Kubernetes cluster..", localConfigPath)
145-
k8sClient, err := k8s.NewClient(localConfigPath)
145+
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
146146
if err != nil {
147147
return "", fmt.Errorf("Failed to create Kubernetes Client: %v", err)
148148
}

cmd/remove.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/rancher/rke/cluster"
1111
"github.com/rancher/rke/hosts"
12+
"github.com/rancher/rke/k8s"
1213
"github.com/rancher/rke/log"
1314
"github.com/rancher/rke/pki"
1415
"github.com/rancher/types/apis/management.cattle.io/v3"
@@ -45,10 +46,11 @@ func ClusterRemove(
4546
ctx context.Context,
4647
rkeConfig *v3.RancherKubernetesEngineConfig,
4748
dialerFactory hosts.DialerFactory,
49+
k8sWrapTransport k8s.WrapTransport,
4850
local bool, configDir string) error {
4951

5052
log.Infof(ctx, "Tearing down Kubernetes cluster")
51-
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dialerFactory, nil)
53+
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dialerFactory, nil, k8sWrapTransport)
5254
if err != nil {
5355
return err
5456
}
@@ -94,7 +96,7 @@ func clusterRemoveFromCli(ctx *cli.Context) error {
9496
if err != nil {
9597
return fmt.Errorf("Failed to parse cluster file: %v", err)
9698
}
97-
return ClusterRemove(context.Background(), rkeConfig, nil, false, "")
99+
return ClusterRemove(context.Background(), rkeConfig, nil, nil, false, "")
98100
}
99101

100102
func clusterRemoveLocal(ctx *cli.Context) error {
@@ -111,5 +113,5 @@ func clusterRemoveLocal(ctx *cli.Context) error {
111113
}
112114
rkeConfig.Nodes = []v3.RKEConfigNode{*cluster.GetLocalRKENodeConfig()}
113115
}
114-
return ClusterRemove(context.Background(), rkeConfig, nil, true, "")
116+
return ClusterRemove(context.Background(), rkeConfig, nil, nil, true, "")
115117
}

cmd/up.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/rancher/rke/cluster"
88
"github.com/rancher/rke/hosts"
9+
"github.com/rancher/rke/k8s"
910
"github.com/rancher/rke/log"
1011
"github.com/rancher/rke/pki"
1112
"github.com/rancher/types/apis/management.cattle.io/v3"
@@ -40,11 +41,12 @@ func ClusterUp(
4041
ctx context.Context,
4142
rkeConfig *v3.RancherKubernetesEngineConfig,
4243
dockerDialerFactory, localConnDialerFactory hosts.DialerFactory,
44+
k8sWrapTransport k8s.WrapTransport,
4345
local bool, configDir string) (string, string, string, string, error) {
4446

4547
log.Infof(ctx, "Building Kubernetes cluster")
4648
var APIURL, caCrt, clientCert, clientKey string
47-
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, localConnDialerFactory)
49+
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport)
4850
if err != nil {
4951
return APIURL, caCrt, clientCert, clientKey, err
5052
}
@@ -102,7 +104,7 @@ func ClusterUp(
102104
return APIURL, caCrt, clientCert, clientKey, err
103105
}
104106

105-
err = cluster.ConfigureCluster(ctx, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.Certificates, clusterFilePath, configDir)
107+
err = cluster.ConfigureCluster(ctx, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.Certificates, clusterFilePath, configDir, k8sWrapTransport)
106108
if err != nil {
107109
return APIURL, caCrt, clientCert, clientKey, err
108110
}
@@ -131,7 +133,7 @@ func clusterUpFromCli(ctx *cli.Context) error {
131133
if err != nil {
132134
return fmt.Errorf("Failed to parse cluster file: %v", err)
133135
}
134-
_, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, false, "")
136+
_, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, nil, false, "")
135137
return err
136138
}
137139

@@ -149,6 +151,6 @@ func clusterUpLocal(ctx *cli.Context) error {
149151
}
150152
rkeConfig.Nodes = []v3.RKEConfigNode{*cluster.GetLocalRKENodeConfig()}
151153
}
152-
_, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, hosts.LocalHealthcheckFactory, true, "")
154+
_, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, hosts.LocalHealthcheckFactory, nil, true, "")
153155
return err
154156
}

cmd/version.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ func VersionCommand() cli.Command {
2727

2828
func getClusterVersion(ctx *cli.Context) error {
2929
localKubeConfig := pki.GetLocalKubeConfig(ctx.String("config"), "")
30-
serverVersion, err := cluster.GetK8sVersion(localKubeConfig)
30+
// not going to use a k8s dialer here.. this is a CLI command
31+
serverVersion, err := cluster.GetK8sVersion(localKubeConfig, nil)
3132
if err != nil {
3233
return err
3334
}

k8s/job.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ import (
1212
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1313
)
1414

15-
func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error {
15+
func ApplyK8sSystemJob(jobYaml, kubeConfigPath string, k8sWrapTransport WrapTransport) error {
1616
job := v1.Job{}
1717
if err := decodeYamlResource(&job, jobYaml); err != nil {
1818
return err
1919
}
2020
if job.Namespace == metav1.NamespaceNone {
2121
job.Namespace = metav1.NamespaceSystem
2222
}
23-
k8sClient, err := NewClient(kubeConfigPath)
23+
k8sClient, err := NewClient(kubeConfigPath, k8sWrapTransport)
2424
if err != nil {
2525
return err
2626
}

k8s/k8s.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package k8s
22

33
import (
44
"bytes"
5+
"net/http"
56
"time"
67

78
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
@@ -16,12 +17,17 @@ const (
1617

1718
type k8sCall func(*kubernetes.Clientset, interface{}) error
1819

19-
func NewClient(kubeConfigPath string) (*kubernetes.Clientset, error) {
20+
type WrapTransport func(rt http.RoundTripper) http.RoundTripper
21+
22+
func NewClient(kubeConfigPath string, k8sWrapTransport WrapTransport) (*kubernetes.Clientset, error) {
2023
// use the current admin kubeconfig
2124
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
2225
if err != nil {
2326
return nil, err
2427
}
28+
if k8sWrapTransport != nil {
29+
config.WrapTransport = k8sWrapTransport
30+
}
2531
K8sClientSet, err := kubernetes.NewForConfig(config)
2632
if err != nil {
2733
return nil, err

0 commit comments

Comments
 (0)