Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support monitoring ztunnel to adapt istio ambient mode #142

Merged
merged 3 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Release Notes.
* Add `pprof` module for observe self.
* Add detect process from `CRI-O` container in Kubernetes.
* Introduce `MonitorFilter` into access log module.
* Support monitoring ztunnel to adapt istio ambient mode.

#### Bug Fixes
* Fixed the issue where `conntrack` could not find the Reply IP in the access log module.
Expand Down
5 changes: 4 additions & 1 deletion bpf/accesslog/accesslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ char __license[] SEC("license") = "Dual MIT/GPL";
// tls monitoring
#include "tls/go_tls.c"
#include "tls/node_tls.c"
#include "tls/openssl.c"
#include "tls/openssl.c"

// ambient istio
#include "ambient/ztunnel.c"
52 changes: 52 additions & 0 deletions bpf/accesslog/ambient/ztunnel.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "ztunnel.h"

static __inline bool get_socket_addr_ip_in_ztunnel(bool success, void * arg, __u32 *ip, __u16 *port) {
if (!success) {
return false;
}
__u8 sockaddr[8];
if (bpf_probe_read(&sockaddr, sizeof(sockaddr), (void *)arg) != 0) {
return false;
}
// ip is stored in sockaddr[2], sockaddr[3], sockaddr[4], sockaddr[5]
*ip = ((__u32)sockaddr[2] << 24) | ((__u32)sockaddr[3] << 16) | ((__u32)sockaddr[4] << 8) | (__u32)sockaddr[5];
if (port != NULL) {
// port is stored in sockaddr[6], sockaddr[7](should convert to big-endian)
*port = ((__u16)sockaddr[7] << 8) | sockaddr[6];
}
return true;
}

SEC("uprobe/connection_manager_track_outbound")
int connection_manager_track_outbound(struct pt_regs* ctx) {
struct ztunnel_socket_mapping_t *event = create_ztunnel_socket_mapping_event();
if (event == NULL) {
return 0;
}
bool success = true;
success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM3(ctx), &event->orginal_src_ip, &event->src_port);
success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM4(ctx), &event->original_dst_ip, &event->dst_port);
success = get_socket_addr_ip_in_ztunnel(success, (void *)PT_REGS_PARM5(ctx), &event->lb_dst_ip, NULL);
if (!success) {
return 0;
}
bpf_perf_event_output(ctx, &ztunnel_lb_socket_mapping_event_queue, BPF_F_CURRENT_CPU, event, sizeof(*event));
return 0;
}
39 changes: 39 additions & 0 deletions bpf/accesslog/ambient/ztunnel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

struct ztunnel_socket_mapping_t {
__u32 orginal_src_ip; // origin local ip
__u32 original_dst_ip; // origin remote ip(should be service ip)
__u16 src_port; // origin local port
__u16 dst_port; // origin remote port
__u32 lb_dst_ip; // load balanced remote ip(should be real pod ip)
};

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} ztunnel_lb_socket_mapping_event_queue SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct ztunnel_socket_mapping_t);
__uint(max_entries, 1);
} ztunnel_socket_mapping_event_per_cpu_map SEC(".maps");

