Skip to content

Commit

Permalink
[GOBBLIN-2088] Add retry to OH replication final catalog commit (#3976)
Browse files Browse the repository at this point in the history
* added retries to OH replication final catalog commit
* modified code to use retryerfactory with config
  • Loading branch information
Blazer-007 authored Jul 10, 2024
1 parent a1a5d4b commit b44a91e
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,33 @@
package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.Properties;

import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;

import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;

import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.RetryException;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.annotations.VisibleForTesting;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.util.retry.RetryerFactory;

import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
import static org.apache.gobblin.util.retry.RetryerFactory.RetryType;


/**
Expand All @@ -49,6 +66,12 @@ public class IcebergRegisterStep implements CommitStep {
private final TableMetadata readTimeSrcTableMetadata;
private final TableMetadata justPriorDestTableMetadata;
private final Properties properties;
public static final String RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".catalog.registration.retries";

private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of(
RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
RETRY_TIMES, 5,
RETRY_TYPE, RetryType.FIXED_ATTEMPT.name()));

public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier destTableId,
TableMetadata readTimeSrcTableMetadata, TableMetadata justPriorDestTableMetadata,
Expand Down Expand Up @@ -85,11 +108,33 @@ public void execute() throws IOException {
if (!isJustPriorDestMetadataStillCurrent) {
throw new IOException("error: likely concurrent writing to destination: " + determinationMsg);
}
destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, currentDestMetadata);
Retryer<Void> registerRetryer = createRegisterRetryer();
registerRetryer.call(() -> {
destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, currentDestMetadata);
return null;
});
} catch (IcebergTable.TableNotFoundException tnfe) {
String msg = "Destination table (with TableMetadata) does not exist: " + tnfe.getMessage();
log.error(msg);
throw new IOException(msg, tnfe);
} catch (ExecutionException executionException) {
String msg = String.format("Failed to register iceberg table : (src: {%s}) - (dest: {%s})",
this.srcTableIdStr,
this.destTableIdStr);
log.error(msg, executionException);
throw new RuntimeException(msg, executionException.getCause());
} catch (RetryException retryException) {
String interruptedNote = Thread.currentThread().isInterrupted() ? "... then interrupted" : "";
String msg = String.format("Failed to register iceberg table : (src: {%s}) - (dest: {%s}) : (retried %d times) %s ",
this.srcTableIdStr,
this.destTableIdStr,
retryException.getNumberOfFailedAttempts(),
interruptedNote);
Throwable informativeException = retryException.getLastFailedAttempt().hasException()
? retryException.getLastFailedAttempt().getExceptionCause()
: retryException;
log.error(msg, informativeException);
throw new RuntimeException(msg, informativeException);
}
}

Expand All @@ -103,4 +148,25 @@ public String toString() {
protected IcebergCatalog createDestinationCatalog() throws IOException {
return IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION);
}

private Retryer<Void> createRegisterRetryer() {
Config config = ConfigFactory.parseProperties(this.properties);
Config retryerOverridesConfig = config.hasPath(IcebergRegisterStep.RETRYER_CONFIG_PREFIX)
? config.getConfig(IcebergRegisterStep.RETRYER_CONFIG_PREFIX)
: ConfigFactory.empty();

return RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasException()) {
String msg = String.format("Exception caught while registering iceberg table : (src: {%s}) - (dest: {%s}) : [attempt: %d; %s after start]",
srcTableIdStr,
destTableIdStr,
attempt.getAttemptNumber(),
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
log.warn(msg, attempt.getExceptionCause());
}
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import static org.mockito.Mockito.*;

import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;


/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep} */
public class IcebergRegisterStepTest {
Expand All @@ -41,13 +43,7 @@ public void testDestSideMetadataMatchSucceeds() throws IOException {
TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class); // (no mocked behavior)
IcebergTable mockTable = mockIcebergTable("foo", "bar"); // matches!
IcebergRegisterStep regStep =
new IcebergRegisterStep(srcTableId, destTableId, readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
@Override
protected IcebergCatalog createDestinationCatalog() throws IOException {
return mockSingleTableIcebergCatalog(mockTable);
}
};
IcebergRegisterStep regStep = createIcebergRegisterStepInstance(readTimeSrcTableMetadata, justPriorDestTableMetadata, mockTable, new Properties());
try {
regStep.execute();
Mockito.verify(mockTable, Mockito.times(1)).registerIcebergTable(any(), any());
Expand Down Expand Up @@ -75,6 +71,64 @@ protected IcebergCatalog createDestinationCatalog() throws IOException {
}
}

@Test
public void testRegisterIcebergTableWithRetryer() throws IOException {
TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
IcebergTable mockTable = mockIcebergTable("foo", "bar");
// Mocking registerIcebergTable() call to fail for first two attempts and then succeed
// So total number of invocations to registerIcebergTable() should be 3 only
Mockito.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doNothing()
.when(mockTable).registerIcebergTable(any(), any());
IcebergRegisterStep regStep = createIcebergRegisterStepInstance(readTimeSrcTableMetadata, justPriorDestTableMetadata, mockTable, new Properties());
try {
regStep.execute();
Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(), any());
} catch (RuntimeException re) {
Assert.fail("Got Unexpected Runtime Exception", re);
}
}

@Test
public void testRegisterIcebergTableWithDefaultRetryerConfig() throws IOException {
TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
IcebergTable mockTable = mockIcebergTable("foo", "bar");
// Mocking registerIcebergTable() call to always throw exception
Mockito.doThrow(new RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
IcebergRegisterStep regStep = createIcebergRegisterStepInstance(readTimeSrcTableMetadata, justPriorDestTableMetadata, mockTable, new Properties());
try {
regStep.execute();
Assert.fail("Expected Runtime Exception");
} catch (RuntimeException re) {
// The default number of retries is 5 so register iceberg table should fail after retrying for 5 times
assertRetryTimes(re, 5);
}
}

@Test
public void testRegisterIcebergTableWithOverrideRetryerConfig() throws IOException {
TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
IcebergTable mockTable = mockIcebergTable("foo", "bar");
// Mocking registerIcebergTable() call to always throw exception
Mockito.doThrow(new RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
Properties properties = new Properties();
String retryCount = "10";
// Changing the number of retries to 10
properties.setProperty(IcebergRegisterStep.RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES, retryCount);
IcebergRegisterStep regStep = createIcebergRegisterStepInstance(readTimeSrcTableMetadata, justPriorDestTableMetadata, mockTable, properties);
try {
regStep.execute();
Assert.fail("Expected Runtime Exception");
} catch (RuntimeException re) {
// register iceberg table should fail after retrying for retryCount times mentioned above
assertRetryTimes(re, Integer.parseInt(retryCount));
}
}

protected TableMetadata mockTableMetadata(String uuid, String metadataFileLocation) throws IOException {
TableMetadata mockMetadata = Mockito.mock(TableMetadata.class);
Mockito.when(mockMetadata.uuid()).thenReturn(uuid);
Expand All @@ -100,4 +154,21 @@ protected IcebergCatalog mockSingleTableIcebergCatalog(IcebergTable mockTable) t
Mockito.when(catalog.openTable(any())).thenReturn(mockTable);
return catalog;
}

private IcebergRegisterStep createIcebergRegisterStepInstance(TableMetadata readTimeSrcTableMetadata,
TableMetadata justPriorDestTableMetadata,
IcebergTable mockTable,
Properties properties) {
return new IcebergRegisterStep(srcTableId, destTableId, readTimeSrcTableMetadata, justPriorDestTableMetadata, properties) {
@Override
protected IcebergCatalog createDestinationCatalog() throws IOException {
return mockSingleTableIcebergCatalog(mockTable);
}
};
}

private void assertRetryTimes(RuntimeException re, Integer retryTimes) {
String msg = String.format("Failed to register iceberg table : (src: {%s}) - (dest: {%s}) : (retried %d times)", srcTableId, destTableId, retryTimes);
Assert.assertTrue(re.getMessage().startsWith(msg), re.getMessage());
}
}

0 comments on commit b44a91e

Please sign in to comment.