Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion plugins/in_syslog/syslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_syslog, source_address_key),
"Key where the source address will be injected"
},

{
FLB_CONFIG_MAP_STR, "format", (char *) NULL,
0, FLB_TRUE, offsetof(struct flb_syslog, format_str),
"Format of TCP framing: newline (default) or octet_counting (RFC 6587)"
},

/* EOF */
{0}
Expand Down
8 changes: 8 additions & 0 deletions plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
/* 32KB chunk size */
#define FLB_SYSLOG_CHUNK "32768"

/* TCP framing */
#define FLB_SYSLOG_FRAME_NEWLINE 0
#define FLB_SYSLOG_FRAME_OCTET_COUNTING 1

struct syslog_conn;

/* Context / Config*/
Expand Down Expand Up @@ -67,6 +71,10 @@ struct flb_syslog {
flb_sds_t raw_message_key;
flb_sds_t source_address_key;

/* TCP framing */
flb_sds_t format_str;
int frame_type;

int dgram_mode_flag;
int collector_id;
struct mk_event *collector_event;
Expand Down
13 changes: 13 additions & 0 deletions plugins/in_syslog/syslog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ struct flb_syslog *syslog_conf_create(struct flb_input_instance *ins,
ctx->mode = FLB_SYSLOG_UNIX_UDP;
}

/* TCP Frame type (only applies to stream modes; default newline) */
ctx->frame_type = FLB_SYSLOG_FRAME_NEWLINE;
if (ctx->format_str != NULL) {
if (strcasecmp(ctx->format_str, "octet_counting") == 0 ||
strcasecmp(ctx->format_str, "octet") == 0) {
ctx->frame_type = FLB_SYSLOG_FRAME_OCTET_COUNTING;
}
else if (strcasecmp(ctx->format_str, "newline") != 0) {
flb_plg_warn(ins, "[in_syslog] unknown frame '%s', using 'newline'",
ctx->format_str);
}
}

/* Check if TCP mode was requested */
if (ctx->mode == FLB_SYSLOG_TCP || ctx->mode == FLB_SYSLOG_UDP) {
/* Listen interface (if not set, defaults to 0.0.0.0:5140) */
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_syslog/syslog_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ struct syslog_conn *syslog_conn_add(struct flb_connection *connection,
conn->ins = ctx->ins;
conn->buf_len = 0;
conn->buf_parsed = 0;
conn->frame_expected_len = 0;
conn->frame_have_len = 0;

/* Allocate read buffer */
conn->buf_data = flb_malloc(ctx->buffer_chunk_size);
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_syslog/syslog_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ struct syslog_conn {
size_t buf_size; /* Buffer size */
size_t buf_len; /* Buffer length */
size_t buf_parsed; /* Parsed buffer (offset) */
/* Octet-counting framing state */
size_t frame_expected_len; /* remaining message bytes needed */
int frame_have_len; /* 0 = need length, 1 = have length */
struct flb_input_instance *ins; /* Parent plugin instance */
struct flb_syslog *ctx; /* Plugin configuration context */
struct flb_connection *connection;
Expand Down
67 changes: 56 additions & 11 deletions plugins/in_syslog/syslog_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,21 +218,55 @@ int syslog_prot_process(struct syslog_conn *conn)

flb_log_event_encoder_reset(ctx->log_encoder);

/* Always parse while some remaining bytes exists */
/* Always parse while some remaining bytes exist */
while (eof < end) {
/* Lookup the ending byte */
eof = p = conn->buf_data + conn->buf_parsed;
while (*eof != '\n' && *eof != '\0' && eof < end) {
eof++;
if (ctx->frame_type == FLB_SYSLOG_FRAME_NEWLINE) {
/* newline framing (current behavior) */
eof = p = conn->buf_data + conn->buf_parsed;
while (*eof != '\n' && *eof != '\0' && eof < end) {
eof++;
}
/* Incomplete message */
if (eof == end || (*eof != '\n' && *eof != '\0')) {
break;
}
len = (eof - p);
}

/* Incomplete message */
if (eof == end || (*eof != '\n' && *eof != '\0')) {
break;
else {
/* RFC 6587 octet-counting framing: <len> SP <msg> */
p = conn->buf_data + conn->buf_parsed;

if (!conn->frame_have_len) {
char *sp = p;
size_t n = 0;
while (sp < end && *sp >= '0' && *sp <= '9') {
if (n > SIZE_MAX / 10) {
n = SIZE_MAX;
break;
}
n = n * 10 + (size_t)(*sp - '0');
sp++;
}
if (sp == end) {
break;
}
if (*sp != ' ') {
flb_plg_warn(ctx->ins, "invalid octet-counting length");
return -1;
}
conn->buf_parsed += (sp - p) + 1;
conn->frame_expected_len = n;
conn->frame_have_len = 1;
p = conn->buf_data + conn->buf_parsed;
end = conn->buf_data + conn->buf_len;
}
if ((size_t)(end - p) < conn->frame_expected_len) {
break;
}
len = (int)conn->frame_expected_len;
}

/* No data ? */
len = (eof - p);
if (len == 0) {
consume_bytes(conn->buf_data, 1, conn->buf_len);
conn->buf_len--;
Expand Down Expand Up @@ -266,7 +300,18 @@ int syslog_prot_process(struct syslog_conn *conn)
flb_plg_debug(ctx->ins, "unparsed log message: %.*s", len, p);
}

conn->buf_parsed += len + 1;
if (ctx->frame_type == FLB_SYSLOG_FRAME_NEWLINE) {
conn->buf_parsed += len + 1;
}
else {
conn->buf_parsed += len;
conn->frame_expected_len = 0;
conn->frame_have_len = 0;
if (conn->buf_parsed < conn->buf_len &&
conn->buf_data[conn->buf_parsed] == '\n') {
conn->buf_parsed += 1;
}
}
end = conn->buf_data + conn->buf_len;
eof = conn->buf_data + conn->buf_parsed;
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_syslog/syslog_prot.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <fluent-bit/flb_info.h>

#include <stdint.h>

#include "syslog.h"

#define FLB_MAP_EXPAND_SUCCESS 0
Expand Down
Loading
Loading