Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Maintenance - TableManager + ExpireSnapshots #11144

Merged
merged 12 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.operator;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Delete the files using the {@link FileIO}. */
@Internal
public class AsyncDeleteFiles extends RichAsyncFunction<String, Boolean> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncDeleteFiles.class);
public static final Predicate<Collection<Boolean>> FAILED_PREDICATE = new FailedPredicate();

private final String name;
private final FileIO io;
private final int workerPoolSize;
private final String tableName;

private transient ExecutorService workerPool;
private transient Counter failedCounter;
private transient Counter succeededCounter;

public AsyncDeleteFiles(String name, TableLoader tableLoader, int workerPoolSize) {
Preconditions.checkNotNull(name, "Name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");

this.name = name;
tableLoader.open();
Table table = tableLoader.loadTable();
this.io = table.io();
this.workerPoolSize = workerPoolSize;
this.tableName = table.name();
}

@Override
public void open(Configuration parameters) throws Exception {
this.failedCounter =
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER);
this.succeededCounter =
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER);

this.workerPool =
pvary marked this conversation as resolved.
Show resolved Hide resolved
ThreadPools.newWorkerPool(tableName + "-" + name + "-async-delete-files", workerPoolSize);
}

@Override
public void asyncInvoke(String fileName, ResultFuture<Boolean> resultFuture) {
workerPool.execute(
() -> {
try {
LOG.info("Deleting file: {} with {}", fileName, name);
io.deleteFile(fileName);
pvary marked this conversation as resolved.
Show resolved Hide resolved
resultFuture.complete(Collections.singletonList(true));
succeededCounter.inc();
} catch (Throwable e) {
LOG.info("Failed to delete file {} with {}", fileName, name, e);
resultFuture.complete(Collections.singletonList(false));
failedCounter.inc();
}
});
}

