Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java): Chunk by chunk predictive map serialization protocol (WIP) #1722

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

Hen1ng
Copy link

@Hen1ng Hen1ng commented Jul 8, 2024

What does this PR do?

Implement chunk based map serialization in #925. This pr doesn't provide JIT support, it will be implemented in later PR.

Related issues

Does this PR introduce any user-facing change?

  • Does this PR introduce any public API change?
  • Does this PR introduce any binary protocol compatibility change?

Benchmark

@Hen1ng Hen1ng requested a review from chaokunyang as a code owner July 8, 2024 12:12
}
}
}
protected MethodHandle constructor;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you format code by mvn spotless:apply? Those changes can then be removed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!
Spotless version 2.41.1 does not support JDK 8, at least JDK version 11 is required

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, spotless needs jdk11 now

@chaokunyang chaokunyang changed the title feat(java):Chunk by chunk predictive map serialization protocol (WIP) feat(java): Chunk by chunk predictive map serialization protocol (WIP) Jul 31, 2024
@chaokunyang
Copy link
Collaborator

Could we add license header for MapChunkWriter?

image

@chaokunyang
Copy link
Collaborator

Hi @Hen1ng , I noticed we created a MapChunkWriter for serialization/deserialization everytime when we serialize a map. This will introduce extra object allocation cose.

And I found that we use fields in MapChunkWriter for maintain state across different elements serialization. This is good for code readability, but not efficient for execution, we will read and update fields frequently, this will introduce extra indirection and read/write cost. It would be faster if we keep state loca variables. In such ways, all state updates happens on stack, which will be much faster. And we used many small methods, we may need to inline it in the caller to reduce method cost of the method body is small.

@chaokunyang
Copy link
Collaborator

Another thing I found is that we seems not support predict same type in this PR. For example:

  public static void main(String[] args) {
    Map<String, Integer> map =new HashMap<>(20);
    for (int i = 0; i < 20; i++) {
      map.put("Key"+i, i);
    }
    Fury fury = Fury.builder().withChunkSerializeMapEnable(true).build();
    byte[] result = null;
    for (int i = 0; i < 1000000000; i++) {
      result = fury.serialize(map);
    }
    fury.deserialize(result);
  }

With this PR, we still needs to write key type and value type for every element, could we optimize this in current PR?

@Hen1ng
Copy link
Author

Hen1ng commented Sep 10, 2024

Benchmark (size) (tracking) Mode Cnt Score Error Units
HnBenchmark.testGeneralChunkWrite 64 true avgt 15 986.457 ± 64.592 ns/op
HnBenchmark.testGeneralChunkWrite 64 false avgt 15 924.420 ± 201.541 ns/op
HnBenchmark.testGeneralChunkWrite 128 true avgt 15 2029.786 ± 95.281 ns/op
HnBenchmark.testGeneralChunkWrite 128 false avgt 15 1992.186 ± 173.750 ns/op
HnBenchmark.testGeneralChunkWrite 256 true avgt 15 4305.362 ± 326.161 ns/op
HnBenchmark.testGeneralChunkWrite 256 false avgt 15 3808.830 ± 120.738 ns/op
HnBenchmark.testGeneralChunkWrite 512 true avgt 15 10309.996 ± 264.954 ns/op
HnBenchmark.testGeneralChunkWrite 512 false avgt 15 9160.058 ± 1075.136 ns/op
HnBenchmark.testGeneralWrite 64 true avgt 15 1384.715 ± 388.240 ns/op
HnBenchmark.testGeneralWrite 64 false avgt 15 1522.562 ± 617.019 ns/op
HnBenchmark.testGeneralWrite 128 true avgt 15 2718.254 ± 666.551 ns/op
HnBenchmark.testGeneralWrite 128 false avgt 15 2916.637 ± 838.792 ns/op
HnBenchmark.testGeneralWrite 256 true avgt 15 6086.414 ± 349.734 ns/op
HnBenchmark.testGeneralWrite 256 false avgt 15 5871.677 ± 672.658 ns/op
HnBenchmark.testGeneralWrite 512 true avgt 15 11769.064 ± 995.204 ns/op
HnBenchmark.testGeneralWrite 512 false avgt 15 12268.517 ± 1140.401 ns/op

generalWrite benchmark compare, benchmark code

