forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathschema_tables.hh
343 lines (249 loc) · 14.2 KB
/
schema_tables.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
/*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "cql3/functions/functions.hh"
#include "mutation/mutation.hh"
#include "schema/schema_fwd.hh"
#include "schema_features.hh"
#include "utils/hashing.hh"
#include "schema_mutations.hh"
#include "types/map.hh"
#include "query-result-set.hh"
#include <seastar/core/distributed.hh>
#include <vector>
#include <map>
namespace data_dictionary {
class keyspace_metadata;
class user_types_storage;
}
using keyspace_metadata = data_dictionary::keyspace_metadata;
namespace replica {
class database;
}
namespace query {
class result_set;
}
namespace service {
class storage_service;
class storage_proxy;
}
namespace gms {
class feature_service;
}
namespace cql3::functions {
class user_function;
class user_aggregate;
}
namespace db {
class system_keyspace;
class extensions;
class config;
class schema_ctxt {
public:
schema_ctxt(const config&, std::shared_ptr<data_dictionary::user_types_storage> uts, const gms::feature_service&,
replica::database* = nullptr);
schema_ctxt(replica::database&);
schema_ctxt(distributed<replica::database>&);
schema_ctxt(distributed<service::storage_proxy>&);
const db::extensions& extensions() const {
return _extensions;
}
unsigned murmur3_partitioner_ignore_msb_bits() const {
return _murmur3_partitioner_ignore_msb_bits;
}
uint32_t schema_registry_grace_period() const {
return _schema_registry_grace_period;
}
const data_dictionary::user_types_storage& user_types() const noexcept {
return *_user_types;
}
const gms::feature_service& features() const {
return _features;
}
replica::database* get_db() const {
return _db;
}
private:
replica::database* _db;
const gms::feature_service& _features;
const db::extensions& _extensions;
const unsigned _murmur3_partitioner_ignore_msb_bits;
const uint32_t _schema_registry_grace_period;
const std::shared_ptr<data_dictionary::user_types_storage> _user_types;
};
namespace schema_tables {
extern logging::logger slogger;
using schema_result = std::map<sstring, lw_shared_ptr<query::result_set>>;
using schema_result_value_type = std::pair<sstring, lw_shared_ptr<query::result_set>>;
const std::string COMMITLOG_FILENAME_PREFIX("SchemaLog-");
namespace v3 {
static constexpr auto NAME = "system_schema";
static constexpr auto KEYSPACES = "keyspaces";
static constexpr auto SCYLLA_KEYSPACES = "scylla_keyspaces";
static constexpr auto TABLES = "tables";
static constexpr auto SCYLLA_TABLES = "scylla_tables";
static constexpr auto COLUMNS = "columns";
static constexpr auto DROPPED_COLUMNS = "dropped_columns";
static constexpr auto TRIGGERS = "triggers";
static constexpr auto VIEWS = "views";
static constexpr auto TYPES = "types";
static constexpr auto FUNCTIONS = "functions";
static constexpr auto AGGREGATES = "aggregates";
static constexpr auto SCYLLA_AGGREGATES = "scylla_aggregates";
static constexpr auto INDEXES = "indexes";
static constexpr auto VIEW_VIRTUAL_COLUMNS = "view_virtual_columns"; // Scylla specific
static constexpr auto COMPUTED_COLUMNS = "computed_columns"; // Scylla specific
static constexpr auto SCYLLA_TABLE_SCHEMA_HISTORY = "scylla_table_schema_history"; // Scylla specific;
schema_ptr keyspaces();
schema_ptr columns();
schema_ptr view_virtual_columns();
schema_ptr dropped_columns();
schema_ptr indexes();
schema_ptr tables();
schema_ptr scylla_tables(schema_features features = schema_features::full());
schema_ptr views();
schema_ptr types();
schema_ptr functions();
schema_ptr aggregates();
schema_ptr computed_columns();
// Belongs to the "system" keyspace
schema_ptr scylla_table_schema_history();
// Returns table ids of all schema tables which contribute to schema_mutations,
// i.e. those which are used to define schema of a table or a view.
// All such tables have a clustering key whose first column is the table name.
const std::unordered_set<table_id>& schema_tables_holding_schema_mutations();
}
namespace legacy {
class schema_mutations {
mutation _columnfamilies;
mutation _columns;
public:
schema_mutations(mutation columnfamilies, mutation columns)
: _columnfamilies(std::move(columnfamilies))
, _columns(std::move(columns))
{ }
table_schema_version digest() const;
};
future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s);
}
struct qualified_name {
sstring keyspace_name;
sstring table_name;
qualified_name(sstring keyspace_name, sstring table_name)
: keyspace_name(std::move(keyspace_name))
, table_name(std::move(table_name))
{ }
qualified_name(const schema_ptr& s)
: keyspace_name(s->ks_name())
, table_name(s->cf_name())
{ }
auto operator<=>(const qualified_name&) const = default;
};
future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s);
using namespace v3;
// Change on non-backwards compatible changes of schema mutations.
// Replication of schema between nodes with different version is inhibited.
extern const sstring version;
// Returns schema_ptrs for all schema tables supported by given schema_features.
std::vector<schema_ptr> all_tables(schema_features);
// Like all_tables(), but returns schema::cf_name() of each table.
std::vector<sstring> all_table_names(schema_features);
// saves/creates all the system objects in the appropriate keyspaces;
// deletes them first, so they will be effectively overwritten.
future<> save_system_schema(cql3::query_processor& qp);
future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features, noncopyable_function<bool(std::string_view)> accept_keyspace);
// Calculates schema digest for all non-system keyspaces
future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features);
// Must be called on shard 0.
future<semaphore_units<>> hold_merge_lock() noexcept;
future<> with_merge_lock(noncopyable_function<future<> ()> func);
future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, schema_features features, std::optional<table_schema_version> version_from_group0);
future<std::optional<table_schema_version>> get_group0_schema_version(db::system_keyspace& sys_ks);
// Recalculates the local schema version.
//
// It is safe to call concurrently with recalculate_schema_version() and merge_schema() in which case it
// is guaranteed that the schema version we end up with after all calls will reflect the most recent state
// of feature_service and schema tables.
future<> recalculate_schema_version(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat);
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features);
std::vector<mutation> adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features);
future<schema_result_value_type>
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, sstring schema_table_name, sstring keyspace_name);
future<mutation> read_keyspace_mutation(distributed<service::storage_proxy>&, const sstring& keyspace_name);
std::vector<mutation> make_create_keyspace_mutations(schema_features features, lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp, bool with_tables_and_types_and_functions = true);
std::vector<mutation> make_drop_keyspace_mutations(schema_features features, lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp);
future<lw_shared_ptr<keyspace_metadata>> create_keyspace_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result_value_type& partition, lw_shared_ptr<query::result_set> scylla_specific_rs = nullptr);
future<lw_shared_ptr<query::result_set>> extract_scylla_specific_keyspace_info(distributed<service::storage_proxy>& proxy, const schema_result_value_type& partition);
std::vector<mutation> make_create_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
// Given a set of rows that is sorted by keyspace, create types for each keyspace.
// The topological sort in each keyspace is necessary when creating types, since we can only create a type when the
// types it reference have already been created.
future<std::vector<user_type>> create_types(replica::database& db, const std::vector<const query::result_set_row*>& rows);
future<std::vector<user_type>> create_types_from_schema_partition(keyspace_metadata& ks, lw_shared_ptr<query::result_set> result);
std::vector<data_type> read_arg_types(replica::database& db, const query::result_set_row& row, const sstring& keyspace);
future<shared_ptr<cql3::functions::user_function>> create_func(replica::database& db, const query::result_set_row& row);
seastar::future<std::vector<shared_ptr<cql3::functions::user_function>>> create_functions_from_schema_partition(replica::database& db, lw_shared_ptr<query::result_set> result);
shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::database& db, const query::result_set_row& row, const query::result_set_row* scylla_row, cql3::functions::change_batch& batch);
std::vector<shared_ptr<cql3::functions::user_aggregate>> create_aggregates_from_schema_partition(replica::database& db, lw_shared_ptr<query::result_set> result, lw_shared_ptr<query::result_set> scylla_result, cql3::functions::change_batch& batch);
std::vector<mutation> make_create_function_mutations(shared_ptr<cql3::functions::user_function> func, api::timestamp_type timestamp);
std::vector<mutation> make_drop_function_mutations(shared_ptr<cql3::functions::user_function> func, api::timestamp_type timestamp);
std::vector<mutation> make_create_aggregate_mutations(schema_features features, shared_ptr<cql3::functions::user_aggregate> func, api::timestamp_type timestamp);
std::vector<mutation> make_drop_aggregate_mutations(schema_features features, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type timestamp);
std::vector<mutation> make_drop_type_mutations(lw_shared_ptr<keyspace_metadata> keyspace, user_type type, api::timestamp_type timestamp);
void add_type_to_schema_mutation(user_type type, api::timestamp_type timestamp, std::vector<mutation>& mutations);
std::vector<mutation> make_create_table_mutations(schema_ptr table, api::timestamp_type timestamp);
std::vector<mutation> make_update_table_mutations(
replica::database& db,
lw_shared_ptr<keyspace_metadata> keyspace,
schema_ptr old_table,
schema_ptr new_table,
api::timestamp_type timestamp);
future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result);
std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
schema_ptr create_table_from_mutations(const schema_ctxt&, schema_mutations, std::optional<table_schema_version> version = {});
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, std::optional<table_schema_version> version = {});
future<std::vector<view_ptr>> create_views_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result);
schema_mutations make_schema_mutations(schema_ptr s, api::timestamp_type timestamp, bool with_columns);
mutation make_scylla_tables_mutation(schema_ptr, api::timestamp_type timestamp);
void add_table_or_view_to_schema_mutation(schema_ptr view, api::timestamp_type timestamp, bool with_columns, std::vector<mutation>& mutations);
std::vector<mutation> make_create_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
std::vector<mutation> make_update_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr old_view, view_ptr new_view, api::timestamp_type timestamp, bool include_base);
std::vector<mutation> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view_ptr& v, schema_ptr base_schema);
sstring serialize_kind(column_kind kind);
column_kind deserialize_kind(sstring kind);
data_type parse_type(sstring str);
sstring serialize_index_kind(index_metadata_kind kind);
index_metadata_kind deserialize_index_kind(sstring kind);
mutation compact_for_schema_digest(const mutation& m);
void feed_hash_for_schema_digest(hasher&, const mutation&, schema_features);
template<typename K, typename V>
std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const sstring& name) {
if (auto values = row.get<map_type_impl::native_type>(name)) {
std::map<K, V> map;
for (auto&& entry : *values) {
map.emplace(value_cast<K>(entry.first), value_cast<V>(entry.second));
};
return map;
}
return std::nullopt;
}
/// Stores the column mapping for the table being created or altered in the system table
/// which holds a history of schema versions alongside with their column mappings.
/// Can be used to insert entries with TTL (equal to DEFAULT_GC_GRACE_SECONDS) in case we are
/// overwriting an existing column mapping to garbage collect obsolete entries.
future<> store_column_mapping(distributed<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
/// Query column mapping for a given version of the table locally.
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Check that column mapping exists for a given version of the table
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table
future<> drop_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
} // namespace schema_tables
} // namespace db