Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: LiangliangSui <[email protected]>
  • Loading branch information
LiangliangSui committed Apr 17, 2024
1 parent bb419dd commit 8b2f5c5
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 67 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ on:
push:
branches:
- main
- refactor_mem
- 'releases/**'
- 'deploy/**'
- 'test*'
Expand Down
113 changes: 47 additions & 66 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.fury.logging.Logger;
import org.apache.fury.logging.LoggerFactory;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryBufferManager;
import org.apache.fury.memory.MemoryUtils;
import org.apache.fury.resolver.ClassInfo;
import org.apache.fury.resolver.ClassInfoHolder;
Expand Down Expand Up @@ -95,7 +96,6 @@ public final class Fury implements BaseFury {
private static final byte isOutOfBandFlag = 1 << 3;
private static final boolean isLittleEndian = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final byte BITMAP = isLittleEndian ? isLittleEndianFlag : 0;
private static final int BUFFER_SIZE_LIMIT = 128 * 1024;

private final Config config;
private final boolean refTracking;
Expand All @@ -105,7 +105,6 @@ public final class Fury implements BaseFury {
private final SerializationContext serializationContext;
private final ClassLoader classLoader;
private final JITContext jitContext;
private MemoryBuffer buffer;
private final List<Object> nativeObjects;
private final StringSerializer stringSerializer;
private final Language language;
Expand All @@ -117,6 +116,7 @@ public final class Fury implements BaseFury {
private Iterator<MemoryBuffer> outOfBandBuffers;
private boolean peerOutOfBandEnabled;
private int depth;
private final MemoryBufferManager bufferManager;

public Fury(FuryBuilder builder, ClassLoader classLoader) {
// Avoid set classLoader in `FuryBuilder`, which won't be clear when
Expand All @@ -140,6 +140,7 @@ public Fury(FuryBuilder builder, ClassLoader classLoader) {
nativeObjects = new ArrayList<>();
generics = new Generics(this);
stringSerializer = new StringSerializer(this);
bufferManager = new MemoryBufferManager();
LOG.info("Created new fury {}", this);
}

Expand Down Expand Up @@ -196,22 +197,20 @@ public MemoryBuffer serialize(Object obj, long address, int size) {

@Override
public byte[] serialize(Object obj) {
MemoryBuffer buf = getBuffer();
buf.writerIndex(0);
serialize(buf, obj, null);
byte[] bytes = buf.getBytes(0, buf.writerIndex());
resetBuffer();
return bytes;
return bufferManager.execute(
buf -> {
serialize(buf, obj, null);
return buf.getBytes(0, buf.writerIndex());
});
}

@Override
public byte[] serialize(Object obj, BufferCallback callback) {
MemoryBuffer buf = getBuffer();
buf.writerIndex(0);
serialize(buf, obj, callback);
byte[] bytes = buf.getBytes(0, buf.writerIndex());
resetBuffer();
return bytes;
return bufferManager.execute(
buf -> {
serialize(buf, obj, callback);
return buf.getBytes(0, buf.writerIndex());
});
}

@Override
Expand Down Expand Up @@ -282,21 +281,6 @@ private StackOverflowError processStackOverflowError(StackOverflowError e) {
throw e;
}

private MemoryBuffer getBuffer() {
MemoryBuffer buf = buffer;
if (buf == null) {
buf = buffer = MemoryBuffer.newHeapBuffer(64);
}
return buf;
}

private void resetBuffer() {
MemoryBuffer buf = buffer;
if (buf != null && buf.size() > BUFFER_SIZE_LIMIT) {
buffer = MemoryBuffer.newHeapBuffer(BUFFER_SIZE_LIMIT);
}
}

private void write(MemoryBuffer buffer, Object obj) {
int startOffset = buffer.writerIndex();
boolean shareMetaContext = config.shareMetaContext();
Expand Down Expand Up @@ -1008,12 +992,11 @@ public Object xreadNonRef(MemoryBuffer buffer, Serializer<?> serializer) {

@Override
public byte[] serializeJavaObject(Object obj) {
MemoryBuffer buf = getBuffer();
buf.writerIndex(0);
serializeJavaObject(buf, obj);
byte[] bytes = buf.getBytes(0, buf.writerIndex());
resetBuffer();
return bytes;
return bufferManager.execute(
buf -> {
serializeJavaObject(buf, obj);
return buf.getBytes(0, buf.writerIndex());
});
}

@Override
Expand Down Expand Up @@ -1117,12 +1100,11 @@ public <T> T deserializeJavaObject(FuryReadableChannel channel, Class<T> cls) {
*/
@Override
public byte[] serializeJavaObjectAndClass(Object obj) {
MemoryBuffer buf = getBuffer();
buf.writerIndex(0);
serializeJavaObjectAndClass(buf, obj);
byte[] bytes = buf.getBytes(0, buf.writerIndex());
resetBuffer();
return bytes;
return bufferManager.execute(
buf -> {
serializeJavaObjectAndClass(buf, obj);
return buf.getBytes(0, buf.writerIndex());
});
}

/**
Expand Down Expand Up @@ -1211,31 +1193,30 @@ public Object deserializeJavaObjectAndClass(FuryReadableChannel channel) {
}

private void serializeToStream(OutputStream outputStream, Consumer<MemoryBuffer> function) {
MemoryBuffer buf = getBuffer();
if (outputStream.getClass() == ByteArrayOutputStream.class) {
byte[] oldBytes = buf.getHeapMemory(); // Note: This should not be null.
assert oldBytes != null;
MemoryUtils.wrap((ByteArrayOutputStream) outputStream, buf);
function.accept(buf);
MemoryUtils.wrap(buf, (ByteArrayOutputStream) outputStream);
buf.pointTo(oldBytes, 0, oldBytes.length);
} else {
buf.writerIndex(0);
function.accept(buf);
try {
byte[] bytes = buf.getHeapMemory();
if (bytes != null) {
outputStream.write(bytes, 0, buf.writerIndex());
} else {
outputStream.write(buf.getBytes(0, buf.writerIndex()));
}
outputStream.flush();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
resetBuffer();
}
}
bufferManager.execute(
buf -> {
if (outputStream.getClass() == ByteArrayOutputStream.class) {
byte[] oldBytes = buf.getHeapMemory(); // Note: This should not be null.
assert oldBytes != null;
MemoryUtils.wrap((ByteArrayOutputStream) outputStream, buf);
function.accept(buf);
MemoryUtils.wrap(buf, (ByteArrayOutputStream) outputStream);
buf.pointTo(oldBytes, 0, oldBytes.length);
} else {
function.accept(buf);
try {
if (!buf.isOffHeap()) {
outputStream.write(buf.getHeapMemory(), 0, buf.writerIndex());
} else {
outputStream.write(buf.getBytes(0, buf.writerIndex()));
}
outputStream.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return null;
});
}

public void reset() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.fury.memory;

import java.util.function.Function;

/** A class that manages MemoryBuffer for Fury. */
public class MemoryBufferManager {
private static final int BUFFER_SIZE_LIMIT = 128 * 1024;
private static final int INITIAL_SIZE = 64;
private MemoryBuffer buffer;

public <R> R execute(Function<MemoryBuffer, R> action) {
try {
MemoryBuffer buf = getBuffer();
return action.apply(buf);
} finally {
resetBuffer();
}
}

private MemoryBuffer getBuffer() {
MemoryBuffer buf = buffer;
if (buf == null) {
buf = buffer = MemoryBuffer.newHeapBuffer(INITIAL_SIZE);
}
return buf;
}

private void resetBuffer() {
MemoryBuffer buf = buffer;
if (buf != null && buf.size() > BUFFER_SIZE_LIMIT) {
buffer = MemoryBuffer.newHeapBuffer(BUFFER_SIZE_LIMIT);
}
if (buf != null) {
buf.writerIndex(0);
}
}
}
6 changes: 5 additions & 1 deletion java/fury-core/src/test/java/org/apache/fury/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.fury.io.FuryReadableChannel;
import org.apache.fury.io.FuryStreamReader;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryBufferManager;
import org.apache.fury.test.bean.BeanA;
import org.apache.fury.util.ReflectionUtils;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -140,7 +141,10 @@ public void testBufferReset() {
}

private void checkBuffer(Fury fury) {
Object buf = ReflectionUtils.getObjectFieldValue(fury, "buffer");
Object bufManager = ReflectionUtils.getObjectFieldValue(fury, "bufferManager");
MemoryBufferManager bufferManager = (MemoryBufferManager) bufManager;
assert bufferManager != null;
Object buf = ReflectionUtils.getObjectFieldValue(bufferManager, "buffer");
MemoryBuffer buffer = (MemoryBuffer) buf;
assert buffer != null;
assertTrue(buffer.size() < 1000 * 1000);
Expand Down

0 comments on commit 8b2f5c5

Please sign in to comment.