static __inline struct ztunnel_socket_mapping_t* create_ztunnel_socket_mapping_event() {
__u32 kZero = 0;
return bpf_map_lookup_elem(&ztunnel_socket_mapping_event_per_cpu_map, &kZero);
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
k8s.io/apimachinery v0.23.5
k8s.io/client-go v0.23.5
k8s.io/utils v0.0.0-20211116205334-6203023598ed
skywalking.apache.org/repo/goapi v0.0.0-20240604102541-64f9001abe03
skywalking.apache.org/repo/goapi v0.0.0-20240914024804-703f701836e6
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1062,5 +1062,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
skywalking.apache.org/repo/goapi v0.0.0-20240604102541-64f9001abe03 h1:y0f+3gNmeyC/V5Bt8VE9aL9mmWESj+WvyqJ2tOb7qrk=
skywalking.apache.org/repo/goapi v0.0.0-20240604102541-64f9001abe03/go.mod h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE=
skywalking.apache.org/repo/goapi v0.0.0-20240914024804-703f701836e6 h1:ZGcxRsuAF+Q/IHzNzunHTeYPSCbXcLIjonEFkDlAfPc=
skywalking.apache.org/repo/goapi v0.0.0-20240914024804-703f701836e6/go.mod h1:+n8BMuS8eRdzdnGh15ElRGBXPi0eYZSs2TKySBDmRTE=
4 changes: 4 additions & 0 deletions pkg/accesslog/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package collector

import (
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
)

var log = logger.GetLogger("accesslog", "collector")

type Collector interface {
Start(mgr *module.Manager, context *common.AccessLogContext) error
Stop()
Expand All @@ -35,5 +38,6 @@ func Collectors() []Collector {
connectCollectInstance,
tlsCollectInstance,
processCollectInstance,
zTunnelCollectInstance,
}
}
200 changes: 200 additions & 0 deletions pkg/accesslog/collector/ztunnel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package collector

import (
"context"
"fmt"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/cache"

"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/tools/elf"
"github.com/apache/skywalking-rover/pkg/tools/host"

v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"

"github.com/shirou/gopsutil/process"
)

var (
// ZTunnelProcessFinderInterval is the interval to find ztunnel process
ZTunnelProcessFinderInterval = time.Second * 30
// ZTunnelTrackBoundSymbolPrefix is the prefix of the symbol name to track outbound connections in ztunnel process
// ztunnel::proxy::connection_manager::ConnectionManager::track_outbound
ZTunnelTrackBoundSymbolPrefix = "_ZN7ztunnel5proxy18connection_manager17ConnectionManager14track_outbound"
)

var zTunnelCollectInstance = NewZTunnelCollector(time.Minute)

// ZTunnelCollector is a collector for ztunnel process in the Ambient Istio scenario
type ZTunnelCollector struct {
ctx context.Context
cancel context.CancelFunc
alc *common.AccessLogContext

collectingProcess *process.Process
ipMappingCache *cache.Expiring
ipMappingExpireDuration time.Duration
}

func NewZTunnelCollector(expireTime time.Duration) *ZTunnelCollector {
return &ZTunnelCollector{
ipMappingCache: cache.NewExpiring(),
ipMappingExpireDuration: expireTime,
}
}

func (z *ZTunnelCollector) Start(mgr *module.Manager, ctx *common.AccessLogContext) error {
z.ctx, z.cancel = context.WithCancel(ctx.RuntimeContext)
z.alc = ctx
ctx.ConnectionMgr.RegisterNewFlushListener(z)

err := z.findZTunnelProcessAndCollect()
if err != nil {
return err
}

ctx.BPF.ReadEventAsync(ctx.BPF.ZtunnelLbSocketMappingEventQueue, func(data interface{}) {
event := data.(*events.ZTunnelSocketMappingEvent)
localIP := z.convertBPFIPToString(event.OriginalSrcIP)
localPort := event.OriginalSrcPort
remoteIP := z.convertBPFIPToString(event.OriginalDestIP)
remotePort := event.OriginalDestPort
lbIP := z.convertBPFIPToString(event.LoadBalancedDestIP)
log.Debugf("received ztunnel lb socket mapping event: %s:%d -> %s:%d, lb: %s", localIP, localPort, remoteIP, remotePort, lbIP)

key := z.buildIPMappingCacheKey(localIP, int(localPort), remoteIP, int(remotePort))
z.ipMappingCache.Set(key, lbIP, z.ipMappingExpireDuration)
}, func() interface{} {
return &events.ZTunnelSocketMappingEvent{}
})
go func() {
ticker := time.NewTicker(ZTunnelProcessFinderInterval)
for {
select {
case <-ticker.C:
err := z.findZTunnelProcessAndCollect()
if err != nil {
log.Error("failed to find and collect ztunnel process: ", err)
}
case <-z.ctx.Done():
ticker.Stop()
return
}
}
}()
return nil
}

func (z *ZTunnelCollector) ReadyToFlushConnection(connection *common.ConnectionInfo, _ events.Event) {
if connection == nil || connection.Socket == nil || connection.RPCConnection == nil || connection.RPCConnection.Attachment != nil {
return
}
key := z.buildIPMappingCacheKey(connection.Socket.SrcIP, int(connection.Socket.SrcPort),
connection.Socket.DestIP, int(connection.Socket.DestPort))
lbIPObj, found := z.ipMappingCache.Get(key)
if !found {
log.Debugf("there no ztunnel mapped IP address found for connection ID: %d, random ID: %d",
connection.ConnectionID, connection.RandomID)
return
}
lbIP := lbIPObj.(string)
log.Debugf("found the ztunnel load balanced IP for the connection: %s, connectionID: %d, randomID: %d", lbIP,
connection.ConnectionID, connection.RandomID)
connection.RPCConnection.Attachment = &v3.ConnectionAttachment{
Environment: &v3.ConnectionAttachment_ZTunnel{
ZTunnel: &v3.ZTunnelAttachmentEnvironment{
RealDestinationIp: lbIP,
By: v3.ZTunnelAttachmentEnvironmentDetectBy_ZTUNNEL_OUTBOUND_FUNC,
},
},
}
}

func (z *ZTunnelCollector) convertBPFIPToString(ip uint32) string {
return fmt.Sprintf("%d.%d.%d.%d", ip>>24, ip>>16&0xff, ip>>8&0xff, ip&0xff)
}

func (z *ZTunnelCollector) buildIPMappingCacheKey(localIP string, localPort int, remoteIP string, remotePort int) string {
return fmt.Sprintf("%s:%d-%s:%d", localIP, localPort, remoteIP, remotePort)
}

func (z *ZTunnelCollector) Stop() {
if z.cancel != nil {
z.cancel()
}
}

func (z *ZTunnelCollector) findZTunnelProcessAndCollect() error {
if z.collectingProcess != nil {
running, err := z.collectingProcess.IsRunning()
if err == nil && running {
// already collecting the process
return nil
}
log.Warnf("detected ztunnel process is not running, should re-scan process to find and collect it")
}

processes, err := process.Processes()
if err != nil {
return err
}
var zTunnelProcess *process.Process
for _, p := range processes {
name, err := p.Exe()
if err != nil {
continue
}
if strings.HasSuffix(name, "/ztunnel") {
zTunnelProcess = p
break
}
}

if zTunnelProcess == nil {
log.Debugf("ztunnel process not found is current node")
return nil
}

log.Infof("ztunnel process founded in current node, pid: %d", zTunnelProcess.Pid)
z.collectingProcess = zTunnelProcess
return z.collectZTunnelProcess(zTunnelProcess)
}

func (z *ZTunnelCollector) collectZTunnelProcess(p *process.Process) error {
pidExeFile := host.GetFileInHost(fmt.Sprintf("/proc/%d/exe", p.Pid))
elfFile, err := elf.NewFile(pidExeFile)
if err != nil {
return fmt.Errorf("read executable file error: %v", err)
}
trackBoundSymbol := elfFile.FilterSymbol(func(name string) bool {
return strings.HasPrefix(name, ZTunnelTrackBoundSymbolPrefix)
}, true)
if len(trackBoundSymbol) == 0 {
return fmt.Errorf("failed to find track outbound symbol in ztunnel process")
}

uprobeFile := z.alc.BPF.OpenUProbeExeFile(pidExeFile)
uprobeFile.AddLink(trackBoundSymbol[0].Name, z.alc.BPF.ConnectionManagerTrackOutbound, nil)
return nil
}
Loading
Loading