Skip to content

[KYUUBI #6691] A new Spark SQL command to merge small files #6695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
532369d
1. involve a compact table command to merge small files
gabry-lab Sep 11, 2024
d2cbcc2
parser tests pass
gabry-lab Sep 11, 2024
1a92be0
reformat all codes
gabry-lab Sep 11, 2024
906b47e
SparkPlan resolved successfully
gabry-lab Sep 11, 2024
4e9d8ca
reformat
gabry-lab Sep 11, 2024
350326e
adding unit test to recover command
gabry-lab Sep 12, 2024
0d30887
compact table execution tests pass
gabry-lab Sep 12, 2024
a52d501
remove unnecessary comments
gabry-lab Sep 12, 2024
7b10692
recover compact table command tests pass
gabry-lab Sep 13, 2024
0ef55ff
more unit tests
gabry-lab Sep 13, 2024
f04170b
fix scala style issue
gabry-lab Sep 13, 2024
02f6303
reduce message count
gabry-lab Sep 13, 2024
cc0ecce
involve createToScalaConverter
gabry-lab Sep 13, 2024
32276b5
remove unused import
gabry-lab Sep 13, 2024
c48b167
remove unused import
gabry-lab Sep 13, 2024
b4fd2ad
remove unnecessary comment & reformat
gabry-lab Sep 14, 2024
79e4e94
involve SPECULATION_ENABLED_SYNONYM
gabry-lab Sep 14, 2024
257dfd6
involve createRandomTable
gabry-lab Sep 15, 2024
66faca4
try to catch unknown Row
gabry-lab Sep 15, 2024
fd5a3c4
use Seq instead of WrappedArray
gabry-lab Sep 15, 2024
3775c95
compile on scala-2.13 successfully
gabry-lab Sep 15, 2024
dccc23a
remove unused import
gabry-lab Sep 15, 2024
62c1044
ByteUnit.MiB.toBytes to fix default target size
gabry-lab Sep 15, 2024
42d1fc0
rename compact-table.md to docs
gabry-lab Sep 16, 2024
232407a
remove unused comments
gabry-lab Sep 16, 2024
5a34b97
support orc
gabry-lab Sep 16, 2024
c042dfa
involve toJavaList to compile on scala 2.13
gabry-lab Sep 16, 2024
d35f7d4
add bzip2 unit tests
gabry-lab Sep 16, 2024
8b637de
spotless:apply
gabry-lab Sep 16, 2024
7fbc805
fix getCodecFromFilePath for orc
gabry-lab Sep 16, 2024
59f0b99
reformat
gabry-lab Sep 16, 2024
ab9b674
support more codec
gabry-lab Sep 18, 2024
22f0b79
remove unused util class, close opened stream in finally block
gabry-lab Sep 18, 2024
8cc2390
rollback regardless of the success or failure of the command
gabry-lab Sep 18, 2024
fd39f66
reformat
gabry-lab Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions docs/extensions/engines/spark/compact-table.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Compact Table Command Support

It's a new spark sql command to compact small files in a table into larger files, such as 128MB.
After compacting is done, it create a temporary view to query the compacted file details.

Instead of read-write the whole data in a table, it only merges data in the binary and file level,
and it's more efficient.

## syntax

### compact table

```sparksql
compact table table_name [INTO ${targetFileSize} ${targetFileSizeUnit} ] [ cleanup | retain | list ]
-- targetFileSizeUnit can be 'b','k','m','g','t','p'
-- cleanup means cleaning compact staging folders, which contains original small files, default behavior
-- retain means retaining compact staging folders, for testing, and we can recover with the staging data
-- list means this command only get the merging result, and don't run actually
```

### recover table

```sparksql
corecover mpact table table_name
-- recover the compacted table, and restore the small files from staging to the original location
```

## example

The following command will compact the small files in the table `default.small_files_table` into 128MB files, and create
a temporary view `v_merged_files` to query the compacted file details.

```sparksql
set spark.sql.shuffle.partitions=32;

compact table default.small_files_table;

select * from v_merged_files;
```

7 changes: 7 additions & 0 deletions extensions/spark/kyuubi-extension-spark-3-5/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ singleStatement

statement
: OPTIMIZE multipartIdentifier whereClause? zorderClause #optimizeZorder
| COMPACT TABLE multipartIdentifier
(INTO targetFileSize=INTEGER_VALUE FILE_SIZE_UNIT_LITERAL)?
(action=compactAction)? #compactTable
| RECOVER COMPACT TABLE multipartIdentifier #recoverCompactTable
| .*? #passThrough
;

Expand All @@ -62,6 +66,9 @@ zorderClause
: ZORDER BY order+=multipartIdentifier (',' order+=multipartIdentifier)*
;

compactAction
: CLEANUP | RETAIN | LIST
;
// We don't have an expression rule in our grammar here, so we just grab the tokens and defer
// parsing them to later.
predicateToken
Expand Down Expand Up @@ -101,6 +108,12 @@ nonReserved
| ZORDER
;

COMPACT: 'COMPACT';
INTO: 'INTO';
RECOVER: 'RECOVER';
CLEANUP: 'CLEANUP';
RETAIN:'RETAIN';
LIST:'LIST';
AND: 'AND';
BY: 'BY';
FALSE: 'FALSE';
Expand All @@ -115,7 +128,9 @@ WHERE: 'WHERE';
ZORDER: 'ZORDER';

MINUS: '-';

FILE_SIZE_UNIT_LITERAL:
'M' | 'MB'
;
BIGINT_LITERAL
: DIGIT+ 'L'
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Sort}

