Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17556. Avoid adding block to neededReconstruction repeatedly in decommission #6896

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ private boolean isBlockReplicatedOk(DatanodeDescriptor datanode,
if (neededReconstruction && scheduleReconStruction) {
if (!blockManager.neededReconstruction.contains(block) &&
blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
!blockManager.pendingReconstruction.hasTimeOutBlock(block) &&
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReconstruction.add(block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ private void processBlocksInternal(
if (neededReconstruction) {
if (!blockManager.neededReconstruction.contains(block) &&
blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
!blockManager.pendingReconstruction.hasTimeOutBlock(block) &&
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReconstruction.add(block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.Block;
Expand All @@ -50,7 +52,7 @@ class PendingReconstructionBlocks {
private static final Logger LOG = BlockManager.LOG;

private final Map<BlockInfo, PendingBlockInfo> pendingReconstructions;
private final ArrayList<BlockInfo> timedOutItems;
private final Set<BlockInfo> timedOutItems;
Daemon timerThread = null;
private volatile boolean fsRunning = true;
private long timedOutCount = 0L;
Expand All @@ -68,7 +70,7 @@ class PendingReconstructionBlocks {
this.timeout = timeoutPeriod;
}
pendingReconstructions = new HashMap<>();
timedOutItems = new ArrayList<>();
timedOutItems = new HashSet<>();
}

void start() {
Expand Down Expand Up @@ -170,6 +172,16 @@ int getNumReplicas(BlockInfo block) {
return 0;
}

/**
* Check if block in timedOutItems.
* @return true if the block is in timedOutItems.
*/
boolean hasTimeOutBlock(BlockInfo block) {
synchronized (timedOutItems) {
return timedOutItems.contains(block);
}
}

/**
* Used for metrics.
* @return The number of timeouts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.AdminStatesBaseTest;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
Expand All @@ -28,6 +29,7 @@
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.util.Lists;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -2329,4 +2331,81 @@ public void delayDeleteReplica() {
DataNodeFaultInjector.set(oldInjector);
}
}

private void waitForDecommissionedNodes(final DatanodeAdminManager dnAdminMgr,
final int trackedNumber)
throws TimeoutException, InterruptedException {
GenericTestUtils
.waitFor(() -> dnAdminMgr.getNumTrackedNodes() == trackedNumber,
100, 2000);
}

@Test(timeout = 360000)
public void testDecommissionWithPendingTimeout() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false);

HostsFileWriter hostsFileWriter = new HostsFileWriter();
hostsFileWriter.initialize(conf, "work-dir/decommission");
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);

MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
cluster.getNamesystem().getBlockManager().getDatanodeManager().setHeartbeatExpireInterval(3000);
try {
DistributedFileSystem fs = cluster.getFileSystem();
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
DatanodeManager dm = cluster.getNamesystem().getBlockManager().getDatanodeManager();
cluster.waitActive();

DistributedFileSystem dfs = cluster.getFileSystem();
// create a file
Path filePath = new Path("/tmp.txt");
DFSTestUtil.createFile(dfs, filePath, 1024, (short) 1, 0L);
FSDataOutputStream st1 =
AdminStatesBaseTest.writeIncompleteFile(fs, filePath, (short) 1, (short) (1));
st1.close();

LocatedBlocks blocks =
NameNodeAdapter.getBlockLocations(cluster.getNameNode(), filePath.toString(), 0, 1);

BlockInfo bi = blockManager.getStoredBlock(blocks.get(0).getBlock().getLocalBlock());
String dnName = blocks.get(0).getLocations()[0].getXferAddr();

ArrayList<String> dns = new ArrayList<String>(2);
dns.add(dnName);
hostsFileWriter.initExcludeHosts(dns);
dm.refreshNodes(conf);
// Force DatanodeManager to check decommission state.
BlockManagerTestUtil.recheckDecommissionState(dm);
// Block until the admin's monitor updates the number of tracked dns.
waitForDecommissionedNodes(dm.getDatanodeAdminManager(), 1);
// the node as decommissioning, even if it's dead
List<DatanodeDescriptor> decomlist = dm.getDecommissioningNodes();
assertTrue("The node should be be decommissioning", decomlist.size() == 1);

// 2. disable the IBR
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.pauseIBR(dn);
}
while (blockManager.numOfUnderReplicatedBlocks() > 0
|| blockManager.pendingReconstruction.size() > 0) {
try {
Thread.sleep(500);
} catch (Exception e) {
}
}

assertEquals(false, blockManager.neededReconstruction.contains(bi));
assertEquals(0, blockManager.pendingReconstruction.getNumReplicas(bi));
assertEquals(true, blockManager.pendingReconstruction.hasTimeOutBlock(bi));
} finally {
cluster.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void testPendingReconstruction() {
//
assertEquals("Size of pendingReconstructions ", 0, pendingReconstructions.size());
assertEquals(15L, pendingReconstructions.getNumTimedOuts());
assertTrue(pendingReconstructions.hasTimeOutBlock(blk));
Block[] timedOut = pendingReconstructions.getTimedOutBlocks();
assertNotNull(timedOut);
assertEquals(15, timedOut.length);
Expand Down
Loading