@State(Scope.Benchmark)
@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 3, time = 5)
@Threads(1)
@Fork(5)
public class HnBenchmark {

  private Fury furyMapChunk;
  private Fury fury;
  Map<Integer, Integer> map;
  MapBean mapBean = new MapBean();
  Map<BeanB, BeanB> beanBBeanBMap = new HashMap<>();
  Bean bean;

  @Param({"64", "128", "256", "512"})
  int size;

    @Param({"true", "false"})
  boolean tracking = false;


  @Setup(Level.Trial)
  public void init() {
    furyMapChunk =
        Fury.builder()
            .withLanguage(Language.JAVA)
            .withRefTracking(tracking)
            .withCodegen(false)
            .withChunkSerializeMapEnable(true)
            .requireClassRegistration(false)
            .build();
    fury =
        Fury.builder()
            .withLanguage(Language.JAVA)
            .withRefTracking(tracking)
            .withCodegen(false)
            .withChunkSerializeMapEnable(false)
            .requireClassRegistration(false)
            .build();
        map = new HashMap<>(size);
    for (int i = 0; i < size; i++) {
      map.put(i, i);
      beanBBeanBMap.put(new BeanB(), new BeanB());
    }
    bean = new Bean();
    bean.setMap(map);
    mapBean.setMap(beanBBeanBMap);
  }

  @Benchmark
  public void testGeneralChunkWrite() {
    final byte[] serialize = furyMapChunk.serialize(map);
  }

  @Benchmark
  public void testGeneralWrite() {
    final byte[] serialize = fury.serialize(map);
  }
}

@Hen1ng
Copy link
Author

Hen1ng commented Sep 10, 2024

Benchmark (size) (tracking) Mode Cnt Score Error Units
HnBenchmark.testFinalChunkWrite 64 true avgt 15 1513.621 ± 266.821 ns/op
HnBenchmark.testFinalChunkWrite 64 false avgt 15 1548.258 ± 374.848 ns/op
HnBenchmark.testFinalChunkWrite 128 true avgt 15 2587.836 ± 263.128 ns/op
HnBenchmark.testFinalChunkWrite 128 false avgt 15 2411.845 ± 51.029 ns/op
HnBenchmark.testFinalChunkWrite 256 true avgt 15 5246.138 ± 444.736 ns/op
HnBenchmark.testFinalChunkWrite 256 false avgt 15 5413.328 ± 1192.068 ns/op
HnBenchmark.testFinalChunkWrite 512 true avgt 15 12151.494 ± 954.000 ns/op
HnBenchmark.testFinalChunkWrite 512 false avgt 15 11942.548 ± 775.937 ns/op
HnBenchmark.testFinalWrite 64 true avgt 15 1814.819 ± 491.802 ns/op
HnBenchmark.testFinalWrite 64 false avgt 15 1878.988 ± 795.618 ns/op
HnBenchmark.testFinalWrite 128 true avgt 15 3626.284 ± 1315.089 ns/op
HnBenchmark.testFinalWrite 128 false avgt 15 3224.561 ± 402.316 ns/op
HnBenchmark.testFinalWrite 256 true avgt 15 7173.662 ± 671.952 ns/op
HnBenchmark.testFinalWrite 256 false avgt 15 6983.448 ± 663.652 ns/op
HnBenchmark.testFinalWrite 512 true avgt 15 13694.154 ± 940.861 ns/op
HnBenchmark.testFinalWrite 512 false avgt 15 13999.483 ± 1102.000 ns/op

finalWrite benchmark compare, benchmark code is below

@State(Scope.Benchmark)
@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 3, time = 5)
@Threads(1)
@Fork(5)
public class HnBenchmark {

  private Fury furyMapChunk;
  private Fury fury;
  Map<Integer, Integer> map;
  MapBean mapBean = new MapBean();
  Map<BeanB, BeanB> beanBBeanBMap = new HashMap<>();
  Bean bean;

  @Param({"64", "128", "256", "512"})
  int size;

    @Param({"true", "false"})
  boolean tracking = false;


