Skip to content

Commit

Permalink
Support monitoring ztunnel to adapt istio ambient mode (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Sep 14, 2024
1 parent 1c099fe commit 2331dbe
Show file tree
Hide file tree
Showing 12 changed files with 369 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Release Notes.
* Add `pprof` module for observe self.
* Add detect process from `CRI-O` container in Kubernetes.
* Introduce `MonitorFilter` into access log module.
* Support monitoring ztunnel to adapt istio ambient mode.

#### Bug Fixes
* Fixed the issue where `conntrack` could not find the Reply IP in the access log module.
Expand Down
5 changes: 4 additions & 1 deletion bpf/accesslog/accesslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ char __license[] SEC("license") = "Dual MIT/GPL";
// tls monitoring
#include "tls/go_tls.c"
#include "tls/node_tls.c"
#include "tls/openssl.c"
#include "tls/openssl.c"

// ambient istio
#include "ambient/ztunnel.c"
52 changes: 52 additions & 0 deletions bpf/accesslog/ambient/ztunnel.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "ztunnel.h"

static __inline bool get_socket_addr_ip_in_ztunnel(bool success, void * arg, __u32 *ip, __u16 *port) {
if (!success) {
return false;
}
__u8 sockaddr[8];
if (bpf_probe_read(&sockaddr, sizeof(sockaddr), (void *)arg) != 0) {
return false;
}
// ip is stored in sockaddr[2], sockaddr[3], sockaddr[4], sockaddr[5]
*ip = ((__u32)sockaddr[2] << 24) | ((__u32)sockaddr[3] << 16) | ((__u32)sockaddr[4] << 8) | (__u32)sockaddr[5];
if (port != NULL) {
// port is stored in sockaddr[6], sockaddr[7](should convert to big-endian)
*port = ((__u16)sockaddr[7] << 8) | sockaddr[6];
}
return true;
}

SEC("uprobe/connection_manager_track_outbound")
int connection_manager_track_outbound(struct pt_regs* ctx) {
struct ztunnel_socket_mapping_t *event = create_ztunnel_socket_mapping_event();
if (event == NULL) {
return 0;
}
bool success = true;
success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM3(ctx), &event->orginal_src_ip, &event->src_port);
success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM4(ctx), &event->original_dst_ip, &event->dst_port);
success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM5(ctx), &event->lb_dst_ip, NULL);
if (!success) {
return 0;
}
bpf_perf_event_output(ctx, &ztunnel_lb_socket_mapping_event_queue, BPF_F_CURRENT_CPU, event, sizeof(*event));
return 0;
}
39 changes: 39 additions & 0 deletions bpf/accesslog/ambient/ztunnel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

struct ztunnel_socket_mapping_t {
__u32 orginal_src_ip; // origin local ip
__u32 original_dst_ip; // origin remote ip(should be service ip)
__u16 src_port; // origin local port
__u16 dst_port; // origin remote port
__u32 lb_dst_ip; // load balanced remote ip(should be real pod ip)
};

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} ztunnel_lb_socket_mapping_event_queue SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct ztunnel_socket_mapping_t);
__uint(max_entries, 1);
} ztunnel_socket_mapping_event_per_cpu_map SEC(".maps");

static __inline struct ztunnel_socket_mapping_t* create_ztunnel_socket_mapping_event() {
__u32 kZero = 0;
return bpf_map_lookup_elem(&ztunnel_socket_mapping_event_per_cpu_map, &kZero);
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
k8s.io/apimachinery v0.23.5
k8s.io/client-go v0.23.5
k8s.io/utils v0.0.0-20211116205334-6203023598ed
skywalking.apache.org/repo/goapi v0.0.0-20240604102541-64f9001abe03
skywalking.apache.org/repo/goapi v0.0.0-20240914024804-703f701836e6
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1062,5 +1062,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
skywalking.apache.org/repo/goapi v0.0.0-20240604102541-64f9001abe03 h1:y0f+3gNmeyC/V5Bt8VE9aL9mmWESj+WvyqJ2tOb7qrk=
skywalking.apache.org/repo/goapi v0.0.0-20240604102541-64f9001abe03/go.mod h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE=
skywalking.apache.org/repo/goapi v0.0.0-20240914024804-703f701836e6 h1:ZGcxRsuAF+Q/IHzNzunHTeYPSCbXcLIjonEFkDlAfPc=
skywalking.apache.org/repo/goapi v0.0.0-20240914024804-703f701836e6/go.mod h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE=
4 changes: 4 additions & 0 deletions pkg/accesslog/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package collector

import (
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
)

var log = logger.GetLogger("accesslog", "collector")

type Collector interface {
Start(mgr *module.Manager, context *common.AccessLogContext) error
Stop()
Expand All @@ -35,5 +38,6 @@ func Collectors() []Collector {
connectCollectInstance,
tlsCollectInstance,
processCollectInstance,
zTunnelCollectInstance,
}
}
200 changes: 200 additions & 0 deletions pkg/accesslog/collector/ztunnel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package collector

import (
"context"
"fmt"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/cache"

"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/tools/elf"
"github.com/apache/skywalking-rover/pkg/tools/host"

v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"

"github.com/shirou/gopsutil/process"
)

var (
// ZTunnelProcessFinderInterval is the interval to find ztunnel process
ZTunnelProcessFinderInterval = time.Second * 30
// ZTunnelTrackBoundSymbolPrefix is the prefix of the symbol name to track outbound connections in ztunnel process
// ztunnel::proxy::connection_manager::ConnectionManager::track_outbound
ZTunnelTrackBoundSymbolPrefix = "_ZN7ztunnel5proxy18connection_manager17ConnectionManager14track_outbound"
)

var zTunnelCollectInstance = NewZTunnelCollector(time.Minute)

// ZTunnelCollector is a collector for ztunnel process in the Ambient Istio scenario
type ZTunnelCollector struct {
ctx context.Context
cancel context.CancelFunc
alc *common.AccessLogContext

collectingProcess *process.Process
ipMappingCache *cache.Expiring
ipMappingExpireDuration time.Duration
}

func NewZTunnelCollector(expireTime time.Duration) *ZTunnelCollector {
return &ZTunnelCollector{
ipMappingCache: cache.NewExpiring(),
ipMappingExpireDuration: expireTime,
}
}

func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx *common.AccessLogContext) error {
z.ctx, z.cancel = context.WithCancel(ctx.RuntimeContext)
z.alc = ctx
ctx.ConnectionMgr.RegisterNewFlushListener(z)

err := z.findZTunnelProcessAndCollect()
if err != nil {
return err
}

ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, func(data interface{}) {
event := data.(*events.ZTunnelSocketMappingEvent)
localIP := z.convertBPFIPToString(event.OriginalSrcIP)
localPort := event.OriginalSrcPort
remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
remotePort := event.OriginalDestPort
lbIP := z.convertBPFIPToString(event.LoadBalancedDestIP)
log.Debugf("received ztunnel lb socket mapping event: %s:%d -> %s:%d, lb: %s", localIP, localPort, remoteIP, remotePort, lbIP)

key := z.buildIPMappingCacheKey(localIP, int(localPort), remoteIP, int(remotePort))
z.ipMappingCache.Set(key, lbIP, z.ipMappingExpireDuration)
}, func() interface{} {
return &events.ZTunnelSocketMappingEvent{}
})
go func() {
ticker := time.NewTicker(ZTunnelProcessFinderInterval)
for {
select {
case <-ticker.C:
err := z.findZTunnelProcessAndCollect()
if err != nil {
log.Error("failed to find and collect ztunnel process: ", err)
}
case <-z.ctx.Done():
ticker.Stop()
return
}
}
}()
return nil
}