private static class FailedPredicate implements Predicate<Collection<Boolean>>, Serializable {
@Override
public boolean test(Collection<Boolean> collection) {
return collection.size() != 1 || !collection.iterator().next();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.operator;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the filenames which could
* be removed in the {@link #DELETE_STREAM} side output.
*/
public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResult> {
pvary marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class);
public static final OutputTag<String> DELETE_STREAM =
new OutputTag<>("delete-stream", Types.STRING);
pvary marked this conversation as resolved.
Show resolved Hide resolved

private final TableLoader tableLoader;
private final Long minAgeMs;
private final Integer retainLast;
private final int plannerPoolSize;
private transient ExecutorService plannerPool;
private transient Table table;

public ExpireSnapshotsProcessor(
TableLoader tableLoader, Long minAgeMs, Integer retainLast, int plannerPoolSize) {
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");

this.tableLoader = tableLoader;
this.minAgeMs = minAgeMs;
this.retainLast = retainLast;
this.plannerPoolSize = plannerPoolSize;
}

@Override
public void open(Configuration parameters) throws Exception {
tableLoader.open();
this.table = tableLoader.loadTable();
this.plannerPool = ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize);
}

@Override
public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> out)
throws Exception {
try {
table.refresh();
ExpireSnapshots expireSnapshots = table.expireSnapshots();
if (minAgeMs != null) {
expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - minAgeMs);
}

if (retainLast != null) {
expireSnapshots = expireSnapshots.retainLast(retainLast);
}

expireSnapshots
.planWith(plannerPool)
.deleteWith(file -> ctx.output(DELETE_STREAM, file))
.cleanExpiredFiles(true)
stevenzwu marked this conversation as resolved.
Show resolved Hide resolved
.commit();

LOG.info("Successfully finished expiring snapshots for {} at {}", table, ctx.timestamp());
pvary marked this conversation as resolved.
Show resolved Hide resolved
out.collect(
new TaskResult(trigger.taskId(), trigger.timestamp(), true, Collections.emptyList()));
} catch (Exception e) {
LOG.info("Exception expiring snapshots for {} at {}", table, ctx.timestamp(), e);
pvary marked this conversation as resolved.
Show resolved Hide resolved
out.collect(
new TaskResult(trigger.taskId(), trigger.timestamp(), false, Lists.newArrayList(e)));
stevenzwu marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

/** Monitors an Iceberg table for changes */
@Internal
class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
public class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
private static final Logger LOG = LoggerFactory.getLogger(MonitorSource.class);

private final TableLoader tableLoader;
Expand All @@ -58,7 +58,7 @@ class MonitorSource extends SingleThreadedIteratorSource<TableChange> {
* @param rateLimiterStrategy limits the frequency the table is checked
* @param maxReadBack sets the number of snapshots read before stopping change collection
*/
MonitorSource(
public MonitorSource(
TableLoader tableLoader, RateLimiterStrategy rateLimiterStrategy, long maxReadBack) {
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(rateLimiterStrategy, "Rate limiter strategy should no be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/** Event describing changes in an Iceberg table */
@Internal
class TableChange {
public class TableChange {
private int dataFileCount;
private long dataFileSizeInBytes;
private int posDeleteFileCount;
Expand Down Expand Up @@ -87,7 +87,7 @@ static TableChange empty() {
return new TableChange(0, 0L, 0, 0L, 0, 0L, 0);
}

static Builder builder() {
public static Builder builder() {
return new Builder();
}

Expand Down Expand Up @@ -115,7 +115,7 @@ long eqDeleteRecordCount() {
return eqDeleteRecordCount;
}

public int commitCount() {
int commitCount() {
return commitCount;
}

Expand Down Expand Up @@ -183,7 +183,7 @@ public int hashCode() {
commitCount);
}

static class Builder {
public static class Builder {
private int dataFileCount = 0;
private long dataFileSizeInBytes = 0L;
private int posDeleteFileCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class TableMaintenanceMetrics {
public static final String FAILED_TASK_COUNTER = "failedTasks";
public static final String LAST_RUN_DURATION_MS = "lastRunDurationMs";

// DeleteFiles metrics
public static final String DELETE_FILE_FAILED_COUNTER = "deleteFailed";
public static final String DELETE_FILE_SUCCEEDED_COUNTER = "deleteSucceeded";

private TableMaintenanceMetrics() {
// do not instantiate
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

@Internal
class Trigger {
public class Trigger {
private final long timestamp;
private final SerializableTable table;
private final Integer taskId;
Expand All @@ -36,23 +36,23 @@ private Trigger(long timestamp, SerializableTable table, Integer taskId, boolean
this.isRecovery = isRecovery;
}

static Trigger create(long timestamp, SerializableTable table, int taskId) {
public static Trigger create(long timestamp, SerializableTable table, int taskId) {
return new Trigger(timestamp, table, taskId, false);
}

static Trigger recovery(long timestamp) {
return new Trigger(timestamp, null, null, true);
}

long timestamp() {
public long timestamp() {
return timestamp;
}

SerializableTable table() {
return table;
}

Integer taskId() {
public Integer taskId() {
return taskId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.slf4j.LoggerFactory;

@Internal
class TriggerEvaluator implements Serializable {
public class TriggerEvaluator implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(TriggerEvaluator.class);
private final List<Predicate> predicates;

Expand All @@ -50,7 +50,7 @@ boolean check(TableChange event, long lastTimeMs, long currentTimeMs) {
return result;
}

static class Builder implements Serializable {
public static class Builder implements Serializable {
private Integer dataFileCount;
private Long dataFileSizeInBytes;
private Integer posDeleteFileCount;
Expand Down Expand Up @@ -95,12 +95,12 @@ public Builder commitCount(int newCommitCount) {
return this;
}

Builder timeout(Duration newTimeout) {
public Builder timeout(Duration newTimeout) {
this.timeout = newTimeout;
return this;
}

TriggerEvaluator build() {
public TriggerEvaluator build() {
List<Predicate> predicates = Lists.newArrayList();
if (dataFileCount != null) {
predicates.add((change, unused, unused2) -> change.dataFileCount() >= dataFileCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ interface Lock {
*/
boolean isHeld();

// TODO: Fix the link to the LockRemover when we have a final name and implementation
/**
* Releases the lock. Should not fail if the lock is not held by anyone.
*
* <p>Called by LockRemover. Implementations could assume that are no concurrent calls for this
* method.
* <p>Called by {@link LockRemover}. Implementations could assume that are no concurrent calls
* for this method.
*/
void unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* the timer functions are available, but the key is not used.
*/
@Internal
class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger>
public class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger>
implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class);

Expand Down Expand Up @@ -89,7 +89,7 @@ class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger>
private transient int startsFrom = 0;
private transient boolean triggered = false;

TriggerManager(
public TriggerManager(
TableLoader tableLoader,
TriggerLockFactory lockFactory,
List<String> maintenanceTaskNames,
Expand Down
Loading