forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsistency_level.cc
441 lines (374 loc) · 16.6 KB
/
consistency_level.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
/*
* Copyright (C) 2015-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "db/consistency_level.hh"
#include "db/consistency_level_validations.hh"
#include "exceptions/exceptions.hh"
#include <fmt/ranges.h>
#include <seastar/core/sstring.hh>
#include "locator/host_id.hh"
#include "schema/schema.hh"
#include "replica/database.hh"
#include "db/read_repair_decision.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/network_topology_strategy.hh"
#include "heat_load_balance.hh"
namespace db {
logging::logger cl_logger("consistency");
size_t quorum_for(const locator::effective_replication_map& erm) {
size_t replication_factor = erm.get_replication_factor();
return replication_factor ? (replication_factor / 2) + 1 : 0;
}
size_t local_quorum_for(const locator::effective_replication_map& erm, const sstring& dc) {
using namespace locator;
const auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
const network_topology_strategy* nrs =
static_cast<const network_topology_strategy*>(&rs);
size_t replication_factor = nrs->get_replication_factor(dc);
return replication_factor ? (replication_factor / 2) + 1 : 0;
}
return quorum_for(erm);
}
size_t block_for_local_serial(const locator::effective_replication_map& erm) {
using namespace locator;
//
// TODO: Consider caching the final result in order to avoid all these
// useless dereferencing. Note however that this will introduce quite
// a lot of complications since both snitch output for a local host
// and the snitch itself (and thus its output) may change dynamically.
//
const auto& topo = erm.get_topology();
return local_quorum_for(erm, topo.get_datacenter());
}
size_t block_for_each_quorum(const locator::effective_replication_map& erm) {
using namespace locator;
const auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
const network_topology_strategy* nrs =
static_cast<const network_topology_strategy*>(&rs);
size_t n = 0;
for (auto& dc : nrs->get_datacenters()) {
n += local_quorum_for(erm, dc);
}
return n;
} else {
return quorum_for(erm);
}
}
size_t block_for(const locator::effective_replication_map& erm, consistency_level cl) {
switch (cl) {
case consistency_level::ONE:
[[fallthrough]];
case consistency_level::LOCAL_ONE:
return 1;
case consistency_level::ANY:
return 1;
case consistency_level::TWO:
return 2;
case consistency_level::THREE:
return 3;
case consistency_level::QUORUM:
[[fallthrough]];
case consistency_level::SERIAL:
return quorum_for(erm);
case consistency_level::ALL:
return erm.get_replication_factor();
case consistency_level::LOCAL_QUORUM:
[[fallthrough]];
case consistency_level::LOCAL_SERIAL:
return block_for_local_serial(erm);
case consistency_level::EACH_QUORUM:
return block_for_each_quorum(erm);
default:
abort();
}
}
bool is_datacenter_local(consistency_level l) {
return l == consistency_level::LOCAL_ONE || l == consistency_level::LOCAL_QUORUM;
}
std::unordered_map<sstring, dc_node_count> count_per_dc_endpoints(
const locator::effective_replication_map& erm,
const host_id_vector_replica_set& live_endpoints,
const host_id_vector_topology_change& pending_endpoints = host_id_vector_topology_change()) {
using namespace locator;
auto& rs = erm.get_replication_strategy();
const auto& topo = erm.get_topology();
const network_topology_strategy* nrs =
static_cast<const network_topology_strategy*>(&rs);
std::unordered_map<sstring, dc_node_count> dc_endpoints;
for (auto& dc : nrs->get_datacenters()) {
dc_endpoints.emplace(dc, dc_node_count());
}
//
// Since live_endpoints are a subset of a get_natural_endpoints() output we
// will never get any endpoints outside the dataceters from
// nrs->get_datacenters().
//
for (auto& endpoint : live_endpoints) {
++(dc_endpoints[topo.get_datacenter(endpoint)].live);
}
for (auto& endpoint : pending_endpoints) {
++(dc_endpoints[topo.get_datacenter(endpoint)].pending);
}
return dc_endpoints;
}
bool assure_sufficient_live_nodes_each_quorum(
consistency_level cl,
const locator::effective_replication_map& erm,
const host_id_vector_replica_set& live_endpoints,
const host_id_vector_topology_change& pending_endpoints) {
using namespace locator;
auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
for (auto& entry : count_per_dc_endpoints(erm, live_endpoints, pending_endpoints)) {
auto dc_block_for = local_quorum_for(erm, entry.first);
auto dc_live = entry.second.live;
auto dc_pending = entry.second.pending;
if (dc_live < dc_block_for + dc_pending) {
throw exceptions::unavailable_exception(cl, dc_block_for, dc_live);
}
}
return true;
}
return false;
}
void assure_sufficient_live_nodes(
consistency_level cl,
const locator::effective_replication_map& erm,
const host_id_vector_replica_set& live_endpoints,
const host_id_vector_topology_change& pending_endpoints) {
size_t need = block_for(erm, cl);
auto adjust_live_for_error = [] (size_t live, size_t pending) {
// DowngradingConsistencyRetryPolicy uses alive replicas count from Unavailable
// exception to adjust CL for retry. When pending node is present CL is increased
// by 1 internally, so reported number of live nodes has to be adjusted to take
// this into account
return pending <= live ? live - pending : 0;
};
const auto& topo = erm.get_topology();
switch (cl) {
case consistency_level::ANY:
// local hint is acceptable, and local node is always live
break;
case consistency_level::LOCAL_ONE:
if (topo.count_local_endpoints(live_endpoints) < topo.count_local_endpoints(pending_endpoints) + 1) {
throw exceptions::unavailable_exception(cl, 1, 0);
}
break;
case consistency_level::LOCAL_QUORUM: {
size_t local_live = topo.count_local_endpoints(live_endpoints);
size_t pending = topo.count_local_endpoints(pending_endpoints);
if (local_live < need + pending) {
cl_logger.debug("Local replicas {} are insufficient to satisfy LOCAL_QUORUM requirement of needed {} and pending {}", live_endpoints, local_live, pending);
throw exceptions::unavailable_exception(cl, need, adjust_live_for_error(local_live, pending));
}
break;
}
case consistency_level::EACH_QUORUM:
if (assure_sufficient_live_nodes_each_quorum(cl, erm, live_endpoints, pending_endpoints)) {
break;
}
// Fallthrough on purpose for SimpleStrategy
[[fallthrough]];
default:
size_t live = live_endpoints.size();
size_t pending = pending_endpoints.size();
if (live < need + pending) {
cl_logger.debug("Live nodes {} do not satisfy ConsistencyLevel ({} required, {} pending)", live, need, pending);
throw exceptions::unavailable_exception(cl, need, adjust_live_for_error(live, pending));
}
break;
}
}
host_id_vector_replica_set
filter_for_query(consistency_level cl,
const locator::effective_replication_map& erm,
host_id_vector_replica_set live_endpoints,
const host_id_vector_replica_set& preferred_endpoints,
read_repair_decision read_repair,
const gms::gossiper& g,
std::optional<locator::host_id>* extra,
replica::column_family* cf) {
size_t local_count;
if (read_repair == read_repair_decision::GLOBAL) { // take RRD.GLOBAL out of the way
return live_endpoints;
}
if (read_repair == read_repair_decision::DC_LOCAL || is_datacenter_local(cl)) {
const auto& topo = erm.get_topology();
auto non_local_endpoints = std::ranges::stable_partition(live_endpoints, topo.get_local_dc_filter());
local_count = std::distance(live_endpoints.begin(), non_local_endpoints.begin());
if (is_datacenter_local(cl)) {
live_endpoints.erase(non_local_endpoints.begin(), non_local_endpoints.end());
}
}
size_t bf = block_for(erm, cl);
if (read_repair == read_repair_decision::DC_LOCAL) {
bf = std::max(bf, local_count);
}
if (bf >= live_endpoints.size()) { // RRD.DC_LOCAL + CL.LOCAL or CL.ALL
return live_endpoints;
}
host_id_vector_replica_set selected_endpoints;
// Pre-select endpoints based on client preference. If the endpoints
// selected this way aren't enough to satisfy CL requirements select the
// remaining ones according to the load-balancing strategy as before.
if (!preferred_endpoints.empty()) {
const auto preferred = std::ranges::stable_partition(live_endpoints, [&preferred_endpoints] (const locator::host_id& a) {
return std::find(preferred_endpoints.cbegin(), preferred_endpoints.cend(), a) == preferred_endpoints.end();
});
const size_t selected = std::ranges::distance(preferred);
if (selected >= bf) {
if (extra) {
*extra = selected == bf ? live_endpoints.front() : *(preferred.begin() + bf);
}
return host_id_vector_replica_set(preferred.begin(), preferred.begin() + bf);
} else if (selected) {
selected_endpoints.reserve(bf);
std::ranges::move(preferred, std::back_inserter(selected_endpoints));
live_endpoints.erase(preferred.begin(), preferred.end());
}
}
const auto remaining_bf = bf - selected_endpoints.size();
if (cf) {
auto get_hit_rate = [&g, cf] (locator::host_id ep) -> float {
// We limit each nodes' cache-hit ratio to max_hit_rate = 0.95
// for two reasons:
// 1. If two nodes have hit rate 0.99 and 0.98, the miss rates
// are 0.01 and 0.02, so equalizing the miss numbers will send
// the first node twice the requests. But unless the disk is
// extremely slow, at such high hit ratios the disk work is
// negligible and we want these two nodes to get equal work.
// 2. Even if one node has perfect cache hit ratio (near 1.0),
// and the other near 0, we want the near-0 node to get some
// of the work to warm up its cache. When max_hit_rate=0.95
// its miss rate is 0.05, 1/20th of the worst miss rate 1.0,
// so the cold node will get 1/20th the work of the hot.
constexpr float max_hit_rate = 0.95;
// FIXME: all functions here should work on host ids
auto ht = cf->get_hit_rate(g, ep);
if (float(ht.rate) < 0) {
return float(ht.rate);
} else if (lowres_clock::now() - ht.last_updated > std::chrono::milliseconds(1000)) {
// if a cache entry is not updates for a while try to send traffic there
// to get more up to date data, mark it updated to not send to much traffic there
cf->set_hit_rate(ep, ht.rate);
return max_hit_rate;
} else {
return std::min(float(ht.rate), max_hit_rate); // calculation below cannot work with hit rate 1
}
};
float ht_max = 0;
float ht_min = 1;
bool old_node = false;
auto epi = live_endpoints | std::views::transform([&] (locator::host_id ep) {
auto ht = get_hit_rate(ep);
old_node = old_node || ht < 0;
ht_max = std::max(ht_max, ht);
ht_min = std::min(ht_min, ht);
return std::make_pair(ep, ht);
}) | std::ranges::to<std::vector<std::pair<locator::host_id, float>>>();
if (!old_node && ht_max - ht_min > 0.01) { // if there is old node or hit rates are close skip calculations
// local node is always first if present (see storage_proxy::get_endpoints_for_reading)
unsigned local_idx = erm.get_topology().is_me(epi[0].first) ? 0 : epi.size() + 1;
auto weighted = miss_equalizing_combination(epi, local_idx, remaining_bf, bool(extra)) | std::ranges::to<host_id_vector_replica_set>();
// Workaround for https://github.com/scylladb/scylladb/issues/9285
auto last = std::adjacent_find(weighted.begin(), weighted.end());
if (last == weighted.end()) {
// No duplicates, so use the result based on hit rates
live_endpoints = std::move(weighted);
}
}
}
if (extra) {
*extra = live_endpoints[remaining_bf]; // extra replica for speculation
}
std::move(live_endpoints.begin(), live_endpoints.begin() + remaining_bf, std::back_inserter(selected_endpoints));
return selected_endpoints;
}
bool
is_sufficient_live_nodes(consistency_level cl,
const locator::effective_replication_map& erm,
const host_id_vector_replica_set& live_endpoints) {
using namespace locator;
const auto& topo = erm.get_topology();
switch (cl) {
case consistency_level::ANY:
// local hint is acceptable, and local node is always live
return true;
case consistency_level::LOCAL_ONE:
return topo.count_local_endpoints(live_endpoints) >= 1;
case consistency_level::LOCAL_QUORUM:
return topo.count_local_endpoints(live_endpoints) >= block_for(erm, cl);
case consistency_level::EACH_QUORUM:
{
auto& rs = erm.get_replication_strategy();
if (rs.get_type() == replication_strategy_type::network_topology) {
for (auto& entry : count_per_dc_endpoints(erm, live_endpoints)) {
if (entry.second.live < local_quorum_for(erm, entry.first)) {
return false;
}
}
return true;
}
}
[[fallthrough]];
// Fallthrough on purpose for SimpleStrategy
default:
return live_endpoints.size() >= block_for(erm, cl);
}
}
void validate_for_read(consistency_level cl) {
switch (cl) {
case consistency_level::ANY:
throw exceptions::invalid_request_exception("ANY ConsistencyLevel is only supported for writes");
case consistency_level::EACH_QUORUM:
throw exceptions::invalid_request_exception("EACH_QUORUM ConsistencyLevel is only supported for writes");
default:
break;
}
}
void validate_for_write(consistency_level cl) {
switch (cl) {
case consistency_level::SERIAL:
[[fallthrough]];
case consistency_level::LOCAL_SERIAL:
throw exceptions::invalid_request_exception("You must use conditional updates for serializable writes");
default:
break;
}
}
// This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL
void validate_for_cas_learn(consistency_level cl, const sstring& keyspace) {
switch (cl) {
case consistency_level::SERIAL:
[[fallthrough]];
case consistency_level::LOCAL_SERIAL:
throw exceptions::invalid_request_exception(format("{} is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\"", cl));
default:
break;
}
}
bool is_serial_consistency(consistency_level cl) {
return cl == consistency_level::SERIAL || cl == consistency_level::LOCAL_SERIAL;
}
void validate_for_cas(consistency_level cl)
{
if (!is_serial_consistency(cl)) {
throw exceptions::invalid_request_exception("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL");
}
}
void validate_counter_for_write(const schema& s, consistency_level cl) {
if (cl == consistency_level::ANY) {
throw exceptions::invalid_request_exception(format("Consistency level ANY is not yet supported for counter table {}", s.cf_name()));
}
if (is_serial_consistency(cl)) {
throw exceptions::invalid_request_exception("Counter operations are inherently non-serializable");
}
}
}