Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce MonitorFilter into access log module #140

Merged
merged 3 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Release Notes.
* Support propagation the excluding namespaces in the access log to the backend.
* Add `pprof` module for observe self.
* Add detect process from `CRI-O` container in Kubernetes.
* Introduce `MonitorFilter` into access log module.

#### Bug Fixes
* Fixed the issue where `conntrack` could not find the Reply IP in the access log module.
Expand Down
46 changes: 5 additions & 41 deletions pkg/accesslog/common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -109,8 +108,7 @@ type ConnectionManager struct {
processMonitorMap *ebpf.Map
activeConnectionMap *ebpf.Map

excludeNamespaces map[string]bool
excludeClusters map[string]bool
monitorFilter MonitorFilter

processors []ConnectionProcessor
processListeners []ProcessListener
Expand Down Expand Up @@ -139,15 +137,7 @@ type ConnectionInfo struct {
PID uint32
}

func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader *bpf.Loader) *ConnectionManager {
excludeNamespaces := make(map[string]bool)
for _, ns := range strings.Split(config.ExcludeNamespaces, ",") {
excludeNamespaces[ns] = true
}
excludeClusters := make(map[string]bool)
for _, cluster := range strings.Split(config.ExcludeClusters, ",") {
excludeClusters[cluster] = true
}
func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader *bpf.Loader, filter MonitorFilter) *ConnectionManager {
mgr := &ConnectionManager{
moduleMgr: moduleMgr,
processOP: moduleMgr.FindModule(process.ModuleName).(process.Operator),
Expand All @@ -159,8 +149,7 @@ func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader *
processMonitorMap: bpfLoader.ProcessMonitorControl,
activeConnectionMap: bpfLoader.ActiveConnectionMap,
allUnfinishedConnections: make(map[string]*bool),
excludeNamespaces: excludeNamespaces,
excludeClusters: excludeClusters,
monitorFilter: filter,
}
return mgr
}
Expand Down Expand Up @@ -229,11 +218,7 @@ func (c *ConnectionManager) OnNewProcessExecuting(pid int32) {
}

func (c *ConnectionManager) GetExcludeNamespaces() []string {
namespaces := make([]string, len(c.excludeNamespaces))
for namespace := range c.excludeNamespaces {
namespaces = append(namespaces, namespace)
}
return namespaces
return c.monitorFilter.ExcludeNamespaces()
}

func (c *ConnectionManager) Find(event events.Event) *ConnectionInfo {
Expand Down Expand Up @@ -365,9 +350,6 @@ func (c *ConnectionManager) buildAddressFromLocalKubernetesProcess(pid uint32, p
for _, pi := range c.monitoringProcesses[int32(pid)] {
if pi.DetectType() == api.Kubernetes {
entity := pi.Entity()
if cluster, _, found := strings.Cut(entity.ServiceName, "::"); found && c.excludeClusters[cluster] {
continue
}
podContainer := pi.DetectProcess().(*kubernetes.Process).PodContainer()
return &v3.ConnectionAddress{
Address: &v3.ConnectionAddress_Kubernetes{
Expand Down Expand Up @@ -528,25 +510,7 @@ func (c *ConnectionManager) printTotalAddressesWithPid(prefix string) {
}

func (c *ConnectionManager) shouldExcludeTheProcess(entities []api.ProcessInterface) bool {
// when the process contains multiple entity, and contains the cluster not exclude, then should not exclude the process
containsNotExcludeCluster := false
for _, entity := range entities {
if entity.DetectType() == api.Kubernetes { // for now, we only have the kubernetes detected process
namespace := entity.DetectProcess().(*kubernetes.Process).PodContainer().Pod.Namespace
if c.excludeNamespaces[namespace] {
return true
}
if cluster, _, found := strings.Cut(entity.Entity().ServiceName, "::"); found {
if !c.excludeClusters[cluster] {
containsNotExcludeCluster = true
}
} else {
containsNotExcludeCluster = true
break
}
}
}
return !containsNotExcludeCluster
return c.monitorFilter.ShouldExclude(entities)
}

func (c *ConnectionManager) RemoveProcess(pid int32, entities []api.ProcessInterface) {
Expand Down
80 changes: 80 additions & 0 deletions pkg/accesslog/common/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package common

import (
"strings"

"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/process/finders/kubernetes"
)

type MonitorFilter interface {
// ShouldExclude returns true if the process should be excluded from monitoring.
ShouldExclude(process []api.ProcessInterface) bool
// ExcludeNamespaces returns a list of namespaces that should be excluded from monitoring.
ExcludeNamespaces() []string
}

type StaticMonitorFilter struct {
namespaces map[string]bool
clusters map[string]bool
originalNamespaces []string
}

func NewStaticMonitorFilter(namespaces, clusters []string) *StaticMonitorFilter {
return &StaticMonitorFilter{
namespaces: convertArrayToMapBool(namespaces),
clusters: convertArrayToMapBool(clusters),
originalNamespaces: namespaces,
}
}

func (s *StaticMonitorFilter) ShouldExclude(processes []api.ProcessInterface) bool {
containsNotExcludeCluster := false
for _, entity := range processes {
if entity.DetectType() != api.Kubernetes { // for now, we only have the kubernetes detected processes
continue
}
namespace := entity.DetectProcess().(*kubernetes.Process).PodContainer().Pod.Namespace
if s.namespaces[namespace] {
return true
}
if cluster, _, found := strings.Cut(entity.Entity().ServiceName, "::"); found {
if !s.clusters[cluster] {
containsNotExcludeCluster = true
}
} else {
containsNotExcludeCluster = true
break
}
}
return !containsNotExcludeCluster
}

func (s *StaticMonitorFilter) ExcludeNamespaces() []string {
return s.originalNamespaces
}

func convertArrayToMapBool(a []string) map[string]bool {
m := make(map[string]bool, len(a))
for _, v := range a {
m[v] = true
}
return m
}
4 changes: 3 additions & 1 deletion pkg/accesslog/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package accesslog
import (
"context"
"fmt"
"strings"
"time"

process2 "github.com/shirou/gopsutil/process"
Expand Down Expand Up @@ -68,11 +69,12 @@ func NewRunner(mgr *module.Manager, config *common.Config) (*Runner, error) {
coreModule := mgr.FindModule(core.ModuleName).(core.Operator)
backendOP := coreModule.BackendOperator()
clusterName := coreModule.ClusterName()
monitorFilter := common.NewStaticMonitorFilter(strings.Split(config.ExcludeNamespaces, ","), strings.Split(config.ExcludeClusters, ","))
runner := &Runner{
context: &common.AccessLogContext{
BPF: bpfLoader,
Config: config,
ConnectionMgr: common.NewConnectionManager(config, mgr, bpfLoader),
ConnectionMgr: common.NewConnectionManager(config, mgr, bpfLoader, monitorFilter),
},
collectors: collector.Collectors(),
mgr: mgr,
Expand Down
Loading