Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/TODO-AUTOMATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ This repository uses git hooks to automatically sync between `TODO.md` and GitHu
Use these keywords in your commit messages to automatically manage issues:

**Close an issue:**

```bash
git commit -m "Fixes #3: Add exponential backoff"
git commit -m "Closes #5: Fixed memory leak"
git commit -m "Resolves #7: Implemented dynamic table creation"
```

**Reference an issue (adds a comment):**

```bash
git commit -m "Updates #6: Added initial rolling window support"
git commit -m "Refs #8: Investigating streaming insert API"
Expand Down Expand Up @@ -43,6 +45,7 @@ You can also manually update `TODO.md`:
```

Then commit:

```bash
git add TODO.md
git commit -m "Update TODO: mark tasks complete"
Expand Down Expand Up @@ -82,6 +85,7 @@ Added configurable retry strategy with exponential backoff and jitter.
```

**Result:**

- ✅ TODO.md updated: `- [x] **Add exponential backoff...** [#3]`
- 💬 Issue #3 gets a comment with commit details
- 🏷️ Issue #3 ready to be closed manually (or via PR)
Expand All @@ -98,6 +102,7 @@ Still TODO:
```

**Result:**

- 💬 Issue #6 gets a progress update comment
- 📋 TODO.md stays as `[ ]` (not completed yet)

Expand All @@ -111,6 +116,7 @@ See issue discussions for details."
```

**Result:**

- 💬 Both issues get update comments
- 📋 Both stay incomplete in TODO.md

Expand All @@ -136,18 +142,21 @@ GitHub will automatically close these issues when the PR is merged.
## Troubleshooting

**Hooks not running?**

```bash
ls -la .husky/
npx husky install
```

**GitHub CLI not authenticated?**

```bash
gh auth status
gh auth login
```

**TODO.md not updating?**

- Check that issue numbers match exactly
- Verify `sed` is available (standard on macOS/Linux)
- Manual fallback: Edit TODO.md directly
Expand Down
38 changes: 37 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ npx maritime-data-synthesizer initialize realistic
- `clear` - Clear all data (keeps schema)
- `reset N` - Delete and reload with N days of data

**Note:** Multi-table mode (current default config) supports `initialize` command. For continuous generation with `start`, use single-table config format.
**Note:** Both single-table and multi-table modes now support all commands including `start` for continuous generation with rolling windows.

**Documentation:**

Expand Down Expand Up @@ -184,6 +184,42 @@ If you need to combine data from multiple BigQuery tables, sync them to separate

See `config.multi-table.yaml` for a complete example.

### Retry Configuration

The plugin implements exponential backoff with jitter for transient BigQuery errors (rate limits, quota exceeded, temporary service unavailability):

```yaml
bigquery:
projectId: your-project
credentials: service-account-key.json
location: US

# Optional retry configuration (defaults shown)
maxRetries: 5 # Maximum number of retry attempts
initialRetryDelay: 1000 # Initial delay in milliseconds (doubles each retry)

tables:
- id: vessel_positions
# ... table configuration
```

**Retry Behavior:**

- **Retryable errors**: Rate limits, quota exceeded, internal errors, service unavailable (503), too many requests (429)
- **Non-retryable errors**: Invalid queries, permission errors, schema mismatches - fail immediately
- **Backoff strategy**: Initial delay × 2^attempt with random jitter, capped at 30 seconds
- **Logging**: Warnings on retry attempts, errors on final failure with detailed error information

**Example backoff delays** (with jitter):

- Attempt 1: 1000-2000ms
- Attempt 2: 2000-4000ms
- Attempt 3: 4000-8000ms
- Attempt 4: 8000-16000ms
- Attempt 5: 16000-30000ms (capped)

This prevents overwhelming BigQuery API during transient issues while quickly failing on permanent errors.

### Data Storage

BigQuery records are stored as-is at the top level:
Expand Down
6 changes: 3 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Consolidated task list for Harper BigQuery Sync plugin and Maritime Data Synthes

### Production Readiness

