From 01b83570431d41d3f8cdbf42495211337c230dd0 Mon Sep 17 00:00:00 2001 From: irjudson Date: Fri, 14 Nov 2025 08:38:28 -0700 Subject: [PATCH 1/2] Add optional streaming insert API for BigQuery Implements #8 - adds opt-in streaming insert API alongside existing load job API. Features: - New useStreamingAPIs config option (defaults to false for backward compatibility) - Streaming API: lower latency (sub-second), has costs ($0.01 per 200 MB) - Load Job API: free tier compatible, higher latency (default) - Configurable per-table in multi-table setup - Comprehensive error handling with retry logic - 16 unit tests covering all scenarios (TDD approach) Implementation follows Test-Driven Development: 1. RED: Wrote 16 unit tests (all failed) 2. GREEN: Implemented _insertStreaming() and routing logic (all passed) 3. REFACTOR: Clean separation between streaming and load job methods All 107 tests passing (91 existing + 16 new). --- config.yaml | 21 + .../2025-11-14-streaming-insert-api-design.md | 374 ++++++++++++++++++ src/bigquery.js | 92 ++++- test/bigquery-streaming.test.js | 314 +++++++++++++++ 4 files changed, 799 insertions(+), 2 deletions(-) create mode 100644 docs/plans/2025-11-14-streaming-insert-api-design.md create mode 100644 test/bigquery-streaming.test.js diff --git a/config.yaml b/config.yaml index e3d602e..9e10ecb 100644 --- a/config.yaml +++ b/config.yaml @@ -55,6 +55,8 @@ bigquery: initialBatchSize: 5000 catchupBatchSize: 500 steadyBatchSize: 100 + # Optional: Enable streaming inserts for lower latency (has cost implications) + # useStreamingAPIs: true # Vessel metadata updates - id: vessel_metadata @@ -68,6 +70,25 @@ bigquery: catchupBatchSize: 100 steadyBatchSize: 10 +# BigQuery Insert Method Configuration +# Two options available (configurable per table): +# +# 1. Load Job API (default): +# - Free tier compatible (no per-row costs) +# - Higher latency (seconds to minutes) +# - Best for: development, batch workloads, cost-sensitive deployments +# - Set useStreamingAPIs: false (or omit) +# +# 2. Streaming Insert API: +# - Lower latency (sub-second to few seconds) +# - Costs: $0.01 per 200 MB (minimum $0.01/day) +# - Best for: production, real-time dashboards, low-latency requirements +# - Set useStreamingAPIs: true +# +# Example: Enable streaming for time-sensitive events +# - id: port_events +# useStreamingAPIs: true # Real-time events benefit from low latency + # Maritime vessel data synthesizer configuration # When bigquery.tables is present, synthesizer uses multi-table orchestrator # to generate data for ALL tables (vessel_positions, port_events, vessel_metadata). diff --git a/docs/plans/2025-11-14-streaming-insert-api-design.md b/docs/plans/2025-11-14-streaming-insert-api-design.md new file mode 100644 index 0000000..06b209a --- /dev/null +++ b/docs/plans/2025-11-14-streaming-insert-api-design.md @@ -0,0 +1,374 @@ +# Streaming Insert API Support for BigQuery + +**Date:** 2025-11-14 +**Status:** Planning +**Issue:** [#8](https://github.com/HarperFast/harper-bigquery-sync/issues/8) +**Approach:** Test-Driven Development + +## Overview + +Add opt-in streaming insert API for BigQuery alongside the current load job API. Streaming inserts provide lower latency for real-time use cases but have a different cost model. + +## Current Implementation + +**File:** `src/bigquery.js` +**Method:** `insertBatch(records, maxRetries)` +**API Used:** Load Job API (`table.load()`) + +**Current Behavior:** +1. Writes records to temporary NDJSON file +2. Uses `table.load()` to upload file +3. Waits for job completion +4. Cleans up temporary file +5. Retry logic with exponential backoff + +**Benefits of Load Job API:** +- ✅ Free tier compatible (no per-row costs) +- ✅ Efficient for large batches +- ✅ Good for batch workloads + +**Limitations:** +- ❌ Higher latency (seconds to minutes) +- ❌ Requires temporary file I/O +- ❌ Not suitable for real-time requirements + +## Proposed: Streaming Insert API + +**API:** `table.insert(rows)` method from @google-cloud/bigquery +**When to use:** Real-time use cases, lower latency requirements, production deployments + +**Benefits:** +- ✅ Lower latency (sub-second to few seconds) +- ✅ No temporary files needed +- ✅ Direct insert to table +- ✅ Simpler code path + +**Tradeoffs:** +- ❌ Costs money ($.01 per 200 MB, minimum $0.01/day) +- ❌ Row size limits (1 MB per row, 10 MB per request) +- ❌ Best-effort deduplication (not exact-once guarantees) + +## Design + +### Configuration + +Add `useStreamingAPIs` flag to BigQuery config: + +**config.yaml:** +```yaml +bigquery: + projectId: my-project + dataset: maritime_tracking + table: vessel_positions + timestampColumn: timestamp + credentials: service-account-key.json + location: US + + # Streaming insert API (off by default) + useStreamingAPIs: false # Set to true for lower latency +``` + +**Per-table configuration:** +```yaml +bigquery: + # ... shared config ... + tables: + - id: vessel_positions + dataset: maritime_tracking + table: vessel_positions + timestampColumn: timestamp + columns: ['*'] + useStreamingAPIs: false # High volume, batch is fine + + - id: port_events + dataset: maritime_tracking + table: port_events + timestampColumn: event_time + columns: ['*'] + useStreamingAPIs: true # Real-time events, use streaming +``` + +### Implementation Strategy + +**Option 1: Single method with conditional logic** +```javascript +async insertBatch(records, maxRetries = 5) { + if (this.useStreamingAPIs) { + return await this._insertStreaming(records); + } else { + return await this._insertLoadJob(records, maxRetries); + } +} +``` + +**Option 2: Strategy pattern (cleaner, testable)** +```javascript +class LoadJobStrategy { + async insert(table, records, schema) { /* current implementation */ } +} + +class StreamingInsertStrategy { + async insert(table, records) { /* streaming implementation */ } +} + +class MaritimeBigQueryClient { + constructor(config) { + this.insertStrategy = config.useStreamingAPIs + ? new StreamingInsertStrategy() + : new LoadJobStrategy(); + } + + async insertBatch(records, maxRetries) { + return await this.insertStrategy.insert(this.table, records, this.getSchema()); + } +} +``` + +**Recommendation:** Start with Option 1 (simpler), refactor to Option 2 if needed. + +### Streaming Insert Implementation + +```javascript +async _insertStreaming(records) { + if (!records || records.length === 0) { + throw new Error('No records to insert'); + } + + try { + // BigQuery streaming insert API + const response = await this.table.insert(records, { + // Skip invalid rows (log and continue) + skipInvalidRows: false, + // Don't ignore unknown values (fail on schema mismatch) + ignoreUnknownValues: false, + // Template suffix for best-effort deduplication (optional) + // templateSuffix: '_streaming' + }); + + return { + success: true, + recordCount: records.length, + method: 'streaming' + }; + } catch (error) { + // Handle partial failures + if (error.name === 'PartialFailureError') { + console.error('Some rows failed to insert:', error.errors); + + // Log failed rows for debugging + error.errors.forEach((err, index) => { + console.error(`Row ${index} failed:`, err); + }); + + throw new Error(`Partial failure: ${error.errors.length} rows failed`); + } + + throw error; + } +} +``` + +### Error Handling + +**Load Job API errors:** +- Network timeouts → Retry +- Rate limits (429) → Retry with backoff +- Server errors (5xx) → Retry +- Schema errors → Fail immediately +- Permissions errors → Fail immediately + +**Streaming Insert API errors:** +- Partial failures → Log failed rows, throw error +- Quota exceeded → Retry with backoff +- Invalid schema → Fail immediately +- Row size too large → Fail immediately (log which row) + +### Retry Logic + +**Load Job API:** Already has retry logic with exponential backoff (up to 5 attempts) + +**Streaming Insert API:** Add retry for transient errors +```javascript +async _insertStreaming(records, maxRetries = 3) { + let lastError; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + return await this.table.insert(records, options); + } catch (error) { + lastError = error; + + // Check if retryable + const isRetryable = + error.code === 429 || // Quota exceeded + error.code === 503 || // Service unavailable + (error.code >= 500 && error.code < 600); // Server errors + + if (!isRetryable || attempt === maxRetries) { + throw error; + } + + // Exponential backoff: 1s, 2s, 4s + const backoffMs = Math.pow(2, attempt - 1) * 1000; + console.log(`Streaming insert failed (attempt ${attempt}/${maxRetries}): ${error.message}`); + console.log(`Retrying in ${backoffMs / 1000}s...`); + + await new Promise(resolve => setTimeout(resolve, backoffMs)); + } + } + + throw lastError; +} +``` + +## Testing Strategy (TDD) + +### Unit Tests + +**Test file:** `test/bigquery-streaming.test.js` + +```javascript +describe('BigQuery Streaming Inserts', () => { + describe('Configuration', () => { + it('should default to load job API when useStreamingAPIs is false'); + it('should use streaming API when useStreamingAPIs is true'); + it('should validate streaming config at initialization'); + }); + + describe('Streaming insert method', () => { + it('should successfully insert records using streaming API'); + it('should handle empty record array'); + it('should include record count in success response'); + it('should indicate method used (streaming vs load job)'); + }); + + describe('Error handling', () => { + it('should handle partial failures with detailed logging'); + it('should retry on quota exceeded (429)'); + it('should retry on service unavailable (503)'); + it('should not retry on schema errors'); + it('should not retry on invalid row data'); + it('should respect maxRetries limit'); + }); + + describe('Performance', () => { + it('should be faster than load job API for small batches'); + it('should handle row size limits gracefully'); + it('should handle request size limits (10 MB)'); + }); + + describe('Backward compatibility', () => { + it('should use load job API by default (existing behavior)'); + it('should maintain retry logic for load jobs'); + it('should clean up temp files with load jobs'); + }); +}); +``` + +### Integration Tests + +**Test file:** `test/integration/bigquery-streaming.integration.test.js` + +```javascript +describe('BigQuery Streaming Integration', () => { + // Only run if BIGQUERY_INTEGRATION_TESTS=true + before(function() { + if (!process.env.BIGQUERY_INTEGRATION_TESTS) { + this.skip(); + } + }); + + it('should insert records using streaming API against real BigQuery'); + it('should verify records are queryable immediately'); + it('should compare latency: streaming vs load job'); + it('should handle concurrent streaming inserts'); +}); +``` + +## Implementation Checklist + +### Phase 1: Tests (Write First) +- [ ] Create `test/bigquery-streaming.test.js` +- [ ] Write unit tests for configuration +- [ ] Write unit tests for streaming insert method +- [ ] Write unit tests for error handling +- [ ] Write unit tests for backward compatibility +- [ ] Run tests (should all fail - RED phase) + +### Phase 2: Implementation (Make Tests Pass) +- [ ] Add `useStreamingAPIs` config option +- [ ] Extract current load job logic to `_insertLoadJob()` method +- [ ] Implement `_insertStreaming()` method +- [ ] Update `insertBatch()` to dispatch based on config +- [ ] Add retry logic for streaming inserts +- [ ] Handle partial failure errors +- [ ] Run tests (should all pass - GREEN phase) + +### Phase 3: Refactoring (Clean Up) +- [ ] Extract common error handling logic +- [ ] Add JSDoc documentation +- [ ] Consider strategy pattern if code gets complex +- [ ] Optimize retry backoff timing +- [ ] Run tests (should still pass - REFACTOR phase) + +### Phase 4: Documentation +- [ ] Update config.yaml with examples +- [ ] Update README with streaming insert option +- [ ] Document cost implications +- [ ] Add decision guide: when to use streaming vs load jobs +- [ ] Update multi-table configuration examples + +### Phase 5: Integration Testing +- [ ] Create integration test with real BigQuery (gated by env var) +- [ ] Test with maritime synthesizer +- [ ] Verify latency improvements +- [ ] Test error scenarios + +## Cost Estimation + +**Streaming Insert Costs (as of 2024):** +- $0.01 per 200 MB (compressed) +- Minimum $0.01 per day if any streaming inserts used +- No charge for load job API + +**Example:** +- 144K records/day × 1 KB/record = 144 MB/day +- Cost: $0.01/day = $0.30/month +- Load job API: $0/month (free) + +**Decision Guide:** +- **Use Load Job API (default):** Development, testing, batch workloads, cost-sensitive +- **Use Streaming API:** Production, real-time dashboards, low-latency requirements + +## Migration Path + +**Existing deployments:** No changes required (defaults to load job API) + +**Enable streaming for a table:** +```yaml +tables: + - id: vessel_positions + useStreamingAPIs: true # Add this line +``` + +**Test in development:** +1. Enable streaming for one table +2. Monitor latency and costs +3. Roll out to other tables if beneficial + +## Success Metrics + +- ✅ All tests pass (unit + integration) +- ✅ Backward compatible (default behavior unchanged) +- ✅ Configurable per table +- ✅ Lower latency verified (< 5s vs > 30s for load jobs) +- ✅ Error handling comprehensive +- ✅ Retry logic functional +- ✅ Documentation complete + +## References + +- [BigQuery Streaming Insert API](https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery) +- [BigQuery Load Jobs](https://cloud.google.com/bigquery/docs/loading-data) +- [@google-cloud/bigquery Node.js SDK](https://googleapis.dev/nodejs/bigquery/latest/Table.html#insert) +- [Issue #8](https://github.com/HarperFast/harper-bigquery-sync/issues/8) diff --git a/src/bigquery.js b/src/bigquery.js index 76d0bf1..38beb57 100644 --- a/src/bigquery.js +++ b/src/bigquery.js @@ -16,6 +16,9 @@ class MaritimeBigQueryClient { this.retentionDays = config.retentionDays || parseInt(process.env.RETENTION_DAYS || '30', 10); this.location = config.location || process.env.BIGQUERY_LOCATION || 'US'; + // Streaming insert API option (off by default for backward compatibility) + this.useStreamingAPIs = config.useStreamingAPIs || false; + if (!this.projectId) { throw new Error('projectId must be set in config or GCP_PROJECT_ID environment variable'); } @@ -109,10 +112,94 @@ class MaritimeBigQueryClient { } /** - * Insert batch of records into BigQuery using Load Job (free tier compatible) - * Includes retry logic for transient network errors + * Insert batch of records into BigQuery + * Dispatches to streaming or load job API based on configuration + * @param {Array} records - Records to insert + * @param {number} maxRetries - Maximum retry attempts (default: 5) + * @returns {Promise} - Result with success flag, recordCount, and method */ async insertBatch(records, maxRetries = 5) { + if (this.useStreamingAPIs) { + return await this._insertStreaming(records, maxRetries); + } else { + return await this._insertLoadJob(records, maxRetries); + } + } + + /** + * Insert batch using Streaming Insert API + * Lower latency but has cost implications + * @param {Array} records - Records to insert + * @param {number} maxRetries - Maximum retry attempts (default: 3) + * @returns {Promise} - Result with success flag, recordCount, and method + */ + async _insertStreaming(records, maxRetries = 3) { + if (!records || records.length === 0) { + throw new Error('No records to insert'); + } + + let lastError; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + // BigQuery streaming insert API + await this.table.insert(records, { + skipInvalidRows: false, + ignoreUnknownValues: false, + }); + + // Success + return { + success: true, + recordCount: records.length, + method: 'streaming', + }; + } catch (error) { + lastError = error; + + // Handle partial failures + if (error.name === 'PartialFailureError') { + console.error('Partial failure - some rows failed to insert:', error.errors); + + // Log failed rows for debugging + error.errors.forEach((err, index) => { + console.error(`Row ${index} failed:`, err); + }); + + throw new Error(`Partial failure: ${error.errors.length} rows failed`); + } + + // Check if this is a retryable error + const isRetryable = + error.code === 429 || // Quota exceeded + error.code === 503 || // Service unavailable + (error.code >= 500 && error.code < 600); // Server errors + + if (!isRetryable || attempt === maxRetries) { + throw error; + } + + // Exponential backoff: 1s, 2s, 4s + const backoffMs = Math.pow(2, attempt - 1) * 1000; + console.log(`Streaming insert failed (attempt ${attempt}/${maxRetries}): ${error.message}`); + console.log(`Retrying in ${backoffMs / 1000}s...`); + + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + } + } + + // Should never reach here, but just in case + throw lastError; + } + + /** + * Insert batch using Load Job API (free tier compatible) + * Includes retry logic for transient network errors + * @param {Array} records - Records to insert + * @param {number} maxRetries - Maximum retry attempts (default: 5) + * @returns {Promise} - Result with success flag, recordCount, and method + */ + async _insertLoadJob(records, maxRetries = 5) { if (!records || records.length === 0) { throw new Error('No records to insert'); } @@ -146,6 +233,7 @@ class MaritimeBigQueryClient { return { success: true, recordCount: records.length, + method: 'load_job', }; } catch (loadError) { lastError = loadError; diff --git a/test/bigquery-streaming.test.js b/test/bigquery-streaming.test.js new file mode 100644 index 0000000..cfe9428 --- /dev/null +++ b/test/bigquery-streaming.test.js @@ -0,0 +1,314 @@ +/** + * Tests for BigQuery Streaming Insert API + * + * TDD approach: Write tests first, then implement functionality + */ + +import { describe, it, beforeEach } from 'node:test'; +import assert from 'node:assert'; +import MaritimeBigQueryClient from '../src/bigquery.js'; + +describe('BigQuery Streaming Inserts', () => { + describe('Configuration', () => { + it('should default to load job API when useStreamingAPIs is false', () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: false, + }); + + assert.strictEqual(client.useStreamingAPIs, false); + }); + + it('should use streaming API when useStreamingAPIs is true', () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: true, + }); + + assert.strictEqual(client.useStreamingAPIs, true); + }); + + it('should default to load job API when useStreamingAPIs not specified', () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + }); + + assert.strictEqual(client.useStreamingAPIs, false); + }); + }); + + describe('Insert method selection', () => { + it('should call _insertLoadJob when useStreamingAPIs is false', async () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: false, + }); + + let loadJobCalled = false; + client._insertLoadJob = async () => { + loadJobCalled = true; + return { success: true, recordCount: 1, method: 'load_job' }; + }; + + const records = [{ mmsi: '123456789', timestamp: new Date().toISOString() }]; + await client.insertBatch(records); + + assert.strictEqual(loadJobCalled, true); + }); + + it('should call _insertStreaming when useStreamingAPIs is true', async () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: true, + }); + + let streamingCalled = false; + client._insertStreaming = async () => { + streamingCalled = true; + return { success: true, recordCount: 1, method: 'streaming' }; + }; + + const records = [{ mmsi: '123456789', timestamp: new Date().toISOString() }]; + await client.insertBatch(records); + + assert.strictEqual(streamingCalled, true); + }); + }); + + describe('Streaming insert method', () => { + let client; + + beforeEach(() => { + client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: true, + }); + }); + + it('should successfully insert records using streaming API', async () => { + // Mock the BigQuery table.insert method + client.table = { + insert: async (records, _options) => { + assert.strictEqual(Array.isArray(records), true); + assert.strictEqual(records.length > 0, true); + return {}; // Success (no errors thrown) + }, + }; + + const records = [{ mmsi: '123456789', vessel_name: 'Test Vessel', timestamp: new Date().toISOString() }]; + + const result = await client._insertStreaming(records); + + assert.strictEqual(result.success, true); + assert.strictEqual(result.recordCount, 1); + assert.strictEqual(result.method, 'streaming'); + }); + + it('should handle empty record array', async () => { + await assert.rejects( + async () => { + await client._insertStreaming([]); + }, + { + message: 'No records to insert', + } + ); + }); + + it('should include record count in success response', async () => { + client.table = { + insert: async () => ({}), + }; + + const records = [ + { mmsi: '123', timestamp: new Date().toISOString() }, + { mmsi: '456', timestamp: new Date().toISOString() }, + { mmsi: '789', timestamp: new Date().toISOString() }, + ]; + + const result = await client._insertStreaming(records); + + assert.strictEqual(result.recordCount, 3); + }); + + it('should indicate method used as streaming', async () => { + client.table = { + insert: async () => ({}), + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + const result = await client._insertStreaming(records); + + assert.strictEqual(result.method, 'streaming'); + }); + }); + + describe('Error handling', () => { + let client; + + beforeEach(() => { + client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: true, + }); + }); + + it('should handle partial failures with detailed error', async () => { + client.table = { + insert: async () => { + const error = new Error('Partial failure'); + error.name = 'PartialFailureError'; + error.errors = [{ message: 'Row 0 failed: Invalid field' }]; + throw error; + }, + }; + + const records = [{ mmsi: 'invalid', timestamp: new Date().toISOString() }]; + + await assert.rejects( + async () => { + await client._insertStreaming(records); + }, + { + message: /Partial failure/, + } + ); + }); + + it('should retry on quota exceeded (429)', async () => { + let attempts = 0; + + client.table = { + insert: async () => { + attempts++; + if (attempts < 2) { + const error = new Error('Quota exceeded'); + error.code = 429; + throw error; + } + return {}; // Success on second attempt + }, + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + const result = await client._insertStreaming(records, 3); + + assert.strictEqual(attempts, 2); + assert.strictEqual(result.success, true); + }); + + it('should retry on service unavailable (503)', async () => { + let attempts = 0; + + client.table = { + insert: async () => { + attempts++; + if (attempts < 2) { + const error = new Error('Service unavailable'); + error.code = 503; + throw error; + } + return {}; + }, + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + const result = await client._insertStreaming(records, 3); + + assert.strictEqual(attempts, 2); + assert.strictEqual(result.success, true); + }); + + it('should not retry on schema errors', async () => { + let attempts = 0; + + client.table = { + insert: async () => { + attempts++; + const error = new Error('Schema mismatch'); + error.code = 400; + throw error; + }, + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + + await assert.rejects(async () => { + await client._insertStreaming(records, 3); + }); + + assert.strictEqual(attempts, 1); // Should not retry + }); + + it('should respect maxRetries limit', async () => { + let attempts = 0; + + client.table = { + insert: async () => { + attempts++; + const error = new Error('Service unavailable'); + error.code = 503; + throw error; + }, + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + + await assert.rejects(async () => { + await client._insertStreaming(records, 2); + }); + + assert.strictEqual(attempts, 2); // Should try exactly maxRetries times + }); + }); + + describe('Backward compatibility', () => { + it('should use load job API by default (existing behavior)', () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + }); + + assert.strictEqual(client.useStreamingAPIs, false); + }); + + it('should maintain existing insertBatch signature', async () => { + const client = new MaritimeBigQueryClient({ + projectId: 'test-project', + datasetId: 'test_dataset', + tableId: 'test_table', + useStreamingAPIs: false, + }); + + // Mock load job method + client._insertLoadJob = async (records, maxRetries) => { + assert.strictEqual(Array.isArray(records), true); + assert.strictEqual(typeof maxRetries, 'number'); + return { success: true, recordCount: records.length, method: 'load_job' }; + }; + + const records = [{ mmsi: '123', timestamp: new Date().toISOString() }]; + + // Should accept records and maxRetries parameters + const result = await client.insertBatch(records, 5); + + assert.strictEqual(result.success, true); + assert.strictEqual(result.method, 'load_job'); + }); + }); +}); From 72150e68f1f842bda7d0dd5e33583c3bd5a0c05d Mon Sep 17 00:00:00 2001 From: irjudson Date: Fri, 14 Nov 2025 08:43:32 -0700 Subject: [PATCH 2/2] Fix formatting in streaming insert API design doc --- .../2025-11-14-streaming-insert-api-design.md | 140 ++++++++++-------- 1 file changed, 82 insertions(+), 58 deletions(-) diff --git a/docs/plans/2025-11-14-streaming-insert-api-design.md b/docs/plans/2025-11-14-streaming-insert-api-design.md index 06b209a..967ae2b 100644 --- a/docs/plans/2025-11-14-streaming-insert-api-design.md +++ b/docs/plans/2025-11-14-streaming-insert-api-design.md @@ -16,6 +16,7 @@ Add opt-in streaming insert API for BigQuery alongside the current load job API. **API Used:** Load Job API (`table.load()`) **Current Behavior:** + 1. Writes records to temporary NDJSON file 2. Uses `table.load()` to upload file 3. Waits for job completion @@ -23,11 +24,13 @@ Add opt-in streaming insert API for BigQuery alongside the current load job API. 5. Retry logic with exponential backoff **Benefits of Load Job API:** + - ✅ Free tier compatible (no per-row costs) - ✅ Efficient for large batches - ✅ Good for batch workloads **Limitations:** + - ❌ Higher latency (seconds to minutes) - ❌ Requires temporary file I/O - ❌ Not suitable for real-time requirements @@ -38,12 +41,14 @@ Add opt-in streaming insert API for BigQuery alongside the current load job API. **When to use:** Real-time use cases, lower latency requirements, production deployments **Benefits:** + - ✅ Lower latency (sub-second to few seconds) - ✅ No temporary files needed - ✅ Direct insert to table - ✅ Simpler code path **Tradeoffs:** + - ❌ Costs money ($.01 per 200 MB, minimum $0.01/day) - ❌ Row size limits (1 MB per row, 10 MB per request) - ❌ Best-effort deduplication (not exact-once guarantees) @@ -55,6 +60,7 @@ Add opt-in streaming insert API for BigQuery alongside the current load job API. Add `useStreamingAPIs` flag to BigQuery config: **config.yaml:** + ```yaml bigquery: projectId: my-project @@ -65,10 +71,11 @@ bigquery: location: US # Streaming insert API (off by default) - useStreamingAPIs: false # Set to true for lower latency + useStreamingAPIs: false # Set to true for lower latency ``` **Per-table configuration:** + ```yaml bigquery: # ... shared config ... @@ -78,19 +85,20 @@ bigquery: table: vessel_positions timestampColumn: timestamp columns: ['*'] - useStreamingAPIs: false # High volume, batch is fine + useStreamingAPIs: false # High volume, batch is fine - id: port_events dataset: maritime_tracking table: port_events timestampColumn: event_time columns: ['*'] - useStreamingAPIs: true # Real-time events, use streaming + useStreamingAPIs: true # Real-time events, use streaming ``` ### Implementation Strategy **Option 1: Single method with conditional logic** + ```javascript async insertBatch(records, maxRetries = 5) { if (this.useStreamingAPIs) { @@ -102,25 +110,28 @@ async insertBatch(records, maxRetries = 5) { ``` **Option 2: Strategy pattern (cleaner, testable)** + ```javascript class LoadJobStrategy { - async insert(table, records, schema) { /* current implementation */ } + async insert(table, records, schema) { + /* current implementation */ + } } class StreamingInsertStrategy { - async insert(table, records) { /* streaming implementation */ } + async insert(table, records) { + /* streaming implementation */ + } } class MaritimeBigQueryClient { - constructor(config) { - this.insertStrategy = config.useStreamingAPIs - ? new StreamingInsertStrategy() - : new LoadJobStrategy(); - } + constructor(config) { + this.insertStrategy = config.useStreamingAPIs ? new StreamingInsertStrategy() : new LoadJobStrategy(); + } - async insertBatch(records, maxRetries) { - return await this.insertStrategy.insert(this.table, records, this.getSchema()); - } + async insertBatch(records, maxRetries) { + return await this.insertStrategy.insert(this.table, records, this.getSchema()); + } } ``` @@ -171,6 +182,7 @@ async _insertStreaming(records) { ### Error Handling **Load Job API errors:** + - Network timeouts → Retry - Rate limits (429) → Retry with backoff - Server errors (5xx) → Retry @@ -178,6 +190,7 @@ async _insertStreaming(records) { - Permissions errors → Fail immediately **Streaming Insert API errors:** + - Partial failures → Log failed rows, throw error - Quota exceeded → Retry with backoff - Invalid schema → Fail immediately @@ -188,6 +201,7 @@ async _insertStreaming(records) { **Load Job API:** Already has retry logic with exponential backoff (up to 5 attempts) **Streaming Insert API:** Add retry for transient errors + ```javascript async _insertStreaming(records, maxRetries = 3) { let lastError; @@ -229,39 +243,39 @@ async _insertStreaming(records, maxRetries = 3) { ```javascript describe('BigQuery Streaming Inserts', () => { - describe('Configuration', () => { - it('should default to load job API when useStreamingAPIs is false'); - it('should use streaming API when useStreamingAPIs is true'); - it('should validate streaming config at initialization'); - }); - - describe('Streaming insert method', () => { - it('should successfully insert records using streaming API'); - it('should handle empty record array'); - it('should include record count in success response'); - it('should indicate method used (streaming vs load job)'); - }); - - describe('Error handling', () => { - it('should handle partial failures with detailed logging'); - it('should retry on quota exceeded (429)'); - it('should retry on service unavailable (503)'); - it('should not retry on schema errors'); - it('should not retry on invalid row data'); - it('should respect maxRetries limit'); - }); - - describe('Performance', () => { - it('should be faster than load job API for small batches'); - it('should handle row size limits gracefully'); - it('should handle request size limits (10 MB)'); - }); - - describe('Backward compatibility', () => { - it('should use load job API by default (existing behavior)'); - it('should maintain retry logic for load jobs'); - it('should clean up temp files with load jobs'); - }); + describe('Configuration', () => { + it('should default to load job API when useStreamingAPIs is false'); + it('should use streaming API when useStreamingAPIs is true'); + it('should validate streaming config at initialization'); + }); + + describe('Streaming insert method', () => { + it('should successfully insert records using streaming API'); + it('should handle empty record array'); + it('should include record count in success response'); + it('should indicate method used (streaming vs load job)'); + }); + + describe('Error handling', () => { + it('should handle partial failures with detailed logging'); + it('should retry on quota exceeded (429)'); + it('should retry on service unavailable (503)'); + it('should not retry on schema errors'); + it('should not retry on invalid row data'); + it('should respect maxRetries limit'); + }); + + describe('Performance', () => { + it('should be faster than load job API for small batches'); + it('should handle row size limits gracefully'); + it('should handle request size limits (10 MB)'); + }); + + describe('Backward compatibility', () => { + it('should use load job API by default (existing behavior)'); + it('should maintain retry logic for load jobs'); + it('should clean up temp files with load jobs'); + }); }); ``` @@ -271,23 +285,24 @@ describe('BigQuery Streaming Inserts', () => { ```javascript describe('BigQuery Streaming Integration', () => { - // Only run if BIGQUERY_INTEGRATION_TESTS=true - before(function() { - if (!process.env.BIGQUERY_INTEGRATION_TESTS) { - this.skip(); - } - }); - - it('should insert records using streaming API against real BigQuery'); - it('should verify records are queryable immediately'); - it('should compare latency: streaming vs load job'); - it('should handle concurrent streaming inserts'); + // Only run if BIGQUERY_INTEGRATION_TESTS=true + before(function () { + if (!process.env.BIGQUERY_INTEGRATION_TESTS) { + this.skip(); + } + }); + + it('should insert records using streaming API against real BigQuery'); + it('should verify records are queryable immediately'); + it('should compare latency: streaming vs load job'); + it('should handle concurrent streaming inserts'); }); ``` ## Implementation Checklist ### Phase 1: Tests (Write First) + - [ ] Create `test/bigquery-streaming.test.js` - [ ] Write unit tests for configuration - [ ] Write unit tests for streaming insert method @@ -296,6 +311,7 @@ describe('BigQuery Streaming Integration', () => { - [ ] Run tests (should all fail - RED phase) ### Phase 2: Implementation (Make Tests Pass) + - [ ] Add `useStreamingAPIs` config option - [ ] Extract current load job logic to `_insertLoadJob()` method - [ ] Implement `_insertStreaming()` method @@ -305,6 +321,7 @@ describe('BigQuery Streaming Integration', () => { - [ ] Run tests (should all pass - GREEN phase) ### Phase 3: Refactoring (Clean Up) + - [ ] Extract common error handling logic - [ ] Add JSDoc documentation - [ ] Consider strategy pattern if code gets complex @@ -312,6 +329,7 @@ describe('BigQuery Streaming Integration', () => { - [ ] Run tests (should still pass - REFACTOR phase) ### Phase 4: Documentation + - [ ] Update config.yaml with examples - [ ] Update README with streaming insert option - [ ] Document cost implications @@ -319,6 +337,7 @@ describe('BigQuery Streaming Integration', () => { - [ ] Update multi-table configuration examples ### Phase 5: Integration Testing + - [ ] Create integration test with real BigQuery (gated by env var) - [ ] Test with maritime synthesizer - [ ] Verify latency improvements @@ -327,16 +346,19 @@ describe('BigQuery Streaming Integration', () => { ## Cost Estimation **Streaming Insert Costs (as of 2024):** + - $0.01 per 200 MB (compressed) - Minimum $0.01 per day if any streaming inserts used - No charge for load job API **Example:** + - 144K records/day × 1 KB/record = 144 MB/day - Cost: $0.01/day = $0.30/month - Load job API: $0/month (free) **Decision Guide:** + - **Use Load Job API (default):** Development, testing, batch workloads, cost-sensitive - **Use Streaming API:** Production, real-time dashboards, low-latency requirements @@ -345,13 +367,15 @@ describe('BigQuery Streaming Integration', () => { **Existing deployments:** No changes required (defaults to load job API) **Enable streaming for a table:** + ```yaml tables: - id: vessel_positions - useStreamingAPIs: true # Add this line + useStreamingAPIs: true # Add this line ``` **Test in development:** + 1. Enable streaming for one table 2. Monitor latency and costs 3. Roll out to other tables if beneficial