  @Setup(Level.Trial)
  public void init() {
    furyMapChunk =
        Fury.builder()
            .withLanguage(Language.JAVA)
            .withRefTracking(tracking)
            .withCodegen(false)
            .withChunkSerializeMapEnable(true)
            .requireClassRegistration(false)
            .build();
    fury =
        Fury.builder()
            .withLanguage(Language.JAVA)
            .withRefTracking(tracking)
            .withCodegen(false)
            .withChunkSerializeMapEnable(false)
            .requireClassRegistration(false)
            .build();
    
    map = new HashMap<>(size);
    for (int i = 0; i < size; i++) {
      map.put(i, i);
    }
    bean = new Bean();
    bean.setMap(map);
    
  }



    @Benchmark
    public void testFinalChunkWrite() {
        final byte[] serialize = furyMapChunk.serialize(bean);
    }

  @Benchmark
  public void testFinalWrite() {
    final byte[] serialize = fury.serialize(bean);
  }
}

@Hen1ng
Copy link
Author

Hen1ng commented Sep 11, 2024

HnBenchmark.testNoFinalGenericChunkWrite 64 true avgt 15 5173.244 ± 551.374 ns/op
HnBenchmark.testNoFinalGenericChunkWrite 64 false avgt 15 2549.857 ± 213.828 ns/op
HnBenchmark.testNoFinalGenericChunkWrite 128 true avgt 15 6468.278 ± 309.073 ns/op
HnBenchmark.testNoFinalGenericChunkWrite 128 false avgt 15 4765.756 ± 192.533 ns/op
HnBenchmark.testNoFinalGenericChunkWrite 256 true avgt 15 12581.604 ± 460.256 ns/op
HnBenchmark.testNoFinalGenericChunkWrite 256 false avgt 15 10055.729 ± 586.683 ns/op
HnBenchmark.testNoFinalGenericWrite 64 true avgt 15 5140.018 ± 158.204 ns/op
HnBenchmark.testNoFinalGenericWrite 64 false avgt 15 3775.049 ± 190.314 ns/op
HnBenchmark.testNoFinalGenericWrite 128 true avgt 15 9481.691 ± 310.432 ns/op
HnBenchmark.testNoFinalGenericWrite 128 false avgt 15 7771.257 ± 442.158 ns/op
HnBenchmark.testNoFinalGenericWrite 256 true avgt 15 17014.393 ± 995.254 ns/op
HnBenchmark.testNoFinalGenericWrite 256 false avgt 15 15370.666 ± 809.569 ns/op

testNoFinalGenericWrite benchmark compare, benchmark code is below

@State(Scope.Benchmark)
@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 3, time = 5)
@Threads(1)
@Fork(5)
public class HnBenchmark {

  private Fury furyMapChunk;
  private Fury fury;
  Map<Integer, Integer> map;
  MapBean mapBean = new MapBean();
  Map<BeanB, BeanB> beanBBeanBMap = new HashMap<>();
  Bean bean;

  @Param({"64", "128", "256"})
  int size;

    @Param({"true", "false"})
  boolean tracking = false;

  @Setup(Level.Trial)
  public void init() {
    furyMapChunk =
        Fury.builder()
            .withLanguage(Language.JAVA)
            .withRefTracking(tracking)
            .withCodegen(false)
            .withChunkSerializeMapEnable(true)
            .requireClassRegistration(false)
            .build();
    fury =
        Fury.builder()
            .withLanguage(Language.JAVA)
            .withRefTracking(tracking)
            .withCodegen(false)
            .withChunkSerializeMapEnable(false)
            .requireClassRegistration(false)
            .build();
    map = new HashMap<>(size);
    for (int i = 0; i < size; i++) {
      map.put(i, i);
      beanBBeanBMap.put(new BeanB(), new BeanB());
    }
    bean = new Bean();
    bean.setMap(map);
    mapBean.setMap(beanBBeanBMap);
  }


  @Benchmark
    public void testNoFinalGenericChunkWrite() {
        final byte[] serialize = furyMapChunk.serialize(mapBean);
    }

  @Benchmark
  public void testNoFinalGenericWrite() {
    final byte[] serialize = fury.serialize(mapBean);
  }
}

@Hen1ng
Copy link
Author

Hen1ng commented Sep 11, 2024

