diff --git a/config.yaml b/config.yaml index e3d602e..9e10ecb 100644 --- a/config.yaml +++ b/config.yaml @@ -55,6 +55,8 @@ bigquery: initialBatchSize: 5000 catchupBatchSize: 500 steadyBatchSize: 100 + # Optional: Enable streaming inserts for lower latency (has cost implications) + # useStreamingAPIs: true # Vessel metadata updates - id: vessel_metadata @@ -68,6 +70,25 @@ bigquery: catchupBatchSize: 100 steadyBatchSize: 10 +# BigQuery Insert Method Configuration +# Two options available (configurable per table): +# +# 1. Load Job API (default): +# - Free tier compatible (no per-row costs) +# - Higher latency (seconds to minutes) +# - Best for: development, batch workloads, cost-sensitive deployments +# - Set useStreamingAPIs: false (or omit) +# +# 2. Streaming Insert API: +# - Lower latency (sub-second to few seconds) +# - Costs: $0.01 per 200 MB (minimum $0.01/day) +# - Best for: production, real-time dashboards, low-latency requirements +# - Set useStreamingAPIs: true +# +# Example: Enable streaming for time-sensitive events +# - id: port_events +# useStreamingAPIs: true # Real-time events benefit from low latency + # Maritime vessel data synthesizer configuration # When bigquery.tables is present, synthesizer uses multi-table orchestrator # to generate data for ALL tables (vessel_positions, port_events, vessel_metadata). diff --git a/docs/plans/2025-11-14-streaming-insert-api-design.md b/docs/plans/2025-11-14-streaming-insert-api-design.md new file mode 100644 index 0000000..967ae2b --- /dev/null +++ b/docs/plans/2025-11-14-streaming-insert-api-design.md @@ -0,0 +1,398 @@ +# Streaming Insert API Support for BigQuery + +**Date:** 2025-11-14 +**Status:** Planning +**Issue:** [#8](https://github.com/HarperFast/harper-bigquery-sync/issues/8) +**Approach:** Test-Driven Development + +## Overview + +Add opt-in streaming insert API for BigQuery alongside the current load job API. Streaming inserts provide lower latency for real-time use cases but have a different cost model. + +## Current Implementation + +**File:** `src/bigquery.js` +**Method:** `insertBatch(records, maxRetries)` +**API Used:** Load Job API (`table.load()`) + +**Current Behavior:** + +1. Writes records to temporary NDJSON file +2. Uses `table.load()` to upload file +3. Waits for job completion +4. Cleans up temporary file +5. Retry logic with exponential backoff + +**Benefits of Load Job API:** + +- ✅ Free tier compatible (no per-row costs) +- ✅ Efficient for large batches +- ✅ Good for batch workloads + +**Limitations:** + +- ❌ Higher latency (seconds to minutes) +- ❌ Requires temporary file I/O +- ❌ Not suitable for real-time requirements + +## Proposed: Streaming Insert API + +**API:** `table.insert(rows)` method from @google-cloud/bigquery +**When to use:** Real-time use cases, lower latency requirements, production deployments + +**Benefits:** + +- ✅ Lower latency (sub-second to few seconds) +- ✅ No temporary files needed +- ✅ Direct insert to table +- ✅ Simpler code path + +**Tradeoffs:** + +- ❌ Costs money ($.01 per 200 MB, minimum $0.01/day) +- ❌ Row size limits (1 MB per row, 10 MB per request) +- ❌ Best-effort deduplication (not exact-once guarantees) + +## Design + +### Configuration + +Add `useStreamingAPIs` flag to BigQuery config: + +**config.yaml:** + +```yaml +bigquery: + projectId: my-project + dataset: maritime_tracking + table: vessel_positions + timestampColumn: timestamp + credentials: service-account-key.json + location: US + + # Streaming insert API (off by default) + useStreamingAPIs: false # Set to true for lower latency +``` + +**Per-table configuration:** + +```yaml +bigquery: + # ... shared config ... + tables: + - id: vessel_positions + dataset: maritime_tracking + table: vessel_positions + timestampColumn: timestamp + columns: ['*'] + useStreamingAPIs: false # High volume, batch is fine + + - id: port_events + dataset: maritime_tracking + table: port_events + timestampColumn: event_time + columns: ['*'] + useStreamingAPIs: true # Real-time events, use streaming +``` + +### Implementation Strategy + +**Option 1: Single method with conditional logic** + +```javascript +async insertBatch(records, maxRetries = 5) { + if (this.useStreamingAPIs) { + return await this._insertStreaming(records); + } else { + return await this._insertLoadJob(records, maxRetries); + } +} +``` + +**Option 2: Strategy pattern (cleaner, testable)** + +```javascript +class LoadJobStrategy { + async insert(table, records, schema) { + /* current implementation */ + } +} + +class StreamingInsertStrategy { + async insert(table, records) { + /* streaming implementation */ + } +} + +class MaritimeBigQueryClient { + constructor(config) { + this.insertStrategy = config.useStreamingAPIs ? new StreamingInsertStrategy() : new LoadJobStrategy(); + } + + async insertBatch(records, maxRetries) { + return await this.insertStrategy.insert(this.table, records, this.getSchema()); + } +} +``` + +**Recommendation:** Start with Option 1 (simpler), refactor to Option 2 if needed. + +### Streaming Insert Implementation + +```javascript +async _insertStreaming(records) { + if (!records || records.length === 0) { + throw new Error('No records to insert'); + } + + try { + // BigQuery streaming insert API + const response = await this.table.insert(records, { + // Skip invalid rows (log and continue) + skipInvalidRows: false, + // Don't ignore unknown values (fail on schema mismatch) + ignoreUnknownValues: false, + // Template suffix for best-effort deduplication (optional) + // templateSuffix: '_streaming' + }); + + return { + success: true, + recordCount: records.length, + method: 'streaming' + }; + } catch (error) { + // Handle partial failures + if (error.name === 'PartialFailureError') { + console.error('Some rows failed to insert:', error.errors); + + // Log failed rows for debugging + error.errors.forEach((err, index) => { + console.error(`Row ${index} failed:`, err); + }); + + throw new Error(`Partial failure: ${error.errors.length} rows failed`); + } + + throw error; + } +} +``` + +### Error Handling + +**Load Job API errors:** + +- Network timeouts → Retry +- Rate limits (429) → Retry with backoff +- Server errors (5xx) → Retry +- Schema errors → Fail immediately +- Permissions errors → Fail immediately + +**Streaming Insert API errors:** + +- Partial failures → Log failed rows, throw error +- Quota exceeded → Retry with backoff +- Invalid schema → Fail immediately +- Row size too large → Fail immediately (log which row) + +### Retry Logic + +**Load Job API:** Already has retry logic with exponential backoff (up to 5 attempts) + +**Streaming Insert API:** Add retry for transient errors + +```javascript +async _insertStreaming(records, maxRetries = 3) { + let lastError; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + return await this.table.insert(records, options); + } catch (error) { + lastError = error; + + // Check if retryable + const isRetryable = + error.code === 429 || // Quota exceeded + error.code === 503 || // Service unavailable + (error.code >= 500 && error.code < 600); // Server errors + + if (!isRetryable || attempt === maxRetries) { + throw error; + } + + // Exponential backoff: 1s, 2s, 4s + const backoffMs = Math.pow(2, attempt - 1) * 1000; + console.log(`Streaming insert failed (attempt ${attempt}/${maxRetries}): ${error.message}`); + console.log(`Retrying in ${backoffMs / 1000}s...`); + + await new Promise(resolve => setTimeout(resolve, backoffMs)); + } + } + + throw lastError; +} +``` + +## Testing Strategy (TDD) + +### Unit Tests + +**Test file:** `test/bigquery-streaming.test.js` + +```javascript +describe('BigQuery Streaming Inserts', () => { + describe('Configuration', () => { + it('should default to load job API when useStreamingAPIs is false'); + it('should use streaming API when useStreamingAPIs is true'); + it('should validate streaming config at initialization'); + }); + + describe('Streaming insert method', () => { + it('should successfully insert records using streaming API'); + it('should handle empty record array'); + it('should include record count in success response'); + it('should indicate method used (streaming vs load job)'); + }); + + describe('Error handling', () => { + it('should handle partial failures with detailed logging'); + it('should retry on quota exceeded (429)'); + it('should retry on service unavailable (503)'); + it('should not retry on schema errors'); + it('should not retry on invalid row data'); + it('should respect maxRetries limit'); + }); + + describe('Performance', () => { + it('should be faster than load job API for small batches'); + it('should handle row size limits gracefully'); + it('should handle request size limits (10 MB)'); + }); + + describe('Backward compatibility', () => { + it('should use load job API by default (existing behavior)'); + it('should maintain retry logic for load jobs'); + it('should clean up temp files with load jobs'); + }); +}); +``` + +### Integration Tests + +**Test file:** `test/integration/bigquery-streaming.integration.test.js` + +```javascript +describe('BigQuery Streaming Integration', () => { + // Only run if BIGQUERY_INTEGRATION_TESTS=true + before(function () { + if (!process.env.BIGQUERY_INTEGRATION_TESTS) { + this.skip(); + } + }); + + it('should insert records using streaming API against real BigQuery'); + it('should verify records are queryable immediately'); + it('should compare latency: streaming vs load job'); + it('should handle concurrent streaming inserts'); +}); +``` + +## Implementation Checklist + +### Phase 1: Tests (Write First) + +- [ ] Create `test/bigquery-streaming.test.js` +- [ ] Write unit tests for configuration +- [ ] Write unit tests for streaming insert method +- [ ] Write unit tests for error handling +- [ ] Write unit tests for backward compatibility +- [ ] Run tests (should all fail - RED phase) + +### Phase 2: Implementation (Make Tests Pass) + +- [ ] Add `useStreamingAPIs` config option +- [ ] Extract current load job logic to `_insertLoadJob()` method +- [ ] Implement `_insertStreaming()` method +- [ ] Update `insertBatch()` to dispatch based on config +- [ ] Add retry logic for streaming inserts +- [ ] Handle partial failure errors +- [ ] Run tests (should all pass - GREEN phase) + +### Phase 3: Refactoring (Clean Up) + +- [ ] Extract common error handling logic +- [ ] Add JSDoc documentation +- [ ] Consider strategy pattern if code gets complex +- [ ] Optimize retry backoff timing +- [ ] Run tests (should still pass - REFACTOR phase) + +### Phase 4: Documentation + +- [ ] Update config.yaml with examples +- [ ] Update README with streaming insert option +- [ ] Document cost implications +- [ ] Add decision guide: when to use streaming vs load jobs +- [ ] Update multi-table configuration examples + +### Phase 5: Integration Testing + +- [ ] Create integration test with real BigQuery (gated by env var) +- [ ] Test with maritime synthesizer +- [ ] Verify latency improvements +- [ ] Test error scenarios + +## Cost Estimation + +**Streaming Insert Costs (as of 2024):** + +- $0.01 per 200 MB (compressed) +- Minimum $0.01 per day if any streaming inserts used +- No charge for load job API + +**Example:** + +- 144K records/day × 1 KB/record = 144 MB/day +- Cost: $0.01/day = $0.30/month +- Load job API: $0/month (free) + +**Decision Guide:** + +- **Use Load Job API (default):** Development, testing, batch workloads, cost-sensitive +- **Use Streaming API:** Production, real-time dashboards, low-latency requirements + +## Migration Path + +**Existing deployments:** No changes required (defaults to load job API) + +**Enable streaming for a table:** + +```yaml +tables: + - id: vessel_positions + useStreamingAPIs: true # Add this line +``` + +**Test in development:** + +1. Enable streaming for one table +2. Monitor latency and costs +3. Roll out to other tables if beneficial + +## Success Metrics + +- ✅ All tests pass (unit + integration) +- ✅ Backward compatible (default behavior unchanged) +- ✅ Configurable per table +- ✅ Lower latency verified (< 5s vs > 30s for load jobs) +- ✅ Error handling comprehensive +- ✅ Retry logic functional +- ✅ Documentation complete + +## References + +- [BigQuery Streaming Insert API](https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery) +- [BigQuery Load Jobs](https://cloud.google.com/bigquery/docs/loading-data) +- [@google-cloud/bigquery Node.js SDK](https://googleapis.dev/nodejs/bigquery/latest/Table.html#insert) +- [Issue #8](https://github.com/HarperFast/harper-bigquery-sync/issues/8) diff --git a/src/bigquery.js b/src/bigquery.js index 76d0bf1..38beb57 100644 --- a/src/bigquery.js +++ b/src/bigquery.js @@ -16,6 +16,9 @@ class MaritimeBigQueryClient { this.retentionDays = config.retentionDays || parseInt(process.env.RETENTION_DAYS || '30', 10); this.location = config.location || process.env.BIGQUERY_LOCATION || 'US'; + // Streaming insert API option (off by default for backward compatibility) + this.useStreamingAPIs = config.useStreamingAPIs || false; + if (!this.projectId) { throw new Error('projectId must be set in config or GCP_PROJECT_ID environment variable'); } @@ -109,10 +112,94 @@ class MaritimeBigQueryClient { } /** - * Insert batch of records into BigQuery using Load Job (free tier compatible) - * Includes retry logic for transient network errors + * Insert batch of records into BigQuery + * Dispatches to streaming or load job API based on configuration + * @param {Array} records - Records to insert + * @param {number} maxRetries - Maximum retry attempts (default: 5) + * @returns {Promise} - Result with success flag, recordCount, and method */ async insertBatch(records, maxRetries = 5) { + if (this.useStreamingAPIs) { + return await this._insertStreaming(records, maxRetries); + } else { + return await this._insertLoadJob(records, maxRetries); + } + } + + /** + * Insert batch using Streaming Insert API + * Lower latency but has cost implications + * @param {Array} records - Records to insert + * @param {number} maxRetries - Maximum retry attempts (default: 3) + * @returns {Promise} - Result with success flag, recordCount, and method + */ + async _insertStreaming(records, maxRetries = 3) { + if (!records || records.length === 0) { + throw new Error('No records to insert'); + } + + let lastError; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + // BigQuery streaming insert API + await this.table.insert(records, { + skipInvalidRows: false, + ignoreUnknownValues: false, + }); + + // Success + return { + success: true, + recordCount: records.length, + method: 'streaming', + }; + } catch (error) { + lastError = error; + + // Handle partial failures + if (error.name === 'PartialFailureError') { + console.error('Partial failure - some rows failed to insert:', error.errors); + + // Log failed rows for debugging + error.errors.forEach((err, index) => { + console.error(`Row ${index} failed:`, err); + }); + + throw new Error(`Partial failure: ${error.errors.length} rows failed`); + } + + // Check if this is a retryable error + const isRetryable = + error.code === 429 || // Quota exceeded + error.code === 503 || // Service unavailable + (error.code >= 500 && error.code < 600); // Server errors + + if (!isRetryable || attempt === maxRetries) { + throw error; + } + + // Exponential backoff: 1s, 2s, 4s + const backoffMs = Math.pow(2, attempt - 1) * 1000; + console.log(`Streaming insert failed (attempt ${attempt}/${maxRetries}): ${error.message}`); + console.log(`Retrying in ${backoffMs / 1000}s...`); + + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + } + } + + // Should never reach here, but just in case + throw lastError; + } + + /** + * Insert batch using Load Job API (free tier compatible) + * Includes retry logic for transient network errors + * @param {Array} records - Records to insert + * @param {number} maxRetries - Maximum retry attempts (default: 5) + * @returns {Promise} - Result with success flag, recordCount, and method + */ + async _insertLoadJob(records, maxRetries = 5) { if (!records || records.length === 0) { throw new Error('No records to insert'); } @@ -146,6 +233,7 @@ class MaritimeBigQueryClient { return { success: true, recordCount: records.length, + method: 'load_job', }; } catch (loadError) { lastError = loadError; diff --git a/test/bigquery-streaming.test.js b/test/bigquery-streaming.test.js new file mode 100644 index 0000000..cfe9428 --- /dev/null +++ b/test/bigquery-streaming.test.js @@ -0,0 +1,314 @@ +/** + * Tests for BigQuery Streaming Insert API + * + * TDD approach: Write tests first, then implement functionality + */ + +import { describe, it, beforeEach } from 'node:test'; +import assert from 'node:assert'; +import MaritimeBigQueryClient from '../src/bigquery.js'; + +describe('BigQuery Streaming Inserts', () => { + describe('Configuration', () => { + it('should default to load job API when useStreamingAPIs is false', () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: false, + }); + + assert.strictEqual(client.useStreamingAPIs, false); + }); + + it('should use streaming API when useStreamingAPIs is true', () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: true, + }); + + assert.strictEqual(client.useStreamingAPIs, true); + }); + + it('should default to load job API when useStreamingAPIs not specified', () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + }); + + assert.strictEqual(client.useStreamingAPIs, false); + }); + }); + + describe('Insert method selection', () => { + it('should call _insertLoadJob when useStreamingAPIs is false', async () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: false, + }); + + let loadJobCalled = false; + client._insertLoadJob = async () => { + loadJobCalled = true; + return { success: true, recordCount: 1, method: 'load_job' }; + }; + + const records = [{ mmsi: '123456789', timestamp: new Date().toISOString() }]; + await client.insertBatch(records); + + assert.strictEqual(loadJobCalled, true); + }); + + it('should call _insertStreaming when useStreamingAPIs is true', async () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: true, + }); + + let streamingCalled = false; + client._insertStreaming = async () => { + streamingCalled = true; + return { success: true, recordCount: 1, method: 'streaming' }; + }; + + const records = [{ mmsi: '123456789', timestamp: new Date().toISOString() }]; + await client.insertBatch(records); + + assert.strictEqual(streamingCalled, true); + }); + }); + + describe('Streaming insert method', () => { + let client; + + beforeEach(() => { + client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: true, + }); + }); + + it('should successfully insert records using streaming API', async () => { + // Mock the BigQuery table.insert method + client.table = { + insert: async (records, _options) => { + assert.strictEqual(Array.isArray(records), true); + assert.strictEqual(records.length > 0, true); + return {}; // Success (no errors thrown) + }, + }; + + const records = [{ mmsi: '123456789', vessel_name: 'Test Vessel', timestamp: new Date().toISOString() }]; + + const result = await client._insertStreaming(records); + + assert.strictEqual(result.success, true); + assert.strictEqual(result.recordCount, 1); + assert.strictEqual(result.method, 'streaming'); + }); + + it('should handle empty record array', async () => { + await assert.rejects( + async () => { + await client._insertStreaming([]); + }, + { + message: 'No records to insert', + } + ); + }); + + it('should include record count in success response', async () => { + client.table = { + insert: async () => ({}), + }; + + const records = [ + { mmsi: '123', timestamp: new Date().toISOString() }, + { mmsi: '456', timestamp: new Date().toISOString() }, + { mmsi: '789', timestamp: new Date().toISOString() }, + ]; + + const result = await client._insertStreaming(records); + + assert.strictEqual(result.recordCount, 3); + }); + + it('should indicate method used as streaming', async () => { + client.table = { + insert: async () => ({}), + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + const result = await client._insertStreaming(records); + + assert.strictEqual(result.method, 'streaming'); + }); + }); + + describe('Error handling', () => { + let client; + + beforeEach(() => { + client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: true, + }); + }); + + it('should handle partial failures with detailed error', async () => { + client.table = { + insert: async () => { + const error = new Error('Partial failure'); + error.name = 'PartialFailureError'; + error.errors = [{ message: 'Row 0 failed: Invalid field' }]; + throw error; + }, + }; + + const records = [{ mmsi: 'invalid', timestamp: new Date().toISOString() }]; + + await assert.rejects( + async () => { + await client._insertStreaming(records); + }, + { + message: /Partial failure/, + } + ); + }); + + it('should retry on quota exceeded (429)', async () => { + let attempts = 0; + + client.table = { + insert: async () => { + attempts++; + if (attempts < 2) { + const error = new Error('Quota exceeded'); + error.code = 429; + throw error; + } + return {}; // Success on second attempt + }, + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + const result = await client._insertStreaming(records, 3); + + assert.strictEqual(attempts, 2); + assert.strictEqual(result.success, true); + }); + + it('should retry on service unavailable (503)', async () => { + let attempts = 0; + + client.table = { + insert: async () => { + attempts++; + if (attempts < 2) { + const error = new Error('Service unavailable'); + error.code = 503; + throw error; + } + return {}; + }, + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + const result = await client._insertStreaming(records, 3); + + assert.strictEqual(attempts, 2); + assert.strictEqual(result.success, true); + }); + + it('should not retry on schema errors', async () => { + let attempts = 0; + + client.table = { + insert: async () => { + attempts++; + const error = new Error('Schema mismatch'); + error.code = 400; + throw error; + }, + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + + await assert.rejects(async () => { + await client._insertStreaming(records, 3); + }); + + assert.strictEqual(attempts, 1); // Should not retry + }); + + it('should respect maxRetries limit', async () => { + let attempts = 0; + + client.table = { + insert: async () => { + attempts++; + const error = new Error('Service unavailable'); + error.code = 503; + throw error; + }, + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + + await assert.rejects(async () => { + await client._insertStreaming(records, 2); + }); + + assert.strictEqual(attempts, 2); // Should try exactly maxRetries times + }); + }); + + describe('Backward compatibility', () => { + it('should use load job API by default (existing behavior)', () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + }); + + assert.strictEqual(client.useStreamingAPIs, false); + }); + + it('should maintain existing insertBatch signature', async () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: false, + }); + + // Mock load job method + client._insertLoadJob = async (records, maxRetries) => { + assert.strictEqual(Array.isArray(records), true); + assert.strictEqual(typeof maxRetries, 'number'); + return { success: true, recordCount: records.length, method: 'load_job' }; + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + + // Should accept records and maxRetries parameters + const result = await client.insertBatch(records, 5); + + assert.strictEqual(result.success, true); + assert.strictEqual(result.method, 'load_job'); + }); + }); +});