import org.apache.kyuubi.sql.KyuubiSparkSQLParser._
import org.apache.kyuubi.sql.compact.{CompactTableOptions, CompactTableStatement, RecoverCompactTableStatement}
import org.apache.kyuubi.sql.zorder.{OptimizeZorderStatement, Zorder}

class KyuubiSparkSQLAstBuilder extends KyuubiSparkSQLBaseVisitor[AnyRef] with SQLConfHelper {
Expand Down Expand Up @@ -127,6 +128,20 @@ class KyuubiSparkSQLAstBuilder extends KyuubiSparkSQLBaseVisitor[AnyRef] with SQ
UnparsedPredicateOptimize(tableIdent, predicate, orderExpr)
}

override def visitCompactTable(ctx: CompactTableContext): CompactTableStatement =
withOrigin(ctx) {
val tableParts = visitMultipartIdentifier(ctx.multipartIdentifier())
val targetFileSize = Option(ctx.targetFileSize).map(_.getText.toLong)
val action = Option(ctx.action).map(_.getText)
CompactTableStatement(tableParts, targetFileSize, CompactTableOptions(action))
}

override def visitRecoverCompactTable(ctx: RecoverCompactTableContext)
: RecoverCompactTableStatement = withOrigin(ctx) {
val tableParts = visitMultipartIdentifier(ctx.multipartIdentifier())
RecoverCompactTableStatement(tableParts)
}

override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null

override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): Seq[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kyuubi.sql

import org.apache.spark.sql.SparkSessionExtensions

import org.apache.kyuubi.sql.compact.CompactTableResolver
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}

class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
Expand All @@ -32,6 +33,7 @@ object KyuubiSparkSQLCommonExtension {
// inject zorder parser and related rules
extensions.injectParser { case (_, parser) => new SparkKyuubiSparkSQLParser(parser) }
extensions.injectResolutionRule(ResolveZorder)
extensions.injectResolutionRule(CompactTableResolver)

// Note that:
// InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package org.apache.kyuubi.sql

import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}

import org.apache.kyuubi.sql.compact.CompactTableSparkStrategy
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, KyuubiUnsupportedOperationsCheck, MaxScanStrategy}

// scalastyle:off line.size.limit