Benchmark (size) (tracking) Mode Cnt Score Error Units
HnBenchmark.testGeneralChunkWriteWithNull 64 true avgt 15 1228.736 ± 153.330 ns/op
HnBenchmark.testGeneralChunkWriteWithNull 64 false avgt 15 1150.012 ± 83.370 ns/op
HnBenchmark.testGeneralChunkWriteWithNull 128 true avgt 15 2393.119 ± 168.874 ns/op
HnBenchmark.testGeneralChunkWriteWithNull 128 false avgt 15 2607.314 ± 463.271 ns/op
HnBenchmark.testGeneralChunkWriteWithNull 256 true avgt 15 4554.671 ± 91.605 ns/op
HnBenchmark.testGeneralChunkWriteWithNull 256 false avgt 15 4977.650 ± 1019.474 ns/op
HnBenchmark.testGeneralWriteWithNull 64 true avgt 15 1935.727 ± 1269.534 ns/op
HnBenchmark.testGeneralWriteWithNull 64 false avgt 15 1704.501 ± 604.193 ns/op
HnBenchmark.testGeneralWriteWithNull 128 true avgt 15 2741.738 ± 664.690 ns/op
HnBenchmark.testGeneralWriteWithNull 128 false avgt 15 2431.270 ± 65.081 ns/op
HnBenchmark.testGeneralWriteWithNull 256 true avgt 15 5496.098 ± 117.704 ns/op
HnBenchmark.testGeneralWriteWithNull 256 false avgt 15 5845.516 ± 90.525 ns/op

@State(Scope.Benchmark)
@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 3, time = 5)
@Threads(1)
@Fork(5)
public class HnBenchmark {

  private Fury furyMapChunk;
  private Fury fury;
  Map<Integer, Integer> map;
  MapBean mapBean = new MapBean();
  Map<BeanB, BeanB> beanBBeanBMap = new HashMap<>();
  Bean bean;

  @Param({"64", "128", "256"})
  int size;

    @Param({"true", "false"})
  boolean tracking = false;


  @Setup(Level.Trial)
  public void init() {
    furyMapChunk =
        Fury.builder()
            .withLanguage(Language.JAVA)
            .withRefTracking(tracking)
            .withCodegen(false)
            .withChunkSerializeMapEnable(true)
            .requireClassRegistration(false)
            .build();
    fury =
        Fury.builder()
            .withLanguage(Language.JAVA)
            .withRefTracking(tracking)
            .withCodegen(false)
            .withChunkSerializeMapEnable(false)
            .requireClassRegistration(false)
            .build();
    map = new HashMap<>(size);
    for (int i = 0; i < size; i++) {
      if ( i == 0) {
        map.put(null, i);
        beanBBeanBMap.put(null, new BeanB());
        continue;
      }
      if (i == 10) {
        map.put(i, null);
        beanBBeanBMap.put(new BeanB(), null);
        continue;
      }
      map.put(i, i);
      beanBBeanBMap.put(new BeanB(), new BeanB());
    }
    bean = new Bean();
    bean.setMap(map);
    mapBean.setMap(beanBBeanBMap);
  }

    @Benchmark
  public void testGeneralChunkWriteWithNull() {
    final byte[] serialize = furyMapChunk.serialize(map);
  }

  @Benchmark
  public void testGeneralWriteWithNull() {
    final byte[] serialize = fury.serialize(map);
  }
}

@chaokunyang
Copy link
Collaborator

The performance are basically same, it's a little unexpected. Could you share some profiling data here?

@Hen1ng
Copy link
Author

Hen1ng commented Sep 11, 2024

The performance are basically same, it's a little unexpected. Could you share some profiling data here?

@Hen1ng Hen1ng closed this Sep 11, 2024
@Hen1ng Hen1ng reopened this Sep 11, 2024
@Hen1ng
Copy link
Author

Hen1ng commented Sep 11, 2024

generalJavaChunkWrite profiling data
image

@Hen1ng
Copy link
Author

Hen1ng commented Sep 11, 2024

image
current method generalJavaWrite profiling data

}
}

private void javaChunkWriteWithKeySerializers(
Copy link
Collaborator

@chaokunyang chaokunyang Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is too big, jvm jit can't inline such big method, could we split it into multiple smaller methods? especially for less frequent code path?

@@ -1380,6 +1389,10 @@ public Class<? extends Serializer> getDefaultJDKStreamSerializerType() {
return config.getDefaultJDKStreamSerializerType();
}

public boolean isChunkSerializeMapEnabled() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In future this will be the only way to serialize map, we'd better not make this as an option. We can add an option in AbstractMapSerializer directly, and remove that option when we support chunk based serialization in JIT mode

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants