Skip to content

HIVE-28819: [HiveAcidReplication] copy gets stuck for hive.repl.retry.total.duration for FileNotFoundException #5990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion common/src/java/org/apache/hadoop/hive/common/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,14 @@ public static boolean copy(FileSystem srcFS, Path[] srcs, FileSystem dstFS, Path
throw new IOException("copying multiple files, but last argument `" + dst + "' is not a directory");
}
} catch (FileNotFoundException var16) {
throw new IOException("`" + dst + "': specified destination directory does not exist", var16);
// Create a new FileNotFoundException with the custom message and the original message
FileNotFoundException e = new FileNotFoundException("'" + dst + "': specified destination directory does not exist");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a common code, I don't think it's safe to change it just to handle replication edge-case

e.initCause(var16); // Attach the original exception as the cause
// Re-throw the new FileNotFoundException
// This is important because this copy operation is called under Retryable and if it hits the FileNotFound exception then it comes out immediately
// and in case of IOException it waits for hive.repl.retry.total.duration which is 24 hours before giving up
throw e;

}

Path[] var17 = srcs;
Expand All @@ -843,6 +850,14 @@ public static boolean copy(FileSystem srcFS, Path[] srcs, FileSystem dstFS, Path
if (!doIOUtilsCopyBytes(srcFS, srcFS.getFileStatus(src), dstFS, dst, deleteSource, overwrite, preserveXAttr, conf, copyStatistics)) {
returnVal = false;
}
} catch (FileNotFoundException var16) {
// Create a new FileNotFoundException with the custom message and the original message
FileNotFoundException e = new FileNotFoundException("Copy operation failed");
e.initCause(var16); // Attach the original exception as the cause
// Re-throw the new FileNotFoundException
// This is important because this copy operation is called under retryable and if it hits the FileNotFound exception then it comes out immediately
// and in case of IOException it waits for hive.repl.retry.total.duration which is 24 hours before giving up
throw e;
} catch (IOException var15) {
gotException = true;
exceptions.append(var15.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
Expand Down
105 changes: 104 additions & 1 deletion ql/src/test/org/apache/hadoop/hive/ql/parse/repl/TestCopyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,33 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.common.DataCopyStatistics;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.TestFileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
Expand All @@ -59,9 +66,19 @@
*/
@RunWith(MockitoJUnitRunner.class)
public class TestCopyUtils {
private static final Path basePath = new Path("/tmp/");

private static HiveConf hiveConf;
private static FileSystem fileSystem;

@BeforeClass
public static void setup() throws Exception {
hiveConf = new HiveConf(TestFileUtils.class);
fileSystem = FileSystem.get(hiveConf);
}
/*
Distcp currently does not copy a single file in a distributed manner hence we dont care about
the size of file, if there is only file, we dont want to launch distcp.
the size of file, if there is only file, we don't want to launch distcp.
*/
@Test
public void distcpShouldNotBeCalledOnlyForOneFile() throws Exception {
Expand Down Expand Up @@ -244,4 +261,90 @@ public void testParallelCopySuccess() throws Exception {
Mockito.times(1)).invokeAll(callableCapture.capture());
}
}

@Test
public void testCopyFilesBetweenFSWithDestDirNotExistFailure() throws IOException {
Path srcPath1 = new Path(basePath, "file1.txt");
Path srcPath2 = new Path(basePath, "file2.txt");
Path dstPath = new Path(basePath, "copyDst");

try {
// Create source files
fileSystem.create(srcPath1).write("Content of file1".getBytes());
fileSystem.create(srcPath2).write("Content of file2".getBytes());

// Prepare source paths array
Path[] srcPaths = {srcPath1, srcPath2};


CopyUtils copyUtils = new CopyUtils("hive", hiveConf, fileSystem);
DataCopyStatistics copyStatistics = new DataCopyStatistics();
IOException thrown =
Assert.assertThrows(IOException.class, () -> {
copyUtils.copyFilesBetweenFS(fileSystem, srcPaths, fileSystem, dstPath, false, true, copyStatistics);
});
// this is supposed to come out of retryable function immediately without waiting as FileNotFound is not auto-recoverable error
Assert.assertEquals(FileNotFoundException.class, thrown.getCause().getClass());
Assert.assertEquals("java.io.FileNotFoundException: '/tmp/copyDst': specified destination directory does not exist", thrown.getMessage());

} finally {
// Clean up
fileSystem.delete(srcPath1, false);
fileSystem.delete(srcPath2, false);
fileSystem.delete(dstPath, true);
}
}

@Test
public void testCopyFilesBetweenFSWithSourceFileGettingDeletedFailure() throws IOException {
CountDownLatch copyStartedLatch = new CountDownLatch(1);
Path srcPath1 = new Path(basePath, "file3.txt");
Path srcPath2 = new Path(basePath, "file4.txt");
Path dstPath = new Path(basePath, "copyDst");
fileSystem.mkdirs(dstPath);

try {
// Create source files
fileSystem.create(srcPath1).write("Content of file3".getBytes());
fileSystem.create(srcPath2).write("Content of file4".getBytes());

// Prepare source paths array
Path[] srcPaths = {srcPath1, srcPath2};

// Use a separate thread to delete the file during copy
new Thread(() -> {
try {
copyStartedLatch.await(100, TimeUnit.MILLISECONDS); // This will wait for 100ms and then will delete the file
fileSystem.delete(srcPath1, false); // Delete source file mid-operation
} catch (Exception e) {
// Ignore exception
}
}).start();

CopyUtils copyUtils = new CopyUtils("hive", hiveConf, fileSystem);
DataCopyStatistics copyStatistics = new DataCopyStatistics();
try {
copyUtils.copyFilesBetweenFS(fileSystem, srcPaths, fileSystem, dstPath, false, true, copyStatistics);
// Assert that files have been copied
for (Path srcPath : srcPaths) {
Path dstFilePath = new Path(dstPath, srcPath.getName());
// If above thread deletes the file after copy is done then there will be no error
Assert.assertTrue("File " + dstFilePath + " should exist", fileSystem.exists(dstFilePath));
}
} catch (IOException e) {
// If before copy operation above thread deletes the file from the source then it is expected to throw FileNotFoundException
// Earlier it was IOException and because of that replication load was getting stuck for 24 hours because of hive.repl.retry.total.duration
// this is supposed to come out of retryable function immediately without waiting as FileNotFound is not auto-recoverable error
Assert.assertEquals(FileNotFoundException.class, e.getCause().getClass());
Assert.assertEquals("java.io.FileNotFoundException: Copy operation failed", e.getMessage());
Assert.assertTrue(e.getCause().getCause().getMessage().contains("File file:/tmp/file1.txt does not exist"));
}

} finally {
// Clean up
fileSystem.delete(srcPath1, false);
fileSystem.delete(srcPath2, false);
fileSystem.delete(dstPath, true);
}
}
}
Loading