/**
* Depend on Spark SQL Extension framework, we can use this extension follow steps
* 1. move this jar into $SPARK_HOME/jars
Expand All @@ -40,6 +42,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
extensions.injectPlannerStrategy(CompactTableSparkStrategy)

extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql

import java.lang.reflect.Method

import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.{FileMetaData, GlobalMetaData}

object ParquetFileWriterWrapper {

val mergeInfoField: Method = classOf[ParquetFileWriter]
.getDeclaredMethod(
"mergeInto",
classOf[FileMetaData],
classOf[GlobalMetaData],
classOf[Boolean])

mergeInfoField.setAccessible(true)

def mergeInto(
toMerge: FileMetaData,
mergedMetadata: GlobalMetaData,
strict: Boolean): GlobalMetaData = {
mergeInfoField.invoke(
null,
toMerge.asInstanceOf[AnyRef],
mergedMetadata.asInstanceOf[AnyRef],
strict.asInstanceOf[AnyRef]).asInstanceOf[GlobalMetaData]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql.compact

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.{Row, SparkInternalExplorer, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{DropTableCommand, LeafRunnableCommand}

case class CachePerformanceViewCommand(
tableIdentifier: Seq[String],
performancePlan: LogicalPlan,
originalFileLocations: Seq[String],
options: CompactTableOption) extends LeafRunnableCommand {

override def innerChildren: Seq[QueryPlan[_]] = Seq(performancePlan)

override def run(sparkSession: SparkSession): Seq[Row] = {
val dropViewCommand = DropTableCommand(
CompactTableUtils.getTableIdentifier(tableIdentifier),
ifExists = true,
isView = true,
purge = true)
dropViewCommand.run(sparkSession)

val speculation =
sparkSession.sparkContext.getConf.getBoolean(
SparkInternalExplorer.SPECULATION_ENABLED_SYNONYM.key,
defaultValue = false)
if (speculation) {
sparkSession.sparkContext.getConf.set(
SparkInternalExplorer.SPECULATION_ENABLED_SYNONYM.key,
"false")
log.warn("set spark.speculation to false")
}
try {
val cacheTableCommand =
SparkInternalExplorer.CacheTableAsSelectExec(tableIdentifier.head, performancePlan)

// this result always empty
cacheTableCommand.run()

if (options == CompactTableOptions.CleanupStagingFolder) {
val fileSystem = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
originalFileLocations.foreach { originalFileLocation =>
val compactStagingDir = CompactTableUtils.getCompactStagingDir(originalFileLocation)
fileSystem.delete(compactStagingDir, true)
}

}
} finally {
if (speculation) {
sparkSession.sparkContext.getConf.set(
SparkInternalExplorer.SPECULATION_ENABLED_SYNONYM.key,
"true")
log.warn("rollback spark.speculation to true")
}
}
Seq.empty[Row]
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql.compact

import org.apache.spark.sql.catalyst.analysis.UnresolvedUnaryNode
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LeafParsedStatement, LogicalPlan}
import org.apache.spark.sql.types._

object CompactTable {
private val fileLocAndSizeStructArrayType: ArrayType =
DataTypes.createArrayType(DataTypes.createStructType(Array(
DataTypes.createStructField("sub_group_id", IntegerType, false),
DataTypes.createStructField("name", StringType, false),
DataTypes.createStructField("length", LongType, false))))

val smallFileCollectOutput: StructType = DataTypes.createStructType(Array(
DataTypes.createStructField("group_id", IntegerType, false),
DataTypes.createStructField("location", StringType, false),
DataTypes.createStructField("data_source", StringType, false),
DataTypes.createStructField("codec", StringType, true),
DataTypes.createStructField("smallFiles", fileLocAndSizeStructArrayType, false)))

val smallFileCollectOutputAttribute: Seq[AttributeReference] = smallFileCollectOutput
.map(field => AttributeReference(field.name, field.dataType, field.nullable)())

val mergedFilesCachedTableName = "v_merged_files"
val mergeMetadataKey = "spark.sql.compact.parquet.metadata.merge"
}

trait CompactTableOption

object CompactTableOptions {
def apply(options: Option[String]): CompactTableOption = options.map(_.toLowerCase) match {
case Some("retain") => RetainStagingFolder
case Some("list") => DryRun
case _ => CleanupStagingFolder
}

case object CleanupStagingFolder extends CompactTableOption

case object RetainStagingFolder extends CompactTableOption

case object DryRun extends CompactTableOption
}

case class CompactTable(
child: LogicalPlan,
targetSizeInBytes: Option[Long],
options: CompactTableOption) extends UnresolvedUnaryNode {
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
CompactTable(newChild, targetSizeInBytes, options)
}
}

case class CompactTableStatement(
tableParts: Seq[String],
targetSizeInMB: Option[Long],
options: CompactTableOption) extends LeafParsedStatement

case class RecoverCompactTableStatement(tableParts: Seq[String])
extends LeafParsedStatement

case class RecoverCompactTable(child: LogicalPlan) extends UnresolvedUnaryNode {
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
RecoverCompactTable(newChild)
}
}
Loading
Loading