Skip to content
Draft
1 change: 1 addition & 0 deletions include/fluent-bit/flb_compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#define FLB_COMPRESSION_ALGORITHM_NONE 0
#define FLB_COMPRESSION_ALGORITHM_GZIP 1
#define FLB_COMPRESSION_ALGORITHM_ZSTD 2

#define FLB_DECOMPRESSOR_STATE_FAILED -1
#define FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER 0
Expand Down
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_zstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@

#include <fluent-bit/flb_info.h>
#include <zstd.h>
#include <zstd_errors.h>

struct flb_decompression_context;

size_t flb_zstd_compress(void *in_data, size_t in_len, void **out_data, size_t *out_len);
size_t flb_zstd_uncompress(void *in_data, size_t in_len, void **out_data, size_t *out_len);

int flb_zstd_decompressor_dispatch(struct flb_decompression_context *context,
void *output_buffer,
size_t *output_length);
void *flb_zstd_decompression_context_create(void);
void flb_zstd_decompression_context_destroy(void *context);
#endif
8 changes: 8 additions & 0 deletions plugins/in_forward/fw_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_
conn->buf_size = ctx->buffer_chunk_size;
conn->in = ctx->ins;

conn->compression_type = FLB_COMPRESSION_ALGORITHM_NONE;
conn->d_ctx = NULL;

