Skip to content
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_ring_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@ int flb_ring_buffer_add_event_loop(struct flb_ring_buffer *rb, void *evl, uint8_

int flb_ring_buffer_write(struct flb_ring_buffer *rb, void *ptr, size_t size);
int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size);
int flb_ring_buffer_peek(struct flb_ring_buffer *rb, int skip_count, void *ptr, size_t size);
int flb_ring_buffer_skip(struct flb_ring_buffer *rb, size_t size);

#endif
16 changes: 16 additions & 0 deletions plugins/filter_rewrite_tag/rewrite_tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static int emitter_create(struct flb_rewrite_tag *ctx)
{
int ret;
struct flb_input_instance *ins;
char ring_buffer_size[42];

ret = flb_input_name_exists(ctx->emitter_name, ctx->config);
if (ret == FLB_TRUE) {
Expand Down Expand Up @@ -73,6 +74,16 @@ static int emitter_create(struct flb_rewrite_tag *ctx)
flb_plg_error(ctx->ins, "cannot set storage.type");
}

/* Set ring_buffer_size */
if (ctx->emitter_ring_buffer_size > 0) {
snprintf(ring_buffer_size, sizeof(ring_buffer_size)-1, "%zd",
ctx->emitter_ring_buffer_size);
ret = flb_input_set_property(ins, "ring_buffer_size", ring_buffer_size);
if (ret == -1) {
flb_plg_error(ins, "cannot set ring buffer size");
}
}

/* Initialize emitter plugin */
ret = flb_input_instance_init(ins, ctx->config);
if (ret == -1) {
Expand Down Expand Up @@ -602,6 +613,11 @@ static struct flb_config_map config_map[] = {
FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_storage_type),
NULL
},
{
FLB_CONFIG_MAP_SIZE, "emitter_ring_buffer_size", "0",
FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_ring_buffer_size),
"set the emitter ring buffer size, must be set to > 0 to enable it"
},
{
FLB_CONFIG_MAP_SIZE, "emitter_mem_buf_limit", FLB_RTAG_MEM_BUF_LIMIT_DEFAULT,
FLB_FALSE, FLB_TRUE, offsetof(struct flb_rewrite_tag, emitter_mem_buf_limit),
Expand Down
2 changes: 1 addition & 1 deletion plugins/filter_rewrite_tag/rewrite_tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ struct flb_rewrite_tag {
flb_sds_t emitter_name; /* emitter input plugin name */
flb_sds_t emitter_storage_type; /* emitter storage type */
size_t emitter_mem_buf_limit; /* Emitter buffer limit */
size_t emitter_ring_buffer_size; /* ring buffer size of the emitter */
struct mk_list rules; /* processed rules */
struct mk_list *cm_rules; /* config_map rules (only strings) */
struct flb_input_instance *ins_emitter; /* emitter input plugin instance */
struct flb_filter_instance *ins; /* self-filter instance */
struct flb_config *config; /* Fluent Bit context */

#ifdef FLB_HAVE_METRICS
struct cmt_counter *cmt_emitted;
#endif
Expand Down
6 changes: 5 additions & 1 deletion plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,16 @@ static int in_emitter_ingest_ring_buffer(struct flb_input_instance *in,
(void) in;


while ((ret = flb_ring_buffer_read(ctx->msgs, (void *)&ec,
while ((ret = flb_ring_buffer_peek(ctx->msgs, 0, (void *)&ec,
sizeof(struct em_chunk))) == 0) {
ret = flb_input_log_append(in,
ec.tag, flb_sds_len(ec.tag),
ec.mp_sbuf.data,
ec.mp_sbuf.size);
if (ret < 0) {
return ret;
}
flb_ring_buffer_skip(ctx->msgs, sizeof(struct em_chunk));
flb_sds_destroy(ec.tag);
msgpack_sbuffer_destroy(&ec.mp_sbuf);
}
Expand Down
22 changes: 22 additions & 0 deletions src/flb_ring_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,26 @@ int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size)
return 0;
}

int flb_ring_buffer_peek(struct flb_ring_buffer *rb, int skip_count, void *ptr, size_t size)
{
size_t ret;

ret = lwrb_peek(rb->ctx, skip_count, ptr, size);
if (ret == 0) {
return -1;
}

return 0;
}

int flb_ring_buffer_skip(struct flb_ring_buffer *rb, size_t size)
{
size_t ret;

ret = lwrb_skip(rb->ctx, size);
if (ret == 0) {
return -1;
}

return 0;
}
50 changes: 50 additions & 0 deletions tests/internal/ring_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,56 @@ static void test_smart_flush()
flb_bucket_queue_destroy(bktq);
mk_event_loop_destroy(evl);
}

void test_peek_seek()
{
int i;
int ret;
int elements;
struct check *c;
struct check *tmp;
struct flb_ring_buffer *rb;

elements = sizeof(checks) / sizeof(struct check);

rb = flb_ring_buffer_create(sizeof(struct check *) * elements);
TEST_CHECK(rb != NULL);
if (!rb) {
exit(EXIT_FAILURE);
}

for (i = 0; i < elements; i++) {
c = &checks[i];
ret = flb_ring_buffer_write(rb, (void *) &c, sizeof(c));
TEST_CHECK(ret == 0);
}

/* try to write another record, it must fail */
tmp = c;
ret = flb_ring_buffer_write(rb, (void *) &tmp, sizeof(tmp));
TEST_CHECK(ret == -1);

c = NULL;

/* consume one entry */
ret = flb_ring_buffer_peek(rb, 0, (void *) &c, sizeof(c));
TEST_CHECK(ret == 0);

/* the consumed entry must be equal to the first one */
c = &checks[0];
TEST_CHECK(strcmp(c->buf_a, "a1") == 0 && strcmp(c->buf_b, "a2") ==0);

/* consume one entry */
ret = flb_ring_buffer_peek(rb, 0, (void *) &c, sizeof(c));
TEST_CHECK(ret == 0);

/* the consumed entry must be equal to the first one */
c = &checks[0];
TEST_CHECK(strcmp(c->buf_a, "a1") == 0 && strcmp(c->buf_b, "a2") ==0);

flb_ring_buffer_destroy(rb);
}

TEST_LIST = {
{ "basic", test_basic},
{ "smart_flush", test_smart_flush},
Expand Down
56 changes: 56 additions & 0 deletions tests/runtime/filter_rewrite_tag.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,61 @@ static void flb_test_matched()
filter_test_destroy(ctx);
}

/*
* Original tag: rewrite
* Rewritten tag: updated
*/
static void flb_test_ring_buffer()
{
struct flb_lib_out_cb cb_data;
struct filter_test *ctx;
int ret;
int not_used = 0;
int bytes;
int got;
char *p = "[0, {\"key\":\"rewrite\"}]";

/* Prepare output callback with expected result */
cb_data.cb = cb_count_msgpack;
cb_data.data = &not_used;

/* Create test context */
ctx = filter_test_create((void *) &cb_data);
if (!ctx) {
exit(EXIT_FAILURE);
}
clear_output_num();
/* Configure filter */
ret = flb_filter_set(ctx->flb, ctx->f_ffd,
"emitter_ring_buffer_size", "128",
"Rule", "$key ^(rewrite)$ updated false",
NULL);
TEST_CHECK(ret == 0);

/* Configure output */
ret = flb_output_set(ctx->flb, ctx->o_ffd,
"Match", "updated",
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

/* ingest record */
bytes = flb_lib_push(ctx->flb, ctx->i_ffd, p, strlen(p));
TEST_CHECK(bytes == strlen(p));

flb_time_msleep(1500); /* waiting flush */
got = get_output_num();

if (!TEST_CHECK(got != 0)) {
TEST_MSG("expect: %d got: %d", 1, got);
}

filter_test_destroy(ctx);
}

/*
* Original tag: rewrite
* Rewritten tag: updated
Expand Down Expand Up @@ -556,6 +611,7 @@ TEST_LIST = {
{"matched", flb_test_matched},
{"not_matched", flb_test_not_matched},
{"keep_true", flb_test_keep_true},
{"ring_buffer", flb_test_ring_buffer},
{"heavy_input_pause_emitter", flb_test_heavy_input_pause_emitter},
{"issue_4518", flb_test_issue_4518},
{"issue_4793", flb_test_issue_4793},
Expand Down
Loading