Skip to content

Commit fdd8174

Browse files
committed
in_tail: fix gzip data loss on resume and update DB schema
Previously, when tailing gzip files, there was no mechanism to persistently store the uncompressed position ('skip_bytes'). This meant that upon restart, the plugin could not correctly locate the reading position, identifying it as a rotation or new file case, potentially leading to data loss. To fix this, 'skip_bytes' is now stored in the database to persist the uncompressed offset. Additionally, 'exclude_bytes' is introduced to track runtime skipping without interfering with the persistent value. The SQLite schema has been updated to include 'anchor_offset' and 'skip_bytes' columns to support these features. Signed-off-by: jinyong.choi <[email protected]>
1 parent 9cc380f commit fdd8174

File tree

6 files changed

+235
-26
lines changed

6 files changed

+235
-26
lines changed

plugins/in_tail/tail_db.c

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
#include "tail_sql.h"
2626
#include "tail_file.h"
2727

28-
struct query_status {
29-
int id;
30-
int rows;
31-
int64_t offset;
32-
};
28+
/* Callback to detect if a query returned any rows */
29+
static int cb_column_exists(void *data, int argc, char **argv, char **cols)
30+
{
31+
int *found = (int *)data;
32+
*found = 1;
33+
return 0;
34+
}
3335