/* Register instance into the event loop */
ret = mk_event_add(flb_engine_evl_get(),
connection->fd,
Expand Down Expand Up @@ -219,6 +222,11 @@ int fw_conn_del(struct fw_conn *conn)
/* Release resources */
mk_list_del(&conn->_head);

/* Release decompression context if it exists */
if (conn->d_ctx) {
flb_decompression_context_destroy(conn->d_ctx);
}

if (conn->helo != NULL) {
if (conn->helo->nonce != NULL) {
flb_sds_destroy(conn->helo->nonce);
Expand Down
6 changes: 6 additions & 0 deletions plugins/in_forward/fw_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#ifndef FLB_IN_FW_CONN_H
#define FLB_IN_FW_CONN_H

#include <fluent-bit/flb_compression.h>

#define FLB_IN_FW_CHUNK_SIZE "1024000" /* 1MB */
#define FLB_IN_FW_CHUNK_MAX_SIZE "6144000" /* =FLB_IN_FW_CHUNK_SIZE * 6. 6MB */
#define FLB_IN_FW_NONCE_SIZE 16
Expand Down Expand Up @@ -48,6 +50,10 @@ struct fw_conn {
int buf_size; /* Buffer size */
size_t rest; /* Unpacking offset */

/* Decompression context */
int compression_type; /* e.g., FLB_COMPRESSION_ALGORITHM_GZIP */
struct flb_decompression_context *d_ctx; /* Stateful decompressor context */

struct flb_in_fw_helo *helo; /* secure forward HELO phase */

struct flb_input_instance *in; /* Parent plugin instance */
Expand Down
183 changes: 96 additions & 87 deletions plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,50 +90,33 @@ static int get_chunk_event_type(struct flb_input_instance *ins, msgpack_object o
return type;
}

static int is_gzip_compressed(msgpack_object options)
static int get_compression_type(msgpack_object options)
{
int i;
msgpack_object k;
msgpack_object v;
msgpack_object k, v;

if (options.type != MSGPACK_OBJECT_MAP) {
return -1;
}


for (i = 0; i < options.via.map.size; i++) {
k = options.via.map.ptr[i].key;
v = options.via.map.ptr[i].val;

if (k.type != MSGPACK_OBJECT_STR) {
return -1;
}

if (k.via.str.size != 10) {
continue;
}

if (strncmp(k.via.str.ptr, "compressed", 10) == 0) {
if (v.type != MSGPACK_OBJECT_STR) {
return -1;
}

if (v.via.str.size != 4) {
return -1;
}

if (strncmp(v.via.str.ptr, "gzip", 4) == 0) {
return FLB_TRUE;
}
else if (strncmp(v.via.str.ptr, "text", 4) == 0) {
return FLB_FALSE;
if (k.type == MSGPACK_OBJECT_STR && k.via.str.size == 10 &&
strncmp(k.via.str.ptr, "compressed", 10) == 0) {
if (v.type == MSGPACK_OBJECT_STR) {
if (v.via.str.size == 4 && strncmp(v.via.str.ptr, "gzip", 4) == 0) {
return FLB_COMPRESSION_ALGORITHM_GZIP;
}
if (v.via.str.size == 4 && strncmp(v.via.str.ptr, "zstd", 4) == 0) {
return FLB_COMPRESSION_ALGORITHM_ZSTD;
}
}

return -1;
}
}

return FLB_FALSE;
return FLB_COMPRESSION_ALGORITHM_NONE;
}

static inline void print_msgpack_error_code(struct flb_input_instance *in,
Expand Down Expand Up @@ -1269,6 +1252,8 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
msgpack_unpacked result;
msgpack_unpacker *unp;
size_t all_used = 0;
const char *payload_data = NULL;
size_t payload_len = 0;
struct flb_in_fw_config *ctx = conn->ctx;

/*
Expand Down Expand Up @@ -1524,91 +1509,115 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
}

if (data) {
ret = is_gzip_compressed(root.via.array.ptr[2]);
if (ret == -1) {
flb_plg_error(ctx->ins, "invalid 'compressed' option");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
/* Get event type early for use in both compressed/uncompressed paths */
event_type = FLB_EVENT_TYPE_LOGS;
if (contain_options) {
ret = get_chunk_event_type(ins, root.via.array.ptr[2]);
if (ret == -1) {
flb_plg_error(ctx->ins, "invalid chunk event type");
msgpack_unpacked_destroy(&result);
flb_sds_destroy(out_tag);
msgpack_unpacker_free(unp);
return -1;
}
event_type = ret;
}

if (ret == FLB_TRUE) {
size_t remaining = len;

while (remaining > 0) {
ret = flb_gzip_uncompress_multi((void *) (data + (len - remaining)), remaining,
&gz_data, &gz_size, &remaining);

if (ret == -1) {
flb_plg_error(ctx->ins, "gzip uncompress failure");
/* Initialize decompressor on first compressed chunk */
if (conn->d_ctx == NULL && contain_options) {
int type = get_compression_type(root.via.array.ptr[2]);
if (type > 0) {
conn->compression_type = type;
conn->d_ctx = flb_decompression_context_create(
conn->compression_type,
FLB_DECOMPRESSION_BUFFER_SIZE);
if (!conn->d_ctx) {
flb_plg_error(ctx->ins, "failed to create decompression context");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
msgpack_unpacker_free(unp);
return -1;
}
}
}

if (conn->compression_type != FLB_COMPRESSION_ALGORITHM_NONE) {
char *decomp_buf = NULL;
uint8_t *append_ptr;
size_t available_space;
size_t decomp_len;
int decomp_ret;
size_t required_size;

available_space = flb_decompression_context_get_available_space(conn->d_ctx);
if (len > available_space) {
required_size = conn->d_ctx->input_buffer_length + len;
if (flb_decompression_context_resize_buffer(conn->d_ctx, required_size) != 0) {
flb_plg_error(ctx->ins, "cannot resize decompression buffer");
return -1;
}
}
append_ptr = flb_decompression_context_get_append_buffer(conn->d_ctx);
memcpy(append_ptr, data, len);
conn->d_ctx->input_buffer_length += len;

event_type = FLB_EVENT_TYPE_LOGS;
if (contain_options) {
ret = get_chunk_event_type(ins, root.via.array.ptr[2]);
if (ret == -1) {
decomp_buf = flb_malloc(ctx->buffer_chunk_size);
if (!decomp_buf) {
flb_errno();
return -1;
}

do {
decomp_len = ctx->buffer_chunk_size;
decomp_ret = flb_decompress(conn->d_ctx, decomp_buf, &decomp_len);

if (decomp_ret == FLB_DECOMPRESSOR_FAILURE) {
if (decomp_len > 0) {
flb_plg_error(ctx->ins, "decompression failed, data may be corrupt");
flb_free(decomp_buf);
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
flb_free(gz_data);
return -1;
}
event_type = ret;
break;
}

ret = append_log(ins, conn,
event_type,
out_tag, gz_data, gz_size);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
flb_free(gz_data);
return -1;
if (decomp_len > 0) {
if (append_log(ins, conn, event_type, out_tag, decomp_buf, decomp_len) == -1) {
flb_free(decomp_buf);
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}
}
flb_free(gz_data);
}
} while (decomp_len > 0);

flb_free(decomp_buf);

flb_decompression_context_destroy(conn->d_ctx);
conn->d_ctx = NULL;
conn->compression_type = FLB_COMPRESSION_ALGORITHM_NONE;
}
else {
event_type = FLB_EVENT_TYPE_LOGS;
if (contain_options) {
ret = get_chunk_event_type(ins, root.via.array.ptr[2]);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
return -1;
}
event_type = ret;
}

ret = append_log(ins, conn,
event_type,
out_tag, data, len);
if (ret == -1) {
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);
if (append_log(ins, conn, event_type, out_tag, data, len) == -1) {
return -1;
}
}
}

/* Handle ACK response */
if (chunk_id != -1) {
chunk = root.via.array.ptr[2].via.map.ptr[chunk_id].val;
send_ack(ctx->ins, conn, chunk);
}
/* Handle ACK response (common to all paths) */
if (chunk_id != -1) {
chunk = root.via.array.ptr[2].via.map.ptr[chunk_id].val;
send_ack(ctx->ins, conn, chunk);
}
}
else {
flb_plg_warn(ctx->ins, "invalid data format, type=%i",
entry.type);
msgpack_unpacked_destroy(&result);
flb_sds_destroy(out_tag);
msgpack_unpacker_free(unp);
return -1;
}
Expand Down
26 changes: 23 additions & 3 deletions src/flb_compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_compression.h>

static size_t flb_decompression_context_get_read_buffer_offset(
Expand Down Expand Up @@ -131,7 +132,12 @@ void flb_decompression_context_destroy(struct flb_decompression_context *context
}

if (context->inner_context != NULL) {
flb_gzip_decompression_context_destroy(context->inner_context);
if (context->algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
flb_gzip_decompression_context_destroy(context->inner_context);
}
else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
flb_zstd_decompression_context_destroy(context->inner_context);
}

context->inner_context = NULL;
}
Expand Down Expand Up @@ -178,6 +184,9 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
context->inner_context = flb_gzip_decompression_context_create();
}
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
context->inner_context = flb_zstd_decompression_context_create();
}
else {
flb_error("invalid compression algorithm : %d", algorithm);

Expand All @@ -197,9 +206,14 @@ struct flb_decompression_context *flb_decompression_context_create(int algorithm
}

context->input_buffer_size = input_buffer_size;
context->read_buffer = context->read_buffer;
context->read_buffer = context->input_buffer;
context->algorithm = algorithm;
context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER;
if (algorithm == FLB_COMPRESSION_ALGORITHM_GZIP) {
context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_HEADER;
}
else if (algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
context->state = FLB_DECOMPRESSOR_STATE_EXPECTING_BODY;
}

return context;
}
Expand All @@ -215,6 +229,12 @@ int flb_decompress(struct flb_decompression_context *context,
output_length);

}
else if (context->algorithm == FLB_COMPRESSION_ALGORITHM_ZSTD) {
return flb_zstd_decompressor_dispatch(context,
output_buffer,
output_length);

}
}

return FLB_DECOMPRESSOR_FAILURE;
Expand Down
8 changes: 5 additions & 3 deletions src/flb_gzip.c
Original file line number Diff line number Diff line change
Expand Up @@ -690,9 +690,11 @@ static int flb_gzip_decompressor_process_header(

/* Minimal length: header + crc32 */
if (context->input_buffer_length < FLB_GZIP_HEADER_SIZE) {
flb_error("[gzip] unexpected content length");

return FLB_DECOMPRESSOR_FAILURE;
/*
* This is not a fatal error; it's the expected condition when waiting
* for more data. Return INSUFFICIENT_DATA without logging an error.
*/
return FLB_DECOMPRESSOR_INSUFFICIENT_DATA;
}

memcpy(&inner_context->gzip_header,
Expand Down
Loading
Loading