Skip to content

Commit

Permalink
[FLINK-35510][state/forst] Implement basic incremental checkpoint for…
Browse files Browse the repository at this point in the history
… ForStStateBackend (apache#24879)
  • Loading branch information
zoltar9264 authored Sep 19, 2024
1 parent df412ad commit ffd0522
Show file tree
Hide file tree
Showing 22 changed files with 2,387 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
Expand All @@ -35,7 +36,11 @@
* in batch.
*/
@Internal
public interface AsyncKeyedStateBackend extends Disposable, Closeable {
public interface AsyncKeyedStateBackend
extends Snapshotable<SnapshotResult<KeyedStateHandle>>,
InternalCheckpointListener,
Disposable,
Closeable {

/**
* Initializes with some contexts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public static RegisteredStateMetaInfoBase fromMetaInfoSnapshot(
return new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
case PRIORITY_QUEUE:
return new RegisteredPriorityQueueStateBackendMetaInfo<>(snapshot);
case KEY_VALUE_V2:
return new org.apache.flink.runtime.state.v2
.RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
default:
throw new IllegalArgumentException(
"Unknown backend state type: " + backendStateType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ public enum BackendStateType {
KEY_VALUE(0),
OPERATOR(1),
BROADCAST(2),
PRIORITY_QUEUE(3);
PRIORITY_QUEUE(3),

KEY_VALUE_V2(10),
;
private final byte code;

BackendStateType(int code) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

import java.util.Collections;
import java.util.Map;

/**
* Compound meta information for a registered state in a keyed state backend. This combines all
* serializers and the state name.
*
* @param <S> Type of state value
*/
public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStateMetaInfoBase {

@Nonnull private final StateDescriptor.Type stateType;
@Nonnull private final StateSerializerProvider<N> namespaceSerializerProvider;
@Nonnull private final StateSerializerProvider<S> stateSerializerProvider;
@Nonnull private StateSnapshotTransformFactory<S> stateSnapshotTransformFactory;

public RegisteredKeyValueStateBackendMetaInfo(
@Nonnull String name,
@Nonnull StateDescriptor.Type stateType,
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull TypeSerializer<S> stateSerializer) {

this(
name,
stateType,
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
StateSnapshotTransformFactory.noTransform());
}

public RegisteredKeyValueStateBackendMetaInfo(
@Nonnull String name,
@Nonnull StateDescriptor.Type stateType,
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull TypeSerializer<S> stateSerializer,
@Nonnull StateSnapshotTransformFactory<S> stateSnapshotTransformFactory) {

this(
name,
stateType,
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer),
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer),
stateSnapshotTransformFactory);
}

@SuppressWarnings("unchecked")
public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
this(
snapshot.getName(),
StateDescriptor.Type.valueOf(
snapshot.getOption(
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<N>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys
.NAMESPACE_SERIALIZER))),
StateSerializerProvider.fromPreviousSerializerSnapshot(
(TypeSerializerSnapshot<S>)
Preconditions.checkNotNull(
snapshot.getTypeSerializerSnapshot(
StateMetaInfoSnapshot.CommonSerializerKeys
.VALUE_SERIALIZER))),
StateSnapshotTransformFactory.noTransform());

Preconditions.checkState(
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE_V2
== snapshot.getBackendStateType());
}

private RegisteredKeyValueStateBackendMetaInfo(
@Nonnull String name,
@Nonnull StateDescriptor.Type stateType,
@Nonnull StateSerializerProvider<N> namespaceSerializerProvider,
@Nonnull StateSerializerProvider<S> stateSerializerProvider,
@Nonnull StateSnapshotTransformFactory<S> stateSnapshotTransformFactory) {

super(name);
this.stateType = stateType;
this.namespaceSerializerProvider = namespaceSerializerProvider;
this.stateSerializerProvider = stateSerializerProvider;
this.stateSnapshotTransformFactory = stateSnapshotTransformFactory;
}

@Nonnull
public StateDescriptor.Type getStateType() {
return stateType;
}

@Nonnull
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializerProvider.currentSchemaSerializer();
}

@Nonnull
public TypeSerializer<S> getStateSerializer() {
return stateSerializerProvider.currentSchemaSerializer();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

RegisteredKeyValueStateBackendMetaInfo<?, ?> that =
(RegisteredKeyValueStateBackendMetaInfo<?, ?>) o;

if (!stateType.equals(that.stateType)) {
return false;
}

if (!getName().equals(that.getName())) {
return false;
}

return getStateSerializer().equals(that.getStateSerializer())
&& getNamespaceSerializer().equals(that.getNamespaceSerializer());
}

@Override
public String toString() {
return "RegisteredKeyedBackendStateMetaInfo{"
+ "stateType="
+ stateType
+ ", name='"
+ name
+ '\''
+ ", namespaceSerializer="
+ getNamespaceSerializer()
+ ", stateSerializer="
+ getStateSerializer()
+ '}';
}