3436
/* Open or create database required by tail plugin */
3537
struct flb_sqldb *flb_tail_db_open(const char *path,
@@ -38,6 +40,7 @@ struct flb_sqldb *flb_tail_db_open(const char *path,
3840
struct flb_config *config)
3941
{
4042
int ret;
43+
int column_found;
4144
char tmp[64];
4245
struct flb_sqldb *db;
4346

@@ -55,6 +58,54 @@ struct flb_sqldb *flb_tail_db_open(const char *path,
5558
return NULL;
5659
}
5760

61+
/* Check if 'skip' column exists (migration for older databases) */
62+
column_found = 0;
63+
ret = flb_sqldb_query(db,
64+
"SELECT 1 FROM pragma_table_info('in_tail_files') "
65+
"WHERE name='skip';",
66+
cb_column_exists, &column_found);
67+
if (ret != FLB_OK) {
68+
flb_plg_error(ctx->ins, "db: could not query table info for 'skip' column");
69+
flb_sqldb_close(db);
70+
return NULL;
71+
}
72+
if (column_found == 0) {
73+
flb_plg_debug(ctx->ins, "db: migrating database, adding 'skip' column");
74+
ret = flb_sqldb_query(db,
75+
"ALTER TABLE in_tail_files "
76+
"ADD COLUMN skip INTEGER DEFAULT 0;",
77+
NULL, NULL);
78+
if (ret != FLB_OK) {
79+
flb_plg_error(ctx->ins, "db: could not add 'skip' column");
80+
flb_sqldb_close(db);
81+
return NULL;
82+
}
83+
}
84+
85+
/* Check if 'anchor' column exists (migration for older databases) */
86+
column_found = 0;
87+
ret = flb_sqldb_query(db,
88+
"SELECT 1 FROM pragma_table_info('in_tail_files') "
89+
"WHERE name='anchor';",
90+
cb_column_exists, &column_found);
91+
if (ret != FLB_OK) {
92+
flb_plg_error(ctx->ins, "db: could not query table info for 'anchor' column");
93+
flb_sqldb_close(db);
94+
return NULL;
95+
}
96+
if (column_found == 0) {
97+
flb_plg_debug(ctx->ins, "db: migrating database, adding 'anchor' column");
98+
ret = flb_sqldb_query(db,
99+
"ALTER TABLE in_tail_files "
100+
"ADD COLUMN anchor INTEGER DEFAULT 0;",
101+
NULL, NULL);
102+
if (ret != FLB_OK) {
103+
flb_plg_error(ctx->ins, "db: could not add 'anchor' column");
104+
flb_sqldb_close(db);
105+
return NULL;
106+
}
107+
}
108+
58109
if (ctx->db_sync >= 0) {
59110
snprintf(tmp, sizeof(tmp) - 1, SQL_PRAGMA_SYNC,
60111
ctx->db_sync);
@@ -130,7 +181,8 @@ static int flb_tail_db_file_delete_by_id(struct flb_tail_config *ctx,
130181
*/
131182
static int db_file_exists(struct flb_tail_file *file,
132183
struct flb_tail_config *ctx,
133-
uint64_t *id, uint64_t *inode, off_t *offset)
184+
uint64_t *id, uint64_t *inode,
185+
int64_t *offset, uint64_t *skip, int64_t *anchor)
134186
{
135187
int ret;
136188
int exists = FLB_FALSE;
@@ -149,6 +201,8 @@ static int db_file_exists(struct flb_tail_file *file,
149201
/* name: column 1 */
150202
name = sqlite3_column_text(ctx->stmt_get_file, 1);
151203
if (ctx->compare_filename && name == NULL) {
204+
sqlite3_clear_bindings(ctx->stmt_get_file);
205+
sqlite3_reset(ctx->stmt_get_file);
152206
flb_plg_error(ctx->ins, "db: error getting name: id=%"PRIu64, *id);
153207
return -1;
154208
}
@@ -159,12 +213,18 @@ static int db_file_exists(struct flb_tail_file *file,
159213
/* inode: column 3 */
160214
*inode = sqlite3_column_int64(ctx->stmt_get_file, 3);
161215

216+
/* skip: column 6 */
217+
*skip = sqlite3_column_int64(ctx->stmt_get_file, 6);
218+
219+
/* anchor: column 7 */
220+
*anchor = sqlite3_column_int64(ctx->stmt_get_file, 7);
221+
162222
/* Checking if the file's name and inode match exactly */
163223
if (ctx->compare_filename) {
164224
if (flb_tail_target_file_name_cmp((char *) name, file) != 0) {
165225
exists = FLB_FALSE;
166226
flb_plg_debug(ctx->ins, "db: exists stale file from database:"
167-
" id=%"PRIu64" inode=%"PRIu64" offset=%"PRIu64
227+
" id=%"PRIu64" inode=%"PRIu64" offset=%"PRId64
168228
" name=%s file_inode=%"PRIu64" file_name=%s",
169229
*id, *inode, *offset, name, file->inode,
170230
file->name);
@@ -199,6 +259,8 @@ static int db_file_insert(struct flb_tail_file *file, struct flb_tail_config *ct
199259
sqlite3_bind_int64(ctx->stmt_insert_file, 2, file->offset);
200260
sqlite3_bind_int64(ctx->stmt_insert_file, 3, file->inode);
201261
sqlite3_bind_int64(ctx->stmt_insert_file, 4, created);
262+
sqlite3_bind_int64(ctx->stmt_insert_file, 5, file->skip_bytes);
263+
sqlite3_bind_int64(ctx->stmt_insert_file, 6, file->anchor_offset);
202264

203265
/* Run the insert */
204266
ret = sqlite3_step(ctx->stmt_insert_file);
@@ -258,11 +320,13 @@ int flb_tail_db_file_set(struct flb_tail_file *file,
258320
{
259321
int ret;
260322
uint64_t id = 0;
261-
off_t offset = 0;
323+
int64_t offset = 0;
324+
uint64_t skip = 0;
325+
int64_t anchor = 0;
262326
uint64_t inode = 0;
263327

264328
/* Check if the file exists */
265-
ret = db_file_exists(file, ctx, &id, &inode, &offset);
329+
ret = db_file_exists(file, ctx, &id, &inode, &offset, &skip, &anchor);
266330
if (ret == -1) {
267331
flb_plg_error(ctx->ins, "cannot execute query to check inode: %" PRIu64,
268332
file->inode);
@@ -281,6 +345,18 @@ int flb_tail_db_file_set(struct flb_tail_file *file,
281345
else {
282346
file->db_id = id;
283347
file->offset = offset;
348+
file->skip_bytes = skip;
349+
file->anchor_offset = anchor;
350+
351+
/* Initialize skipping mode if needed */
352+
if (file->skip_bytes > 0) {
353+
file->exclude_bytes = file->skip_bytes;
354+
file->skipping_mode = FLB_TRUE;
355+
}
356+
else {
357+
file->exclude_bytes = 0;
358+
file->skipping_mode = FLB_FALSE;
359+
}
284360
}
285361

286362
return 0;
@@ -294,7 +370,9 @@ int flb_tail_db_file_offset(struct flb_tail_file *file,
294370

295371
/* Bind parameters */
296372
sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset);
297-
sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id);
373+
sqlite3_bind_int64(ctx->stmt_offset, 2, file->skip_bytes);
374+
sqlite3_bind_int64(ctx->stmt_offset, 3, file->anchor_offset);
375+
sqlite3_bind_int64(ctx->stmt_offset, 4, file->db_id);
298376

299377
ret = sqlite3_step(ctx->stmt_offset);
300378

0 commit comments

Comments
 (0)