func (z *ZTunnelCollector) ReadyToFlushConnection(connection *common.ConnectionInfo, _ events.Event) {
if connection == nil || connection.Socket == nil || connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil {
return
}
key := z.buildIPMappingCacheKey(connection.Socket.SrcIP, int(connection.Socket.SrcPort),
connection.Socket.DestIP, int(connection.Socket.DestPort))
lbIPObj, found := z.ipMappingCache.Get(key)
if !found {
log.Debugf("there no ztunnel mapped IP address found for connection ID: %d, random ID: %d",
connection.ConnectionID, connection.RandomID)
return
}
lbIP := lbIPObj.(string)
log.Debugf("found the ztunnel load balanced IP for the connection: %s, connectionID: %d, randomID: %d", lbIP,
connection.ConnectionID, connection.RandomID)
connection.RPCConnection.Attachment = &v3.ConnectionAttachment{
Environment: &v3.ConnectionAttachment_ZTunnel{
ZTunnel: &v3.ZTunnelAttachmentEnvironment{
RealDestinationIp: lbIP,
By: v3.ZTunnelAttachmentEnvironmentDetectBy_ZTUNNEL_OUTBOUND_FUNC,
},
},
}
}

func (z *ZTunnelCollector) convertBPFIPToString(ip uint32) string {
return fmt.Sprintf("%d.%d.%d.%d", ip>>24, ip>>16&0xff, ip>>8&0xff, ip&0xff)
}

func (z *ZTunnelCollector) buildIPMappingCacheKey(localIP string, localPort int, remoteIP string, remotePort int) string {
return fmt.Sprintf("%s:%d-%s:%d", localIP, localPort, remoteIP, remotePort)
}

func (z *ZTunnelCollector) Stop() {
if z.cancel != nil {
z.cancel()
}
}

func (z *ZTunnelCollector) findZTunnelProcessAndCollect() error {
if z.collectingProcess != nil {
running, err := z.collectingProcess.IsRunning()
if err == nil && running {
// already collecting the process
return nil
}
log.Warnf("detected ztunnel process is not running, should re-scan process to find and collect it")
}

processes, err := process.Processes()
if err != nil {
return err
}
var zTunnelProcess *process.Process
for _, p := range processes {
name, err := p.Exe()
if err != nil {
continue
}
if strings.HasSuffix(name, "/ztunnel") {
zTunnelProcess = p
break
}
}

if zTunnelProcess == nil {
log.Debugf("ztunnel process not found is current node")
return nil
}

log.Infof("ztunnel process founded in current node, pid: %d", zTunnelProcess.Pid)
z.collectingProcess = zTunnelProcess
return z.collectZTunnelProcess(zTunnelProcess)
}

func (z *ZTunnelCollector) collectZTunnelProcess(p *process.Process) error {
pidExeFile := host.GetFileInHost(fmt.Sprintf("/proc/%d/exe", p.Pid))
elfFile, err := elf.NewFile(pidExeFile)
if err != nil {
return fmt.Errorf("read executable file error: %v", err)
}
trackBoundSymbol := elfFile.FilterSymbol(func(name string) bool {
return strings.HasPrefix(name, ZTunnelTrackBoundSymbolPrefix)
}, true)
if len(trackBoundSymbol) == 0 {
return fmt.Errorf("failed to find track outbound symbol in ztunnel process")
}

uprobeFile := z.alc.BPF.OpenUProbeExeFile(pidExeFile)
uprobeFile.AddLink(trackBoundSymbol[0].Name, z.alc.BPF.ConnectionManagerTrackOutbound, nil)
return nil
}
Loading

0 comments on commit 2331dbe

Please sign in to comment.