Skip to content

Commit

Permalink
Support to serialize and de-serialize Instant, OffsetDateTime and Zon…
Browse files Browse the repository at this point in the history
…edDateTime to Avro long type and logicalType.
  • Loading branch information
MichalFoksa committed Jun 6, 2021
1 parent 8f8bcb6 commit e23d0b4
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.fasterxml.jackson.dataformat.avro.jsr310;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.deser.ContextualDeserializer;
import com.fasterxml.jackson.databind.deser.std.StdScalarDeserializer;

import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.function.BiFunction;

/**
* A deserializer for variants of java.time classes that represent a specific instant on the timeline
* (Instant, OffsetDateTime, ZonedDateTime) which supports de-serialization from Avro long.
*
* See: http://avro.apache.org/docs/current/spec.html#Logical+Types
*
* Note: {@link AvroInstantDeserializer} does not support deserialization from string.
*
* @param <T> The type of a instant class that can be deserialized.
*/
public class AvroInstantDeserializer<T extends Temporal> extends StdScalarDeserializer<T>
implements ContextualDeserializer {

private static final long serialVersionUID = 1L;

public static final AvroInstantDeserializer<Instant> INSTANT =
new AvroInstantDeserializer<>(Instant.class, (instant, zoneID) -> instant);

public static final AvroInstantDeserializer<OffsetDateTime> OFFSET_DATE_TIME =
new AvroInstantDeserializer<>(OffsetDateTime.class, OffsetDateTime::ofInstant);

public static final AvroInstantDeserializer<ZonedDateTime> ZONED_DATE_TIME =
new AvroInstantDeserializer<>(ZonedDateTime.class, ZonedDateTime::ofInstant);

protected final BiFunction<Instant, ZoneId, T> fromInstant;

protected AvroInstantDeserializer(Class<T> t, BiFunction<Instant, ZoneId, T> fromInstant) {
super(t);
this.fromInstant = fromInstant;
}

@SuppressWarnings("unchecked")
@Override
public T deserialize(JsonParser p, DeserializationContext context) throws IOException, JsonProcessingException {
final ZoneId defaultZoneId = context.getTimeZone().toZoneId().normalized();
switch (p.getCurrentToken()) {
case VALUE_NUMBER_INT:
return fromLong(p.getLongValue(), defaultZoneId);
default:
try {
return (T) context.handleUnexpectedToken(_valueClass, p);
} catch (JsonMappingException e) {
throw e;
} catch (IOException e) {
throw JsonMappingException.fromUnexpectedIOE(e);
}
}
}

@Override
public JsonDeserializer<T> createContextual(DeserializationContext ctxt, BeanProperty property) {
return this;
}

private T fromLong(long longValue, ZoneId defaultZoneId) {
return fromInstant.apply(Instant.ofEpochMilli(longValue), defaultZoneId);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.fasterxml.jackson.dataformat.avro.jsr310;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonFormatVisitorWrapper;
import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonIntegerFormatVisitor;
import com.fasterxml.jackson.databind.ser.ContextualSerializer;
import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer;

import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZonedDateTime;
import java.time.temporal.Temporal;
import java.util.function.Function;

/**
* A serializer for variants of java.time classes that represent a specific instant on the timeline
* (Instant, OffsetDateTime, ZonedDateTime) which supports serialization to Avro long type and logicalType.
*
* See: http://avro.apache.org/docs/current/spec.html#Logical+Types
*
* Note: {@link AvroInstantSerializer} does not support serialization to string.
*
* @param <T> The type of a instant class that can be serialized.
*/
public class AvroInstantSerializer<T extends Temporal> extends StdScalarSerializer<T>
implements ContextualSerializer {

private static final long serialVersionUID = 1L;

public static final AvroInstantSerializer<Instant> INSTANT =
new AvroInstantSerializer<>(Instant.class, Function.identity());

public static final AvroInstantSerializer<OffsetDateTime> OFFSET_DATE_TIME =
new AvroInstantSerializer<>(OffsetDateTime.class, OffsetDateTime::toInstant);

public static final AvroInstantSerializer<ZonedDateTime> ZONED_DATE_TIME =
new AvroInstantSerializer<>(ZonedDateTime.class, ZonedDateTime::toInstant);

private final Function<T, Instant> getInstant;

protected AvroInstantSerializer(Class<T> t, Function<T, Instant> getInstant) {
super(t);
this.getInstant = getInstant;
}

@Override
public void serialize(T value, JsonGenerator gen, SerializerProvider provider) throws IOException {
final Instant instant = getInstant.apply(value);
gen.writeNumber(instant.toEpochMilli());
}

@Override
public JsonSerializer<?> createContextual(SerializerProvider prov, BeanProperty property) {
return this;
}

@Override
public void acceptJsonFormatVisitor(JsonFormatVisitorWrapper visitor, JavaType typeHint) throws JsonMappingException {
JsonIntegerFormatVisitor v2 = visitor.expectIntegerFormat(typeHint);
if (v2 != null) {
v2.numberType(JsonParser.NumberType.LONG);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.fasterxml.jackson.dataformat.avro.jsr310;

import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.core.json.PackageVersion;
import com.fasterxml.jackson.databind.module.SimpleModule;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZonedDateTime;

/**
* A module that installs a collection of serializers and deserializers for java.time classes.
*/
public class AvroJavaTimeModule extends SimpleModule {

private static final long serialVersionUID = 1L;

public AvroJavaTimeModule() {
super(PackageVersion.VERSION);
addSerializer(Instant.class, AvroInstantSerializer.INSTANT);
addSerializer(OffsetDateTime.class, AvroInstantSerializer.OFFSET_DATE_TIME);
addSerializer(ZonedDateTime.class, AvroInstantSerializer.ZONED_DATE_TIME);

addDeserializer(Instant.class, AvroInstantDeserializer.INSTANT);
addDeserializer(OffsetDateTime.class, AvroInstantDeserializer.OFFSET_DATE_TIME);
addDeserializer(ZonedDateTime.class, AvroInstantDeserializer.ZONED_DATE_TIME);
}

@Override
public String getModuleName() {
return getClass().getName();
}

@Override
public Version version() {
return PackageVersion.VERSION;
}

@Override
public void setupModule(SetupContext context) {
super.setupModule(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.fasterxml.jackson.dataformat.avro.jsr310;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificData;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collection;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(Parameterized.class)
public class AvroInstantSerializer_schemaCreationTest {

@Parameter
public Class testClass;

@Parameters(name = "With {0}")
public static Collection<Class> testData() {
return Arrays.asList(
Instant.class,
OffsetDateTime.class,
ZonedDateTime.class);
}

@Test
public void testSchemaCreation() throws JsonMappingException {
// GIVEN
AvroMapper mapper = AvroMapper.builder()
.addModules(new AvroJavaTimeModule())
.build();
AvroSchemaGenerator gen = new AvroSchemaGenerator();

// WHEN
mapper.acceptJsonFormatVisitor(testClass, gen);
Schema actualSchema = gen.getGeneratedSchema().getAvroSchema();

System.out.println(actualSchema.toString(true));

// THEN
assertThat(actualSchema.getType()).isEqualTo(Schema.Type.LONG);
assertThat(actualSchema.getProp(LogicalType.LOGICAL_TYPE_PROP)).isEqualTo("timestamp-millis");
/**
* Having logicalType and java-class is not valid according to
* {@link org.apache.avro.LogicalType#validate(Schema)}
*/
assertThat(actualSchema.getProp(SpecificData.CLASS_PROP)).isNull();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.fasterxml.jackson.dataformat.avro.jsr310;

import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator;
import org.junit.Test;

import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;

import static org.assertj.core.api.Assertions.assertThat;

public class AvroJavaTimeModule_serialization_and_deserializationTest {

private static AvroMapper newAvroMapper() {
return AvroMapper.builder()
.addModules(new AvroJavaTimeModule())
.build();
}

@Test
public void testWithInstant() throws IOException {
// GIVEN
AvroMapper mapper = newAvroMapper();
AvroSchemaGenerator gen = new AvroSchemaGenerator();
mapper.acceptJsonFormatVisitor(Instant.class, gen);
AvroSchema schema = gen.getGeneratedSchema();

Instant expectedInstant = Instant.ofEpochMilli(0L);

// WHEN
byte[] serialized = mapper.writer(schema).writeValueAsBytes(expectedInstant);
Instant deserInstant = mapper.readerFor(Instant.class).with(schema).readValue(serialized);

// THEN
assertThat(deserInstant).isEqualTo(expectedInstant);
}

@Test
public void testWithOffsetDateTime() throws IOException {
// GIVEN
AvroMapper mapper = newAvroMapper();
AvroSchemaGenerator gen = new AvroSchemaGenerator();
mapper.acceptJsonFormatVisitor(OffsetDateTime.class, gen);
AvroSchema schema = gen.getGeneratedSchema();

OffsetDateTime expectedOffsetDateTime = OffsetDateTime.of(2021, 6, 6, 12, 00, 30, 00, ZoneOffset.ofHours(2));

// WHEN
byte[] serialized = mapper.writer(schema).writeValueAsBytes(expectedOffsetDateTime);
OffsetDateTime deserOffsetDateTime = mapper.readerFor(OffsetDateTime.class).with(schema).readValue(serialized);

// THEN
assertThat(deserOffsetDateTime.toInstant()).isEqualTo(expectedOffsetDateTime.toInstant());
}

@Test
public void testWithZonedDateTime() throws IOException {
// GIVEN
AvroMapper mapper = newAvroMapper();
AvroSchemaGenerator gen = new AvroSchemaGenerator();
mapper.acceptJsonFormatVisitor(ZonedDateTime.class, gen);
AvroSchema schema = gen.getGeneratedSchema();

ZonedDateTime expectedZonedDateTime = ZonedDateTime.of(2021, 6, 6, 12, 00, 30, 00, ZoneOffset.ofHours(2));

// WHEN
byte[] serialized = mapper.writer(schema).writeValueAsBytes(expectedZonedDateTime);
ZonedDateTime deserZonedDateTime = mapper.readerFor(ZonedDateTime.class).with(schema).readValue(serialized);

// THEN
assertThat(deserZonedDateTime.toInstant()).isEqualTo(expectedZonedDateTime.toInstant());
}

}

0 comments on commit e23d0b4

Please sign in to comment.