Skip to content

Commit

Permalink
test case fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sumitagrawl committed Sep 17, 2024
1 parent 58c852d commit a238336
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ public void cleanupCache(List<Long> epochs) {
}

@VisibleForTesting
TableCache<KEY, VALUE> getCache() {
public TableCache<KEY, VALUE> getCache() {
return cache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,23 @@ public void evictCache(List<Long> epochs) {
// As ConcurrentHashMap computeIfPresent is atomic, there is no race
// condition between cache cleanup and requests updating same cache entry.
// Also, if higher epoch is present indicates all previous epoch is received
for (Iterator<CacheKey<KEY>> iterator = currentCacheKeys.iterator();
iterator.hasNext();) {
cachekey = iterator.next();
cache.computeIfPresent(cachekey, ((k, v) -> {
// If cache epoch entry matches with current Epoch, remove entry
// from cache.
if (v.getEpoch() == currentEpoch) {
if (LOG.isDebugEnabled()) {
LOG.debug("CacheKey {} with epoch {} is removed from cache",
k.getCacheKey(), currentEpoch);
if (epochs.contains(currentEpoch)) {
for (Iterator<CacheKey<KEY>> iterator = currentCacheKeys.iterator();
iterator.hasNext();) {
cachekey = iterator.next();
cache.computeIfPresent(cachekey, ((k, v) -> {
// If cache epoch entry matches with current Epoch, remove entry
// from cache.
if (v.getEpoch() == currentEpoch) {
if (LOG.isDebugEnabled()) {
LOG.debug("CacheKey {} with epoch {} is removed from cache",
k.getCacheKey(), currentEpoch);
}
return null;
}
return null;
}
return v;
}));
return v;
}));
}
// Remove epoch entry, as the entry is there in epoch list.
epochEntries.remove(currentEpoch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,7 @@ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
BlockingService omInterService =
OzoneManagerInterService.newReflectiveBlockingService(
omInterServerProtocol);
this.omGateway = new OMGateway(omRatisServer);
this.omGateway = new OMGateway(this);

OMAdminProtocolServerSideImpl omMetadataServerProtocol =
new OMAdminProtocolServerSideImpl(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
Expand All @@ -62,15 +61,13 @@ public class LeaderRequestExecutor {
private static final long DUMMY_TERM = -1;
private final AtomicLong cacheIndex = new AtomicLong(0);
private final int ratisByteLimit;
private final OzoneManagerRatisServer ratisServer;
private final OzoneManager ozoneManager;
private final PoolExecutor<RequestContext> leaderExecutor;
private final OzoneManagerRequestHandler handler;
private final AtomicBoolean isEnabled = new AtomicBoolean(true);

public LeaderRequestExecutor(OzoneManagerRatisServer ratisServer) {
this.ratisServer = ratisServer;
this.ozoneManager = ratisServer.getOzoneManager();
public LeaderRequestExecutor(OzoneManager om) {
this.ozoneManager = om;
this.handler = new OzoneManagerRequestHandler(ozoneManager);
PoolExecutor<RequestContext> ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE,
RATIS_TASK_QUEUE_SIZE, ozoneManager.getThreadNamePrefix(), this::ratisSubmitCommand, null);
Expand Down Expand Up @@ -106,9 +103,11 @@ public void submit(int idx, RequestContext ctx) throws InterruptedException {
}

private void rejectRequest(RequestContext ctx) {
if (!ratisServer.getOzoneManager().isLeaderReady()) {
OMLeaderNotReadyException leaderNotReadyException = new OMLeaderNotReadyException(
ratisServer.getRaftPeerId().toString() + " is not ready to process request yet.");
if (!ozoneManager.isLeaderReady()) {
String peerId = ozoneManager.isRatisEnabled() ? ozoneManager.getOmRatisServer().getRaftPeerId().toString()
: ozoneManager.getOMNodeId();
OMLeaderNotReadyException leaderNotReadyException = new OMLeaderNotReadyException(peerId
+ " is not ready to process request yet.");
ctx.getFuture().completeExceptionally(leaderNotReadyException);
} else {
ctx.getFuture().completeExceptionally(new OMException("Request processing is disabled due to error",
Expand Down Expand Up @@ -308,6 +307,9 @@ private void prepareAndSendRequest(

private OMResponse sendDbUpdateRequest(OMRequest nextRequest, TermIndex termIndex) throws Exception {
try {
if (ozoneManager.isRatisEnabled()) {
throw new IOException("Non-ratis call is not supported");
}
OMResponse response = ozoneManager.getOmRatisServer().submitRequest(nextRequest, ClientId.randomId(),
termIndex.getIndex());
if (!response.getSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -28,8 +28,10 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
Expand All @@ -47,20 +49,22 @@
public class OMGateway {
private static final Logger LOG = LoggerFactory.getLogger(OMGateway.class);
private final LeaderRequestExecutor leaderExecutor;
private final OzoneManagerRatisServer ratisServer;
private final OzoneManager om;
private final AtomicLong requestInProgress = new AtomicLong(0);

public OMGateway(OzoneManagerRatisServer ratisServer) {
this.leaderExecutor = new LeaderRequestExecutor(ratisServer);
this.ratisServer = ratisServer;
if (ratisServer.getOzoneManager().isLeaderExecutorEnabled()) {
public OMGateway(OzoneManager om) {
this.om = om;
this.leaderExecutor = new LeaderRequestExecutor(om);
if (om.isLeaderExecutorEnabled() && om.isRatisEnabled()) {
OzoneManagerRatisServer ratisServer = om.getOmRatisServer();
ratisServer.getOmBasicStateMachine().registerLeaderNotifier(this::leaderChangeNotifier);
}
}
public OMResponse submit(OMRequest omRequest) throws ServiceException {
if (!ratisServer.getOzoneManager().isLeaderReady()) {
OMLeaderNotReadyException leaderNotReadyException = new OMLeaderNotReadyException(
ratisServer.getRaftPeerId().toString() + " is not ready to process request yet.");
if (!om.isLeaderReady()) {
String peerId = om.isRatisEnabled() ? om.getOmRatisServer().getRaftPeerId().toString() : om.getOMNodeId();
OMLeaderNotReadyException leaderNotReadyException = new OMLeaderNotReadyException(peerId
+ " is not ready to process request yet.");
throw new ServiceException(leaderNotReadyException);
}
executorEnable();
Expand All @@ -73,7 +77,7 @@ public OMResponse submit(OMRequest omRequest) throws ServiceException {
try {
// TODO gateway locking: take lock with OMLockDetails
// TODO scheduling of request to pool
ratisServer.getOzoneManager().checkLeaderStatus();
om.checkLeaderStatus();
leaderExecutor.submit(0, requestContext);
} catch (InterruptedException e) {
requestContext.getFuture().completeExceptionally(e);
Expand All @@ -97,17 +101,17 @@ private void handleAfterExecution(RequestContext ctx, Throwable th) {
}

public void leaderChangeNotifier(String newLeaderId) {
boolean isLeader = ratisServer.getOzoneManager().getOMNodeId().equals(newLeaderId);
boolean isLeader = om.getOMNodeId().equals(newLeaderId);
if (isLeader) {
cleanupCache(Long.MAX_VALUE);
cleanupCache();
} else {
leaderExecutor.disableProcessing();
}
}

private void rebuildBucketVolumeCache() throws IOException {
LOG.info("Rebuild of bucket and volume cache");
Table<String, OmBucketInfo> bucketTable = ratisServer.getOzoneManager().getMetadataManager().getBucketTable();
Table<String, OmBucketInfo> bucketTable = om.getMetadataManager().getBucketTable();
Set<String> cachedBucketKeySet = new HashSet<>();
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>>> cacheItr = bucketTable.cacheIterator();
while (cacheItr.hasNext()) {
Expand All @@ -126,7 +130,7 @@ private void rebuildBucketVolumeCache() throws IOException {
}

Set<String> cachedVolumeKeySet = new HashSet<>();
Table<String, OmVolumeArgs> volumeTable = ratisServer.getOzoneManager().getMetadataManager().getVolumeTable();
Table<String, OmVolumeArgs> volumeTable = om.getMetadataManager().getVolumeTable();
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmVolumeArgs>>> volCacheItr = volumeTable.cacheIterator();
while (volCacheItr.hasNext()) {
cachedVolumeKeySet.add(volCacheItr.next().getKey().getCacheKey());
Expand All @@ -144,12 +148,15 @@ private void rebuildBucketVolumeCache() throws IOException {
}
}

public void cleanupCache(long lastIndex) {
public void cleanupCache() {
// TODO no-cache case, no need re-build bucket/volume cache and cleanup of cache
LOG.debug("clean all table cache and update bucket/volume with db");
for (String tbl : ratisServer.getOzoneManager().getMetadataManager().listTableNames()) {
ratisServer.getOzoneManager().getMetadataManager().getTable(tbl).cleanupCache(
Collections.singletonList(lastIndex));
for (String tbl : om.getMetadataManager().listTableNames()) {
Table table = om.getMetadataManager().getTable(tbl);
if (table instanceof TypedTable) {
ArrayList<Long> epochs = new ArrayList<>(((TypedTable<?, ?>) table).getCache().getEpochEntries().keySet());
table.cleanupCache(epochs);
}
}
try {
rebuildBucketVolumeCache();
Expand All @@ -168,7 +175,7 @@ public void executorEnable() throws ServiceException {
return;
}
if (requestInProgress.get() == 0) {
cleanupCache(Long.MAX_VALUE);
cleanupCache();
leaderExecutor.enableProcessing();
} else {
LOG.warn("Executor is not enabled, previous request {} is still not cleaned", requestInProgress.get());
Expand Down

0 comments on commit a238336

Please sign in to comment.