Skip to content

Commit 9e0316f

Browse files
authored
Fix concurrent last cache query bug because of one device in multi regions
1 parent c7b459c commit 9e0316f

8 files changed

Lines changed: 290 additions & 46 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregation2IT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public static void setUp() {
3939
EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockSizeInByte(4 * 1024);
4040
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(5_000);
4141
EnvFactory.getEnv().getConfig().getCommonConfig().setDataPartitionAllocationStrategy(SHUFFLE);
42-
EnvFactory.getEnv().getConfig().getCommonConfig().setEnableLastCache(false);
4342
EnvFactory.getEnv().initClusterEnvironment();
4443
prepareTableData(createSqls);
4544
}

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationNonStream2IT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public static void setUp() {
3939
EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockSizeInByte(4 * 1024);
4040
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(5_000);
4141
EnvFactory.getEnv().getConfig().getCommonConfig().setDataPartitionAllocationStrategy(SHUFFLE);
42-
EnvFactory.getEnv().getConfig().getCommonConfig().setEnableLastCache(false);
4342
EnvFactory.getEnv().initClusterEnvironment();
4443
String original = createSqls[2];
4544
// make 'province', 'city', 'region' be FIELD to cover cases using GroupedAccumulator

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,43 @@
2020
package org.apache.iotdb.db.queryengine.execution.fragment;
2121

2222
import org.apache.iotdb.commons.path.PartialPath;
23+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
24+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
25+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
2326

2427
import org.apache.tsfile.read.TimeValuePair;
2528
import org.apache.tsfile.utils.Pair;
2629

2730
import javax.annotation.concurrent.GuardedBy;
2831

32+
import java.util.HashMap;
2933
import java.util.Map;
3034
import java.util.concurrent.ConcurrentHashMap;
3135
import java.util.concurrent.atomic.AtomicInteger;
3236
import java.util.concurrent.locks.ReentrantLock;
3337

38+
import static com.google.common.base.Preconditions.checkArgument;
39+
3440
public class DataNodeQueryContext {
35-
// left of Pair is DataNodeSeriesScanNum, right of Pair is the last value waiting to be updated
41+
// Used for TreeModel, left of Pair is DataNodeSeriesScanNum, right of Pair is the last value
42+
// waiting to be updated
3643
@GuardedBy("lock")
3744
private final Map<PartialPath, Pair<AtomicInteger, TimeValuePair>> uncachedPathToSeriesScanInfo;
3845

46+
// Used for TableModel
47+
// 1. Outer Map: record the info for each Table to make sure DeviceEntry is unique in the value
48+
// Scope.
49+
// 2. Inner Map: record DeviceEntry to last cache for each measurement, left of Pair is the
50+
// count of device regions, right is the measurement values wait to be updated for last cache.
51+
// Notice: only the device counts more than one will be recorded
52+
@GuardedBy("lock")
53+
private final Map<
54+
QualifiedObjectName, Map<DeviceEntry, Pair<Integer, Map<String, TimeValuePair>>>>
55+
deviceCountAndMeasurementValues;
56+
57+
private static final TableDeviceSchemaCache TABLE_DEVICE_SCHEMA_CACHE =
58+
TableDeviceSchemaCache.getInstance();
59+
3960
private final AtomicInteger dataNodeFINum;
4061

4162
// TODO consider more fine-grained locks, now the AtomicInteger in uncachedPathToSeriesScanInfo is
@@ -45,6 +66,7 @@ public class DataNodeQueryContext {
4566
public DataNodeQueryContext(int dataNodeFINum) {
4667
this.uncachedPathToSeriesScanInfo = new ConcurrentHashMap<>();
4768
this.dataNodeFINum = new AtomicInteger(dataNodeFINum);
69+
this.deviceCountAndMeasurementValues = new HashMap<>();
4870
}
4971

5072
public boolean unCached(PartialPath path) {
@@ -55,6 +77,50 @@ public void addUnCachePath(PartialPath path, AtomicInteger dataNodeSeriesScanNum
5577
uncachedPathToSeriesScanInfo.put(path, new Pair<>(dataNodeSeriesScanNum, null));
5678
}
5779

80+
public void decreaseDeviceAndMayUpdateLastCache(
81+
QualifiedObjectName tableName, DeviceEntry deviceEntry, Integer initialCount) {
82+
checkArgument(initialCount != null, "initialCount shouldn't be null here");
83+
84+
Map<DeviceEntry, Pair<Integer, Map<String, TimeValuePair>>> deviceInfo =
85+
deviceCountAndMeasurementValues.computeIfAbsent(tableName, t -> new HashMap<>());
86+
87+
Pair<Integer, Map<String, TimeValuePair>> info =
88+
deviceInfo.computeIfAbsent(deviceEntry, d -> new Pair<>(initialCount, new HashMap<>()));
89+
info.left--;
90+
if (info.left == 0) {
91+
updateLastCache(tableName, deviceEntry);
92+
}
93+
}
94+
95+
public void addUnCachedDeviceIfAbsent(
96+
QualifiedObjectName tableName, DeviceEntry deviceEntry, Integer count) {
97+
checkArgument(count != null, "count shouldn't be null here");
98+
99+
Map<DeviceEntry, Pair<Integer, Map<String, TimeValuePair>>> deviceInfo =
100+
deviceCountAndMeasurementValues.computeIfAbsent(tableName, t -> new HashMap<>());
101+
102+
deviceInfo.putIfAbsent(deviceEntry, new Pair<>(count, new HashMap<>()));
103+
}
104+
105+
public Pair<Integer, Map<String, TimeValuePair>> getDeviceInfo(
106+
QualifiedObjectName tableName, DeviceEntry deviceEntry) {
107+
return deviceCountAndMeasurementValues.get(tableName).get(deviceEntry);
108+
}
109+
110+
/** Update the last cache when device count decrease to zero. */
111+
public void updateLastCache(QualifiedObjectName tableName, DeviceEntry deviceEntry) {
112+
Map<String, TimeValuePair> values =
113+
deviceCountAndMeasurementValues.get(tableName).get(deviceEntry).getRight();
114+
// if a device hits cache each time, the values recorded in context will be null
115+
if (values != null) {
116+
TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
117+
tableName.getDatabaseName(),
118+
deviceEntry.getDeviceID(),
119+
values.keySet().toArray(new String[0]),
120+
values.values().toArray(new TimeValuePair[0]));
121+
}
122+
}
123+
58124
public Pair<AtomicInteger, TimeValuePair> getSeriesScanInfo(PartialPath path) {
59125
return uncachedPathToSeriesScanInfo.get(path);
60126
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
2323
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
24+
import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
2425
import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
2526
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastAccumulator;
2627
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
@@ -44,6 +45,7 @@
4445

4546
import java.util.ArrayList;
4647
import java.util.List;
48+
import java.util.Map;
4749
import java.util.Optional;
4850
import java.util.OptionalLong;
4951
import java.util.concurrent.TimeUnit;
@@ -65,6 +67,7 @@ public class LastQueryAggTableScanOperator extends AbstractAggTableScanOperator
6567
private static final TableDeviceSchemaCache TABLE_DEVICE_SCHEMA_CACHE =
6668
TableDeviceSchemaCache.getInstance();
6769

70+
private final QualifiedObjectName tableCompleteName;
6871
private final String dbName;
6972
private int outputDeviceIndex;
7073
private DeviceEntry currentDeviceEntry;
@@ -81,13 +84,18 @@ public class LastQueryAggTableScanOperator extends AbstractAggTableScanOperator
8184
// indicates the index of last(time) aggregation
8285
private int lastTimeAggregationIdx = -1;
8386

87+
private final Map<DeviceEntry, Integer> deviceCountMap;
88+
private final DataNodeQueryContext dataNodeQueryContext;
89+
8490
public LastQueryAggTableScanOperator(
8591
AbstractAggTableScanOperatorParameter parameter,
8692
List<DeviceEntry> cachedDeviceEntries,
8793
QualifiedObjectName qualifiedObjectName,
8894
List<Integer> hitCachesIndexes,
8995
List<Pair<OptionalLong, TsPrimitiveType[]>> lastRowCacheResults,
90-
List<TimeValuePair[]> lastValuesCacheResults) {
96+
List<TimeValuePair[]> lastValuesCacheResults,
97+
Map<DeviceEntry, Integer> deviceCountMap,
98+
DataNodeQueryContext dataNodeQueryContext) {
9199

92100
super(parameter);
93101

@@ -101,6 +109,7 @@ public LastQueryAggTableScanOperator(
101109
this.hitCachesIndexes = hitCachesIndexes;
102110
this.lastRowCacheResults = lastRowCacheResults;
103111
this.lastValuesCacheResults = lastValuesCacheResults;
112+
this.tableCompleteName = qualifiedObjectName;
104113
this.dbName = qualifiedObjectName.getDatabaseName();
105114

106115
this.operatorContext.recordSpecifiedInfo(
@@ -110,6 +119,8 @@ public LastQueryAggTableScanOperator(
110119
lastTimeAggregationIdx = i;
111120
}
112121
}
122+
this.deviceCountMap = deviceCountMap;
123+
this.dataNodeQueryContext = dataNodeQueryContext;
113124
}
114125

115126
@Override
@@ -518,28 +529,23 @@ private void updateLastCacheUseLastRowIfPossible() {
518529
case TIME:
519530
if (!hasSetLastTime) {
520531
hasSetLastTime = true;
532+
updateMeasurementList.add("");
521533
if (i == lastTimeAggregationIdx) {
522534
LastDescAccumulator lastAccumulator =
523535
(LastDescAccumulator) tableAggregator.getAccumulator();
524536
if (lastAccumulator.hasInitResult()) {
525-
updateMeasurementList.add("");
526537
updateTimeValuePairList.add(
527538
new TimeValuePair(
528539
lastAccumulator.getMaxTime(),
529540
new TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
530541
} else {
531542
currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
532-
TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
533-
dbName,
534-
currentDeviceEntry.getDeviceID(),
535-
new String[] {""},
536-
new TimeValuePair[] {EMPTY_TIME_VALUE_PAIR});
543+
updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
537544
}
538545
} else {
539546
LastByDescAccumulator lastByAccumulator =
540547
(LastByDescAccumulator) tableAggregator.getAccumulator();
541548
if (lastByAccumulator.hasInitResult() && !lastByAccumulator.isXNull()) {
542-
updateMeasurementList.add("");
543549
updateTimeValuePairList.add(
544550
new TimeValuePair(
545551
lastByAccumulator.getLastTimeOfY(),
@@ -551,7 +557,7 @@ private void updateLastCacheUseLastRowIfPossible() {
551557
case FIELD:
552558
LastByDescAccumulator lastByAccumulator =
553559
(LastByDescAccumulator) tableAggregator.getAccumulator();
554-
// only can update LastCache when last_by return non-null value
560+
updateMeasurementList.add(schema.getName());
555561
if (lastByAccumulator.hasInitResult() && !lastByAccumulator.isXNull()) {
556562
long lastByTime = lastByAccumulator.getLastTimeOfY();
557563

@@ -562,10 +568,11 @@ private void updateLastCacheUseLastRowIfPossible() {
562568
new TimeValuePair(lastByTime, new TsPrimitiveType.TsLong(lastByTime)));
563569
}
564570

565-
updateMeasurementList.add(schema.getName());
566571
updateTimeValuePairList.add(
567572
new TimeValuePair(
568573
lastByTime, cloneTsPrimitiveType(lastByAccumulator.getXResult())));
574+
} else {
575+
updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
569576
}
570577
break;
571578
default:
@@ -575,17 +582,7 @@ private void updateLastCacheUseLastRowIfPossible() {
575582
channel += tableAggregator.getChannelCount();
576583
}
577584

578-
if (!updateMeasurementList.isEmpty()) {
579-
String[] updateMeasurementArray = updateMeasurementList.toArray(new String[0]);
580-
TimeValuePair[] updateTimeValuePairArray =
581-
updateTimeValuePairList.toArray(new TimeValuePair[0]);
582-
currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
583-
TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
584-
dbName,
585-
currentDeviceEntry.getDeviceID(),
586-
updateMeasurementArray,
587-
updateTimeValuePairArray);
588-
}
585+
checkIfUpdated(updateMeasurementList, updateTimeValuePairList);
589586
}
590587

591588
private void updateLastCacheUseLastValuesIfPossible() {
@@ -604,19 +601,15 @@ private void updateLastCacheUseLastValuesIfPossible() {
604601
hasSetLastTime = true;
605602
LastDescAccumulator lastAccumulator =
606603
(LastDescAccumulator) tableAggregator.getAccumulator();
604+
updateMeasurementList.add("");
607605
if (lastAccumulator.hasInitResult()) {
608-
updateMeasurementList.add("");
609606
updateTimeValuePairList.add(
610607
new TimeValuePair(
611608
lastAccumulator.getMaxTime(),
612609
new TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
613610
} else {
614611
currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
615-
TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
616-
dbName,
617-
currentDeviceEntry.getDeviceID(),
618-
new String[] {""},
619-
new TimeValuePair[] {EMPTY_TIME_VALUE_PAIR});
612+
updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
620613
}
621614
}
622615
break;
@@ -643,16 +636,52 @@ private void updateLastCacheUseLastValuesIfPossible() {
643636
channel += tableAggregator.getChannelCount();
644637
}
645638

639+
checkIfUpdated(updateMeasurementList, updateTimeValuePairList);
640+
}
641+
642+
private void checkIfUpdated(
643+
List<String> updateMeasurementList, List<TimeValuePair> updateTimeValuePairList) {
646644
if (!updateMeasurementList.isEmpty()) {
647-
String[] updateMeasurementArray = updateMeasurementList.toArray(new String[0]);
648-
TimeValuePair[] updateTimeValuePairArray =
649-
updateTimeValuePairList.toArray(new TimeValuePair[0]);
650645
currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
651-
TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
652-
dbName,
653-
currentDeviceEntry.getDeviceID(),
654-
updateMeasurementArray,
655-
updateTimeValuePairArray);
646+
647+
boolean deviceInMultiRegion =
648+
deviceCountMap != null && deviceCountMap.containsKey(currentDeviceEntry);
649+
if (!deviceInMultiRegion) {
650+
TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
651+
dbName,
652+
currentDeviceEntry.getDeviceID(),
653+
updateMeasurementList.toArray(new String[0]),
654+
updateTimeValuePairList.toArray(new TimeValuePair[0]));
655+
return;
656+
}
657+
658+
dataNodeQueryContext.lock(true);
659+
try {
660+
Pair<Integer, Map<String, TimeValuePair>> deviceInfo =
661+
dataNodeQueryContext.getDeviceInfo(tableCompleteName, currentDeviceEntry);
662+
Map<String, TimeValuePair> values = deviceInfo.getRight();
663+
664+
int size = updateMeasurementList.size();
665+
for (int i = 0; i < size; i++) {
666+
String measurementName = updateMeasurementList.get(i);
667+
TimeValuePair timeValuePair = updateTimeValuePairList.get(i);
668+
if (values.containsKey(measurementName)) {
669+
TimeValuePair oldValue = values.get(measurementName);
670+
if (timeValuePair.getTimestamp() > oldValue.getTimestamp()) {
671+
values.put(measurementName, timeValuePair);
672+
}
673+
} else {
674+
values.put(measurementName, timeValuePair);
675+
}
676+
}
677+
678+
deviceInfo.left--;
679+
if (deviceInfo.left == 0) {
680+
dataNodeQueryContext.updateLastCache(tableCompleteName, currentDeviceEntry);
681+
}
682+
} finally {
683+
dataNodeQueryContext.unLock(true);
684+
}
656685
}
657686
}
658687

0 commit comments

Comments
 (0)