Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 122 additions & 18 deletions conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func ConntrackDeleteFilters(table ConntrackTableType, family InetFamily, filters
return pkgHandle.ConntrackDeleteFilters(table, family, filters...)
}

func ConntrackTableListStream(table ConntrackTableType, family InetFamily, handle chan *ConntrackFlow) error {
return pkgHandle.ConntrackTableListStream(table, family, handle)
}

// ConntrackTableList returns the flow list of a table of a specific family using the netlink handle passed
// conntrack -L [table] [options] List conntrack or expectation table
//
Expand Down Expand Up @@ -195,6 +199,17 @@ func (h *Handle) ConntrackDeleteFilters(table ConntrackTableType, family InetFam
return matched, finalErr
}

func (h *Handle) ConntrackTableListStream(table ConntrackTableType, family InetFamily, handle chan *ConntrackFlow) error {
req := h.newConntrackRequest(table, family, nl.IPCTNL_MSG_CT_GET, unix.NLM_F_DUMP)

err := req.ExecuteIter(unix.NETLINK_NETFILTER, 0, func(dataRaw []byte) bool {
handle <- parseRawData(dataRaw)
return true
})

return err
}

func (h *Handle) newConntrackRequest(table ConntrackTableType, family InetFamily, operation, flags int) *nl.NetlinkRequest {
// Create the Netlink request object
req := h.newNetlinkRequest((int(table)<<8)|operation, flags)
Expand All @@ -221,9 +236,12 @@ type ProtoInfo interface {
}

// ProtoInfoTCP corresponds to the `tcp` struct of the __nfct_protoinfo union.
// Only TCP state is currently supported.
type ProtoInfoTCP struct {
State uint8
State uint8
WsacleOriginal uint8
WsacleReply uint8
FlagsOriginal uint16
FlagsReply uint16
}

// Protocol returns "tcp".
Expand All @@ -233,6 +251,14 @@ func (p *ProtoInfoTCP) toNlData() ([]*nl.RtAttr, error) {
ctProtoInfoTCP := nl.NewRtAttr(unix.NLA_F_NESTED|nl.CTA_PROTOINFO_TCP, []byte{})
ctProtoInfoTCPState := nl.NewRtAttr(nl.CTA_PROTOINFO_TCP_STATE, nl.Uint8Attr(p.State))
ctProtoInfoTCP.AddChild(ctProtoInfoTCPState)
ctProtoInfoTCPWscaleOriginal := nl.NewRtAttr(nl.CTA_PROTOINFO_TCP_WSCALE_ORIGINAL, nl.Uint8Attr(p.WsacleOriginal))
ctProtoInfoTCP.AddChild(ctProtoInfoTCPWscaleOriginal)
ctProtoInfoTCPWscaleReply := nl.NewRtAttr(nl.CTA_PROTOINFO_TCP_WSCALE_REPLY, nl.Uint8Attr(p.WsacleReply))
ctProtoInfoTCP.AddChild(ctProtoInfoTCPWscaleReply)
ctProtoInfoTCPFlagsOriginal := nl.NewRtAttr(nl.CTA_PROTOINFO_TCP_FLAGS_ORIGINAL, nl.BEUint16Attr(p.FlagsOriginal))
ctProtoInfoTCP.AddChild(ctProtoInfoTCPFlagsOriginal)
ctProtoInfoTCPFlagsReply := nl.NewRtAttr(nl.CTA_PROTOINFO_TCP_FLAGS_REPLY, nl.BEUint16Attr(p.FlagsReply))
ctProtoInfoTCP.AddChild(ctProtoInfoTCPFlagsReply)
ctProtoInfo.AddChild(ctProtoInfoTCP)

return []*nl.RtAttr{ctProtoInfo}, nil
Expand Down Expand Up @@ -261,6 +287,11 @@ type IPTuple struct {
Protocol uint8
SrcIP net.IP
SrcPort uint16

// ICMP only
ICMPID uint16
ICMPType uint8
ICMPCode uint8
}

// toNlData generates the inner fields of a nested tuple netlink datastructure
Expand Down Expand Up @@ -304,7 +335,11 @@ type ConntrackFlow struct {
TimeStart uint64
TimeStop uint64
TimeOut uint32
Status uint32
Use uint32
ID uint32
Labels []byte
LabelsMask []byte
ProtoInfo ProtoInfo
}

Expand All @@ -315,19 +350,37 @@ func (s *ConntrackFlow) String() string {
start := time.Unix(0, int64(s.TimeStart))
stop := time.Unix(0, int64(s.TimeStop))
timeout := int32(s.TimeOut)
res := fmt.Sprintf("%s\t%d src=%s dst=%s sport=%d dport=%d packets=%d bytes=%d\tsrc=%s dst=%s sport=%d dport=%d packets=%d bytes=%d mark=0x%x ",
nl.L4ProtoMap[s.Forward.Protocol], s.Forward.Protocol,
s.Forward.SrcIP.String(), s.Forward.DstIP.String(), s.Forward.SrcPort, s.Forward.DstPort, s.Forward.Packets, s.Forward.Bytes,
s.Reverse.SrcIP.String(), s.Reverse.DstIP.String(), s.Reverse.SrcPort, s.Reverse.DstPort, s.Reverse.Packets, s.Reverse.Bytes,
s.Mark)

var out string
if s.Forward.Protocol == unix.IPPROTO_ICMP || s.Forward.Protocol == unix.IPPROTO_ICMPV6 {
out = fmt.Sprintf("%s\t%d src=%s dst=%s id=%d type=%d code=%d packets=%d bytes=%d\tsrc=%s dst=%s id=%d type=%d code=%d packets=%d bytes=%d",
nl.L4ProtoMap[s.Forward.Protocol], s.Forward.Protocol,
s.Forward.SrcIP.String(), s.Forward.DstIP.String(), s.Forward.ICMPID, s.Forward.ICMPType, s.Forward.ICMPCode, s.Forward.Packets, s.Forward.Bytes,
s.Reverse.SrcIP.String(), s.Reverse.DstIP.String(), s.Reverse.ICMPID, s.Reverse.ICMPType, s.Reverse.ICMPCode, s.Reverse.Packets, s.Reverse.Bytes)
} else {
out = fmt.Sprintf("%s\t%d src=%s dst=%s sport=%d dport=%d packets=%d bytes=%d\tsrc=%s dst=%s sport=%d dport=%d packets=%d bytes=%d",
nl.L4ProtoMap[s.Forward.Protocol], s.Forward.Protocol,
s.Forward.SrcIP.String(), s.Forward.DstIP.String(), s.Forward.SrcPort, s.Forward.DstPort, s.Forward.Packets, s.Forward.Bytes,
s.Reverse.SrcIP.String(), s.Reverse.DstIP.String(), s.Reverse.SrcPort, s.Reverse.DstPort, s.Reverse.Packets, s.Reverse.Bytes)
}
out += fmt.Sprintf(" mark=0x%x", s.Mark)
if len(s.Labels) > 0 {
res += fmt.Sprintf("labels=0x%x ", s.Labels)
out += fmt.Sprintf(" labels=0x%x", s.Labels)
}
if len(s.LabelsMask) > 0 {
out += fmt.Sprintf("/0x%x", s.LabelsMask)
}
if s.Status != 0 {
out += fmt.Sprintf(" status=0x%x", s.Status)
}
if s.Zone != 0 {
res += fmt.Sprintf("zone=%d ", s.Zone)
out += fmt.Sprintf(" zone=%d", s.Zone)
}
res += fmt.Sprintf("start=%v stop=%v timeout=%d(sec)", start, stop, timeout)
return res
if s.Use != 0 {
out += fmt.Sprintf(" use=0x%x", s.Use)
}
out += fmt.Sprintf(" start=%v stop=%v timeout=%d(sec)", start, stop, timeout)
return out
}

// toNlData generates netlink messages representing the flow.
Expand Down Expand Up @@ -444,8 +497,8 @@ func parseIpTuple(reader *bytes.Reader, tpl *IPTuple) uint8 {
if t == nl.CTA_PROTO_NUM {
tpl.Protocol = uint8(v[0])
}
// We only parse TCP & UDP headers. Skip the others.
if tpl.Protocol != unix.IPPROTO_TCP && tpl.Protocol != unix.IPPROTO_UDP {
// We only parse TCP, UDP, ICMP, ICMPv6 headers. Skip the others.
if tpl.Protocol != unix.IPPROTO_TCP && tpl.Protocol != unix.IPPROTO_UDP && tpl.Protocol != unix.IPPROTO_ICMP && tpl.Protocol != unix.IPPROTO_ICMPV6 {
// skip the rest
bytesRemaining := protoInfoTotalLen - protoInfoBytesRead
reader.Seek(int64(bytesRemaining), seekCurrent)
Expand All @@ -454,7 +507,12 @@ func parseIpTuple(reader *bytes.Reader, tpl *IPTuple) uint8 {
// Skip 3 bytes of padding
reader.Seek(3, seekCurrent)
protoInfoBytesRead += 3
for i := 0; i < 2; i++ {
loopCount := 2
if tpl.Protocol == unix.IPPROTO_ICMP || tpl.Protocol == unix.IPPROTO_ICMPV6 {
loopCount = 3 // ID, Type, Code
}
var ICMPCodeDone, ICMPTypeDone bool
for i := 0; i < loopCount; i++ {
_, t, _ := parseNfAttrTL(reader)
protoInfoBytesRead += uint16(nl.SizeofNfattr)
switch t {
Expand All @@ -464,6 +522,26 @@ func parseIpTuple(reader *bytes.Reader, tpl *IPTuple) uint8 {
case nl.CTA_PROTO_DST_PORT:
parseBERaw16(reader, &tpl.DstPort)
protoInfoBytesRead += 2
case nl.CTA_PROTO_ICMP_ID:
fallthrough
case nl.CTA_PROTO_ICMPV6_ID:
parseBERaw16(reader, &tpl.ICMPID)
protoInfoBytesRead += 2
case nl.CTA_PROTO_ICMP_CODE:
fallthrough
case nl.CTA_PROTO_ICMPV6_CODE:
parseU8(reader, &tpl.ICMPCode)
protoInfoBytesRead += 1
ICMPCodeDone = true
case nl.CTA_PROTO_ICMP_TYPE:
fallthrough
case nl.CTA_PROTO_ICMPV6_TYPE:
parseU8(reader, &tpl.ICMPType)
protoInfoBytesRead += 1
ICMPTypeDone = true
}
if (t == nl.CTA_PROTO_ICMP_CODE || t == nl.CTA_PROTO_ICMP_TYPE) && (!ICMPCodeDone || !ICMPTypeDone) {
continue
}
// Skip 2 bytes of padding
reader.Seek(2, seekCurrent)
Expand Down Expand Up @@ -503,6 +581,10 @@ func skipNfAttrValue(r *bytes.Reader, len uint16) uint16 {
return len
}

func parseU8(r *bytes.Reader, v *uint8) {
binary.Read(r, binary.BigEndian, v)
}

func parseBERaw16(r *bytes.Reader, v *uint16) {
binary.Read(r, binary.BigEndian, v)
}
Expand Down Expand Up @@ -576,6 +658,22 @@ func parseProtoInfoTCP(r *bytes.Reader, attrLen uint16) *ProtoInfoTCP {
case nl.CTA_PROTOINFO_TCP_STATE:
p.State = parseProtoInfoTCPState(r)
bytesRead += nl.SizeofNfattr
case nl.CTA_PROTOINFO_TCP_WSCALE_ORIGINAL:
parseU8(r, &p.WsacleOriginal)
r.Seek(nl.SizeofNfattr-1, seekCurrent)
bytesRead += nl.SizeofNfattr
case nl.CTA_PROTOINFO_TCP_WSCALE_REPLY:
parseU8(r, &p.WsacleReply)
r.Seek(nl.SizeofNfattr-1, seekCurrent)
bytesRead += nl.SizeofNfattr
case nl.CTA_PROTOINFO_TCP_FLAGS_ORIGINAL:
parseBERaw16(r, &p.FlagsOriginal)
r.Seek(nl.SizeofNfattr-2, seekCurrent)
bytesRead += nl.SizeofNfattr
case nl.CTA_PROTOINFO_TCP_FLAGS_REPLY:
parseBERaw16(r, &p.FlagsReply)
r.Seek(nl.SizeofNfattr-2, seekCurrent)
bytesRead += nl.SizeofNfattr
default:
bytesRead += int(skipNfAttrValue(r, l))
}
Expand Down Expand Up @@ -679,14 +777,20 @@ func parseRawData(data []byte) *ConntrackFlow {
switch t {
case nl.CTA_MARK:
s.Mark = parseConnectionMark(reader)
case nl.CTA_ZONE:
s.Zone = parseConnectionZone(reader)
case nl.CTA_LABELS:
s.Labels = parseConnectionLabels(reader)
case nl.CTA_LABELS_MASK:
s.LabelsMask = parseConnectionLabels(reader)
case nl.CTA_TIMEOUT:
s.TimeOut = parseTimeOut(reader)
case nl.CTA_ID, nl.CTA_STATUS, nl.CTA_USE:
skipNfAttrValue(reader, l)
case nl.CTA_ZONE:
s.Zone = parseConnectionZone(reader)
case nl.CTA_STATUS:
parseBERaw32(reader, &s.Status)
case nl.CTA_USE:
parseBERaw32(reader, &s.Use)
case nl.CTA_ID:
parseBERaw32(reader, &s.ID)
default:
skipNfAttrValue(reader, l)
}
Expand Down
23 changes: 17 additions & 6 deletions conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,10 +934,14 @@ func TestParseRawData(t *testing.T) {
12, 0, 1, 0,
22, 134, 80, 142, 230, 127, 74, 166,
/* >> CTA_LABELS */
20, 0, 22, 0,
0, 0, 0, 0, 5, 0, 18, 172, 66, 2, 1, 0, 0, 0, 0, 0},
16, 0, 22, 0,
34, 65, 12, 12, 91, 134, 145, 211, 123, 93, 13, 47, 95, 34, 15, 77,
/* >> CTA_LABELS_MASK */
16, 0, 23, 0,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255},
expConntrackFlow: "udp\t17 src=192.168.0.10 dst=192.168.0.3 sport=48385 dport=53 packets=1 bytes=55\t" +
"src=192.168.0.3 dst=192.168.0.10 sport=53 dport=48385 packets=1 bytes=71 mark=0x5 labels=0x00000000050012ac4202010000000000 " +
"src=192.168.0.3 dst=192.168.0.10 sport=53 dport=48385 packets=1 bytes=71 mark=0x5 " +
"labels=0x22410c0c5b8691d37b5d0d2f5f220f4d/0xffffffffffffffffffffffffffffffff status=0x18a use=0x1 " +
"start=2021-06-07 13:41:30.39632247 +0000 UTC stop=1970-01-01 00:00:00 +0000 UTC timeout=32(sec)",
},
{
Expand Down Expand Up @@ -1033,10 +1037,17 @@ func TestParseRawData(t *testing.T) {
16, 0, 20, 128,
/* >>>> CTA_TIMESTAMP_START */
12, 0, 1, 0,
22, 134, 80, 175, 134, 10, 182, 221},
22, 134, 80, 175, 134, 10, 182, 221,
/* >> CTA_LABELS */
16, 0, 22, 0,
34, 65, 12, 12, 91, 134, 145, 211, 123, 93, 13, 47, 95, 34, 15, 77,
/* >> CTA_LABELS_MASK */
16, 0, 23, 0,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255},
expConntrackFlow: "tcp\t6 src=192.168.0.10 dst=192.168.77.73 sport=42625 dport=3333 packets=11 bytes=1914\t" +
"src=192.168.77.73 dst=192.168.0.10 sport=3333 dport=42625 packets=10 bytes=1858 mark=0x5 zone=100 " +
"start=2021-06-07 13:43:50.511990493 +0000 UTC stop=1970-01-01 00:00:00 +0000 UTC timeout=152(sec)",
"src=192.168.77.73 dst=192.168.0.10 sport=3333 dport=42625 packets=10 bytes=1858 mark=0x5 " +
"labels=0x22410c0c5b8691d37b5d0d2f5f220f4d/0xffffffffffffffffffffffffffffffff " +
"status=0x18e zone=100 use=0x1 start=2021-06-07 13:43:50.511990493 +0000 UTC stop=1970-01-01 00:00:00 +0000 UTC timeout=152(sec)",
},
}

Expand Down
Loading