diff --git a/go.mod b/go.mod index 3f7e55b7..cee2e4c6 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.0 require ( github.com/hashicorp/go-multierror v1.1.1 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.21.1 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.10.0 k8s.io/api v0.32.3 @@ -13,6 +14,8 @@ require ( ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.12.2 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect @@ -29,11 +32,15 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/mailru/easyjson v0.9.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.62.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/x448/float16 v0.8.4 // indirect golang.org/x/net v0.37.0 // indirect diff --git a/go.sum b/go.sum index d36e954e..9927f179 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -43,10 +47,14 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -65,6 +73,14 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.21.1 h1:DOvXXTqVzvkIewV/CDPFdejpMCGeMcbGCQ8YOmu+Ibk= +github.com/prometheus/client_golang v1.21.1/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= diff --git a/main.go b/main.go index 136dc8b2..d80a3d08 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,9 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" ) var f flags @@ -88,32 +91,35 @@ func main() { client = kubernetes.NewForConfigOrDie(config) + reg := prometheus.NewRegistry() + metrics := common.NewMetrics(reg) + if f.ReplicateSecrets { - secretRepl := secret.NewReplicator(client, f.ResyncPeriod, f.AllowAll, f.SyncByContent) + secretRepl := secret.NewReplicator(client, f.ResyncPeriod, f.AllowAll, f.SyncByContent, metrics) go secretRepl.Run() enabledReplicators = append(enabledReplicators, secretRepl) } if f.ReplicateConfigMaps { - configMapRepl := configmap.NewReplicator(client, f.ResyncPeriod, f.AllowAll, f.SyncByContent) + configMapRepl := configmap.NewReplicator(client, f.ResyncPeriod, f.AllowAll, f.SyncByContent, metrics) go configMapRepl.Run() enabledReplicators = append(enabledReplicators, configMapRepl) } if f.ReplicateRoles { - roleRepl := role.NewReplicator(client, f.ResyncPeriod, f.AllowAll) + roleRepl := role.NewReplicator(client, f.ResyncPeriod, f.AllowAll, metrics) go roleRepl.Run() enabledReplicators = append(enabledReplicators, roleRepl) } if f.ReplicateRoleBindings { - roleBindingRepl := rolebinding.NewReplicator(client, f.ResyncPeriod, f.AllowAll) + roleBindingRepl := rolebinding.NewReplicator(client, f.ResyncPeriod, f.AllowAll, metrics) go roleBindingRepl.Run() enabledReplicators = append(enabledReplicators, roleBindingRepl) } if f.ReplicateServiceAccounts { - serviceAccountRepl := serviceaccount.NewReplicator(client, f.ResyncPeriod, f.AllowAll) + serviceAccountRepl := serviceaccount.NewReplicator(client, f.ResyncPeriod, f.AllowAll, metrics) go serviceAccountRepl.Run() enabledReplicators = append(enabledReplicators, serviceAccountRepl) } @@ -126,6 +132,7 @@ func main() { http.Handle("/healthz", &h) http.Handle("/readyz", &h) + http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) err = http.ListenAndServe(f.StatusAddr, nil) if err != nil { log.Fatal(err) diff --git a/replicate/common/generic-replicator.go b/replicate/common/generic-replicator.go index eb5305a6..4b087f40 100644 --- a/replicate/common/generic-replicator.go +++ b/replicate/common/generic-replicator.go @@ -31,6 +31,7 @@ type ReplicatorConfig struct { ListFunc cache.ListFunc WatchFunc cache.WatchFunc ObjType runtime.Object + Metrics *ReplicatorMetrics } type UpdateFuncs struct { diff --git a/replicate/common/metrics.go b/replicate/common/metrics.go new file mode 100644 index 00000000..364f3ec9 --- /dev/null +++ b/replicate/common/metrics.go @@ -0,0 +1,44 @@ +package common + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +type ReplicatorMetrics struct { + Kind string + OperationCounter *prometheus.CounterVec +} + +type Operation string + +const ( + Update Operation = "Update" + Patch Operation = "Patch" + Create Operation = "Create" + Delete Operation = "Delete" +) + +func NewMetrics(reg prometheus.Registerer) *ReplicatorMetrics { + m := &ReplicatorMetrics{ + OperationCounter: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "kubernetes_replicator", + Subsystem: "kube_api", + Name: "operation_count", + Help: "Counter for each operation to change a resource", + }, + []string{"kind", "namespace", "name", "operation"}, + ), + } + reg.MustRegister(m.OperationCounter) + return m +} + +func (self ReplicatorMetrics) WithKind(kind string) *ReplicatorMetrics { + self.Kind = kind + return &self +} + +func (self *ReplicatorMetrics) OperationCounterInc(namespace string, name string, operation Operation) { + self.OperationCounter.With(prometheus.Labels{"kind": self.Kind, "namespace": namespace, "name": name, "operation": string(operation)}).Inc() +} diff --git a/replicate/configmap/configmaps.go b/replicate/configmap/configmaps.go index a0b00db3..d72d32ce 100644 --- a/replicate/configmap/configmaps.go +++ b/replicate/configmap/configmaps.go @@ -26,7 +26,7 @@ type Replicator struct { } // NewReplicator creates a new config map replicator -func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll, syncByContent bool) common.Replicator { +func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll, syncByContent bool, metrics *common.ReplicatorMetrics) common.Replicator { repl := Replicator{ GenericReplicator: common.NewGenericReplicator(common.ReplicatorConfig{ Kind: "ConfigMap", @@ -41,6 +41,7 @@ func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allo WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { return client.CoreV1().ConfigMaps("").Watch(context.TODO(), lo) }, + Metrics: metrics.WithKind("ConfigMap"), }), } repl.UpdateFuncs = common.UpdateFuncs{ @@ -140,6 +141,7 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac targetCopy.Annotations[common.ReplicatedFromVersionAnnotation] = source.ResourceVersion targetCopy.Annotations[common.ReplicatedKeysAnnotation] = strings.Join(replicatedKeys, ",") + r.Metrics.OperationCounterInc(target.Namespace, targetCopy.Name, common.Update) s, err := r.Client.CoreV1().ConfigMaps(target.Namespace).Update(context.TODO(), targetCopy, metav1.UpdateOptions{}) if err != nil { err = errors.Wrapf(err, "Failed updating target %s/%s", target.Namespace, targetCopy.Name) @@ -243,9 +245,11 @@ func (r *Replicator) ReplicateObjectTo(sourceObj interface{}, target *v1.Namespa var obj interface{} if exists { logger.Debugf("Updating existing secret %s/%s", target.Name, resourceCopy.Name) + r.Metrics.OperationCounterInc(target.Name, resourceCopy.Name, common.Update) obj, err = r.Client.CoreV1().ConfigMaps(target.Name).Update(context.TODO(), resourceCopy, metav1.UpdateOptions{}) } else { logger.Debugf("Creating a new secret secret %s/%s", target.Name, resourceCopy.Name) + r.Metrics.OperationCounterInc(target.Name, resourceCopy.Name, common.Create) obj, err = r.Client.CoreV1().ConfigMaps(target.Name).Create(context.TODO(), resourceCopy, metav1.CreateOptions{}) } if err != nil { @@ -284,6 +288,7 @@ func (r *Replicator) PatchDeleteDependent(sourceKey string, target interface{}) logger.Debugf("clearing dependent config map %s", dependentKey) logger.Tracef("patch body: %s", string(patchBody)) + r.Metrics.OperationCounterInc(targetObject.Namespace, targetObject.Name, common.Patch) s, err := r.Client.CoreV1().ConfigMaps(targetObject.Namespace).Patch(context.TODO(), targetObject.Name, types.JSONPatchType, patchBody, metav1.PatchOptions{}) if err != nil { return nil, errors.Wrapf(err, "error while patching secret %s: %v", dependentKey, err) @@ -309,6 +314,7 @@ func (r *Replicator) DeleteReplicatedResource(targetResource interface{}) error if strings.Join(resourceKeys, ",") == object.Annotations[common.ReplicatedKeysAnnotation] { logger.Debugf("Deleting %s", targetLocation) + r.Metrics.OperationCounterInc(object.Namespace, object.Name, common.Delete) if err := r.Client.CoreV1().ConfigMaps(object.Namespace).Delete(context.TODO(), object.Name, metav1.DeleteOptions{}); err != nil { return errors.Wrapf(err, "Failed deleting %s: %v", targetLocation, err) } @@ -330,6 +336,7 @@ func (r *Replicator) DeleteReplicatedResource(targetResource interface{}) error return errors.Wrapf(err, "error while building patch body for confimap %s: %v", object, err) } + r.Metrics.OperationCounterInc(object.Namespace, object.Name, common.Patch) s, err := r.Client.CoreV1().ConfigMaps(object.Namespace).Patch(context.TODO(), object.Name, types.JSONPatchType, patchBody, metav1.PatchOptions{}) if err != nil { return errors.Wrapf(err, "error while patching secret %s: %v", s, err) diff --git a/replicate/role/roles.go b/replicate/role/roles.go index ec28f9e1..e26c7ce6 100644 --- a/replicate/role/roles.go +++ b/replicate/role/roles.go @@ -24,7 +24,7 @@ type Replicator struct { } // NewReplicator creates a new role replicator -func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll bool) common.Replicator { +func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll bool, metrics *common.ReplicatorMetrics) common.Replicator { repl := Replicator{ GenericReplicator: common.NewGenericReplicator(common.ReplicatorConfig{ Kind: "Role", @@ -38,6 +38,7 @@ func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allo WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { return client.RbacV1().Roles("").Watch(context.TODO(), lo) }, + Metrics: metrics.WithKind("Role"), }), } repl.UpdateFuncs = common.UpdateFuncs{ @@ -80,6 +81,7 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac targetCopy.Annotations[common.ReplicatedAtAnnotation] = time.Now().Format(time.RFC3339) targetCopy.Annotations[common.ReplicatedFromVersionAnnotation] = source.ResourceVersion + r.Metrics.OperationCounterInc(target.Namespace, targetCopy.Name, common.Update) s, err := r.Client.RbacV1().Roles(target.Namespace).Update(context.TODO(), targetCopy, metav1.UpdateOptions{}) if err != nil { err = errors.Wrapf(err, "Failed updating target %s/%s", target.Namespace, targetCopy.Name) @@ -154,9 +156,11 @@ func (r *Replicator) ReplicateObjectTo(sourceObj interface{}, target *v1.Namespa var obj interface{} if exists { logger.Debugf("Updating existing role %s/%s", target.Name, targetCopy.Name) + r.Metrics.OperationCounterInc(target.Name, targetCopy.Name, common.Update) obj, err = r.Client.RbacV1().Roles(target.Name).Update(context.TODO(), targetCopy, metav1.UpdateOptions{}) } else { logger.Debugf("Creating a new role %s/%s", target.Name, targetCopy.Name) + r.Metrics.OperationCounterInc(target.Name, targetCopy.Name, common.Create) obj, err = r.Client.RbacV1().Roles(target.Name).Create(context.TODO(), targetCopy, metav1.CreateOptions{}) } if err != nil { @@ -194,6 +198,7 @@ func (r *Replicator) PatchDeleteDependent(sourceKey string, target interface{}) logger.Debugf("clearing dependent role %s", dependentKey) logger.Tracef("patch body: %s", string(patchBody)) + r.Metrics.OperationCounterInc(targetObject.Namespace, targetObject.Name, common.Patch) s, err := r.Client.RbacV1().Roles(targetObject.Namespace).Patch(context.TODO(), targetObject.Name, types.JSONPatchType, patchBody, metav1.PatchOptions{}) if err != nil { return nil, errors.Wrapf(err, "error while patching role %s: %v", dependentKey, err) @@ -211,6 +216,7 @@ func (r *Replicator) DeleteReplicatedResource(targetResource interface{}) error object := targetResource.(*rbacv1.Role) logger.Debugf("Deleting %s", targetLocation) + r.Metrics.OperationCounterInc(object.Namespace, object.Name, common.Delete) if err := r.Client.RbacV1().Roles(object.Namespace).Delete(context.TODO(), object.Name, metav1.DeleteOptions{}); err != nil { return errors.Wrapf(err, "Failed deleting %s: %v", targetLocation, err) } diff --git a/replicate/role/roles_test.go b/replicate/role/roles_test.go index 176b530a..52505522 100644 --- a/replicate/role/roles_test.go +++ b/replicate/role/roles_test.go @@ -4,8 +4,6 @@ import ( "bytes" "context" "fmt" - rbacv1 "k8s.io/api/rbac/v1" - "k8s.io/apimachinery/pkg/types" "os" "path/filepath" "strings" @@ -13,8 +11,12 @@ import ( "testing" "time" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/types" + "github.com/mittwald/kubernetes-replicator/replicate/common" pkgerrors "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" @@ -79,7 +81,10 @@ func TestRoleReplicator(t *testing.T) { prefix := namespacePrefix() client := kubernetes.NewForConfigOrDie(config) - repl := NewReplicator(client, 60*time.Second, false) + reg := prometheus.NewRegistry() + metrics := common.NewMetrics(reg) + + repl := NewReplicator(client, 60*time.Second, false, metrics) go repl.Run() time.Sleep(200 * time.Millisecond) diff --git a/replicate/rolebinding/rolebindings.go b/replicate/rolebinding/rolebindings.go index f895863d..24a50ce0 100644 --- a/replicate/rolebinding/rolebindings.go +++ b/replicate/rolebinding/rolebindings.go @@ -26,7 +26,7 @@ type Replicator struct { const sleepTime = 100 * time.Millisecond // NewReplicator creates a new secret replicator -func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll bool) common.Replicator { +func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll bool, metrics *common.ReplicatorMetrics) common.Replicator { repl := Replicator{ GenericReplicator: common.NewGenericReplicator(common.ReplicatorConfig{ Kind: "RoleBinding", @@ -40,6 +40,7 @@ func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allo WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { return client.RbacV1().RoleBindings("").Watch(context.TODO(), lo) }, + Metrics: metrics.WithKind("RoleBinding"), }), } repl.UpdateFuncs = common.UpdateFuncs{ @@ -82,6 +83,7 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac targetCopy.Annotations[common.ReplicatedAtAnnotation] = time.Now().Format(time.RFC3339) targetCopy.Annotations[common.ReplicatedFromVersionAnnotation] = source.ResourceVersion + r.Metrics.OperationCounterInc(target.Namespace, targetCopy.Name, common.Update) s, err := r.Client.RbacV1().RoleBindings(target.Namespace).Update(context.TODO(), targetCopy, metav1.UpdateOptions{}) if err != nil { err = errors.Wrapf(err, "Failed updating target %s/%s", target.Namespace, targetCopy.Name) @@ -159,11 +161,13 @@ func (r *Replicator) ReplicateObjectTo(sourceObj interface{}, target *v1.Namespa if exists { if err == nil { logger.Debugf("Updating existing roleBinding %s/%s", target.Name, targetCopy.Name) + r.Metrics.OperationCounterInc(target.Name, targetCopy.Name, common.Update) obj, err = r.Client.RbacV1().RoleBindings(target.Name).Update(context.TODO(), targetCopy, metav1.UpdateOptions{}) } } else { if err == nil { logger.Debugf("Creating a new roleBinding %s/%s", target.Name, targetCopy.Name) + r.Metrics.OperationCounterInc(target.Name, targetCopy.Name, common.Create) obj, err = r.Client.RbacV1().RoleBindings(target.Name).Create(context.TODO(), targetCopy, metav1.CreateOptions{}) } } @@ -178,7 +182,7 @@ func (r *Replicator) ReplicateObjectTo(sourceObj interface{}, target *v1.Namespa return nil } -//Checks if Role required for RoleBinding exists. Retries a few times before returning error to allow replication to catch up +// Checks if Role required for RoleBinding exists. Retries a few times before returning error to allow replication to catch up func (r *Replicator) canReplicate(targetNameSpace string, roleRef string) (err error) { for i := 0; i < 5; i++ { _, err = r.Client.RbacV1().Roles(targetNameSpace).Get(context.TODO(), roleRef, metav1.GetOptions{}) @@ -216,6 +220,7 @@ func (r *Replicator) PatchDeleteDependent(sourceKey string, target interface{}) logger.Debugf("clearing dependent roleBinding %s", dependentKey) logger.Tracef("patch body: %s", string(patchBody)) + r.Metrics.OperationCounterInc(targetObject.Namespace, targetObject.Name, common.Patch) s, err := r.Client.RbacV1().RoleBindings(targetObject.Namespace).Patch(context.TODO(), targetObject.Name, types.JSONPatchType, patchBody, metav1.PatchOptions{}) if err != nil { return nil, errors.Wrapf(err, "error while patching role %s: %v", dependentKey, err) @@ -233,6 +238,7 @@ func (r *Replicator) DeleteReplicatedResource(targetResource interface{}) error object := targetResource.(*rbacv1.RoleBinding) logger.Debugf("Deleting %s", targetLocation) + r.Metrics.OperationCounterInc(object.Namespace, object.Name, common.Delete) if err := r.Client.RbacV1().RoleBindings(object.Namespace).Delete(context.TODO(), object.Name, metav1.DeleteOptions{}); err != nil { return errors.Wrapf(err, "Failed deleting %s: %v", targetLocation, err) } diff --git a/replicate/secret/secrets.go b/replicate/secret/secrets.go index 81812d9a..787dbb3b 100644 --- a/replicate/secret/secrets.go +++ b/replicate/secret/secrets.go @@ -26,7 +26,7 @@ type Replicator struct { } // NewReplicator creates a new secret replicator -func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll, syncByContent bool) common.Replicator { +func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll, syncByContent bool, metrics *common.ReplicatorMetrics) common.Replicator { repl := Replicator{ GenericReplicator: common.NewGenericReplicator(common.ReplicatorConfig{ Kind: "Secret", @@ -41,6 +41,7 @@ func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allo WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { return client.CoreV1().Secrets("").Watch(context.TODO(), lo) }, + Metrics: metrics.WithKind("Secret"), }), } repl.UpdateFuncs = common.UpdateFuncs{ @@ -123,6 +124,7 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac targetCopy.Annotations[common.ReplicatedFromVersionAnnotation] = source.ResourceVersion targetCopy.Annotations[common.ReplicatedKeysAnnotation] = strings.Join(replicatedKeys, ",") + r.Metrics.OperationCounterInc(target.Namespace, targetCopy.Name, common.Update) s, err := r.Client.CoreV1().Secrets(target.Namespace).Update(context.TODO(), targetCopy, metav1.UpdateOptions{}) if err != nil { err = errors.Wrapf(err, "Failed updating target %s/%s", target.Namespace, targetCopy.Name) @@ -203,9 +205,11 @@ func (r *Replicator) ReplicateObjectTo(sourceObj interface{}, target *v1.Namespa var obj interface{} if exists { logger.Debugf("Updating existing secret %s/%s", target.Name, resourceCopy.Name) + r.Metrics.OperationCounterInc(target.Name, resourceCopy.Name, common.Update) obj, err = r.Client.CoreV1().Secrets(target.Name).Update(context.TODO(), resourceCopy, metav1.UpdateOptions{}) } else { logger.Debugf("Creating a new secret secret %s/%s", target.Name, resourceCopy.Name) + r.Metrics.OperationCounterInc(target.Name, resourceCopy.Name, common.Create) obj, err = r.Client.CoreV1().Secrets(target.Name).Create(context.TODO(), resourceCopy, metav1.CreateOptions{}) } if err != nil { @@ -268,6 +272,7 @@ func (r *Replicator) PatchDeleteDependent(sourceKey string, target interface{}) logger.Debugf("clearing dependent %s %s", r.Kind, dependentKey) logger.Tracef("patch body: %s", string(patchBody)) + r.Metrics.OperationCounterInc(targetObject.Namespace, targetObject.Name, common.Patch) s, err := r.Client.CoreV1().Secrets(targetObject.Namespace).Patch(context.TODO(), targetObject.Name, types.JSONPatchType, patchBody, metav1.PatchOptions{}) if err != nil { return nil, errors.Wrapf(err, "error while patching secret %s: %v", dependentKey, err) @@ -287,6 +292,7 @@ func (r *Replicator) DeleteReplicatedResource(targetResource interface{}) error resourceKeys := strings.Join(common.GetKeysFromBinaryMap(object.Data), ",") if resourceKeys == object.Annotations[common.ReplicatedKeysAnnotation] { logger.Debugf("Deleting %s", targetLocation) + r.Metrics.OperationCounterInc(object.Namespace, object.Name, common.Delete) if err := r.Client.CoreV1().Secrets(object.Namespace).Delete(context.TODO(), object.Name, metav1.DeleteOptions{}); err != nil { return errors.Wrapf(err, "Failed deleting %s: %v", targetLocation, err) } @@ -308,6 +314,7 @@ func (r *Replicator) DeleteReplicatedResource(targetResource interface{}) error return errors.Wrapf(err, "error while building patch body for confimap %s: %v", object, err) } + r.Metrics.OperationCounterInc(object.Namespace, object.Name, common.Patch) s, err := r.Client.CoreV1().Secrets(object.Namespace).Patch(context.TODO(), object.Name, types.JSONPatchType, patchBody, metav1.PatchOptions{}) if err != nil { return errors.Wrapf(err, "error while patching secret %s: %v", s, err) diff --git a/replicate/secret/secrets_test.go b/replicate/secret/secrets_test.go index 2b3a0172..ca5214ea 100644 --- a/replicate/secret/secrets_test.go +++ b/replicate/secret/secrets_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "k8s.io/client-go/tools/clientcmd" "os" "path/filepath" "reflect" @@ -13,8 +12,11 @@ import ( "testing" "time" + "k8s.io/client-go/tools/clientcmd" + "github.com/mittwald/kubernetes-replicator/replicate/common" pkgerrors "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" @@ -83,7 +85,10 @@ func TestSecretReplicator(t *testing.T) { client := setupRealClientSet(t) - repl := NewReplicator(client, 60*time.Second, false, false) + reg := prometheus.NewRegistry() + metrics := common.NewMetrics(reg) + + repl := NewReplicator(client, 60*time.Second, false, false, metrics) go repl.Run() time.Sleep(200 * time.Millisecond) @@ -1293,7 +1298,10 @@ func TestSecretReplicatorSyncByContent(t *testing.T) { client := setupRealClientSet(t) ctx := context.TODO() - repl := NewReplicator(client, 60*time.Second, false, true) + reg := prometheus.NewRegistry() + metrics := common.NewMetrics(reg) + + repl := NewReplicator(client, 60*time.Second, false, true, metrics) go repl.Run() time.Sleep(200 * time.Millisecond) diff --git a/replicate/serviceaccount/serviceaccounts.go b/replicate/serviceaccount/serviceaccounts.go index ebe736de..4a3d49ad 100644 --- a/replicate/serviceaccount/serviceaccounts.go +++ b/replicate/serviceaccount/serviceaccounts.go @@ -24,7 +24,7 @@ type Replicator struct { } // NewReplicator creates a new serviceaccount replicator -func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll bool) common.Replicator { +func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allowAll bool, metrics *common.ReplicatorMetrics) common.Replicator { repl := Replicator{ GenericReplicator: common.NewGenericReplicator(common.ReplicatorConfig{ Kind: "ServiceAccount", @@ -38,6 +38,7 @@ func NewReplicator(client kubernetes.Interface, resyncPeriod time.Duration, allo WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { return client.CoreV1().ServiceAccounts("").Watch(context.TODO(), lo) }, + Metrics: metrics.WithKind("ServiceAccount"), }), } repl.UpdateFuncs = common.UpdateFuncs{ @@ -80,6 +81,7 @@ func (r *Replicator) ReplicateDataFrom(sourceObj interface{}, targetObj interfac targetCopy.Annotations[common.ReplicatedAtAnnotation] = time.Now().Format(time.RFC3339) targetCopy.Annotations[common.ReplicatedFromVersionAnnotation] = source.ResourceVersion + r.Metrics.OperationCounterInc(target.Namespace, targetCopy.Name, common.Update) s, err := r.Client.CoreV1().ServiceAccounts(target.Namespace).Update(context.TODO(), targetCopy, metav1.UpdateOptions{}) if err != nil { err = errors.Wrapf(err, "Failed updating target %s/%s", target.Namespace, targetCopy.Name) @@ -154,11 +156,13 @@ func (r *Replicator) ReplicateObjectTo(sourceObj interface{}, target *v1.Namespa if exists { if err == nil { logger.Debugf("Updating existing serviceAccount %s/%s", target.Name, targetCopy.Name) + r.Metrics.OperationCounterInc(target.Name, targetCopy.Name, common.Update) obj, err = r.Client.CoreV1().ServiceAccounts(target.Name).Update(context.TODO(), targetCopy, metav1.UpdateOptions{}) } } else { if err == nil { logger.Debugf("Creating a new serviceAccount %s/%s", target.Name, targetCopy.Name) + r.Metrics.OperationCounterInc(target.Name, targetCopy.Name, common.Create) obj, err = r.Client.CoreV1().ServiceAccounts(target.Name).Create(context.TODO(), targetCopy, metav1.CreateOptions{}) } } @@ -198,6 +202,7 @@ func (r *Replicator) PatchDeleteDependent(sourceKey string, target interface{}) logger.Debugf("clearing dependent serviceAccount %s", dependentKey) logger.Tracef("patch body: %s", string(patchBody)) + r.Metrics.OperationCounterInc(targetObject.Namespace, targetObject.Name, common.Patch) s, err := r.Client.CoreV1().ServiceAccounts(targetObject.Namespace).Patch(context.TODO(), targetObject.Name, types.JSONPatchType, patchBody, metav1.PatchOptions{}) if err != nil { return nil, errors.Wrapf(err, "error while patching serviceAccount %s: %v", dependentKey, err) @@ -215,6 +220,7 @@ func (r *Replicator) DeleteReplicatedResource(targetResource interface{}) error object := targetResource.(*corev1.ServiceAccount) logger.Debugf("Deleting %s", targetLocation) + r.Metrics.OperationCounterInc(object.Namespace, object.Name, common.Delete) if err := r.Client.CoreV1().ServiceAccounts(object.Namespace).Delete(context.TODO(), object.Name, metav1.DeleteOptions{}); err != nil { return errors.Wrapf(err, "Failed deleting %s: %v", targetLocation, err) }