Skip to content

Commit 09d292c

Browse files
jheerdomoritz
andauthored
feat: Create data cube index tables in a schema. (uwdata#519)
* feat!: Create data cube index tables in a schema. * rename to data_cube_schema and implement corresponding python part * docs: Fix docs typo. --------- Co-authored-by: Dominik Moritz <[email protected]>
1 parent cf19ab0 commit 09d292c

File tree

6 files changed

+127
-67
lines changed

6 files changed

+127
-67
lines changed

dev/index.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@
160160
}
161161

162162
function setIndex() {
163-
vg.coordinator().dataCubeIndexer.enabled(indexToggle.checked);
163+
vg.coordinator().dataCubeIndexer.enabled = indexToggle.checked;
164164
}
165165

166166
function reload() {

docs/api/core/coordinator.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Create a new Mosaic Coordinator to manage all database communication for clients
2020
* _logger_: The logger to use, defaults to `console`.
2121
* _cache_: Boolean flag to enable/disable query caching (default `true`).
2222
* _consolidate_ Boolean flag to enable/disable query consolidation (default `true`).
23-
* _indexes_: Data cube indexer options object. The _enabled_ flag (default `true`) determines if data cube indexes should be used when possible. The _temp_ flag (default `true`) controls if temporary tables should be created for data cube indexes.
23+
* _indexes_: Data cube indexer options object. The _enabled_ flag (default `true`) determines if data cube indexes should be used when possible. The _schema_ option (default `'mosaic'`) indicates the database schema in which data cube index tables should be created.
2424

2525
## databaseConnector
2626

packages/core/src/Coordinator.js

+31-27
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { socketConnector } from './connectors/socket.js';
22
import { DataCubeIndexer } from './DataCubeIndexer.js';
3+
import { MosaicClient } from './MosaicClient.js';
34
import { QueryManager, Priority } from './QueryManager.js';
45
import { queryFieldInfo } from './util/field-info.js';
6+
import { QueryResult } from './util/query-result.js';
57
import { voidLogger } from './util/void-logger.js';
68

79
/**
@@ -24,6 +26,10 @@ export function coordinator(instance) {
2426
return _instance;
2527
}
2628

29+
/**
30+
* @typedef {import('@uwdata/mosaic-sql').Query | string} QueryType
31+
*/
32+
2733
/**
2834
* A Mosaic Coordinator manages all database communication for clients and
2935
* handles selection updates. The Coordinator also performs optimizations
@@ -34,7 +40,8 @@ export function coordinator(instance) {
3440
* @param {*} [options.manager] The query manager to use.
3541
* @param {boolean} [options.cache=true] Boolean flag to enable/disable query caching.
3642
* @param {boolean} [options.consolidate=true] Boolean flag to enable/disable query consolidation.
37-
* @param {object} [options.indexes] Data cube indexer options.
43+
* @param {import('./DataCubeIndexer.js').DataCubeIndexerOptions} [options.indexes]
44+
* Data cube indexer options.
3845
*/
3946
export class Coordinator {
4047
constructor(db = socketConnector(), {
@@ -48,10 +55,10 @@ export class Coordinator {
4855
this.manager = manager;
4956
this.manager.cache(cache);
5057
this.manager.consolidate(consolidate);
51-
this.dataCubeIndexer = new DataCubeIndexer(this, indexes);
52-
this.logger(logger);
5358
this.databaseConnector(db);
59+
this.logger(logger);
5460
this.clear();
61+
this.dataCubeIndexer = new DataCubeIndexer(this, indexes);
5562
}
5663

5764
/**
@@ -98,7 +105,7 @@ export class Coordinator {
98105
/**
99106
* Cancel previosuly submitted query requests. These queries will be
100107
* canceled if they are queued but have not yet been submitted.
101-
* @param {import('./util/query-result.js').QueryResult[]} requests An array
108+
* @param {QueryResult[]} requests An array
102109
* of query result objects, such as those returned by the `query` method.
103110
*/
104111
cancel(requests) {
@@ -107,29 +114,30 @@ export class Coordinator {
107114

108115
/**
109116
* Issue a query for which no result (return value) is needed.
110-
* @param {import('@uwdata/mosaic-sql').Query | string} query The query.
117+
* @param {QueryType | QueryType[]} query The query or an array of queries.
118+
* Each query should be either a Query builder object or a SQL string.
111119
* @param {object} [options] An options object.
112120
* @param {number} [options.priority] The query priority, defaults to
113121
* `Priority.Normal`.
114-
* @returns {import('./util/query-result.js').QueryResult} A query result
122+
* @returns {QueryResult} A query result
115123
* promise.
116124
*/
117125
exec(query, { priority = Priority.Normal } = {}) {
118-
query = Array.isArray(query) ? query.join(';\n') : query;
126+
query = Array.isArray(query) ? query.filter(x => x).join(';\n') : query;
119127
return this.manager.request({ type: 'exec', query }, priority);
120128
}
121129

122130
/**
123131
* Issue a query to the backing database. The submitted query may be
124132
* consolidate with other queries and its results may be cached.
125-
* @param {import('@uwdata/mosaic-sql').Query | string} query The query.
133+
* @param {QueryType} query The query as either a Query builder object
134+
* or a SQL string.
126135
* @param {object} [options] An options object.
127136
* @param {'arrow' | 'json'} [options.type] The query result format type.
128137
* @param {boolean} [options.cache=true] If true, cache the query result.
129138
* @param {number} [options.priority] The query priority, defaults to
130139
* `Priority.Normal`.
131-
* @returns {import('./util/query-result.js').QueryResult} A query result
132-
* promise.
140+
* @returns {QueryResult} A query result promise.
133141
*/
134142
query(query, {
135143
type = 'arrow',
@@ -143,11 +151,11 @@ export class Coordinator {
143151
/**
144152
* Issue a query to prefetch data for later use. The query result is cached
145153
* for efficient future access.
146-
* @param {import('@uwdata/mosaic-sql').Query | string} query The query.
154+
* @param {QueryType} query The query as either a Query builder object
155+
* or a SQL string.
147156
* @param {object} [options] An options object.
148157
* @param {'arrow' | 'json'} [options.type] The query result format type.
149-
* @returns {import('./util/query-result.js').QueryResult} A query result
150-
* promise.
158+
* @returns {QueryResult} A query result promise.
151159
*/
152160
prefetch(query, options = {}) {
153161
return this.query(query, { ...options, cache: true, priority: Priority.Low });
@@ -159,7 +167,7 @@ export class Coordinator {
159167
* @param {string} name The name of the bundle.
160168
* @param {[string | {sql: string}, {alias: string}]} queries The queries to save into the bundle.
161169
* @param {number} priority Request priority.
162-
* @returns
170+
* @returns {QueryResult} A query result promise.
163171
*/
164172
createBundle(name, queries, priority = Priority.Low) {
165173
const options = { name, queries: queries.map(q => typeof q == 'string' ? {sql: q} : q) };
@@ -170,7 +178,7 @@ export class Coordinator {
170178
* Load a bundle into the cache.
171179
* @param {string} name The name of the bundle.
172180
* @param {number} priority Request priority.
173-
* @returns
181+
* @returns {QueryResult} A query result promise.
174182
*/
175183
loadBundle(name, priority = Priority.High) {
176184
const options = { name };
@@ -182,8 +190,8 @@ export class Coordinator {
182190
/**
183191
* Update client data by submitting the given query and returning the
184192
* data (or error) to the client.
185-
* @param {import('./MosaicClient.js').MosaicClient} client A Mosaic client.
186-
* @param {import('@uwdata/mosaic-sql').Query | string} query The data query.
193+
* @param {MosaicClient} client A Mosaic client.
194+
* @param {QueryType} query The data query.
187195
* @param {number} [priority] The query priority.
188196
* @returns {Promise} A Promise that resolves upon completion of the update.
189197
*/
@@ -201,10 +209,8 @@ export class Coordinator {
201209
* Issue a query request for a client. If the query is null or undefined,
202210
* the client is simply updated. Otherwise `updateClient` is called. As a
203211
* side effect, this method clears the current data cube indexer state.
204-
* @param {import('./MosaicClient.js').MosaicClient} client The client
205-
* to update.
206-
* @param {import('@uwdata/mosaic-sql').Query | string | null} [query]
207-
* The query to issue.
212+
* @param {MosaicClient} client The client to update.
213+
* @param {QueryType | null} [query] The query to issue.
208214
*/
209215
requestQuery(client, query) {
210216
this.dataCubeIndexer.clear();
@@ -215,8 +221,7 @@ export class Coordinator {
215221

216222
/**
217223
* Connect a client to the coordinator.
218-
* @param {import('./MosaicClient.js').MosaicClient} client The Mosaic
219-
* client to connect.
224+
* @param {MosaicClient} client The Mosaic client to connect.
220225
*/
221226
async connect(client) {
222227
const { clients } = this;
@@ -247,8 +252,7 @@ export class Coordinator {
247252

248253
/**
249254
* Disconnect a client from the coordinator.
250-
* @param {import('./MosaicClient.js').MosaicClient} client The Mosaic
251-
* client to disconnect.
255+
* @param {MosaicClient} client The Mosaic client to disconnect.
252256
*/
253257
disconnect(client) {
254258
const { clients, filterGroups } = this;
@@ -267,8 +271,8 @@ export class Coordinator {
267271
* Connect a selection-client pair to the coordinator to process updates.
268272
* @param {Coordinator} mc The Mosaic coordinator.
269273
* @param {import('./Selection.js').Selection} selection A selection.
270-
* @param {import('./MosaicClient.js').MosaicClient} client A Mosiac
271-
* client that is filtered by the given selection.
274+
* @param {MosaicClient} client A Mosiac client that is filtered by the
275+
* given selection.
272276
*/
273277
function connectSelection(mc, selection, client) {
274278
if (!selection) return;

packages/core/src/DataCubeIndexer.js

+81-21
Original file line numberDiff line numberDiff line change
@@ -6,55 +6,112 @@ import { fnv_hash } from './util/hash.js';
66

77
const Skip = { skip: true, result: null };
88

9+
/**
10+
* @typedef {object} DataCubeIndexerOptions
11+
* @property {string} [schema] Database schema (namespace) in which to write
12+
* data cube index tables (default 'mosaic').
13+
* @property {boolean} [options.enabled=true] Flag to enable or disable the
14+
* indexer. This setting can later be updated via the `enabled` method.
15+
*/
16+
917
/**
1018
* Build and query optimized indices ("data cubes") for fast computation of
1119
* groupby aggregate queries over compatible client queries and selections.
1220
* A data cube contains pre-aggregated data for a Mosaic client, subdivided
1321
* by possible query values from an active selection clause. These cubes are
1422
* realized as as database tables that can be queried for rapid updates.
23+
*
1524
* Compatible client queries must consist of only groupby dimensions and
1625
* supported aggregate functions. Compatible selections must contain an active
1726
* clause that exposes metadata for an interval or point value predicate.
27+
*
28+
* Data cube index tables are written to a dedicated schema (namespace) that
29+
* can be set using the *schema* constructor option. This schema acts as a
30+
* persistent cache, and index tables may be used across sessions. The
31+
* `dropIndexTables` method issues a query to remove *all* tables within
32+
* this schema. This may be needed if the original tables have updated data,
33+
* but should be used with care.
1834
*/
1935
export class DataCubeIndexer {
2036
/**
2137
* Create a new data cube index table manager.
2238
* @param {import('./Coordinator.js').Coordinator} coordinator A Mosaic coordinator.
23-
* @param {object} [options] Indexer options.
24-
* @param {boolean} [options.enabled=true] Flag to enable/disable indexer.
25-
* @param {boolean} [options.temp=true] Flag to indicate if generated data
26-
* cube index tables should be temporary tables.
39+
* @param {DataCubeIndexerOptions} [options] Data cube indexer options.
2740
*/
2841
constructor(coordinator, {
29-
enabled = true,
30-
temp = true
42+
schema = 'mosaic',
43+
enabled = true
3144
} = {}) {
3245
/** @type {Map<import('./MosaicClient.js').MosaicClient, DataCubeInfo | Skip | null>} */
3346
this.indexes = new Map();
3447
this.active = null;
35-
this.temp = temp;
3648
this.mc = coordinator;
49+
this._schema = schema;
3750
this._enabled = enabled;
3851
}
3952

4053
/**
41-
* Set the enabled state of this indexer. If false, any cached state is
54+
* Set the enabled state of this indexer. If false, any local state is
4255
* cleared and subsequent index calls will return null until re-enabled.
43-
* @param {boolean} state The enabled state.
56+
* This method has no effect on any index tables already in the database.
57+
* @param {boolean} [state] The enabled state to set.
4458
*/
45-
enabled(state) {
46-
if (state === undefined) {
47-
return this._enabled;
48-
} else if (this._enabled !== state) {
59+
set enabled(state) {
60+
if (this._enabled !== state) {
4961
if (!state) this.clear();
5062
this._enabled = state;
5163
}
5264
}
5365

66+
/**
67+
* Get the enabled state of this indexer.
68+
* @returns {boolean} The current enabled state.
69+
*/
70+
get enabled() {
71+
return this._enabled;
72+
}
73+
74+
/**
75+
* Set the database schema used by this indexer. Upon changes, any local
76+
* state is cleared. This method does _not_ drop any existing data cube
77+
* tables, use `dropIndexTables` before changing the schema to also remove
78+
* existing index tables in the database.
79+
* @param {string} [schema] The schema name to set.
80+
*/
81+
set schema(schema) {
82+
if (this._schema !== schema) {
83+
this.clear();
84+
this._schema = schema;
85+
}
86+
}
87+
88+
/**
89+
* Get the database schema used by this indexer.
90+
* @returns {string} The current schema name.
91+
*/
92+
get schema() {
93+
return this._schema;
94+
}
95+
96+
/**
97+
* Issues a query through the coordinator to drop the current index table
98+
* schema. *All* tables in the schema will be removed and local state is
99+
* cleared. Call this method if the underlying base tables have been updated,
100+
* causing derived index tables to become stale and inaccurate. Use this
101+
* method with care! Once dropped, the schema will be repopulated by future
102+
* data cube indexer requests.
103+
* @returns A query result promise.
104+
*/
105+
dropIndexTables() {
106+
this.clear();
107+
return this.mc.exec(`DROP SCHEMA IF EXISTS "${this.schema}" CASCADE`);
108+
}
109+
54110
/**
55111
* Clear the cache of data cube index table entries for the current active
56112
* selection clause. This method does _not_ drop any existing data cube
57-
* tables.
113+
* tables. Use `dropIndexTables` to remove existing index tables from the
114+
* database.
58115
*/
59116
clear() {
60117
this.indexes.clear();
@@ -77,9 +134,9 @@ export class DataCubeIndexer {
77134
*/
78135
index(client, selection, activeClause) {
79136
// if not enabled, do nothing
80-
if (!this._enabled) return null;
137+
if (!this.enabled) return null;
81138

82-
const { indexes, mc, temp } = this;
139+
const { indexes, mc, schema } = this;
83140
const { source } = activeClause;
84141

85142
// if there is no clause source to track, do nothing
@@ -125,8 +182,11 @@ export class DataCubeIndexer {
125182
} else {
126183
// generate data cube index table
127184
const filter = selection.remove(source).predicate(client);
128-
info = dataCubeInfo(client.query(filter), active, indexCols);
129-
info.result = mc.exec(create(info.table, info.create, { temp }));
185+
info = dataCubeInfo(client.query(filter), active, indexCols, schema);
186+
info.result = mc.exec([
187+
`CREATE SCHEMA IF NOT EXISTS ${schema}`,
188+
create(info.table, info.create, { temp: false })
189+
]);
130190
info.result.catch(e => mc.logger().error(e));
131191
}
132192

@@ -223,7 +283,7 @@ function binInterval(scale, pixelSize, bin) {
223283
* @param {*} indexCols Data cube index column definitions.
224284
* @returns {DataCubeInfo}
225285
*/
226-
function dataCubeInfo(clientQuery, active, indexCols) {
286+
function dataCubeInfo(clientQuery, active, indexCols, schema) {
227287
const { dims, aggr, aux } = indexCols;
228288
const { columns } = active;
229289

@@ -246,7 +306,7 @@ function dataCubeInfo(clientQuery, active, indexCols) {
246306
// generate creation query string and hash id
247307
const create = query.toString();
248308
const id = (fnv_hash(create) >>> 0).toString(16);
249-
const table = `cube_index_${id}`;
309+
const table = `${schema}.cube_${id}`;
250310

251311
// generate data cube select query
252312
const select = Query
@@ -255,7 +315,7 @@ function dataCubeInfo(clientQuery, active, indexCols) {
255315
.groupby(dims)
256316
.orderby(order);
257317

258-
return new DataCubeInfo({ table, create, active, select });
318+
return new DataCubeInfo({ id, table, create, active, select });
259319
}
260320

261321
/**

0 commit comments

Comments
 (0)