diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1c0df47..f745134 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -211,7 +211,7 @@ The BigQuery plugin integrates with HarperDB. When modifying plugin code: - `src/validation.js` - Data validation and auditing - `src/query-builder.js` - SQL query construction - `src/config-loader.js` - Configuration parsing (single/multi-table) -- `schema/harper-bigquery-sync.graphql` - GraphQL schema +- `schema/bigquery-ingestor.graphql` - GraphQL schema - `test/` - Unit tests for all core functionality ## Synthesizer Development diff --git a/README.md b/README.md index efc5d36..1c70222 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Install Harper and add this plugin: # Clone this plugin cd your-harper-project -npm install @harperdb/harper-bigquery-sync +npm install @harperdb/bigquery-ingestor ``` ### 2. Configure (Your Data) @@ -307,22 +307,115 @@ Perfect for testing sync performance, multi-table coordination, and distributed ## Monitoring & Operations -### Check Sync Status +### Distributed Sync Control -Query the REST API: +The plugin provides cluster-wide sync control via REST API. All commands replicate across nodes automatically. + +**Available Commands:** ```bash -# Get current status -curl http://localhost:9926/SyncControl +# Get current status (GET) +curl http://localhost:9926/SyncControl \ + -u admin:HarperRocks! -# Control sync +# Start sync across entire cluster (POST) curl -X POST http://localhost:9926/SyncControl \ + -u admin:HarperRocks! \ -H "Content-Type: application/json" \ - -d '{"action": "start"}' # or "stop" + -d '{"action": "start"}' + +# Stop sync across entire cluster (POST) +curl -X POST http://localhost:9926/SyncControl \ + -u admin:HarperRocks! \ + -H "Content-Type: application/json" \ + -d '{"action": "stop"}' + +# Run validation across cluster (POST) +curl -X POST http://localhost:9926/SyncControl \ + -u admin:HarperRocks! \ + -H "Content-Type: application/json" \ + -d '{"action": "validate"}' +``` + +**Status Response Format:** + +```json +{ + "global": { + "command": "start", + "commandedAt": "2025-12-16T20:30:00Z", + "commandedBy": "node1-0", + "version": 42 + }, + "worker": { + "nodeId": "node1-0", + "running": true, + "tables": [ + { "tableId": "vessel_positions", "running": true, "phase": "steady" }, + { "tableId": "port_events", "running": true, "phase": "catchup" } + ], + "failedEngines": [] + }, + "uptime": 3600, + "version": "2.0.0" +} +``` + +- **global**: Cluster-wide sync command state (replicated across all nodes via HarperDB) +- **worker**: This specific worker thread's status +- **nodeId**: Identifies worker as `hostname-workerIndex` +- **tables**: Status per sync engine (one per configured table) +- **failedEngines**: Any engines that failed to start + +### Data Validation + +Run validation to verify data integrity across the cluster: + +```bash +curl -X POST http://localhost:9926/SyncControl \ + -u admin:HarperRocks! \ + -H "Content-Type: application/json" \ + -d '{"action": "validate"}' +``` + +**Validation performs three checks per table:** + +1. **Progress Check** - Verifies sync is advancing, checks for stalled workers + - Status: `healthy`, `lagging`, `severely_lagging`, `stalled`, `no_checkpoint` + +2. **Smoke Test** - Confirms recent data (last 5 minutes) is queryable + - Status: `healthy`, `no_recent_data`, `query_failed`, `table_not_found` + +3. **Spot Check** - Validates data integrity bidirectionally + - Checks if Harper records exist in BigQuery (detects phantom records) + - Checks if BigQuery records exist in Harper (detects missing records) + - Status: `healthy`, `issues_found`, `no_data`, `check_failed` + +**View validation results:** + +```bash +# Get recent validation audits +curl http://localhost:9926/SyncAudit/ \ + -u admin:HarperRocks! ``` +Each validation run creates audit records with: + +- `timestamp` - When validation ran +- `nodeId` - Which worker performed the validation +- `status` - Overall status: `healthy`, `issues_detected`, or `error` +- `checkResults` - JSON with detailed results per table and check + ### View Checkpoints +```bash +# REST API +curl http://localhost:9926/SyncCheckpoint/ \ + -u admin:HarperRocks! +``` + +Or query via SQL: + ```sql -- Check sync progress per node SELECT * FROM SyncCheckpoint ORDER BY nodeId; @@ -336,15 +429,45 @@ SELECT FROM SyncCheckpoint; ``` -### View Validation Results +### Access Synced Data -```sql --- Check recent audits -SELECT * FROM SyncAudit -WHERE timestamp > NOW() - INTERVAL '1 hour' -ORDER BY timestamp DESC; +All synced tables are accessible via REST API: + +```bash +# Query vessel positions (note trailing slash) +curl http://localhost:9926/VesselPositions/ \ + -u admin:HarperRocks! + +# Query port events +curl http://localhost:9926/PortEvents/ \ + -u admin:HarperRocks! + +# Query vessel metadata +curl http://localhost:9926/VesselMetadata/ \ + -u admin:HarperRocks! +``` + +**Important:** REST endpoints require a trailing slash (`/TableName/`) to return data arrays. Without the trailing slash, you get table metadata instead of records. + +### Postman Collection + +A comprehensive Postman collection is included for testing all endpoints: + +```bash +# Import into Postman +bigquery-ingestor_postman.json ``` +**Collection includes:** + +- **Cluster Control** - Start, stop, validate commands +- **Status Monitoring** - Check sync status and worker health +- **Data Verification** - Query PortEvents, VesselMetadata, VesselPositions +- **Checkpoint Inspection** - View sync progress per node +- **Audit Review** - Check validation results + +**Authentication:** Uses Basic Auth with default credentials (admin / HarperRocks!). Update the collection variables if using different credentials. + ### Query Synced Data ```sql diff --git a/ROADMAP.md b/ROADMAP.md index 0f797d8..9753bb4 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -11,9 +11,9 @@ We've shipped v2.0 with comprehensive multi-table support, column selection, and - ✅ **Multi-table support** - Sync multiple BigQuery tables simultaneously with independent settings - ✅ **Column selection** - Reduce costs by fetching only needed columns from BigQuery - ✅ **Per-table configuration** - Independent batch sizes, sync intervals, and strategies per table -- ✅ **Exponential backoff retry logic** - Smart retry with jitter for transient BigQuery errors ([#3](https://github.com/HarperFast/harper-bigquery-sync/issues/3)) -- ✅ **Comprehensive logging** - Structured logging throughout codebase for Grafana observability ([#11](https://github.com/HarperFast/harper-bigquery-sync/issues/11)) -- ✅ **Optional streaming insert API** - Configurable streaming inserts for production deployments ([#8](https://github.com/HarperFast/harper-bigquery-sync/issues/8)) +- ✅ **Exponential backoff retry logic** - Smart retry with jitter for transient BigQuery errors ([#3](https://github.com/HarperFast/bigquery-ingestor/issues/3)) +- ✅ **Comprehensive logging** - Structured logging throughout codebase for Grafana observability ([#11](https://github.com/HarperFast/bigquery-ingestor/issues/11)) +- ✅ **Optional streaming insert API** - Configurable streaming inserts for production deployments ([#8](https://github.com/HarperFast/bigquery-ingestor/issues/8)) - ✅ **Multi-table validation** - Independent validation and monitoring per table - ✅ **Backward compatibility** - Single-table format still supported @@ -26,7 +26,7 @@ We've shipped v2.0 with comprehensive multi-table support, column selection, and ### Maritime Data Synthesizer -- ✅ **Multi-table orchestrator** - Generate realistic data for multiple related tables ([#6](https://github.com/HarperFast/harper-bigquery-sync/issues/6)) +- ✅ **Multi-table orchestrator** - Generate realistic data for multiple related tables ([#6](https://github.com/HarperFast/bigquery-ingestor/issues/6)) - ✅ **Rolling window mode** - Automatic data window maintenance and backfill - ✅ **100K+ vessel simulation** - Realistic maritime tracking data at global scale - ✅ **Physics-based movement** - Realistic navigation patterns @@ -34,7 +34,7 @@ We've shipped v2.0 with comprehensive multi-table support, column selection, and ### Project Quality -- ✅ **Memory leak fixes** - Journey tracking system optimized ([#5](https://github.com/HarperFast/harper-bigquery-sync/issues/5)) +- ✅ **Memory leak fixes** - Journey tracking system optimized ([#5](https://github.com/HarperFast/bigquery-ingestor/issues/5)) - ✅ **CI/CD pipeline** - Automated lint, test, and format checks - ✅ **Comprehensive documentation** - User guides, API docs, design documents - ✅ **Project history** - Development milestones and evolution tracking @@ -44,7 +44,7 @@ We've shipped v2.0 with comprehensive multi-table support, column selection, and ### Multi-Threaded Ingestion -- [ ] **Multi-threaded ingestion per node** ([#9](https://github.com/HarperFast/harper-bigquery-sync/issues/9)) +- [ ] **Multi-threaded ingestion per node** ([#9](https://github.com/HarperFast/bigquery-ingestor/issues/9)) - Better CPU utilization on multi-core nodes - Code already supports durable thread identity via `hostname-workerIndex` - Thread-level checkpointing for fine-grained recovery @@ -52,7 +52,7 @@ We've shipped v2.0 with comprehensive multi-table support, column selection, and ### Dynamic Rebalancing -- [ ] **Dynamic rebalancing for autoscaling** ([#10](https://github.com/HarperFast/harper-bigquery-sync/issues/10)) +- [ ] **Dynamic rebalancing for autoscaling** ([#10](https://github.com/HarperFast/bigquery-ingestor/issues/10)) - Detect topology changes → pause → recalculate → resume - Graceful node additions/removals without manual intervention - Zero-downtime scaling capabilities @@ -69,7 +69,7 @@ We've shipped v2.0 with comprehensive multi-table support, column selection, and ### Dynamic Schema Management -- [ ] **Dynamic Harper table creation via Operations API** ([#7](https://github.com/HarperFast/harper-bigquery-sync/issues/7)) +- [ ] **Dynamic Harper table creation via Operations API** ([#7](https://github.com/HarperFast/bigquery-ingestor/issues/7)) - Currently requires manual schema.graphql definition - Could dynamically create tables based on BigQuery schema at runtime - Enables automatic table creation from BigQuery metadata @@ -82,7 +82,7 @@ These are potential enhancements without specific commitments: ### Production Operations -- [ ] **Production deployment documentation** ([#4](https://github.com/HarperFast/harper-bigquery-sync/issues/4)) +- [ ] **Production deployment documentation** ([#4](https://github.com/HarperFast/bigquery-ingestor/issues/4)) - Fabric deployment guide with one-click setup - Self-hosted installation for on-premise clusters - Monitoring dashboards (Grafana/CloudWatch templates) @@ -143,7 +143,7 @@ These are potential enhancements without specific commitments: Want to help build v3.0 or tackle future considerations? See [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines. -Check out [open issues on GitHub](https://github.com/HarperFast/harper-bigquery-sync/issues) for specific tasks you can pick up. +Check out [open issues on GitHub](https://github.com/HarperFast/bigquery-ingestor/issues) for specific tasks you can pick up. --- diff --git a/bigquery-ingestor_postman.json b/bigquery-ingestor_postman.json new file mode 100644 index 0000000..d47237e --- /dev/null +++ b/bigquery-ingestor_postman.json @@ -0,0 +1,908 @@ +{ + "info": { + "name": "bigquery-ingestor", + "description": "Test suite for BigQuery Ingestor component. Tests cluster-wide start/stop/validate commands, status monitoring, and data ingestion verification.\\n\\n**Authentication:** This collection uses Basic Auth with default credentials (username: admin, password: HarperRocks!). You can change these in the collection-level auth settings.", + "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" + }, + "variable": [ + { + "key": "baseUrl", + "value": "http://localhost:9926", + "type": "string" + } + ], + "item": [ + { + "name": "1. Single Node Tests", + "item": [ + { + "name": "1.1 Get Initial Status", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response has global and worker fields\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData).to.have.property('global');", + " pm.expect(jsonData).to.have.property('worker');", + " pm.expect(jsonData).to.have.property('uptime');", + " pm.expect(jsonData).to.have.property('version');", + "});", + "", + "pm.test(\"Global state has version number\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.global).to.have.property('version');", + " pm.environment.set('currentVersion', jsonData.global.version);", + "});", + "", + "pm.test(\"Worker has nodeId\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.worker).to.have.property('nodeId');", + " pm.expect(jsonData.worker.nodeId).to.match(/.*-\\d+/);", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Get initial sync status. Should show stopped state with version 0." + }, + "response": [] + }, + { + "name": "1.2 Start Sync (Cluster-Wide)", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response confirms start command\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData).to.have.property('message');", + " pm.expect(jsonData.message).to.include('start');", + " pm.expect(jsonData).to.have.property('version');", + "});", + "", + "pm.test(\"Version number incremented\", function () {", + " var jsonData = pm.response.json();", + " var previousVersion = pm.environment.get('currentVersion') || 0;", + " pm.expect(jsonData.version).to.be.above(previousVersion);", + " pm.environment.set('currentVersion', jsonData.version);", + "});", + "", + "// Wait 2 seconds for engines to start", + "setTimeout(function(){}, 2000);" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"action\": \"start\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Start all sync engines across all workers. Command is issued cluster-wide via SyncControlState table." + }, + "response": [] + }, + { + "name": "1.3 Verify All Engines Running", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Global command is 'start'\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.global.command).to.equal('start');", + "});", + "", + "pm.test(\"Worker is running\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.worker.running).to.be.true;", + "});", + "", + "pm.test(\"All tables have engines\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.worker.tables).to.be.an('array');", + " pm.expect(jsonData.worker.tables.length).to.be.above(0);", + "});", + "", + "pm.test(\"No failed engines\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.worker.failedEngines).to.be.an('array');", + " pm.expect(jsonData.worker.failedEngines.length).to.equal(0);", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Verify all sync engines started successfully. Check global state matches worker state." + }, + "response": [] + }, + { + "name": "1.4 Stop Sync (Cluster-Wide)", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response confirms stop command\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.message).to.include('stop');", + "});", + "", + "pm.test(\"Version number incremented again\", function () {", + " var jsonData = pm.response.json();", + " var previousVersion = pm.environment.get('currentVersion') || 0;", + " pm.expect(jsonData.version).to.be.above(previousVersion);", + " pm.environment.set('currentVersion', jsonData.version);", + "});", + "", + "// Wait 2 seconds for engines to stop", + "setTimeout(function(){}, 2000);" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"action\": \"stop\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Stop all sync engines across all workers. Command is issued cluster-wide." + }, + "response": [] + }, + { + "name": "1.5 Verify All Engines Stopped", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Global command is 'stop'\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.global.command).to.equal('stop');", + "});", + "", + "pm.test(\"Worker is not running\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.worker.running).to.be.false;", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Verify all sync engines stopped successfully." + }, + "response": [] + }, + { + "name": "1.6 Trigger Validation", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Response confirms validate command\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.message).to.include('validate');", + "});", + "", + "pm.test(\"Version number incremented\", function () {", + " var jsonData = pm.response.json();", + " var previousVersion = pm.environment.get('currentVersion') || 0;", + " pm.expect(jsonData.version).to.be.above(previousVersion);", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"action\": \"validate\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Trigger validation on all workers. Validation can run while engines are stopped." + }, + "response": [] + }, + { + "name": "1.7 Test Invalid Action (Error Handling)", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 400 or 500 (error)\", function () {", + " pm.expect(pm.response.code).to.be.oneOf([400, 500]);", + "});", + "", + "pm.test(\"Error message mentions unknown action\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(JSON.stringify(jsonData).toLowerCase()).to.include('unknown');", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"action\": \"invalid_action\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Test error handling with invalid action. Should reject with error." + }, + "response": [] + } + ], + "description": "Basic single-node testing. Run these tests with `harper dev .` (single worker)." + }, + { + "name": "2. Restart Recovery Tests", + "item": [ + { + "name": "2.1 Start Sync Before Restart", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Start command issued\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.message).to.include('start');", + " pm.environment.set('versionBeforeRestart', jsonData.version);", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"action\": \"start\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Start sync, then manually restart Harper to test state recovery. After running this request, restart Harper and run the next request." + }, + "response": [] + }, + { + "name": "2.2 Verify State After Restart", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Engines automatically restarted\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.global.command).to.equal('start');", + " pm.expect(jsonData.worker.running).to.be.true;", + "});", + "", + "pm.test(\"Version preserved from before restart\", function () {", + " var jsonData = pm.response.json();", + " var versionBeforeRestart = pm.environment.get('versionBeforeRestart');", + " if (versionBeforeRestart) {", + " pm.expect(jsonData.global.version).to.equal(parseInt(versionBeforeRestart));", + " }", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "After restarting Harper, verify engines automatically resume based on persisted state in SyncControlState table." + }, + "response": [] + } + ], + "description": "Test that workers automatically recover state after restart by loading from SyncControlState table.\n\n**Manual Steps:**\n1. Run request 2.1 (Start Sync)\n2. Stop Harper (Ctrl+C)\n3. Restart Harper (`harper dev .`)\n4. Run request 2.2 (Verify State)\n\nWorkers should automatically start engines based on last known state." + }, + { + "name": "3. Multi-Worker Tests", + "item": [ + { + "name": "3.1 Get Initial Multi-Worker Status", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Worker has nodeId with worker index\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.worker.nodeId).to.match(/.*-\\d+/);", + " console.log('Worker nodeId:', jsonData.worker.nodeId);", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Get status from one worker. With multiple workers, each request may hit a different worker (round-robin)." + }, + "response": [] + }, + { + "name": "3.2 Start Sync (All Workers)", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Start command issued\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.message).to.include('start');", + " console.log('Commanded by:', jsonData.version);", + "});", + "", + "// Wait for all workers to process", + "setTimeout(function(){}, 3000);" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"action\": \"start\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Issue start command. All workers across all threads should receive and process this command via subscription." + }, + "response": [] + }, + { + "name": "3.3 Verify Worker 1", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Worker is running\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.worker.running).to.be.true;", + " console.log('Worker:', jsonData.worker.nodeId, 'running:', jsonData.worker.running);", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Check worker status. Run this multiple times to hit different workers." + }, + "response": [] + }, + { + "name": "3.4 Verify Worker 2", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Worker is running\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.worker.running).to.be.true;", + " console.log('Worker:', jsonData.worker.nodeId, 'running:', jsonData.worker.running);", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Check worker status again (may hit different worker due to round-robin)." + }, + "response": [] + }, + { + "name": "3.5 Verify Worker 3", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Worker is running\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.worker.running).to.be.true;", + " console.log('Worker:', jsonData.worker.nodeId, 'running:', jsonData.worker.running);", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Check worker status again (may hit different worker due to round-robin)." + }, + "response": [] + }, + { + "name": "3.6 Stop Sync (All Workers)", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Stop command issued\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.message).to.include('stop');", + "});", + "", + "// Wait for all workers to process", + "setTimeout(function(){}, 3000);" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "POST", + "header": [ + { + "key": "Content-Type", + "value": "application/json" + } + ], + "body": { + "mode": "raw", + "raw": "{\n \"action\": \"stop\"\n}" + }, + "url": { + "raw": "{{baseUrl}}/SyncControl", + "host": ["{{baseUrl}}"], + "path": ["SyncControl"] + }, + "description": "Issue stop command to all workers." + }, + "response": [] + } + ], + "description": "Multi-worker testing. Start Harper with multiple workers: `harper dev . --threads 3`\n\nEach worker thread subscribes to SyncControlState independently. When you issue a command, all workers should receive and process it.\n\nNote: Harper's HTTP server uses round-robin across workers, so successive GET requests may hit different workers." + }, + { + "name": "4. Data Query Tests", + "item": [ + { + "name": "4.1 View Checkpoints", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Returns array of checkpoints\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData).to.be.an('array');", + " if (jsonData.length > 0) {", + " pm.expect(jsonData[0]).to.have.property('nodeId');", + " pm.expect(jsonData[0]).to.have.property('lastTimestamp');", + " }", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncCheckpoint/", + "host": ["{{baseUrl}}"], + "path": ["SyncCheckpoint/"] + }, + "description": "Query checkpoint table to see sync progress per node." + }, + "response": [] + }, + { + "name": "4.2 View Validation Audits", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Returns array of audit records\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData).to.be.an('array');", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/SyncAudit/", + "host": ["{{baseUrl}}"], + "path": ["SyncAudit/"] + }, + "description": "Query audit table to see validation results." + }, + "response": [] + } + ], + "description": "Query Harper tables directly to inspect sync state, checkpoints, and audit logs." + }, + { + "name": "5. Data Ingestion Verification", + "item": [ + { + "name": "5.1 Verify PortEvents Data Ingested", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Returns array from PortEvents table\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData).to.be.an('array');", + "});", + "", + "pm.test(\"PortEvents table has records\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.length).to.be.above(0, 'PortEvents table should contain ingested records');", + " console.log('PortEvents record count:', jsonData.length);", + "});", + "", + "pm.test(\"PortEvents records have required fields\", function () {", + " var jsonData = pm.response.json();", + " if (jsonData.length > 0) {", + " var firstRecord = jsonData[0];", + " pm.expect(firstRecord).to.have.property('id');", + " pm.expect(firstRecord).to.have.property('event_time');", + " pm.expect(firstRecord).to.have.property('port_id');", + " pm.expect(firstRecord).to.have.property('port_name');", + " pm.expect(firstRecord).to.have.property('vessel_mmsi');", + " pm.expect(firstRecord).to.have.property('event_type');", + " pm.expect(firstRecord).to.have.property('latitude');", + " pm.expect(firstRecord).to.have.property('longitude');", + " pm.expect(firstRecord).to.have.property('_syncedAt');", + " console.log('Sample PortEvents record:', JSON.stringify(firstRecord, null, 2));", + " }", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/PortEvents/", + "host": ["{{baseUrl}}"], + "path": ["PortEvents/"] + }, + "description": "Verify that port events data has been ingested from BigQuery. Should return records with port arrival/departure events." + }, + "response": [] + }, + { + "name": "5.2 Verify VesselMetadata Data Ingested", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Returns array from VesselMetadata table\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData).to.be.an('array');", + "});", + "", + "pm.test(\"VesselMetadata table has records\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.length).to.be.above(0, 'VesselMetadata table should contain ingested records');", + " console.log('VesselMetadata record count:', jsonData.length);", + "});", + "", + "pm.test(\"VesselMetadata records have required fields\", function () {", + " var jsonData = pm.response.json();", + " if (jsonData.length > 0) {", + " var firstRecord = jsonData[0];", + " pm.expect(firstRecord).to.have.property('id');", + " pm.expect(firstRecord).to.have.property('last_updated');", + " pm.expect(firstRecord).to.have.property('mmsi');", + " pm.expect(firstRecord).to.have.property('vessel_name');", + " pm.expect(firstRecord).to.have.property('_syncedAt');", + " console.log('Sample VesselMetadata record:', JSON.stringify(firstRecord, null, 2));", + " }", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/VesselMetadata/", + "host": ["{{baseUrl}}"], + "path": ["VesselMetadata/"] + }, + "description": "Verify that vessel metadata has been ingested from BigQuery. Should return records with vessel information." + }, + "response": [] + }, + { + "name": "5.3 Verify VesselPositions Data Ingested", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "pm.test(\"Status code is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "pm.test(\"Returns array from VesselPositions table\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData).to.be.an('array');", + "});", + "", + "pm.test(\"VesselPositions table has records\", function () {", + " var jsonData = pm.response.json();", + " pm.expect(jsonData.length).to.be.above(0, 'VesselPositions table should contain ingested records');", + " console.log('VesselPositions record count:', jsonData.length);", + "});", + "", + "pm.test(\"VesselPositions records have required fields\", function () {", + " var jsonData = pm.response.json();", + " if (jsonData.length > 0) {", + " var firstRecord = jsonData[0];", + " pm.expect(firstRecord).to.have.property('id');", + " pm.expect(firstRecord).to.have.property('timestamp');", + " pm.expect(firstRecord).to.have.property('mmsi');", + " pm.expect(firstRecord).to.have.property('vessel_name');", + " pm.expect(firstRecord).to.have.property('latitude');", + " pm.expect(firstRecord).to.have.property('longitude');", + " pm.expect(firstRecord).to.have.property('speed_knots');", + " pm.expect(firstRecord).to.have.property('heading');", + " pm.expect(firstRecord).to.have.property('course');", + " pm.expect(firstRecord).to.have.property('_syncedAt');", + " console.log('Sample VesselPositions record:', JSON.stringify(firstRecord, null, 2));", + " }", + "});" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/VesselPositions/", + "host": ["{{baseUrl}}"], + "path": ["VesselPositions/"] + }, + "description": "Verify that vessel position data has been ingested from BigQuery. Should return records with vessel tracking positions." + }, + "response": [] + } + ], + "description": "Verify that data has been successfully ingested from BigQuery into Harper tables. These tests query the actual data tables (PortEvents, VesselMetadata, VesselPositions) via REST endpoints to confirm records exist and have the expected fields." + } + ], + "auth": { + "type": "basic", + "basic": [ + { + "key": "username", + "value": "admin", + "type": "string" + }, + { + "key": "password", + "value": "HarperRocks!", + "type": "string" + } + ] + } +} diff --git a/config.multi-table.yaml b/config.multi-table.yaml index 792eac2..58ff94b 100644 --- a/config.multi-table.yaml +++ b/config.multi-table.yaml @@ -16,7 +16,7 @@ pluginModule: ./src/index.js rest: true graphqlSchema: - files: 'schema/harper-bigquery-sync.graphql' + files: 'schema/bigquery-ingestor.graphql' jsResource: files: 'src/resources.js' diff --git a/config.yaml b/config.yaml index 9e10ecb..afd8bfd 100644 --- a/config.yaml +++ b/config.yaml @@ -9,7 +9,7 @@ pluginModule: ./src/index.js rest: true graphqlSchema: - files: 'schema/harper-bigquery-sync.graphql' + files: 'schema/bigquery-ingestor.graphql' jsResource: files: 'src/resources.js' diff --git a/docs/blog-post.md b/docs/blog-post.md index 6fcb94e..6feed38 100644 --- a/docs/blog-post.md +++ b/docs/blog-post.md @@ -527,5 +527,5 @@ This enables true autoscaling but isn't critical yet—quarterly capacity planni - [Design Document](design-document.md) — Full technical details - [Project History](HISTORY.md) — Development milestones - [System Overview](system-overview.md) — Architecture and how it all works together -- [GitHub Repository](https://github.com/HarperFast/harper-bigquery-sync) — Complete implementation +- [GitHub Repository](https://github.com/HarperFast/bigquery-ingestor) — Complete implementation - [HarperDB Docs](https://docs.harperdb.io) — Platform documentation diff --git a/docs/design-document.md b/docs/design-document.md index c31b9e1..69dc70c 100644 --- a/docs/design-document.md +++ b/docs/design-document.md @@ -723,7 +723,7 @@ for (const record of harperSample) { ### Schema Design -**Actual schema** (`schema/harper-bigquery-sync.graphql`): +**Actual schema** (`schema/bigquery-ingestor.graphql`): ```graphql # Main data table diff --git a/docs/system-overview.md b/docs/system-overview.md index 469e475..e77945e 100644 --- a/docs/system-overview.md +++ b/docs/system-overview.md @@ -253,7 +253,7 @@ synthesizer: ## File Structure ``` -harper-bigquery-sync/ +bigquery-ingestor/ ├── config.yaml # Unified configuration ├── service-account-key.json # Shared credentials │ @@ -275,7 +275,7 @@ harper-bigquery-sync/ │ └── cli.js # Synthesizer CLI │ ├── schema/ -│ └── harper-bigquery-sync.graphql # Plugin GraphQL schema +│ └── bigquery-ingestor.graphql # Plugin GraphQL schema │ └── docs/ ├── QUICKSTART.md # 5-minute setup guide diff --git a/package-lock.json b/package-lock.json index 212b08f..2d84cd5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,11 +1,11 @@ { - "name": "@harperdb/harper-bigquery-sync", + "name": "@harperdb/bigquery-ingestor", "version": "1.0.0", "lockfileVersion": 3, "requires": true, "packages": { "": { - "name": "@harperdb/harper-bigquery-sync", + "name": "@harperdb/bigquery-ingestor", "version": "1.0.0", "license": "Apache-2.0", "dependencies": { @@ -13,7 +13,7 @@ "yaml": "^2.8.1" }, "bin": { - "maritime-data-synthesizer": "bin/cli.js" + "maritime-data-synthesizer": "tools/maritime-data-synthesizer/cli.js" }, "devDependencies": { "@harperdb/code-guidelines": "^0.0.5", diff --git a/package.json b/package.json index e66499b..cf4675b 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { - "name": "@harperdb/harper-bigquery-sync", + "name": "@harperdb/bigquery-ingestor", "version": "1.0.0", - "description": "BigQuery sync plugin for Harper", + "description": "BigQuery data ingestion plugin for Harper", "license": "Apache-2.0", "author": { "name": "HarperDB, Inc.", diff --git a/schema/harper-bigquery-sync.graphql b/schema/bigquery-ingestor.graphql similarity index 73% rename from schema/harper-bigquery-sync.graphql rename to schema/bigquery-ingestor.graphql index 1bfc924..1d32088 100644 --- a/schema/harper-bigquery-sync.graphql +++ b/schema/bigquery-ingestor.graphql @@ -1,20 +1,17 @@ # File: bigquery-ingestor.graphql # GraphQL schema definitions for tables -# Example tables - to be replaced with dynamic tables later +# Example tables - dynamic schema allows BigQuery fields to be added automatically type VesselPositions @table { id: ID @primaryKey - # All BigQuery fields stored directly at top level } type PortEvents @table { id: ID @primaryKey - # All BigQuery fields stored directly at top level } type VesselMetadata @table { id: ID @primaryKey - # All BigQuery fields stored directly at top level } # Checkpoint table for ingestion @@ -43,6 +40,16 @@ type SyncAudit @table { recordSample: String } +# Cluster-wide sync control state +# Single record coordinates all workers across all nodes +type SyncControlState @table { + id: String! @primaryKey # Always "sync-control" (singleton) + command: String! # "start" | "stop" | "validate" + commandedAt: Date! @indexed # When command was issued + commandedBy: String # Node that issued (hostname-workerIndex) + version: Int! # Monotonically increasing for ordering +} + # Schema lock table for distributed schema checking # Ensures only one node checks schemas at a time type SchemaLock @table { diff --git a/src/bigquery-client.js b/src/bigquery-client.js index 98331ad..30f3183 100644 --- a/src/bigquery-client.js +++ b/src/bigquery-client.js @@ -197,12 +197,17 @@ export class BigQueryClient { batchSize, }); + // Log actual query parameters for debugging + logger.info( + `[BigQueryClient.pullPartition] Query params: nodeId=${params.nodeId}, clusterSize=${params.clusterSize}, lastTimestamp=${params.lastTimestamp}, batchSize=${params.batchSize}` + ); + const options = { query, params: params, }; - logger.trace(`[BigQueryClient.pullPartition] Generated SQL query: ${query}`); + logger.info(`[BigQueryClient.pullPartition] Generated SQL query: ${query}`); return await this.executeWithRetry(async () => { logger.debug('[BigQueryClient.pullPartition] Executing BigQuery query...'); @@ -210,9 +215,17 @@ export class BigQueryClient { const [rows] = await this.client.query(options); const duration = Date.now() - startTime; logger.info(`[BigQueryClient.pullPartition] Query complete - returned ${rows.length} rows in ${duration}ms`); - logger.debug( - `[BigQueryClient.pullPartition] First row timestamp: ${rows.length > 0 ? Date(rows[0][this.timestampColumn]) : 'N/A'}` - ); + + if (rows.length === 0) { + // Log diagnostic info when no results returned + logger.warn( + `[BigQueryClient.pullPartition] No results! Params were: nodeId=${params.nodeId}, clusterSize=${params.clusterSize}, lastTimestamp=${params.lastTimestamp}, batchSize=${params.batchSize}` + ); + } else { + logger.debug( + `[BigQueryClient.pullPartition] First row timestamp: ${rows.length > 0 ? rows[0][this.timestampColumn]?.value || rows[0][this.timestampColumn] : 'N/A'}` + ); + } return rows; }, 'pullPartition'); } diff --git a/src/config-loader.js b/src/config-loader.js index f98de78..b1b1ced 100644 --- a/src/config-loader.js +++ b/src/config-loader.js @@ -12,6 +12,13 @@ import { validateFullConfig as _validateFullConfig, validateAndNormalizeColumns const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); +// Safe logger wrapper for CLI compatibility +const log = { + debug: (msg) => typeof logger !== 'undefined' && logger.debug(msg), + info: (msg) => typeof logger !== 'undefined' && logger.info(msg), + error: (msg) => typeof logger !== 'undefined' && logger.error(msg), +}; + /** * Load configuration from config.yaml or accept a config object * @param {string|Object|null} configPath - Path to config file or config object or options object @@ -27,19 +34,19 @@ export function loadConfig(configPath = null) { if (configPath === null || configPath === undefined) { // Default to config.yaml in project root const path = join(__dirname, '..', 'config.yaml'); - logger.debug(`[ConfigLoader.loadConfig] Loading config from default path: ${path}`); + log.debug(`[ConfigLoader.loadConfig] Loading config from default path: ${path}`); const fileContent = readFileSync(path, 'utf8'); config = parse(fileContent); source = path; } else if (typeof configPath === 'string') { // Path to config file - logger.debug(`[ConfigLoader.loadConfig] Loading config from: ${configPath}`); + log.debug(`[ConfigLoader.loadConfig] Loading config from: ${configPath}`); const fileContent = readFileSync(configPath, 'utf8'); config = parse(fileContent); source = configPath; } else if (typeof configPath === 'object') { // Config object passed directly (for testing) - logger.debug('[ConfigLoader.loadConfig] Using config object passed directly'); + log.debug('[ConfigLoader.loadConfig] Using config object passed directly'); // Check if it's an options object with 'config' property if (configPath.config) { config = configPath.config; @@ -52,16 +59,16 @@ export function loadConfig(configPath = null) { } if (!config) { - logger.error('[ConfigLoader.loadConfig] Failed to parse configuration'); + log.error('[ConfigLoader.loadConfig] Failed to parse configuration'); throw new Error('Failed to parse configuration'); } - logger.info(`[ConfigLoader.loadConfig] Successfully loaded config from: ${source}`); + log.info(`[ConfigLoader.loadConfig] Successfully loaded config from: ${source}`); // Normalize to multi-table format if needed return normalizeConfig(config); } catch (error) { - logger.error(`[ConfigLoader.loadConfig] Configuration loading failed: ${error.message}`); + log.error(`[ConfigLoader.loadConfig] Configuration loading failed: ${error.message}`); throw new Error(`Failed to load configuration: ${error.message}`); } } @@ -75,13 +82,13 @@ export function loadConfig(configPath = null) { */ function normalizeConfig(config) { if (!config.bigquery) { - logger.error('[ConfigLoader.normalizeConfig] bigquery section missing in configuration'); + log.error('[ConfigLoader.normalizeConfig] bigquery section missing in configuration'); throw new Error('bigquery section missing in configuration'); } // Check if already in multi-table format if (config.bigquery.tables && Array.isArray(config.bigquery.tables)) { - logger.info( + log.info( `[ConfigLoader.normalizeConfig] Config already in multi-table format with ${config.bigquery.tables.length} tables` ); // Validate multi-table configuration @@ -90,7 +97,7 @@ function normalizeConfig(config) { } // Legacy single-table format - wrap in tables array - logger.info('[ConfigLoader.normalizeConfig] Converting legacy single-table config to multi-table format'); + log.info('[ConfigLoader.normalizeConfig] Converting legacy single-table config to multi-table format'); const legacyBigQueryConfig = config.bigquery; // Extract table-specific config @@ -108,7 +115,7 @@ function normalizeConfig(config) { }, }; - logger.debug( + log.debug( `[ConfigLoader.normalizeConfig] Created table config: ${tableConfig.dataset}.${tableConfig.table} -> ${tableConfig.targetTable}` ); @@ -128,7 +135,7 @@ function normalizeConfig(config) { }, }; - logger.info('[ConfigLoader.normalizeConfig] Successfully normalized config to multi-table format'); + log.info('[ConfigLoader.normalizeConfig] Successfully normalized config to multi-table format'); return normalizedConfig; } @@ -139,15 +146,15 @@ function normalizeConfig(config) { * @private */ function validateMultiTableConfig(config) { - logger.debug('[ConfigLoader.validateMultiTableConfig] Validating multi-table configuration'); + log.debug('[ConfigLoader.validateMultiTableConfig] Validating multi-table configuration'); if (!config.bigquery.tables || !Array.isArray(config.bigquery.tables)) { - logger.error('[ConfigLoader.validateMultiTableConfig] bigquery.tables must be an array'); + log.error('[ConfigLoader.validateMultiTableConfig] bigquery.tables must be an array'); throw new Error('bigquery.tables must be an array'); } if (config.bigquery.tables.length === 0) { - logger.error('[ConfigLoader.validateMultiTableConfig] bigquery.tables array cannot be empty'); + log.error('[ConfigLoader.validateMultiTableConfig] bigquery.tables array cannot be empty'); throw new Error('bigquery.tables array cannot be empty'); } @@ -157,36 +164,36 @@ function validateMultiTableConfig(config) { for (const table of config.bigquery.tables) { // Check required fields if (!table.id) { - logger.error('[ConfigLoader.validateMultiTableConfig] Missing required field: table.id'); + log.error('[ConfigLoader.validateMultiTableConfig] Missing required field: table.id'); throw new Error('Missing required field: table.id'); } if (!table.dataset) { - logger.error(`[ConfigLoader.validateMultiTableConfig] Missing 'dataset' for table: ${table.id}`); + log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'dataset' for table: ${table.id}`); throw new Error(`Missing required field 'dataset' for table: ${table.id}`); } if (!table.table) { - logger.error(`[ConfigLoader.validateMultiTableConfig] Missing 'table' for table: ${table.id}`); + log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'table' for table: ${table.id}`); throw new Error(`Missing required field 'table' for table: ${table.id}`); } if (!table.timestampColumn) { - logger.error(`[ConfigLoader.validateMultiTableConfig] Missing 'timestampColumn' for table: ${table.id}`); + log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'timestampColumn' for table: ${table.id}`); throw new Error(`Missing required field 'timestampColumn' for table: ${table.id}`); } if (!table.targetTable) { - logger.error(`[ConfigLoader.validateMultiTableConfig] Missing 'targetTable' for table: ${table.id}`); + log.error(`[ConfigLoader.validateMultiTableConfig] Missing 'targetTable' for table: ${table.id}`); throw new Error(`Missing required field 'targetTable' for table: ${table.id}`); } // Check for duplicate IDs if (tableIds.has(table.id)) { - logger.error(`[ConfigLoader.validateMultiTableConfig] Duplicate table ID: ${table.id}`); + log.error(`[ConfigLoader.validateMultiTableConfig] Duplicate table ID: ${table.id}`); throw new Error(`Duplicate table ID: ${table.id}`); } tableIds.add(table.id); // Check for duplicate target Harper tables if (targetTables.has(table.targetTable)) { - logger.error( + log.error( `[ConfigLoader.validateMultiTableConfig] Duplicate targetTable '${table.targetTable}' for: ${table.id}` ); throw new Error( @@ -199,12 +206,12 @@ function validateMultiTableConfig(config) { } targetTables.add(table.targetTable); - logger.debug( + log.debug( `[ConfigLoader.validateMultiTableConfig] Validated table: ${table.id} (${table.dataset}.${table.table} -> ${table.targetTable})` ); } - logger.info( + log.info( `[ConfigLoader.validateMultiTableConfig] Successfully validated ${config.bigquery.tables.length} table configurations` ); } diff --git a/src/generator.js b/src/generator.js index 075be9e..4e39a98 100644 --- a/src/generator.js +++ b/src/generator.js @@ -3,6 +3,26 @@ * Generates realistic synthetic vessel tracking data with global distribution */ +/** + * Converts timestamp with fractional milliseconds to ISO string with microsecond precision + * This ensures even distribution across MOD partitioning in BigQuery + * @param {number} timestampMs - Timestamp in milliseconds (can have fractional part) + * @returns {string} ISO 8601 timestamp with microsecond precision + */ +function toISOStringWithMicros(timestampMs) { + // Add random microseconds (0-999) to ensure even distribution + const randomMicros = Math.floor(Math.random() * 1000); + const totalMicros = Math.floor(timestampMs * 1000) + randomMicros; + + const date = new Date(Math.floor(timestampMs)); + const isoBase = date.toISOString(); // e.g., "2023-12-18T18:36:07.890Z" + + // Replace milliseconds with full microsecond precision + // ISO format: YYYY-MM-DDTHH:MM:SS.sssZ -> YYYY-MM-DDTHH:MM:SS.ssssssZ + const microsStr = String(totalMicros % 1000000).padStart(6, '0'); + return isoBase.replace(/\.\d{3}Z$/, `.${microsStr}Z`); +} + // Major ports around the world with coordinates and traffic weight const MAJOR_PORTS = [ // Asia-Pacific (50% of global maritime traffic) @@ -510,7 +530,7 @@ class MaritimeVesselGenerator { status: position.status, destination: position.destination, eta: new Date(recordTime.getTime() + Math.random() * 7 * 24 * 3600000).toISOString(), // Random ETA within 7 days - timestamp: recordTime.toISOString(), + timestamp: toISOStringWithMicros(recordTime.getTime()), report_date: recordTime.toISOString().split('T')[0].replace(/-/g, ''), // YYYYMMDD }; diff --git a/src/globals.js b/src/globals.js index 4ae79d8..f1ce56e 100644 --- a/src/globals.js +++ b/src/globals.js @@ -7,15 +7,19 @@ class Globals { Globals.instance = this; } set(key, value) { - logger.debug(`[Globals.set] Setting '${key}' = ${JSON.stringify(value)}`); + if (typeof logger !== 'undefined') { + logger.debug(`[Globals.set] Setting '${key}'`); + } this.data[key] = value; } get(key) { const value = this.data[key]; - if (value === undefined) { - logger.debug(`[Globals.get] Key '${key}' not found`); - } else { - logger.debug(`[Globals.get] Retrieved '${key}' = ${JSON.stringify(value)}`); + if (typeof logger !== 'undefined') { + if (value === undefined) { + logger.debug(`[Globals.get] Key '${key}' not found`); + } else { + logger.debug(`[Globals.get] Retrieved '${key}'`); + } } return value; } diff --git a/src/index.js b/src/index.js index 4f9fe23..79a56a1 100644 --- a/src/index.js +++ b/src/index.js @@ -6,6 +6,7 @@ import { getPluginConfig, getTableConfig } from './config-loader.js'; import { ValidationService } from './validation.js'; import { SchemaManager } from './schema-manager.js'; import { BigQueryClient } from './bigquery-client.js'; +import { SyncControlManager } from './sync-control-manager.js'; export async function handleApplication(scope) { const logger = scope.logger; @@ -108,6 +109,12 @@ export async function handleApplication(scope) { // Store all sync engines in globals globals.set('syncEngines', syncEngines); + // Initialize SyncControlManager (controls all engines cluster-wide) + const controlManager = new SyncControlManager(syncEngines); + await controlManager.initialize(); + globals.set('controlManager', controlManager); + logger.info(`[handleApplication] SyncControlManager initialized for ${syncEngines.length} engines`); + // For backward compatibility, also store the first engine as 'syncEngine' if (syncEngines.length > 0) { globals.set('syncEngine', syncEngines[0]); diff --git a/src/resources.js b/src/resources.js index 4901c6d..d65df62 100644 --- a/src/resources.js +++ b/src/resources.js @@ -7,7 +7,7 @@ // File: resources.js // Component entry point with resource definitions -/* global tables, Resource */ +/* global tables, Resource, server, logger */ import { globals } from './globals.js'; // Main data table resource @@ -104,37 +104,82 @@ export class SyncAudit extends tables.SyncAudit { export class SyncControl extends Resource { async get() { logger.debug('[SyncControl.get] Status request received'); - const status = await globals.get('syncEngine').getStatus(); + + const STATE_ID = 'sync-control'; + const globalState = await tables.SyncControlState.get(STATE_ID); + const controlManager = globals.get('controlManager'); + + // Handle case where controlManager not yet initialized (during startup) + if (!controlManager) { + logger.warn('[SyncControl.get] SyncControlManager not initialized yet'); + return { + global: globalState || { command: 'unknown', version: 0 }, + worker: { + nodeId: `${server.hostname}-${server.workerIndex}`, + running: false, + tables: [], + failedEngines: [], + status: 'initializing', + }, + uptime: process.uptime(), + version: '2.0.0', + }; + } + + const workerStatus = controlManager.getStatus(); + const response = { - status, + global: globalState || { command: 'unknown', version: 0 }, + worker: { + nodeId: `${server.hostname}-${server.workerIndex}`, + running: workerStatus.currentState === 'start', + tables: workerStatus.engines, + failedEngines: workerStatus.failedEngines, + }, uptime: process.uptime(), - version: '1.0.0', + version: '2.0.0', }; - logger.info(`[SyncControl.get] Returning status - running: ${status.running}, phase: ${status.phase}`); + + logger.info(`[SyncControl.get] Status: ${globalState?.command} (v${globalState?.version})`); return response; } async post({ action }) { logger.info(`[SyncControl.post] Control action received: ${action}`); - switch (action) { - case 'start': - logger.info('[SyncControl.post] Starting sync engine'); - await globals.get('syncEngine').start(); - logger.info('[SyncControl.post] Sync engine started successfully'); - return { message: 'Sync started' }; - case 'stop': - logger.info('[SyncControl.post] Stopping sync engine'); - await globals.get('syncEngine').stop(); - logger.info('[SyncControl.post] Sync engine stopped successfully'); - return { message: 'Sync stopped' }; - case 'validate': - logger.info('[SyncControl.post] Triggering validation'); - await globals.get('validator').runValidation(); - logger.info('[SyncControl.post] Validation completed'); - return { message: 'Validation triggered' }; - default: - logger.warn(`[SyncControl.post] Unknown action requested: ${action}`); - throw new Error(`Unknown action: ${action}`); + + const STATE_ID = 'sync-control'; + + // Validate action + if (!['start', 'stop', 'validate'].includes(action)) { + throw new Error(`Unknown action: ${action}`); } + + // Check if controlManager is initialized (optional warning) + const controlManager = globals.get('controlManager'); + if (!controlManager) { + logger.warn( + '[SyncControl.post] SyncControlManager not initialized on this worker yet. Command will be processed once initialization completes.' + ); + } + + // Get current version + const current = await tables.SyncControlState.get(STATE_ID); + const nextVersion = (current?.version || 0) + 1; + + // Update state table (triggers all subscriptions cluster-wide) + await tables.SyncControlState.put({ + id: STATE_ID, + command: action, + commandedAt: new Date().toISOString(), + commandedBy: `${server.hostname}-${server.workerIndex}`, + version: nextVersion, + }); + + logger.info(`[SyncControl.post] Command '${action}' issued (v${nextVersion})`); + + return { + message: `${action} command issued to cluster`, + version: nextVersion, + }; } } diff --git a/src/sync-control-manager.js b/src/sync-control-manager.js new file mode 100644 index 0000000..97ef6b9 --- /dev/null +++ b/src/sync-control-manager.js @@ -0,0 +1,181 @@ +// ============================================================================ +// File: sync-control-manager.js +// Manages cluster-wide sync control via replicated state table +// ============================================================================ + +/* global logger, tables */ +import { globals } from './globals.js'; + +export class SyncControlManager { + constructor(syncEngines) { + logger.info('[SyncControlManager] Constructor called'); + this.syncEngines = syncEngines; + this.lastProcessedVersion = 0; + this.currentState = 'stopped'; + this.isProcessing = false; + this.subscription = null; + this.failedEngines = []; + logger.info(`[SyncControlManager] Initialized with ${syncEngines.length} engines`); + } + + async initialize() { + logger.info('[SyncControlManager.initialize] Starting initialization'); + const STATE_ID = 'sync-control'; + + // Load current state from table + const currentState = await tables.SyncControlState.get(STATE_ID); + + if (currentState) { + logger.info( + `[SyncControlManager.initialize] Found existing state: ${currentState.command} (v${currentState.version})` + ); + this.lastProcessedVersion = currentState.version; + // Apply current state to engines + await this.processCommand(currentState); + } else { + // Initialize state on first run + logger.info('[SyncControlManager.initialize] No state found, initializing to stopped'); + await tables.SyncControlState.put({ + id: STATE_ID, + command: 'stop', + commandedAt: new Date(), + commandedBy: 'system-init', + version: 0, + }); + } + + // Subscribe to future changes + logger.info('[SyncControlManager.initialize] Setting up subscription'); + this.subscription = await tables.SyncControlState.subscribe({ id: STATE_ID }); + this.startSubscriptionLoop(); + + logger.info('[SyncControlManager.initialize] Initialization complete'); + } + + async processCommand(state) { + if (this.isProcessing) { + logger.debug('[SyncControlManager.processCommand] Already processing, skipping'); + return; + } + + this.isProcessing = true; + + try { + logger.info(`[SyncControlManager.processCommand] Processing command: ${state.command}`); + + switch (state.command) { + case 'start': + await this.startAllEngines(); + break; + case 'stop': + await this.stopAllEngines(); + break; + case 'validate': + await this.runValidation(); + break; + default: + logger.warn(`[SyncControlManager.processCommand] Unknown command: ${state.command}`); + } + + this.currentState = state.command; + logger.info(`[SyncControlManager.processCommand] Command completed: ${state.command}`); + } catch (error) { + logger.error('[SyncControlManager.processCommand] Error processing command:', error); + } finally { + this.isProcessing = false; + } + } + + async startAllEngines() { + logger.info(`[SyncControlManager.startAllEngines] Starting ${this.syncEngines.length} engines`); + + const results = await Promise.allSettled(this.syncEngines.map((engine) => engine.start())); + + // Track failures for status reporting + this.failedEngines = results + .map((result, i) => ({ engine: this.syncEngines[i], result })) + .filter(({ result }) => result.status === 'rejected') + .map(({ engine, result }) => ({ + tableId: engine.tableId, + error: result.reason?.message || String(result.reason), + })); + + if (this.failedEngines.length > 0) { + logger.error( + `[SyncControlManager.startAllEngines] ${this.failedEngines.length} engines failed to start:`, + this.failedEngines + ); + } + + const successCount = this.syncEngines.length - this.failedEngines.length; + logger.info(`[SyncControlManager.startAllEngines] Started ${successCount}/${this.syncEngines.length} engines`); + } + + async stopAllEngines() { + logger.info(`[SyncControlManager.stopAllEngines] Stopping ${this.syncEngines.length} engines`); + + await Promise.allSettled(this.syncEngines.map((engine) => engine.stop())); + + this.failedEngines = []; // Clear failures on stop + logger.info('[SyncControlManager.stopAllEngines] All engines stopped'); + } + + async runValidation() { + logger.info('[SyncControlManager.runValidation] Running validation'); + const validator = globals.get('validator'); + + if (validator) { + await validator.runValidation(); + logger.info('[SyncControlManager.runValidation] Validation completed'); + } else { + logger.warn('[SyncControlManager.runValidation] Validator not available'); + } + } + + async startSubscriptionLoop() { + logger.info('[SyncControlManager.startSubscriptionLoop] Starting subscription loop'); + + const STATE_ID = 'sync-control'; + + try { + for await (const _update of this.subscription) { + // Subscription event received - fetch the actual record + const state = await tables.SyncControlState.get(STATE_ID); + + if (!state) { + logger.warn('[SyncControlManager.startSubscriptionLoop] State record not found after update'); + continue; + } + + logger.info(`[SyncControlManager.startSubscriptionLoop] Received update: ${state.command} (v${state.version})`); + + // Only process if version is newer + if (state.version > this.lastProcessedVersion) { + await this.processCommand(state); + this.lastProcessedVersion = state.version; + } else { + logger.debug( + `[SyncControlManager.startSubscriptionLoop] Skipping old version ${state.version} (last: ${this.lastProcessedVersion})` + ); + } + } + } catch (error) { + logger.error('[SyncControlManager.startSubscriptionLoop] Subscription failed, restarting in 5s:', error); + setTimeout(() => this.initialize(), 5000); + } + } + + getStatus() { + return { + currentState: this.currentState, + lastProcessedVersion: this.lastProcessedVersion, + engines: this.syncEngines.map((engine) => ({ + tableId: engine.tableId, + running: engine.running, + phase: engine.currentPhase, + })), + failedEngines: this.failedEngines, + isProcessing: this.isProcessing, + }; + } +} diff --git a/src/sync-engine.js b/src/sync-engine.js index ad0dfc6..695c5e3 100644 --- a/src/sync-engine.js +++ b/src/sync-engine.js @@ -15,7 +15,7 @@ export class SyncEngine { logger.info('Hostname: ' + server.hostname); logger.info('Worker Id: ' + server.workerIndex); - logger.info('Nodes: ' + server.nodes); + logger.info('Worker Count: ' + (server.workerCount || 1)); this.initialized = false; this.config = config; @@ -101,15 +101,54 @@ export class SyncEngine { logger.debug('[SyncEngine.discoverCluster] Querying Harper cluster API'); const currentNodeId = [server.hostname, server.workerIndex].join('-'); logger.info(`[SyncEngine.discoverCluster] Current node ID: ${currentNodeId}`); + logger.info(`[SyncEngine.discoverCluster] server.hostname: ${server.hostname}`); + logger.info(`[SyncEngine.discoverCluster] server.workerIndex: ${server.workerIndex}`); + logger.info(`[SyncEngine.discoverCluster] server.workerCount: ${server.workerCount}`); + logger.info( + `[SyncEngine.discoverCluster] server.nodes: ${server.nodes ? JSON.stringify(server.nodes) : 'undefined'}` + ); - // Get cluster nodes from server.nodes if available - let nodes; + let nodes = []; + const workerCount = server.workerCount || 1; + + // Check if running in a multi-node cluster if (server.nodes && Array.isArray(server.nodes) && server.nodes.length > 0) { - nodes = server.nodes.map((node) => `${node.hostname}-${node.workerIndex || 0}`); - logger.info(`[SyncEngine.discoverCluster] Found ${nodes.length} nodes from server.nodes`); + // Multi-node cluster: enumerate all nodes and their workers + logger.info( + `[SyncEngine.discoverCluster] Multi-node cluster: ${server.nodes.length} other nodes, ${workerCount} workers per node` + ); + + // IMPORTANT: server.nodes only contains OTHER nodes, not the current node + // We must add the current node to the list + + // Add current node first + for (let i = 0; i < workerCount; i++) { + nodes.push(`${server.hostname}-${i}`); + } + + // Add other cluster nodes from server.nodes + for (const node of server.nodes) { + const hostname = node.name || node.hostname || node.host || node.id; + + if (!hostname) { + logger.warn(`[SyncEngine.discoverCluster] Node missing name property: ${JSON.stringify(node)}`); + continue; + } + + for (let i = 0; i < workerCount; i++) { + nodes.push(`${hostname}-${i}`); + } + } + logger.info( + `[SyncEngine.discoverCluster] Total cluster size: ${nodes.length} workers across ${server.nodes.length + 1} nodes` + ); } else { - logger.info('[SyncEngine.discoverCluster] No cluster nodes found, running in single-node mode'); - nodes = [currentNodeId]; + // Single node: generate workers for current node only + logger.info(`[SyncEngine.discoverCluster] Single node with ${workerCount} workers`); + + for (let i = 0; i < workerCount; i++) { + nodes.push(`${server.hostname}-${i}`); + } } // Sort deterministically (lexicographic) @@ -229,8 +268,8 @@ export class SyncEngine { logger.debug(`[SyncEngine.runSyncCycle] Batch size: ${batchSize}`); // Pull records for this node's partition - logger.debug( - `[SyncEngine.runSyncCycle] Pulling partition data from BigQuery - nodeId: ${this.nodeId}, clusterSize: ${this.clusterSize}, lastTimestamp: ${this.lastCheckpoint.lastTimestamp}` + logger.info( + `[SyncEngine.runSyncCycle] Pulling partition data from BigQuery - nodeId: ${this.nodeId}, clusterSize: ${this.clusterSize}, lastTimestamp: ${this.lastCheckpoint.lastTimestamp}, batchSize: ${batchSize}` ); const records = await this.client.pullPartition({ nodeId: this.nodeId, @@ -299,8 +338,27 @@ export class SyncEngine { for (const record of records) { try { + // Log first raw record from BigQuery to see structure + if (validRecords.length === 0) { + logger.info(`[SyncEngine.ingestRecords] First raw BigQuery record keys: ${Object.keys(record).join(', ')}`); + logger.info( + `[SyncEngine.ingestRecords] First raw BigQuery record sample: ${JSON.stringify(record).substring(0, 200)}` + ); + } + // Convert BigQuery types to JavaScript primitives using type-converter utility const convertedRecord = convertBigQueryTypes(record); + + // Log first converted record + if (validRecords.length === 0) { + logger.info( + `[SyncEngine.ingestRecords] First converted record keys: ${Object.keys(convertedRecord).join(', ')}` + ); + logger.info( + `[SyncEngine.ingestRecords] First converted record sample: ${JSON.stringify(convertedRecord).substring(0, 200)}` + ); + } + logger.trace(`[SyncEngine.ingestRecords] Converted record: ${JSON.stringify(convertedRecord)}`); // Validate timestamp exists @@ -312,22 +370,32 @@ export class SyncEngine { continue; } - // Generate deterministic integer ID for fast indexing + // Generate deterministic ID for indexing // Use hash of timestamp + deterministic fields for uniqueness const timestamp = convertedRecord[timestampColumn]; const hashInput = `${timestamp}-${JSON.stringify(convertedRecord)}`; const hash = createHash('sha256').update(hashInput).digest(); // Convert first 8 bytes of hash to positive integer within safe range const bigIntId = hash.readBigInt64BE(0); - const id = Number(bigIntId < 0n ? -bigIntId : bigIntId) % Number.MAX_SAFE_INTEGER; + const numericId = Number(bigIntId < 0n ? -bigIntId : bigIntId) % Number.MAX_SAFE_INTEGER; + // Convert to string as required by Harper + const id = String(numericId); - // Store BigQuery record with integer ID for fast indexing + // Store BigQuery record with string ID const mappedRecord = { - id, // Integer primary key for fast indexing + id, // String primary key (required by Harper) ...convertedRecord, // All BigQuery fields at top level _syncedAt: new Date(), // Add sync timestamp }; + // Log first mapped record to see what's being written to Harper + if (validRecords.length === 0) { + logger.info(`[SyncEngine.ingestRecords] First mapped record keys: ${Object.keys(mappedRecord).join(', ')}`); + logger.info( + `[SyncEngine.ingestRecords] First mapped record sample: ${JSON.stringify(mappedRecord).substring(0, 200)}` + ); + } + validRecords.push(mappedRecord); } catch (error) { logger.error(`[SyncEngine.ingestRecords] Error processing record: ${error.message}`, error); @@ -354,10 +422,11 @@ export class SyncEngine { } for (const rec of validRecords) { - _lastResult = targetTableObj.create(rec); + // Use put (upsert) instead of create to handle duplicate IDs gracefully + _lastResult = targetTableObj.put(rec); } } catch (error) { - logger.error(`[SyncEngine.ingestRecords] Harper create failed: ${error.message}`, error); + logger.error(`[SyncEngine.ingestRecords] Harper put failed: ${error.message}`, error); if (error.errors) { error.errors.forEach((e) => logger.error(` ${e.reason} at ${e.location}: ${e.message}`)); } diff --git a/src/validation.js b/src/validation.js index 6fd4321..0a5c244 100644 --- a/src/validation.js +++ b/src/validation.js @@ -3,7 +3,7 @@ // Validation service for data integrity checks // NOTE: Avoids count-based validation since Harper counts are estimates -/* global harperCluster, tables */ +/* global server, tables */ import { BigQueryClient } from './bigquery-client.js'; import { createHash } from 'node:crypto'; @@ -408,19 +408,61 @@ export class ValidationService { async discoverCluster() { logger.trace('[ValidationService.discoverCluster] Discovering cluster topology for validation'); - const nodes = await harperCluster.getNodes(); - logger.trace(`[ValidationService.discoverCluster] Found ${nodes.length} nodes`); - const sortedNodes = nodes.sort((a, b) => a.id.localeCompare(b.id)); - const currentNodeId = harperCluster.currentNode.id; - const nodeIndex = sortedNodes.findIndex((n) => n.id === currentNodeId); + const currentNodeId = [server.hostname, server.workerIndex].join('-'); + logger.trace(`[ValidationService.discoverCluster] Current node ID: ${currentNodeId}`); + + let nodes = []; + const workerCount = server.workerCount || 1; + + // Check if running in a multi-node cluster + if (server.nodes && Array.isArray(server.nodes) && server.nodes.length > 0) { + // Multi-node cluster: enumerate all nodes and their workers + logger.trace( + `[ValidationService.discoverCluster] Multi-node cluster: ${server.nodes.length} other nodes, ${workerCount} workers per node` + ); + + // Add current node first + for (let i = 0; i < workerCount; i++) { + nodes.push(`${server.hostname}-${i}`); + } + + // Add other cluster nodes + for (const node of server.nodes) { + const hostname = node.name || node.hostname || node.host || node.id; + if (!hostname) { + logger.warn(`[ValidationService.discoverCluster] Node missing name property: ${JSON.stringify(node)}`); + continue; + } + for (let i = 0; i < workerCount; i++) { + nodes.push(`${hostname}-${i}`); + } + } + } else { + // Single node: generate workers for current node only + logger.trace(`[ValidationService.discoverCluster] Single node with ${workerCount} workers`); + for (let i = 0; i < workerCount; i++) { + nodes.push(`${server.hostname}-${i}`); + } + } + + // Sort deterministically + nodes.sort(); + const nodeIndex = nodes.findIndex((n) => n === currentNodeId); + + if (nodeIndex === -1) { + logger.error( + `[ValidationService.discoverCluster] Current node '${currentNodeId}' not found in cluster nodes: ${nodes.join(', ')}` + ); + throw new Error(`Current node ${currentNodeId} not found in cluster`); + } logger.trace( - `[ValidationService.discoverCluster] Current node: ${currentNodeId}, index: ${nodeIndex}, clusterSize: ${sortedNodes.length}` + `[ValidationService.discoverCluster] Current node: ${currentNodeId}, index: ${nodeIndex}, clusterSize: ${nodes.length}` ); return { nodeId: nodeIndex, - clusterSize: sortedNodes.length, + clusterSize: nodes.length, }; } } diff --git a/tools/maritime-data-synthesizer/cli.js b/tools/maritime-data-synthesizer/cli.js index 7d4167d..00f1e11 100755 --- a/tools/maritime-data-synthesizer/cli.js +++ b/tools/maritime-data-synthesizer/cli.js @@ -33,12 +33,13 @@ function showHelp() { } console.log('\nExamples:'); - console.log(' maritime-data-synthesizer initialize 30 # Load 30 days of historical data'); + console.log(' maritime-data-synthesizer initialize realistic # Load realistic test data (24 hrs)'); console.log(' maritime-data-synthesizer start # Start with auto-backfill (rolling window)'); console.log(' maritime-data-synthesizer start --no-backfill # Start without backfill'); console.log(' maritime-data-synthesizer stats # View statistics'); - console.log(' maritime-data-synthesizer clear # Clear all data (keeps table)'); - console.log(' maritime-data-synthesizer reset 60 # Reset with 60 days of data'); + console.log(' maritime-data-synthesizer clear # Clear all data (keeps tables)'); + console.log(' maritime-data-synthesizer clean # Delete all tables and data'); + console.log(' maritime-data-synthesizer reset realistic # Delete and reinitialize with data'); console.log('\nConfiguration:'); console.log(' All settings are loaded from config.yaml'); console.log(' - Uses same BigQuery connection as the plugin (bigquery section)'); @@ -266,6 +267,10 @@ async function runMultiTableMode(command, arg, config) { console.log(`Generating data for scenario: ${scenario}`); + // Generate historical data by defaulting to 30 days ago + // This ensures the sync engine will find data when it starts + const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); + const orchestrator = new MultiTableOrchestrator({ bigquery: { projectId: config.projectId, @@ -273,7 +278,7 @@ async function runMultiTableMode(command, arg, config) { location: config.location, }, scenario, - startTime: new Date(), + startTime: thirtyDaysAgo, }); await orchestrator.generateAll({ @@ -348,15 +353,53 @@ async function runMultiTableMode(command, arg, config) { break; } + case 'clear': { + console.log('Clearing all data from tables (schema will be preserved)...'); + + const orchestrator = new MultiTableOrchestrator({ + bigquery: { + projectId: config.projectId, + keyFilename: config.credentials, + location: config.location, + }, + scenario: 'small', // Scenario doesn't matter for clear + }); + + await orchestrator.clearAllTables(config.datasetId); + + console.log('\nClear complete!'); + setTimeout(() => process.exit(0), 100); + break; + } + + case 'clean': { + console.log('Deleting all tables (schema and data will be removed)...'); + + const orchestrator = new MultiTableOrchestrator({ + bigquery: { + projectId: config.projectId, + keyFilename: config.credentials, + location: config.location, + }, + scenario: 'small', // Scenario doesn't matter for clean + }); + + await orchestrator.deleteAllTables(config.datasetId); + + console.log('\nClean complete!'); + setTimeout(() => process.exit(0), 100); + break; + } + case 'stats': - case 'clear': - case 'clean': case 'reset': console.error(`\nCommand "${command}" is not yet implemented in multi-table mode.`); console.error('Available commands for multi-table mode:'); console.error(' initialize - Generate test data (scenarios: small, realistic, stress)'); console.error(' start - Start continuous generation with rolling window'); - console.error('\nOther commands (stats, clear, clean, reset) are coming soon!'); + console.error(' clear - Clear all data from tables (keeps schema)'); + console.error(' clean - Delete all tables (removes schema and data)'); + console.error('\nOther commands (stats, reset) are coming soon!'); process.exit(1); break; diff --git a/tools/maritime-data-synthesizer/generators/port-events-generator.js b/tools/maritime-data-synthesizer/generators/port-events-generator.js index 007b701..52ae8d7 100644 --- a/tools/maritime-data-synthesizer/generators/port-events-generator.js +++ b/tools/maritime-data-synthesizer/generators/port-events-generator.js @@ -7,6 +7,26 @@ import { SAMPLE_PORTS, EVENT_TYPES, SAMPLE_VESSELS } from '../../../test/fixtures/multi-table-test-data.js'; +/** + * Converts timestamp with fractional milliseconds to ISO string with microsecond precision + * This ensures even distribution across MOD partitioning in BigQuery + * @param {number} timestampMs - Timestamp in milliseconds (can have fractional part) + * @returns {string} ISO 8601 timestamp with microsecond precision + */ +function toISOStringWithMicros(timestampMs) { + // Add random microseconds (0-999) to ensure even distribution + const randomMicros = Math.floor(Math.random() * 1000); + const totalMicros = Math.floor(timestampMs * 1000) + randomMicros; + + const date = new Date(Math.floor(timestampMs)); + const isoBase = date.toISOString(); // e.g., "2023-12-18T18:36:07.890Z" + + // Replace milliseconds with full microsecond precision + // ISO format: YYYY-MM-DDTHH:MM:SS.sssZ -> YYYY-MM-DDTHH:MM:SS.ssssssZ + const microsStr = String(totalMicros % 1000000).padStart(6, '0'); + return isoBase.replace(/\.\d{3}Z$/, `.${microsStr}Z`); +} + export class PortEventsGenerator { /** * Creates a new PortEventsGenerator @@ -70,7 +90,7 @@ export class PortEventsGenerator { // Generate event const event = { - event_time: new Date(currentTime).toISOString(), + event_time: toISOStringWithMicros(currentTime), port_id: port.port_id, port_name: port.name, vessel_mmsi: mmsi, @@ -161,7 +181,7 @@ export class PortEventsGenerator { // Arrival events.push({ - event_time: new Date(currentTime).toISOString(), + event_time: toISOStringWithMicros(currentTime), port_id: port.port_id, port_name: port.name, vessel_mmsi: mmsi, @@ -175,7 +195,7 @@ export class PortEventsGenerator { // Berthed events.push({ - event_time: new Date(currentTime).toISOString(), + event_time: toISOStringWithMicros(currentTime), port_id: port.port_id, port_name: port.name, vessel_mmsi: mmsi, @@ -189,7 +209,7 @@ export class PortEventsGenerator { // Departure events.push({ - event_time: new Date(currentTime).toISOString(), + event_time: toISOStringWithMicros(currentTime), port_id: port.port_id, port_name: port.name, vessel_mmsi: mmsi, @@ -242,7 +262,7 @@ export class PortEventsGenerator { const eventType = EVENT_TYPES[Math.floor(Math.random() * EVENT_TYPES.length)]; events.push({ - event_time: new Date(currentTime).toISOString(), + event_time: toISOStringWithMicros(currentTime), port_id: port.port_id, port_name: port.name, vessel_mmsi: mmsi, diff --git a/tools/maritime-data-synthesizer/generators/vessel-metadata-generator.js b/tools/maritime-data-synthesizer/generators/vessel-metadata-generator.js index fdc23ff..c2ebd79 100644 --- a/tools/maritime-data-synthesizer/generators/vessel-metadata-generator.js +++ b/tools/maritime-data-synthesizer/generators/vessel-metadata-generator.js @@ -8,6 +8,26 @@ import { SAMPLE_VESSELS, VESSEL_STATUSES } from '../../../test/fixtures/multi-table-test-data.js'; +/** + * Converts timestamp with fractional milliseconds to ISO string with microsecond precision + * This ensures even distribution across MOD partitioning in BigQuery + * @param {number} timestampMs - Timestamp in milliseconds (can have fractional part) + * @returns {string} ISO 8601 timestamp with microsecond precision + */ +function toISOStringWithMicros(timestampMs) { + // Add random microseconds (0-999) to ensure even distribution + const randomMicros = Math.floor(Math.random() * 1000); + const totalMicros = Math.floor(timestampMs * 1000) + randomMicros; + + const date = new Date(Math.floor(timestampMs)); + const isoBase = date.toISOString(); // e.g., "2023-12-18T18:36:07.890Z" + + // Replace milliseconds with full microsecond precision + // ISO format: YYYY-MM-DDTHH:MM:SS.sssZ -> YYYY-MM-DDTHH:MM:SS.ssssssZ + const microsStr = String(totalMicros % 1000000).padStart(6, '0'); + return isoBase.replace(/\.\d{3}Z$/, `.${microsStr}Z`); +} + // Additional data for realistic vessel generation const VESSEL_TYPES = [ 'Container Ship', @@ -158,10 +178,10 @@ export class VesselMetadataGenerator { const dimensions = this.generateDimensions(vesselType); // Random timestamp within the time range - const lastUpdated = new Date(this.startTime.getTime() + Math.random() * this.durationMs).toISOString(); + const lastUpdated = new Date(this.startTime.getTime() + Math.random() * this.durationMs); return { - last_updated: lastUpdated, + last_updated: toISOStringWithMicros(lastUpdated), mmsi: mmsi, imo: imo, vessel_name: vesselName, @@ -187,10 +207,10 @@ export class VesselMetadataGenerator { * @private */ enrichVesselMetadata(sampleVessel) { - const lastUpdated = new Date(this.startTime.getTime() + Math.random() * this.durationMs).toISOString(); + const lastUpdated = new Date(this.startTime.getTime() + Math.random() * this.durationMs); return { - last_updated: lastUpdated, + last_updated: toISOStringWithMicros(lastUpdated), mmsi: sampleVessel.mmsi, imo: sampleVessel.imo, vessel_name: sampleVessel.vessel_name, @@ -384,12 +404,12 @@ export class VesselMetadataGenerator { // Generate subsequent updates with minor changes for (let i = 1; i < updatesPerVessel; i++) { - const updateTime = new Date(this.startTime.getTime() + (this.durationMs / updatesPerVessel) * i).toISOString(); + const updateTime = this.startTime.getTime() + (this.durationMs / updatesPerVessel) * i; // Update with occasional changes vessel = { ...vessel, - last_updated: updateTime, + last_updated: toISOStringWithMicros(updateTime), status: VESSEL_STATUSES[Math.floor(Math.random() * VESSEL_STATUSES.length)], // Occasionally change owner (ownership transfer) owner: Math.random() < 0.1 ? this.generateOwner() : vessel.owner, diff --git a/tools/maritime-data-synthesizer/multi-table-orchestrator.js b/tools/maritime-data-synthesizer/multi-table-orchestrator.js index c42d660..a163915 100644 --- a/tools/maritime-data-synthesizer/multi-table-orchestrator.js +++ b/tools/maritime-data-synthesizer/multi-table-orchestrator.js @@ -283,20 +283,33 @@ export class MultiTableOrchestrator { } /** - * Truncates all tables + * Truncates all tables (free tier compatible) + * For free tier, this deletes and recreates tables to clear data * @param {string} dataset - Dataset name * @private */ async truncateTables(dataset) { - console.log(`\nTruncating tables...`); + console.log(`\nTruncating tables (free tier compatible - using delete/recreate)...`); const tables = ['vessel_positions', 'port_events', 'vessel_metadata']; for (const tableName of tables) { try { - await this.bigquery.query({ - query: `DELETE FROM \`${this.projectId}.${dataset}.${tableName}\` WHERE true`, - }); + const table = this.bigquery.dataset(dataset).table(tableName); + const [exists] = await table.exists(); + + if (!exists) { + console.log(` ⊘ Table does not exist: ${tableName}`); + continue; + } + + // Get current schema before deleting + const [metadata] = await table.getMetadata(); + const schema = metadata.schema.fields; + + // Delete and recreate + await table.delete(); + await this.bigquery.dataset(dataset).createTable(tableName, { schema }); console.log(` ✓ Truncated: ${tableName}`); } catch (error) { console.error(` Error truncating ${tableName}:`, error.message); @@ -304,6 +317,77 @@ export class MultiTableOrchestrator { } } + /** + * Clears all data from all tables (keeps schema) + * For free tier compatibility, this deletes and recreates tables + * @param {string} dataset - Dataset name + * @returns {Promise} + */ + async clearAllTables(dataset) { + console.log(`\nClearing all data from tables (schema will be preserved)...`); + console.log(`Note: BigQuery free tier requires table recreation to clear data`); + + const tables = ['vessel_positions', 'port_events', 'vessel_metadata']; + + for (const tableName of tables) { + try { + const table = this.bigquery.dataset(dataset).table(tableName); + const [exists] = await table.exists(); + + if (!exists) { + console.log(` ⊘ Table does not exist: ${tableName}`); + continue; + } + + // Get current schema before deleting + const [metadata] = await table.getMetadata(); + const schema = metadata.schema.fields; + + // Delete table + await table.delete(); + console.log(` ✓ Deleted: ${tableName}`); + + // Recreate with same schema + await this.bigquery.dataset(dataset).createTable(tableName, { schema }); + console.log(` ✓ Recreated: ${tableName} (schema preserved)`); + } catch (error) { + console.error(` ✗ Error clearing ${tableName}:`, error.message); + } + } + + console.log(`\n✓ All tables cleared successfully`); + } + + /** + * Deletes all tables entirely (removes schema and data) + * @param {string} dataset - Dataset name + * @returns {Promise} + */ + async deleteAllTables(dataset) { + console.log(`\nDeleting all tables (schema and data will be removed)...`); + + const tables = ['vessel_positions', 'port_events', 'vessel_metadata']; + + for (const tableName of tables) { + try { + const table = this.bigquery.dataset(dataset).table(tableName); + const [exists] = await table.exists(); + + if (!exists) { + console.log(` ⊘ Table does not exist: ${tableName}`); + continue; + } + + await table.delete(); + console.log(` ✓ Deleted: ${tableName}`); + } catch (error) { + console.error(` ✗ Error deleting ${tableName}:`, error.message); + } + } + + console.log(`\n✓ All tables deleted successfully`); + } + /** * Generates and inserts vessel_metadata * @param {string} dataset - Dataset name @@ -821,6 +905,7 @@ export class MultiTableOrchestrator { /** * Clean up old data beyond retention period + * Note: Requires BigQuery billing enabled (DML queries not available in free tier) * @param {string} dataset - Dataset name * @private */ @@ -852,7 +937,13 @@ export class MultiTableOrchestrator { console.log(` ${table.name}: No records to delete`); } } catch (error) { - console.error(` Error cleaning ${table.name}:`, error.message); + // Check if it's a free tier DML error + if (error.message && error.message.includes('DML queries are not allowed in the free tier')) { + console.log(` ${table.name}: Cleanup skipped (requires BigQuery billing)`); + console.log(` Note: Use 'clear' command to manually remove old data`); + } else { + console.error(` Error cleaning ${table.name}:`, error.message); + } } } }