Skip to content

Commit

Permalink
[FLINK-34239][core,table] Add copy() in SerializerConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
kumar-mallikarjuna authored and reswqa committed Jun 7, 2024
1 parent 20816c9 commit 57869c1
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,6 @@ <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(
* @param classLoader a class loader to use when loading classes
*/
void configure(ReadableConfig configuration, ClassLoader classLoader);

SerializerConfig copy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -574,4 +574,29 @@ private void registerTypeWithTypeInfoFactory(
public ExecutionConfig getExecutionConfig() {
return executionConfig;
}

@Override
public SerializerConfigImpl copy() {
final SerializerConfigImpl newSerializerConfig = new SerializerConfigImpl();
newSerializerConfig.configure(configuration, this.getClass().getClassLoader());

getRegisteredTypesWithKryoSerializers()
.forEach(
(c, s) ->
newSerializerConfig.registerTypeWithKryoSerializer(
c, s.getSerializer()));
getRegisteredTypesWithKryoSerializerClasses()
.forEach(newSerializerConfig::registerTypeWithKryoSerializer);
getDefaultKryoSerializers()
.forEach(
(c, s) ->
newSerializerConfig.addDefaultKryoSerializer(c, s.getSerializer()));
getDefaultKryoSerializerClasses().forEach(newSerializerConfig::addDefaultKryoSerializer);
getRegisteredKryoTypes().forEach(newSerializerConfig::registerKryoType);
getRegisteredPojoTypes().forEach(newSerializerConfig::registerPojoType);
getRegisteredTypeInfoFactories()
.forEach(newSerializerConfig::registerTypeWithTypeInfoFactory);

return newSerializerConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
Expand All @@ -34,15 +34,38 @@
import java.lang.reflect.Type;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;

import static org.apache.flink.configuration.PipelineOptions.KRYO_DEFAULT_SERIALIZERS;
import static org.apache.flink.configuration.PipelineOptions.KRYO_REGISTERED_CLASSES;
import static org.apache.flink.configuration.PipelineOptions.POJO_REGISTERED_CLASSES;
import static org.apache.flink.configuration.PipelineOptions.SERIALIZATION_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class SerializerConfigImplTest {
private static final Map<ConfigOption<List<String>>, String> configs = new HashMap<>();

static {
configs.put(
KRYO_DEFAULT_SERIALIZERS,
"class:org.apache.flink.api.common.serialization.SerializerConfigImplTest,"
+ "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1;"
+ "class:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1,"
+ "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer2");
configs.put(
KRYO_REGISTERED_CLASSES,
"org.apache.flink.api.common.serialization.SerializerConfigImplTest;"
+ "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1");
configs.put(
POJO_REGISTERED_CLASSES,
"org.apache.flink.api.common.serialization.SerializerConfigImplTest;"
+ "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1");
}

@Test
void testReadingDefaultConfig() {
Expand Down Expand Up @@ -84,9 +107,7 @@ void testLoadingRegisteredKryoTypesFromConfiguration() {

Configuration configuration = new Configuration();
configuration.setString(
"pipeline.registered-kryo-types",
"org.apache.flink.api.common.serialization.SerializerConfigImplTest;"
+ "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1");
KRYO_REGISTERED_CLASSES.key(), configs.get(KRYO_REGISTERED_CLASSES));

// mutate config according to configuration
configFromConfiguration.configure(
Expand All @@ -106,9 +127,7 @@ void testLoadingRegisteredPojoTypesFromConfiguration() {

Configuration configuration = new Configuration();
configuration.setString(
"pipeline.registered-pojo-types",
"org.apache.flink.api.common.serialization.SerializerConfigImplTest;"
+ "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1");
POJO_REGISTERED_CLASSES.key(), configs.get(POJO_REGISTERED_CLASSES));

// mutate config according to configuration
configFromConfiguration.configure(
Expand All @@ -131,11 +150,7 @@ void testLoadingDefaultKryoSerializersFromConfiguration() {

Configuration configuration = new Configuration();
configuration.setString(
"pipeline.default-kryo-serializers",
"class:org.apache.flink.api.common.serialization.SerializerConfigImplTest,"
+ "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1;"
+ "class:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1,"
+ "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer2");
KRYO_DEFAULT_SERIALIZERS.key(), configs.get(KRYO_DEFAULT_SERIALIZERS));

// mutate config according to configuration
configFromConfiguration.configure(
Expand Down Expand Up @@ -301,9 +316,32 @@ void testLoadingIllegalSerializationConfig() {
+ " class org.apache.flink.api.common.serialization.SerializerConfigImplTest");
}

@Test
void testCopyDefaultSerializationConfig() {
SerializerConfig config = new SerializerConfigImpl();
Configuration configuration = new Configuration();
config.configure(configuration, SerializerConfigImplTest.class.getClassLoader());

assertThat(config.copy()).isEqualTo(config);
}

@Test
void testCopySerializerConfig() {
SerializerConfig serializerConfig = new SerializerConfigImpl();
Configuration configuration = new Configuration();
configs.forEach((k, v) -> configuration.setString(k.key(), v));

serializerConfig.configure(configuration, SerializerConfigImplTest.class.getClassLoader());
serializerConfig
.getDefaultKryoSerializerClasses()
.forEach(serializerConfig::registerTypeWithKryoSerializer);

assertThat(serializerConfig.copy()).isEqualTo(serializerConfig);
}

private SerializerConfig getConfiguredSerializerConfig(String serializationConfigStr) {
Configuration configuration = new Configuration();
configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), serializationConfigStr);
configuration.setString(SERIALIZATION_CONFIG.key(), serializationConfigStr);

SerializerConfig serializerConfig = new SerializerConfigImpl();
serializerConfig.configure(configuration, Thread.currentThread().getContextClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@

import javax.annotation.Nullable;

import java.util.Optional;
import java.util.function.Supplier;

import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
Expand Down Expand Up @@ -132,49 +131,13 @@ public LogicalType createLogicalType(UnresolvedIdentifier identifier) {
private static Supplier<SerializerConfig> createSerializerConfig(
ClassLoader classLoader, ReadableConfig config, SerializerConfig serializerConfig) {
return () -> {
final SerializerConfig newSerializerConfig = new SerializerConfigImpl();

SerializerConfig newSerializerConfig;
if (serializerConfig != null) {
if (serializerConfig.isForceKryoEnabled()) {
newSerializerConfig.setForceKryo(true);
}

if (serializerConfig.isForceAvroEnabled()) {
newSerializerConfig.setForceAvro(true);
}

serializerConfig
.getDefaultKryoSerializers()
.forEach(
(c, s) ->
newSerializerConfig.addDefaultKryoSerializer(
c, s.getSerializer()));

Optional.ofNullable(serializerConfig.isForceKryoAvroEnabled().getAsBoolean())
.ifPresent(serializerConfig::setForceKryoAvro);

serializerConfig
.getDefaultKryoSerializerClasses()
.forEach(newSerializerConfig::addDefaultKryoSerializer);

serializerConfig
.getRegisteredKryoTypes()
.forEach(newSerializerConfig::registerKryoType);

serializerConfig
.getRegisteredTypesWithKryoSerializerClasses()
.forEach(newSerializerConfig::registerTypeWithKryoSerializer);

serializerConfig
.getRegisteredTypesWithKryoSerializers()
.forEach(
(c, s) ->
newSerializerConfig.registerTypeWithKryoSerializer(
c, s.getSerializer()));
newSerializerConfig = serializerConfig.copy();
} else {
newSerializerConfig = new SerializerConfigImpl();
newSerializerConfig.configure(config, classLoader);
}

newSerializerConfig.configure(config, classLoader);

return newSerializerConfig;
};
}
Expand Down

0 comments on commit 57869c1

Please sign in to comment.