Skip to content

Commit 929344e

Browse files
committed
[NO ISSUE][STO] Introduce Index Checkpoints
- user model changes: no - storage format changes: yes - Add index checkpoints. - Use index checkpoint to determine low watermark during recovery. - interface changes: yes - Introduce IIndexCheckpointManager for managing indexes checkpoints. - Introduce IIndexCheckpointProvider for tracking IIndexCheckpointManager references. Details: - Unify LSM flush/merge operations completion order. - Introduce index checkpoints which contains: - Index low watermark. - Latest valid LSM component - Mapping between master replica and local replica. - Use index checkpoints instead of LSM component metadata for identifying low watermark in recovery. - Use index checkpoints in replication instead of overwriting LSN byte offset in replica component metadata. - Replace LSN_MAP used in replication by index checkpoints. - Replace NIO Files.find by Commons FileUtils.listFiles to avoid no NoSuchFileException on any file deletion. Change-Id: Ib22800002bf8ea3660242e599b3f5f20678301a8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2200 Sonar-Qube: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]>
1 parent 3180d87 commit 929344e

File tree

48 files changed

+1043
-658
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1043
-658
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.asterix.app.nc;
20+
21+
import java.io.BufferedWriter;
22+
import java.io.File;
23+
import java.io.FilenameFilter;
24+
import java.io.IOException;
25+
import java.nio.file.Files;
26+
import java.nio.file.Path;
27+
import java.nio.file.Paths;
28+
import java.util.ArrayList;
29+
import java.util.Comparator;
30+
import java.util.List;
31+
import java.util.logging.Level;
32+
import java.util.logging.Logger;
33+
34+
import org.apache.asterix.common.storage.IIndexCheckpointManager;
35+
import org.apache.asterix.common.storage.IndexCheckpoint;
36+
import org.apache.asterix.common.utils.StorageConstants;
37+
import org.apache.hyracks.api.exceptions.HyracksDataException;
38+
import org.apache.hyracks.util.annotations.ThreadSafe;
39+
40+
@ThreadSafe
41+
public class IndexCheckpointManager implements IIndexCheckpointManager {
42+
43+
private static final Logger LOGGER = Logger.getLogger(IndexCheckpointManager.class.getName());
44+
private static final int HISTORY_CHECKPOINTS = 1;
45+
private static final int MAX_CHECKPOINT_WRITE_ATTEMPTS = 5;
46+
private static final FilenameFilter CHECKPOINT_FILE_FILTER =
47+
(file, name) -> name.startsWith(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX);
48+
private static final long BULKLOAD_LSN = 0;
49+
private final Path indexPath;
50+
51+
public IndexCheckpointManager(Path indexPath) {
52+
this.indexPath = indexPath;
53+
}
54+
55+
@Override
56+
public synchronized void init(long lsn) throws HyracksDataException {
57+
final List<IndexCheckpoint> checkpoints = getCheckpoints();
58+
if (!checkpoints.isEmpty()) {
59+
LOGGER.warning(() -> "Checkpoints found on initializing: " + indexPath);
60+
delete();
61+
}
62+
IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lsn);
63+
persist(firstCheckpoint);
64+
}
65+
66+
@Override
67+
public synchronized void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException {
68+
final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
69+
if (localLsn == null) {
70+
throw new IllegalStateException("Component flushed before lsn mapping was received");
71+
}
72+
flushed(componentTimestamp, localLsn);
73+
}
74+
75+
@Override
76+
public synchronized void flushed(String componentTimestamp, long lsn) throws HyracksDataException {
77+
final IndexCheckpoint latest = getLatest();
78+
IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp);
79+
persist(nextCheckpoint);
80+
deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
81+
}
82+
83+
@Override
84+
public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
85+
final IndexCheckpoint latest = getLatest();
86+
latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
87+
final IndexCheckpoint next =
88+
IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentTimestamp());
89+
persist(next);
90+
notifyAll();
91+
}
92+
93+
@Override
94+
public synchronized long getLowWatermark() throws HyracksDataException {
95+
return getLatest().getLowWatermark();
96+
}
97+
98+
@Override
99+
public synchronized boolean isFlushed(long masterLsn) throws HyracksDataException {
100+
if (masterLsn == BULKLOAD_LSN) {
101+
return true;
102+
}
103+
return getLatest().getMasterNodeFlushMap().containsKey(masterLsn);
104+
}
105+
106+
@Override
107+
public synchronized void advanceLowWatermark(long lsn) throws HyracksDataException {
108+
flushed(getLatest().getValidComponentTimestamp(), lsn);
109+
}
110+
111+
@Override
112+
public synchronized void delete() {
113+
deleteHistory(Long.MAX_VALUE, 0);
114+
}
115+
116+
private IndexCheckpoint getLatest() {
117+
final List<IndexCheckpoint> checkpoints = getCheckpoints();
118+
if (checkpoints.isEmpty()) {
119+
throw new IllegalStateException("Couldn't find any checkpoints for resource: " + indexPath);
120+
}
121+
checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
122+
return checkpoints.get(0);
123+
}
124+
125+
private List<IndexCheckpoint> getCheckpoints() {
126+
List<IndexCheckpoint> checkpoints = new ArrayList<>();
127+
final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
128+
if (checkpointFiles != null) {
129+
for (File checkpointFile : checkpointFiles) {
130+
try {
131+
checkpoints.add(read(checkpointFile.toPath()));
132+
} catch (IOException e) {
133+
LOGGER.log(Level.WARNING, e, () -> "Couldn't read index checkpoint file: " + e);
134+
}
135+
}
136+
}
137+
return checkpoints;
138+
}
139+
140+
private void persist(IndexCheckpoint checkpoint) throws HyracksDataException {
141+
final Path checkpointPath = getCheckpointPath(checkpoint);
142+
for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
143+
try {
144+
// clean up from previous write failure
145+
if (checkpointPath.toFile().exists()) {
146+
Files.delete(checkpointPath);
147+
}
148+
try (BufferedWriter writer = Files.newBufferedWriter(checkpointPath)) {
149+
writer.write(checkpoint.asJson());
150+
}
151+
// ensure it was written correctly by reading it
152+
read(checkpointPath);
153+
} catch (IOException e) {
154+
if (i == MAX_CHECKPOINT_WRITE_ATTEMPTS) {
155+
throw HyracksDataException.create(e);
156+
}
157+
LOGGER.log(Level.WARNING, e, () -> "Filed to write checkpoint at: " + indexPath);
158+
int nextAttempt = i + 1;
159+
LOGGER.info(() -> "Checkpoint write attempt " + nextAttempt + "/" + MAX_CHECKPOINT_WRITE_ATTEMPTS);
160+
}
161+
}
162+
}
163+
164+
private IndexCheckpoint read(Path checkpointPath) throws IOException {
165+
return IndexCheckpoint.fromJson(new String(Files.readAllBytes(checkpointPath)));
166+
}
167+
168+
private void deleteHistory(long latestId, int historyToKeep) {
169+
try {
170+
final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
171+
if (checkpointFiles != null) {
172+
for (File checkpointFile : checkpointFiles) {
173+
if (getCheckpointIdFromFileName(checkpointFile.toPath()) < (latestId - historyToKeep)) {
174+
Files.delete(checkpointFile.toPath());
175+
}
176+
}
177+
}
178+
} catch (Exception e) {
179+
LOGGER.log(Level.WARNING, e, () -> "Couldn't delete history checkpoints at " + indexPath);
180+
}
181+
}
182+
183+
private Path getCheckpointPath(IndexCheckpoint checkpoint) {
184+
return Paths.get(indexPath.toString(),
185+
StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + String.valueOf(checkpoint.getId()));
186+
}
187+
188+
private long getCheckpointIdFromFileName(Path checkpointPath) {
189+
return Long.valueOf(checkpointPath.getFileName().toString()
190+
.substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
191+
}
192+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.asterix.app.nc;
20+
21+
import java.nio.file.Path;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import org.apache.asterix.common.storage.IIndexCheckpointManager;
26+
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
27+
import org.apache.asterix.common.storage.ResourceReference;
28+
import org.apache.hyracks.api.exceptions.HyracksDataException;
29+
import org.apache.hyracks.api.io.IIOManager;
30+
31+
public class IndexCheckpointManagerProvider implements IIndexCheckpointManagerProvider {
32+
33+
private final Map<ResourceReference, IndexCheckpointManager> indexCheckpointManagerMap = new HashMap<>();
34+
private final IIOManager ioManager;
35+
36+
public IndexCheckpointManagerProvider(IIOManager ioManager) {
37+
this.ioManager = ioManager;
38+
}
39+
40+
@Override
41+
public IIndexCheckpointManager get(ResourceReference ref) throws HyracksDataException {
42+
synchronized (indexCheckpointManagerMap) {
43+
return indexCheckpointManagerMap.computeIfAbsent(ref, this::create);
44+
}
45+
}
46+
47+
@Override
48+
public void close(ResourceReference ref) {
49+
synchronized (indexCheckpointManagerMap) {
50+
indexCheckpointManagerMap.remove(ref);
51+
}
52+
}
53+
54+
private IndexCheckpointManager create(ResourceReference ref) {
55+
try {
56+
final Path indexPath = getIndexPath(ref);
57+
return new IndexCheckpointManager(indexPath);
58+
} catch (HyracksDataException e) {
59+
throw new IllegalStateException(e);
60+
}
61+
}
62+
63+
private Path getIndexPath(ResourceReference indexRef) throws HyracksDataException {
64+
return ioManager.resolve(indexRef.getRelativePath().toString()).getFile().toPath();
65+
}
66+
}

asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.asterix.common.replication.IReplicaResourcesManager;
6161
import org.apache.asterix.common.replication.IReplicationChannel;
6262
import org.apache.asterix.common.replication.IReplicationManager;
63+
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
6364
import org.apache.asterix.common.storage.IStorageSubsystem;
6465
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
6566
import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -142,6 +143,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
142143
private final IStorageComponentProvider componentProvider;
143144
private IHyracksClientConnection hcc;
144145
private IStorageSubsystem storageSubsystem;
146+
private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
145147

146148
public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
147149
throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -182,11 +184,11 @@ public void initialize(boolean initialRun) throws IOException, ACIDException {
182184
lsmIOScheduler = AsynchronousScheduler.INSTANCE;
183185

184186
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
187+
indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(ioManager);
185188

186189
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
187190
new PersistentLocalResourceRepositoryFactory(ioManager, getServiceContext().getNodeId(),
188-
metadataProperties);
189-
191+
metadataProperties, indexCheckpointManagerProvider);
190192
localResourceRepository =
191193
(PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
192194

@@ -203,11 +205,10 @@ public void initialize(boolean initialRun) throws IOException, ACIDException {
203205
}
204206
localResourceRepository.deleteStorageData();
205207
}
206-
207208
datasetMemoryManager = new DatasetMemoryManager(storageProperties);
208209
datasetLifecycleManager =
209210
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
210-
datasetMemoryManager, ioManager.getIODevices().size());
211+
datasetMemoryManager, indexCheckpointManagerProvider, ioManager.getIODevices().size());
211212
final String nodeId = getServiceContext().getNodeId();
212213
final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
213214
final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
@@ -220,7 +221,8 @@ public void initialize(boolean initialRun) throws IOException, ACIDException {
220221

221222
if (replicationProperties.isParticipant(getServiceContext().getNodeId())) {
222223

223-
replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
224+
replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties,
225+
indexCheckpointManagerProvider);
224226

225227
replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
226228
txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -229,13 +231,13 @@ public void initialize(boolean initialRun) throws IOException, ACIDException {
229231
//LogManager to replicate logs
230232
txnSubsystem.getLogManager().setReplicationManager(replicationManager);
231233

232-
//PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index
234+
//PersistentLocalResourceRepository to replicated metadata files and delete backups on drop index
233235
localResourceRepository.setReplicationManager(replicationManager);
234236

235237
/*
236238
* add the partitions that will be replicated in this node as inactive partitions
237239
*/
238-
//get nodes which replicate to this node
240+
//get nodes which replicated to this node
239241
Set<String> remotePrimaryReplicas = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
240242
for (String clientId : remotePrimaryReplicas) {
241243
//get the partitions of each client
@@ -529,4 +531,9 @@ public IHyracksClientConnection getHcc() throws HyracksDataException {
529531
public IStorageSubsystem getStorageSubsystem() {
530532
return storageSubsystem;
531533
}
534+
535+
@Override
536+
public IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
537+
return indexCheckpointManagerProvider;
538+
}
532539
}

asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,14 @@
4343
import java.util.logging.Logger;
4444

4545
import org.apache.asterix.common.api.IDatasetLifecycleManager;
46+
import org.apache.asterix.common.api.INcApplicationContext;
4647
import org.apache.asterix.common.config.ReplicationProperties;
4748
import org.apache.asterix.common.dataflow.DatasetLocalResource;
4849
import org.apache.asterix.common.exceptions.ACIDException;
4950
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
5051
import org.apache.asterix.common.replication.IReplicaResourcesManager;
52+
import org.apache.asterix.common.storage.DatasetResourceReference;
53+
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
5154
import org.apache.asterix.common.transactions.Checkpoint;
5255
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
5356
import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -293,6 +296,8 @@ private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogRe
293296

294297
IAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
295298
IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
299+
final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
300+
((INcApplicationContext) (serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();
296301

297302
Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
298303
Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
@@ -356,18 +361,15 @@ private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogRe
356361
index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
357362
datasetLifecycleManager.register(localResource.getPath(), index);
358363
datasetLifecycleManager.open(localResource.getPath());
359-
360-
//#. get maxDiskLastLSN
361-
ILSMIndex lsmIndex = index;
362364
try {
365+
final DatasetResourceReference resourceReference =
366+
DatasetResourceReference.of(localResource);
363367
maxDiskLastLsn =
364-
((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
365-
.getComponentLSN(lsmIndex.getDiskComponents());
368+
indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
366369
} catch (HyracksDataException e) {
367370
datasetLifecycleManager.close(localResource.getPath());
368371
throw e;
369372
}
370-
371373
//#. set resourceId and maxDiskLastLSN to the map
372374
resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
373375
} else {

0 commit comments

Comments
 (0)