- [ ] **Add exponential backoff for transient BigQuery errors** [#3](https://github.com/HarperFast/harper-bigquery-sync/issues/3)
- [x] **Add exponential backoff for transient BigQuery errors** [#3](https://github.com/HarperFast/harper-bigquery-sync/issues/3)
- Currently errors are retried with simple logic
- Need exponential backoff strategy for transient failures
- Prevents overwhelming BigQuery API during issues
Expand All @@ -19,7 +19,7 @@ Consolidated task list for Harper BigQuery Sync plugin and Maritime Data Synthes

### Code Quality

- [ ] **Fix memory leak in journey tracking** [#5](https://github.com/HarperFast/harper-bigquery-sync/issues/5)
- [x] **Fix memory leak in journey tracking** [#5](https://github.com/HarperFast/harper-bigquery-sync/issues/5)
- Memory leak in `src/generator.js` journey tracking system
- Blocks re-enabling certain tests
- Related to vessel position generation
Expand All @@ -28,7 +28,7 @@ Consolidated task list for Harper BigQuery Sync plugin and Maritime Data Synthes

### Feature Enhancements

- [ ] **Multi-table rolling window support** [#6](https://github.com/HarperFast/harper-bigquery-sync/issues/6)
- [x] **Multi-table rolling window support** [#6](https://github.com/HarperFast/harper-bigquery-sync/issues/6)
- Currently multi-table orchestrator only supports `initialize` command
- Add `start` command for continuous generation with rolling window
- Add `backfill` capability
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
"README.md"
],
"scripts": {
"test": "node --test test/config-loader.test.js test/sync-engine.test.js",
"test:coverage": "node --test --experimental-test-coverage test/config-loader.test.js test/sync-engine.test.js",
"test": "node --test test/config-loader.test.js test/sync-engine.test.js test/bigquery-client.test.js test/generator.test.js test/multi-table-orchestrator.test.js",
"test:coverage": "node --test --experimental-test-coverage test/config-loader.test.js test/sync-engine.test.js test/bigquery-client.test.js test/generator.test.js test/multi-table-orchestrator.test.js",
"test:config": "node examples/test-bigquery-config.js",
"lint": "eslint .",
"format": "prettier .",
Expand Down
131 changes: 112 additions & 19 deletions src/bigquery-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ export class BigQueryClient {
this.timestampColumn = config.bigquery.timestampColumn;
this.columns = config.bigquery.columns || ['*'];

// Retry configuration with exponential backoff
this.maxRetries = config.bigquery.maxRetries || 5;
this.initialRetryDelay = config.bigquery.initialRetryDelay || 1000; // 1 second

// Initialize query builder with column selection
this.queryBuilder = new QueryBuilder({
dataset: this.dataset,
Expand All @@ -49,6 +53,9 @@ export class BigQueryClient {
});

logger.info(`[BigQueryClient] Client initialized successfully with columns: ${this.queryBuilder.getColumnList()}`);
logger.info(
`[BigQueryClient] Retry configuration - maxRetries: ${this.maxRetries}, initialDelay: ${this.initialRetryDelay}ms`
);
}

/**
Expand All @@ -63,6 +70,99 @@ export class BigQueryClient {
return Object.fromEntries(resolvedEntries);
}

/**
* Determines if a BigQuery error is transient and should be retried
* @param {Error} error - The error to check
* @returns {boolean} True if the error is retryable
* @private
*/
isRetryableError(error) {
if (!error) return false;

// Check error code for retryable conditions
const retryableCodes = [
'rateLimitExceeded',
'quotaExceeded',
'internalError',
'backendError',
'serviceUnavailable',
'timeout',
503, // Service Unavailable
429, // Too Many Requests
];

// Check error.code
if (error.code && retryableCodes.includes(error.code)) {
return true;
}

// Check nested errors array (BigQuery specific)
if (error.errors && Array.isArray(error.errors)) {
return error.errors.some((e) => e.reason && retryableCodes.includes(e.reason));
}

// Check HTTP status code
if (error.response && retryableCodes.includes(error.response.status)) {
return true;
}

return false;
}

/**
* Executes a query with exponential backoff retry logic
* @param {Function} queryFn - Async function that executes the query
* @param {string} operation - Name of the operation (for logging)
* @returns {Promise<*>} Query result
* @private
*/
async executeWithRetry(queryFn, operation) {
let lastError;

for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
return await queryFn();
} catch (error) {
lastError = error;

// Check if we should retry
const isRetryable = this.isRetryableError(error);
const attemptsRemaining = this.maxRetries - attempt;

if (!isRetryable || attemptsRemaining === 0) {
// Log detailed error information
logger.error(
`[BigQueryClient.${operation}] Query failed (attempt ${attempt + 1}/${this.maxRetries + 1}): ${error.message}`
);
if (error.errors) {
error.errors.forEach((e) => logger.error(` ${e.reason} at ${e.location}: ${e.message}`));
}
throw error;
}

// Calculate backoff delay with jitter
// Exponential backoff: initialDelay * 2^attempt
// Jitter: random value between 0 and calculated delay
const exponentialDelay = this.initialRetryDelay * Math.pow(2, attempt);
const jitter = Math.random() * exponentialDelay;
const delay = Math.min(exponentialDelay + jitter, 30000); // Cap at 30 seconds

logger.warn(
`[BigQueryClient.${operation}] Transient error (attempt ${attempt + 1}/${this.maxRetries + 1}): ${error.message}. Retrying in ${Math.round(delay)}ms...`
);
if (error.errors) {
error.errors.forEach((e) => logger.warn(` ${e.reason}: ${e.message}`));
}

// Wait before retrying
await new Promise((resolve) => setTimeout(resolve, delay));
}
}

// Should never reach here, but just in case
throw lastError;
}

/**
* Pulls a partition of data from BigQuery
* Uses modulo-based partitioning for distributed workload
Expand Down Expand Up @@ -104,7 +204,7 @@ export class BigQueryClient {

logger.trace(`[BigQueryClient.pullPartition] Generated SQL query: ${query}`);

try {
return await this.executeWithRetry(async () => {
logger.debug('[BigQueryClient.pullPartition] Executing BigQuery query...');
const startTime = Date.now();
const [rows] = await this.client.query(options);
Expand All @@ -114,13 +214,7 @@ export class BigQueryClient {
`[BigQueryClient.pullPartition] First row timestamp: ${rows.length > 0 ? Date(rows[0][this.timestampColumn]) : 'N/A'}`
);
return rows;
} catch (error) {
logger.error(`[BigQueryClient.pullPartition] BigQuery query failed: ${error.message}`, error);
if (error.errors) {
error.errors.forEach((e) => logger.error(` ${e.reason} at ${e.location}: ${e.message}`));
}
throw error;
}
}, 'pullPartition');
}

/**
Expand Down Expand Up @@ -177,7 +271,7 @@ export class BigQueryClient {
params: { clusterSize, nodeId },
};

try {
return await this.executeWithRetry(async () => {
logger.debug('[BigQueryClient.countPartition] Executing count query...');
const startTime = Date.now();
const [rows] = await this.client.query(options);
Expand All @@ -187,10 +281,7 @@ export class BigQueryClient {
`[BigQueryClient.countPartition] Count complete - ${count} records in partition (took ${duration}ms)`
);
return count;
} catch (error) {
logger.error(`[BigQueryClient.countPartition] Count query error: ${error.message}`, error);
throw error;
}
}, 'countPartition');
}

/**
Expand Down Expand Up @@ -221,13 +312,15 @@ export class BigQueryClient {
};

try {
logger.debug('[BigQueryClient.verifyRecord] Executing verification query...');
const [rows] = await this.client.query(options);
const exists = rows.length > 0;
logger.debug(`[BigQueryClient.verifyRecord] Record ${exists ? 'EXISTS' : 'NOT FOUND'} in BigQuery`);
return exists;
return await this.executeWithRetry(async () => {
logger.debug('[BigQueryClient.verifyRecord] Executing verification query...');
const [rows] = await this.client.query(options);
const exists = rows.length > 0;
logger.debug(`[BigQueryClient.verifyRecord] Record ${exists ? 'EXISTS' : 'NOT FOUND'} in BigQuery`);
return exists;
}, 'verifyRecord');
} catch (error) {
logger.error(`[BigQueryClient.verifyRecord] Verification error: ${error.message}`, error);
logger.error(`[BigQueryClient.verifyRecord] Verification error after retries: ${error.message}`, error);
return false;
}
}
Expand Down
Loading