Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compression codec support for PageSerde #24670

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,14 @@ Limit for memory used for unspilling a single aggregation operator instance.

The corresponding session property is :ref:`admin/properties-session:\`\`aggregation_operator_unspill_memory_limit\`\``.

``experimental.spill-compression-enabled``
``experimental.spill-compression-codec``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``boolean``
* **Default value:** ``false``
* **Type:** ``string``
* **Allowed value:** ``SNAPPY``, ``NONE``, ``GZIP``, ``LZ4``, ``LZO``,, ``ZLIB`` ``ZSTD``
* **Default value:** ``NONE``

Enables data compression for pages spilled to disk.
The data compression codec to be used for pages spilled to disk.

``experimental.spill-encryption-enabled``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
8 changes: 4 additions & 4 deletions presto-docs/src/main/sphinx/admin/spill.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ there is no need to use RAID for spill.
Spill Compression
-----------------

When spill compression is enabled (``spill-compression-enabled`` property in
:ref:`tuning-spilling`), spilled pages will be compressed using the same
implementation as exchange compression when they are sufficiently compressible.
Enabling this feature can reduce the amount of disk IO at the cost
When spill compression codec is configured (``spill-compression-codec``
property in :ref:`tuning-spilling`), spilled pages will be compressed using
the configured codec implementation when they are sufficiently compressible.
This feature can reduce the amount of disk IO at the cost
of extra CPU load to compress and decompress spilled pages.

