-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Maintenance - TableManager + ExpireSnapshots #11144
base: main
Are you sure you want to change the base?
Conversation
@stevenzwu: This PR become quite sizeable. I still think that it is better to keep it as one to provide context for some of the decisions made during the definition of the If you have time we could discuss offline the review strategy, and whether to split this PR to smaller ones. Thanks, |
a1dabe5
to
96322c5
Compare
import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
||
public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be marked as @Internal
or even package private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to allow the users to create their own maintenance tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep this package private first so that it is easier to evolve the class especially during early stages.
when there is real need in the future, we can always make it public then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's mark it experimental then. I definitely would like to provide a way for the users to extend the maintenance tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not saying we shouldn't. but it is usually good to keep them private first so that we are free to evolve the class. Maybe wait until the need is clear.
Use Spark as an example. the BaseSparkAction
is package private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is a single abstract DataStream<TaskResult> append(DataStream<Trigger> sourceStream);
method.
I don't expect big changes here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not just append
. there are a lot of public methods in this class.
If we have any doubt if users would implement extensions from this class, we can delay the decision until real ask came forward. it is trivial to make a private class public. But once a class is public, it is more difficult to change/evolve the contract.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public methods are public API anyways...
These are the ones which will be called by the users when they are scheduling the MaintenanceTasks...
...k/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java
Outdated
Show resolved
Hide resolved
...k/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java
Outdated
Show resolved
Hide resolved
...k/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java
Outdated
Show resolved
Hide resolved
* @param newPlanningWorkerPoolSize for planning files to delete | ||
* @return for chained calls | ||
*/ | ||
public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we design it like RemoveSnapshots
? one benefit is to use the default ThreadPools.getWorkerPool()/getDeleteWorkerPool()
to reuse thread pools in the JVM.
public ExpireSnapshots executeDeleteWith(ExecutorService executorService)
public ExpireSnapshots planWith(ExecutorService executorService)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting to do everything in the operator instead of separating out the delete to another operator?
The Spark implementation even split the expired file calculations to multiple operators for performance reasons. In the long run we might go down that road... WYYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting to do everything in the operator instead of separating out the delete to another operator?
Nope, that is not what I meant.
I was wondering if ExpireSnapshotsProcessor
and AsyncDeleteFiles
should use the default shared thread pools from ThreadPools.getWorkerPool()/getDeleteWorkerPool()
, instead of creating new pools.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the idea of shared pools. If some users create multiple maintenance flows in a single job, they will end up using the same pool, and will block each-other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, if you have multiple slots on a single TaskManager, then these pools are shared between the subtasks. Which is again not something we want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sharing thread pool is not necessarily a bad thing. it can limit the concurrent I/O. E.g., we may not want to have too many threads perform scan planing, which can be memory intensive.
Deletes have low memory footprint. Hence it is probably less of a concern to have separate pools. but probably good to keep an eye on the number of http connections
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We create our own pool for the source planner too, and it is not configurable there.
Do you think this risk is high enough to merit a new configuration value for this?
.../v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
Outdated
Show resolved
Hide resolved
...1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java
Outdated
Show resolved
Hide resolved
...k/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
Outdated
Show resolved
Hide resolved
int maintenanceTaskIndex, | ||
String maintainanceTaskName, | ||
TableLoader newTableLoader, | ||
String mainUidSuffix, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does main
mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a possibility to inherit the suffix and the slotSharingGroup from the TableMaintenance.Builder. It could be overwritten on task-by-task basis.
The main
here is the value inherited from the TableMaintenance.Builder
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.flink.maintenance.stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if users would interpret stream
sub package as pubic APIs. It is better to use proper Java class scope for that purpose. public classes are public and non-public classes can be package private.
...1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java
Outdated
Show resolved
Hide resolved
...nk/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
Show resolved
Hide resolved
ctx.output(DELETE_STREAM, file); | ||
deleteFileCounter.incrementAndGet(); | ||
}) | ||
.cleanExpiredFiles(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should add Javadoc to the ExpireSnapshots
class that expired files are always deleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this:
/** Deletes expired snapshots and the corresponding files. */
...nk/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
Outdated
Show resolved
Hide resolved
} catch (Exception e) { | ||
LOG.info("Exception expiring snapshots for {} at {}", table, ctx.timestamp(), e); | ||
out.collect( | ||
new TaskResult(trigger.taskId(), trigger.timestamp(), false, Lists.newArrayList(e))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskResult
has List<Exception> exceptions
. wondering what scenario would we have a list of exceptions to propagate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other maintenance tasks might have multiple errors from multiple operators/subtasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. I am still not quite following. Each operator subtask emits a TaskResult
. Each TaskResult
should only contain one exception, right?
I didn't see the exceptions are used by downstream. if success
boolean flag good enough for downstream, maybe we can remove the exceptions from TaskResult
as stack trace can be non-trivial.
BTW, TaskResult
is not marked as Serializable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we do compaction, then we have multiple subtasks running parallel. If more than one of them fails, then we will have multiple exception messages to report, but we don't want to fail the job (especially in PostCommitTopology).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I think it is very rare that we have an exception, and it is good to have a single place where we can collect/handle those. So while serializing a stack trace is non-trivial, I think it worth the cost in the long run.
Fixed the TaskResult serialization....
|
||
private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); | ||
private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; | ||
private Duration rateLimit = Duration.ofMillis(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is 1 ms
a good default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch - remained from a different config.
Set it to 1 min.
WDYT?
private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); | ||
private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; | ||
private Duration rateLimit = Duration.ofMillis(1); | ||
private Duration lockCheckDelay = Duration.ofSeconds(30); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is 30s
a good default? is that based on the estimated average of task run time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not set it based on the estimated average task run time, as if the actual run time is longer only by a bit, then we will wait for 2 times the required time.
30s seems reasonable for the JDBC lock manager
.../v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java
Outdated
Show resolved
Hide resolved
.../v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java
Outdated
Show resolved
Hide resolved
.../v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java
Outdated
Show resolved
Hide resolved
.../flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestAsyncDeleteFiles.java
Outdated
Show resolved
Hide resolved
.../flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestAsyncDeleteFiles.java
Outdated
Show resolved
Hide resolved
.../flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestAsyncDeleteFiles.java
Outdated
Show resolved
Hide resolved
.../flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestAsyncDeleteFiles.java
Outdated
Show resolved
Hide resolved
.../flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestAsyncDeleteFiles.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
void testMetrics() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly, can metrics assertion be added to one of earlier methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to separate out testing the different features
.add( | ||
new MaintenanceTaskBuilderForTest(true) | ||
.scheduleOnCommitCount(1) | ||
.uidSuffix(anotherUid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need the flexibility of uidSuffix
overwrite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to keep the MonitorSource state, but drop some or one of the maintenance tasks state, then we need a different uid
...20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java
Outdated
Show resolved
Hide resolved
...20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java
Outdated
Show resolved
Hide resolved
...20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java
Outdated
Show resolved
Hide resolved
fa56618
to
2403f44
Compare
TableManager builder implementation along with the first maintenance task to provide context.
https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.yd2vbtnf7z6w