@Override
public int hashCode() {
int result = getName().hashCode();
result = 31 * result + getStateType().hashCode();
result = 31 * result + getNamespaceSerializer().hashCode();
result = 31 * result + getStateSerializer().hashCode();
return result;
}

@Nonnull
@Override
public StateMetaInfoSnapshot snapshot() {
return computeSnapshot();
}

@Nonnull
@Override
public RegisteredKeyValueStateBackendMetaInfo<N, S> withSerializerUpgradesAllowed() {
return new RegisteredKeyValueStateBackendMetaInfo<>(snapshot());
}

@Nonnull
private StateMetaInfoSnapshot computeSnapshot() {
Map<String, String> optionsMap =
Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
stateType.toString());

Map<String, TypeSerializer<?>> serializerMap = CollectionUtil.newHashMapWithExpectedSize(2);
Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
CollectionUtil.newHashMapWithExpectedSize(2);

String namespaceSerializerKey =
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
String valueSerializerKey =
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();

TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
serializerConfigSnapshotsMap.put(
namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());

TypeSerializer<S> stateSerializer = getStateSerializer();
serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
serializerConfigSnapshotsMap.put(
valueSerializerKey, stateSerializer.snapshotConfiguration());

return new StateMetaInfoSnapshot(
name,
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE_V2,
optionsMap,
serializerConfigSnapshotsMap,
serializerMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,37 @@

package org.apache.flink.runtime.state.v2.adaptor;

import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.StateDescriptorUtils;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.concurrent.RunnableFuture;

/**
* A adaptor that transforms {@link KeyedStateBackend} into {@link AsyncKeyedStateBackend}.
*
* @param <K> The key by which state is keyed.
*/
public class AsyncKeyedStateBackendAdaptor<K> implements AsyncKeyedStateBackend {
private final KeyedStateBackend<K> keyedStateBackend;
private final CheckpointableKeyedStateBackend<K> keyedStateBackend;

public AsyncKeyedStateBackendAdaptor(KeyedStateBackend<K> keyedStateBackend) {
public AsyncKeyedStateBackendAdaptor(CheckpointableKeyedStateBackend<K> keyedStateBackend) {
this.keyedStateBackend = keyedStateBackend;
}

Expand Down Expand Up @@ -78,4 +86,37 @@ public void dispose() {}

@Override
public void close() throws IOException {}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (keyedStateBackend instanceof CheckpointListener) {
((CheckpointListener) keyedStateBackend).notifyCheckpointComplete(checkpointId);
}
}

@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
if (keyedStateBackend instanceof CheckpointListener) {
((CheckpointListener) keyedStateBackend).notifyCheckpointAborted(checkpointId);
}
}

@Override
public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
if (keyedStateBackend instanceof InternalCheckpointListener) {
((InternalCheckpointListener) keyedStateBackend).notifyCheckpointSubsumed(checkpointId);
}
}

@Nonnull
@Override
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions)
throws Exception {
return keyedStateBackend.snapshot(
checkpointId, timestamp, streamFactory, checkpointOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ void snapshotState(
keyedStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}

} else if (null != asyncKeyedStateBackend) {
if (isCanonicalSavepoint(checkpointOptions.getCheckpointType())) {
throw new UnsupportedOperationException("Not supported yet.");
} else {
snapshotInProgress.setKeyedStateManagedFuture(
asyncKeyedStateBackend.snapshot(
checkpointId, timestamp, factory, checkpointOptions));
}
}
} catch (Exception snapshotException) {
try {
Expand Down Expand Up @@ -331,12 +340,20 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (keyedStateBackend instanceof CheckpointListener) {
((CheckpointListener) keyedStateBackend).notifyCheckpointComplete(checkpointId);
}

if (asyncKeyedStateBackend != null) {
asyncKeyedStateBackend.notifyCheckpointComplete(checkpointId);
}
}

public void notifyCheckpointAborted(long checkpointId) throws Exception {
if (keyedStateBackend instanceof CheckpointListener) {
((CheckpointListener) keyedStateBackend).notifyCheckpointAborted(checkpointId);
}

if (asyncKeyedStateBackend != null) {
asyncKeyedStateBackend.notifyCheckpointAborted(checkpointId);
}
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ public StreamOperatorStateContext streamOperatorStateContext(
managedMemoryFraction,
statsCollector,
StateBackend::createKeyedStateBackend);
asyncKeyedStateBackend = new AsyncKeyedStateBackendAdaptor<>(keyedStatedBackend);
if (keyedStatedBackend != null) {
asyncKeyedStateBackend =
new AsyncKeyedStateBackendAdaptor<>(keyedStatedBackend);
}
}

// -------------- Operator State Backend --------------
Expand Down
Loading

0 comments on commit ffd0522

Please sign in to comment.