Spill Encryption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public void testSession()
try (Connection connection = createConnection("sessionProperties=query_max_run_time:2d;max_failed_task_percentage:0.6")) {
assertThat(listSession(connection))
.contains("join_distribution_type|AUTOMATIC|AUTOMATIC")
.contains("exchange_compression|false|false")
.contains("exchange_compression_codec|NONE|NONE")
.contains("query_max_run_time|2d|100.00d")
.contains("max_failed_task_percentage|0.6|0.3");

Expand All @@ -312,15 +312,15 @@ public void testSession()

assertThat(listSession(connection))
.contains("join_distribution_type|BROADCAST|AUTOMATIC")
.contains("exchange_compression|false|false");
.contains("exchange_compression_codec|NONE|NONE");

try (Statement statement = connection.createStatement()) {
statement.execute("SET SESSION exchange_compression = true");
statement.execute("SET SESSION exchange_compression_codec = 'LZ4'");
}

assertThat(listSession(connection))
.contains("join_distribution_type|BROADCAST|AUTOMATIC")
.contains("exchange_compression|true|false");
.contains("exchange_compression_codec|LZ4|NONE");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto;

public enum CompressionCodec {
GZIP, LZ4, LZO, SNAPPY, ZLIB, ZSTD, NONE
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public final class SystemSessionProperties
public static final String ITERATIVE_OPTIMIZER_TIMEOUT = "iterative_optimizer_timeout";
public static final String QUERY_ANALYZER_TIMEOUT = "query_analyzer_timeout";
public static final String RUNTIME_OPTIMIZER_ENABLED = "runtime_optimizer_enabled";
public static final String EXCHANGE_COMPRESSION = "exchange_compression";
public static final String EXCHANGE_COMPRESSION_CODEC = "exchange_compression_codec";
public static final String EXCHANGE_CHECKSUM = "exchange_checksum";
public static final String LEGACY_TIMESTAMP = "legacy_timestamp";
public static final String ENABLE_INTERMEDIATE_AGGREGATIONS = "enable_intermediate_aggregations";
Expand Down Expand Up @@ -839,11 +839,15 @@ public SystemSessionProperties(
"Experimental: enable runtime optimizer",
featuresConfig.isRuntimeOptimizerEnabled(),
false),
booleanProperty(
EXCHANGE_COMPRESSION,
"Enable compression in exchanges",
featuresConfig.isExchangeCompressionEnabled(),
false),
new PropertyMetadata<>(
EXCHANGE_COMPRESSION_CODEC,
"Exchange compression codec",
VARCHAR,
CompressionCodec.class,
featuresConfig.getExchangeCompressionCodec(),
false,
value -> CompressionCodec.valueOf(((String) value).toUpperCase()),
CompressionCodec::name),
booleanProperty(
EXCHANGE_CHECKSUM,
"Enable checksum in exchanges",
Expand Down Expand Up @@ -2285,9 +2289,9 @@ public static Duration getQueryAnalyzerTimeout(Session session)
return session.getSystemProperty(QUERY_ANALYZER_TIMEOUT, Duration.class);
}

public static boolean isExchangeCompressionEnabled(Session session)
public static CompressionCodec getExchangeCompressionCodec(Session session)
{
return session.getSystemProperty(EXCHANGE_COMPRESSION, Boolean.class);
return session.getSystemProperty(EXCHANGE_COMPRESSION_CODEC, CompressionCodec.class);
}

public static boolean isExchangeChecksumEnabled(Session session)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.buffer;

import com.facebook.presto.spi.page.PageCompressor;
import io.airlift.compress.Compressor;

import java.nio.ByteBuffer;

import static java.util.Objects.requireNonNull;

public class AirliftCompressorAdapter
implements PageCompressor
{
private final Compressor compressor;

public AirliftCompressorAdapter(Compressor compressor)
{
this.compressor = requireNonNull(compressor, "compressor is null");
}

@Override
public int maxCompressedLength(int uncompressedSize)
{
return compressor.maxCompressedLength(uncompressedSize);
}

@Override
public int compress(
byte[] input,
int inputOffset,
int inputLength,
byte[] output,
int outputOffset,
int maxOutputLength)
{
return compressor.compress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
}

@Override
public void compress(ByteBuffer input, ByteBuffer output)
{
compressor.compress(input, output);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.buffer;

import com.facebook.presto.spi.page.PageDecompressor;
import io.airlift.compress.Decompressor;

import java.nio.ByteBuffer;

import static java.util.Objects.requireNonNull;

public class AirliftDecompressorAdapter
implements PageDecompressor
{
private final Decompressor decompressor;

public AirliftDecompressorAdapter(Decompressor decompressor)
{
this.decompressor = requireNonNull(decompressor, "decompressor is null");
}

@Override
public int decompress(
byte[] input,
int inputOffset,
int inputLength,
byte[] output,
int outputOffset,
int maxOutputLength)
{
return decompressor.decompress(input, inputOffset, inputLength, output, outputOffset, maxOutputLength);
}

@Override
public void decompress(ByteBuffer input, ByteBuffer output)
{
decompressor.decompress(input, output);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.buffer;

import io.airlift.compress.Compressor;

import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.OptionalInt;
import java.util.zip.Deflater;

import static java.util.Objects.requireNonNull;
import static java.util.zip.Deflater.FULL_FLUSH;

public class DeflateCompressor
implements Compressor
{
private static final int EXTRA_COMPRESSION_SPACE = 16;
private static final int DEFAULT_COMPRESSION_LEVEL = 4;

private final int compressionLevel;

public DeflateCompressor(OptionalInt compressionLevel)
{
requireNonNull(compressionLevel, "compressionLevel is null");
this.compressionLevel = compressionLevel.orElse(DEFAULT_COMPRESSION_LEVEL);
}

@Override
public int maxCompressedLength(int uncompressedSize)
{
// From Mark Adler's post http://stackoverflow.com/questions/1207877/java-size-of-compression-output-bytearray
return uncompressedSize + ((uncompressedSize + 7) >> 3) + ((uncompressedSize + 63) >> 6) + 5 + EXTRA_COMPRESSION_SPACE;
}

@Override
public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
{
int maxCompressedLength = maxCompressedLength(inputLength);
if (maxOutputLength < maxCompressedLength) {
throw new IllegalArgumentException("Output buffer must be at least " + maxCompressedLength + " bytes");
}

Deflater deflater = new Deflater(compressionLevel, false);
try {
deflater.setInput(input, inputOffset, inputLength);
deflater.finish();

int compressedDataLength = deflater.deflate(output, outputOffset, maxOutputLength, FULL_FLUSH);
if (!deflater.finished()) {
throw new IllegalArgumentException("maxCompressedLength formula is incorrect, because deflate produced more data");
}
return compressedDataLength;
}
finally {
deflater.end();
}
}

@Override
public void compress(ByteBuffer input, ByteBuffer output)
{
if (input.isDirect() || output.isDirect() || !input.hasArray() || !output.hasArray()) {
throw new IllegalArgumentException("Non-direct byte buffer backed by byte array required");
}
int inputOffset = input.arrayOffset() + input.position();
int outputOffset = output.arrayOffset() + output.position();

int written = compress(input.array(), inputOffset, input.remaining(), output.array(), outputOffset, output.remaining());
((Buffer) output).position(output.position() + written);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.buffer;

import io.airlift.compress.Compressor;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.zip.GZIPOutputStream;

public class GzipCompressor
implements Compressor
{
private static final int EXTRA_COMPRESSION_SPACE = 16;

@Override
public int maxCompressedLength(int uncompressedSize)
{
// From Mark Adler's post http://stackoverflow.com/questions/1207877/java-size-of-compression-output-bytearray
return uncompressedSize + ((uncompressedSize + 7) >> 3) + ((uncompressedSize + 63) >> 6) + 5 + EXTRA_COMPRESSION_SPACE;
}

@Override
public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
{
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
gzipOutputStream.write(input, inputOffset, inputLength);
gzipOutputStream.finish();
byte[] compressed = byteArrayOutputStream.toByteArray();
if (compressed.length > maxOutputLength) {
throw new IllegalArgumentException("maxCompressedLength formula is incorrect, because gzip produced more data");
}
System.arraycopy(compressed, 0, output, outputOffset, compressed.length);
return compressed.length;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void compress(ByteBuffer input, ByteBuffer output)
{
if (input.isDirect() || output.isDirect() || !input.hasArray() || !output.hasArray()) {
throw new IllegalArgumentException("Non-direct byte buffer backed by byte array required");
}
int inputOffset = input.arrayOffset() + input.position();
int outputOffset = output.arrayOffset() + output.position();

int written = compress(input.array(), inputOffset, input.remaining(), output.array(), outputOffset, output.remaining());
((Buffer) output).position(output.position() + written);
}
}
Loading
Loading