Skip to content

Commit ef23664

Browse files
committed
Use the latest zmq_msg_hdr_v3 from ntop_typedefs.h
Update the ZMQ header to the latest version in as of ntop 6.4. The goal here is to just use the latest thing so we have longer before we have to worry about being deprecated. The work here was: * New header with new version constant * New header requires knowing compress/uncompressed sizes so move the compress code from Format to Transport. We're still only compressing JSON. TESTING DONE: Tested TLV, JSON, and compressed JSON with ntopng 6.4 built from source. Saw flows with all three. In order to get 6.4 to work with compressed JSON, I had to fix a bug in ntopng as well as disable the pro license checks. Without this fix, I cannot see how ntopng decompresses anything that has zmq_message_header_v3. I confirmed with them this is fixed in their dev branch, but broken in v6.4
1 parent ef2beb5 commit ef23664

File tree

3 files changed

+82
-60
lines changed

3 files changed

+82
-60
lines changed

cmd/netflow2ng.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ func main() {
303303
} else {
304304
if errors.Is(err, netflow.ErrorTemplateNotFound) {
305305
log.Debug("Netflow packet received before template was set. Discarding")
306+
log.Trace("More info: ", err)
306307
} else if errors.Is(err, debug.PanicError) {
307308
var pErrMsg *debug.PanicErrorMessage
308309
log.Error("Intercepted panic", pErrMsg)

formatter/ntopng_json.go

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package formatter
22

33
import (
4-
"bytes"
5-
"compress/zlib"
64
"encoding/binary"
75
"encoding/json"
86
"net"
@@ -13,7 +11,6 @@ import (
1311
)
1412

1513
type NtopngJson struct {
16-
compress bool
1714
}
1815

1916
func (d *NtopngJson) Prepare() error {
@@ -150,25 +147,5 @@ func (d *NtopngJson) toJSON(extFlow *proto.ExtendedFlowMessage) ([]byte, error)
150147
}
151148

152149
// convert to JSON
153-
jdata, err := json.Marshal(retmap)
154-
if err != nil {
155-
return jdata, err
156-
}
157-
158-
// TODO
159-
if d.compress {
160-
var zbuf bytes.Buffer
161-
z := zlib.NewWriter(&zbuf)
162-
if _, err = z.Write(jdata); err != nil {
163-
return []byte{}, err
164-
}
165-
if err = z.Close(); err != nil {
166-
return []byte{}, err
167-
}
168-
// must set jdata[0] = '\0' to indicate compressed data
169-
jdata = nil // zero current buffer
170-
jdata = append(jdata, 0)
171-
jdata = append(jdata, zbuf.Bytes()...)
172-
}
173-
return jdata, nil
150+
return json.Marshal(retmap)
174151
}

transport/zmq.go

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package transport
1212

1313
import (
1414
"bytes"
15+
"compress/zlib"
1516
"encoding/binary"
1617
"encoding/hex"
1718
"fmt"
@@ -26,9 +27,11 @@ import (
2627
* include/ntop_typedefs.h, include/ntop_defines.h & src/ZMQCollectorInterface.cpp from
2728
* https://github.com/ntop/ntopng
2829
*/
29-
const ZMQ_MSG_VERSION_OLD = 2 // ntopng message version 2 for ntopng less than v6.4
30-
const ZMQ_MSG_VERSION_TLV = 3 // ntop message version for TLV for ntopng v6.4 and later.
31-
const ZMQ_TOPIC = "flow" // ntopng only really cares about the first character!
30+
const ZMQ_MSG_VERSION_4 = 4 // ntop message version for zmq_msg_hdr_v3 in ntop_defines.h
31+
const ZMQ_TOPIC = "flow" // ntopng only really cares about the first character!
32+
33+
const ZMQ_MSG_V4_FLAG_TLV = 2
34+
const ZMQ_MSG_V4_FLAG_COMPRESSED = 4
3235

3336
const (
3437
PBUF MsgFormat = iota
@@ -48,12 +51,15 @@ type ZmqDriver struct {
4851
lock *sync.RWMutex
4952
}
5053

51-
type zmqHeader struct {
52-
url string
53-
version uint8
54-
source_id uint8
55-
length uint16
56-
msg_id uint32
54+
// This is the latest header as of ntopng 6.4
55+
type zmqHeaderV3 struct {
56+
url string // must be 16 bytes long
57+
version uint8 // use only with ZMQ_MSG_VERSION_4
58+
flags uint8
59+
uncompressed_size uint32
60+
compressed_size uint32
61+
msg_id uint32
62+
source_id uint32
5763
}
5864

5965
var messageId uint32 = 1 // Every ZMQ message we send should have a uniq ID
@@ -82,18 +88,34 @@ func (d *ZmqDriver) Init() error {
8288

8389
func (d *ZmqDriver) Send(key, data []byte) error {
8490
var err error
91+
orig_len := uint32(len(data))
92+
compressed_len := orig_len
93+
94+
// Should only compress JSON
95+
if d.msgType == JSON && d.compress {
96+
var zbuf bytes.Buffer
97+
z := zlib.NewWriter(&zbuf)
98+
if _, err = z.Write(data); err != nil {
99+
return err
100+
}
101+
if err = z.Close(); err != nil {
102+
return err
103+
}
104+
// replace data with zlib compressed buffer
105+
data = zbuf.Bytes()
106+
compressed_len = uint32(len(data))
107+
}
108+
109+
// Lock before accessing messageId or zmq header to ensure messageId is unique
110+
d.lock.Lock()
111+
defer d.lock.Unlock()
85112

86113
if messageId == 1 {
87114
log.Info("Sending first ZMQ message.")
88115
} else if messageId%1000 == 0 {
89116
log.Debugf("Sending ZMQ message id %d.", messageId)
90117
}
91-
92-
msg_len := uint16(len(data))
93-
// Lock before creating zmq header to ensure messageId is unique
94-
d.lock.Lock()
95-
defer d.lock.Unlock()
96-
header := d.newZmqHeader(msg_len)
118+
header := d.newZmqHeaderV3(orig_len, compressed_len)
97119

98120
// send our header with the topic first as a multi-part message
99121
hbytes, err := header.bytes()
@@ -120,17 +142,17 @@ func (d *ZmqDriver) Send(key, data []byte) error {
120142

121143
switch d.msgType {
122144
case PBUF:
123-
log.Tracef("Sent %d bytes of pbuf:\n%s", msg_len, hex.Dump(data))
145+
log.Tracef("Sent %d bytes of pbuf:\n%s", orig_len, hex.Dump(data))
124146
case JSON:
125147
if d.compress {
126-
log.Tracef("Sent %d bytes of zlib json:\n%s", msg_len, hex.Dump(data))
148+
log.Tracef("Sent %d bytes of zlib json:\n%s", compressed_len, hex.Dump(data))
127149
} else {
128-
log.Tracef("Sent %d bytes of json: %s", msg_len, string(data))
150+
log.Tracef("Sent %d bytes of json: %s", orig_len, string(data))
129151
}
130152
case TLV:
131-
log.Tracef("Sent %d bytes of ntop tlv:\n%s", msg_len, hex.Dump(data))
153+
log.Tracef("Sent %d bytes of ntop tlv:\n%s", orig_len, hex.Dump(data))
132154
default:
133-
log.Errorf("Sent %d bytes of unknown message type %d", msg_len, d.msgType)
155+
log.Errorf("Sent %d bytes of unknown message type %d", orig_len, d.msgType)
134156
}
135157

136158
return err
@@ -141,25 +163,30 @@ func (d *ZmqDriver) Close() error {
141163
return nil
142164
}
143165

144-
func (d *ZmqDriver) newZmqHeader(length uint16) *zmqHeader {
145-
var version uint8 = ZMQ_MSG_VERSION_TLV
146-
if d.msgType != TLV {
147-
version = ZMQ_MSG_VERSION_OLD
166+
func (d *ZmqDriver) newZmqHeaderV3(orig_length uint32, compressed_len uint32) *zmqHeaderV3 {
167+
var flags uint8 = 0
168+
if d.msgType == TLV {
169+
flags |= ZMQ_MSG_V4_FLAG_TLV
148170
}
149-
z := &zmqHeader{
150-
url: ZMQ_TOPIC,
151-
version: version,
152-
source_id: uint8(d.sourceId),
153-
length: length,
154-
msg_id: messageId,
171+
if d.compress {
172+
flags |= ZMQ_MSG_V4_FLAG_COMPRESSED
173+
}
174+
175+
z := &zmqHeaderV3{
176+
url: ZMQ_TOPIC,
177+
version: ZMQ_MSG_VERSION_4,
178+
flags: flags,
179+
uncompressed_size: orig_length,
180+
compressed_size: compressed_len,
181+
msg_id: messageId,
182+
source_id: uint32(d.sourceId),
155183
}
156184
messageId++
157185

158186
return z
159187
}
160188

161-
// Serialize our zmqHeader into a byte array
162-
func (zh *zmqHeader) bytes() ([]byte, error) {
189+
func (zh *zmqHeaderV3) bytes() ([]byte, error) {
163190
header := []byte{}
164191
bBuf := bytes.NewBuffer(header)
165192

@@ -186,13 +213,24 @@ func (zh *zmqHeader) bytes() ([]byte, error) {
186213
return nil, fmt.Errorf("URL was %d bytes instead of 16", i)
187214
}
188215

189-
if _, err = bBuf.Write([]byte{zh.version, zh.source_id}); err != nil {
216+
if _, err = bBuf.Write([]byte{zh.version, zh.flags}); err != nil {
217+
return nil, err
218+
}
219+
// Need two bytes of padding to align next uint32 on 4-byte boundary
220+
if _, err = bBuf.Write([]byte{0, 0}); err != nil {
221+
return nil, err
222+
}
223+
224+
// Both uncompressed_size and compressed_size need to be in little-endian
225+
le32Buf := make([]byte, 4)
226+
binary.LittleEndian.PutUint32(le32Buf, zh.uncompressed_size)
227+
if _, err = bBuf.Write(le32Buf); err != nil {
190228
return nil, err
191229
}
192230

193-
be16Buf := make([]byte, 2)
194-
binary.BigEndian.PutUint16(be16Buf, zh.length)
195-
if _, err = bBuf.Write(be16Buf); err != nil {
231+
le32Buf = make([]byte, 4)
232+
binary.LittleEndian.PutUint32(le32Buf, zh.compressed_size)
233+
if _, err = bBuf.Write(le32Buf); err != nil {
196234
return nil, err
197235
}
198236

@@ -201,5 +239,11 @@ func (zh *zmqHeader) bytes() ([]byte, error) {
201239
if _, err = bBuf.Write(be32Buf); err != nil {
202240
return nil, err
203241
}
242+
243+
be32Buf = make([]byte, 4)
244+
binary.BigEndian.PutUint32(be32Buf, zh.source_id)
245+
if _, err = bBuf.Write(be32Buf); err != nil {
246+
return nil, err
247+
}
204248
return bBuf.Bytes(), nil
205249
}

0 commit comments

Comments
 (0)