Skip to content

Commit

Permalink
Fix wrong conntrack data from eBPF (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu committed Jan 16, 2024
1 parent d76ee40 commit afee353
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
28 changes: 26 additions & 2 deletions bpf/accesslog/syscalls/connect_conntrack.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ static __always_inline void nf_conntrack_read_in6_addr(__u64 *addr_h, __u64 *add
bpf_probe_read(addr_l, sizeof(*addr_l), &in6->s6_addr32[2]);
}

static __always_inline int nf_conntrack_tuple_to_conntrack_tuple(conntrack_tuple_t *t, const struct nf_conntrack_tuple *ct) {
static __always_inline int nf_conntrack_tuple_to_conntrack_tuple(struct connect_args_t *connect_args, conntrack_tuple_t *t, const struct nf_conntrack_tuple *ct) {
__builtin_memset(t, 0, sizeof(conntrack_tuple_t));

switch (ct->dst.protonum) {
Expand Down Expand Up @@ -60,6 +60,25 @@ static __always_inline int nf_conntrack_tuple_to_conntrack_tuple(conntrack_tuple
return 0;
}
}

struct sock *sock = connect_args->sock;
struct socket *tmps = _(sock->sk_socket);
if (tmps != NULL) {
struct sock* s;
BPF_CORE_READ_INTO(&s, tmps, sk);
short unsigned int skc_family;
BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
if (skc_family == AF_INET) {
__u16 local_port;
BPF_CORE_READ_INTO(&local_port, s, __sk_common.skc_num);
__u32 local_addr_v4;
BPF_CORE_READ_INTO(&local_addr_v4, s, __sk_common.skc_rcv_saddr);
// make sure connntrack with the same socket address
if (local_addr_v4 != t->daddr_l || local_port != t->dport) {
return 0;
}
}
}
return 1;
}

Expand All @@ -73,6 +92,11 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx, struct nf_conn *ct
return 0;
}

// already contains the remote address
if (&(connect_args->remote) != NULL) {
return 0;
}

__u32 status;
if (bpf_probe_read(&status, sizeof(status), &(ct->status)) != 0) {
return 0; // Invalid ct pointer
Expand All @@ -93,7 +117,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx, struct nf_conn *ct
struct nf_conntrack_tuple reply = tuplehash[IP_CT_DIR_REPLY].tuple;

conntrack_tuple_t reply_conn = {};
if (!nf_conntrack_tuple_to_conntrack_tuple(&reply_conn, &reply)) {
if (!nf_conntrack_tuple_to_conntrack_tuple(connect_args, &reply_conn, &reply)) {
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/accesslog/collector/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (c *ConnectCollector) buildSocketFromConnectEvent(event *events.SocketConne

pair, err := ip.ParseSocket(event.PID, event.SocketFD)
if err != nil {
connectLogger.Warnf("cannot found the socket, pid: %d, socket FD: %d", event.PID, event.SocketFD)
connectLogger.Debugf("cannot found the socket, pid: %d, socket FD: %d", event.PID, event.SocketFD)
return nil
}
connectLogger.Debugf("found the connection from the socket, connection ID: %d, randomID: %d",
Expand Down
33 changes: 32 additions & 1 deletion pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package protocols

import (
"context"
"errors"
"fmt"
"os"
"sync"
"time"

"github.com/cilium/ebpf"

"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
Expand Down Expand Up @@ -212,7 +215,14 @@ func (p *PartitionContext) processEvents() {
p.processConnectionEvents(info)

// if the connection already closed and not contains any buffer data, then delete the connection
if info.closed && info.dataBuffer.DataLength() == 0 {
bufLen := info.dataBuffer.DataLength()
if bufLen > 0 {
return
}
if !info.closed {
p.checkTheConnectionIsAlreadyClose(info)
}
if info.closed {
if info.closeCallback != nil {
info.closeCallback()
}
Expand All @@ -225,6 +235,26 @@ func (p *PartitionContext) processEvents() {
}
}

func (p *PartitionContext) checkTheConnectionIsAlreadyClose(con *PartitionConnection) {
if time.Since(con.lastCheckCloseTime) <= time.Second*30 {
return
}
con.lastCheckCloseTime = time.Now()
var activateConn common.ActiveConnection
if err := p.context.BPF.ActiveConnectionMap.Lookup(con.connectionID, &activateConn); err != nil {
if errors.Is(err, ebpf.ErrKeyNotExist) {
con.closed = true
return
}
log.Warnf("cannot found the active connection: %d-%d, err: %v", con.connectionID, con.randomID, err)
return
} else if activateConn.RandomID != 0 && activateConn.RandomID != con.randomID {
log.Debugf("detect the connection: %d-%d is already closed, so remove from the activate connection",
con.connectionID, con.randomID)
con.closed = true
}
}

func (p *PartitionContext) processExpireEvents() {
// the expiry must be mutual exclusion with events processor
p.analyzeLocker.Lock()
Expand Down Expand Up @@ -267,6 +297,7 @@ type PartitionConnection struct {
closed bool
closeCallback common.ConnectionProcessFinishCallback
skipAllDataAnalyze bool
lastCheckCloseTime time.Time
}

func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext, detail *events.SocketDetailEvent) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/accesslog/common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,10 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() {
func (c *ConnectionManager) SkipAllDataAnalyze(conID, ranID uint64) {
var activateConn ActiveConnection
if err := c.activeConnectionMap.Lookup(conID, &activateConn); err != nil {
log.Warnf("cannot found the active connection: %d-%d", conID, ranID)
if errors.Is(err, ebpf.ErrKeyNotExist) {
return
}
log.Warnf("cannot found the active connection: %d-%d, err: %v", conID, ranID, err)
return
}
if activateConn.RandomID != ranID {
Expand Down

0 comments on commit afee353

Please sign in to comment.