diff --git a/docs/extensions/engines/spark/compact-table.md b/docs/extensions/engines/spark/compact-table.md
new file mode 100644
index 00000000000..afab5f59651
--- /dev/null
+++ b/docs/extensions/engines/spark/compact-table.md
@@ -0,0 +1,57 @@
+
+
+# Compact Table Command Support
+
+It's a new spark sql command to compact small files in a table into larger files, such as 128MB.
+After compacting is done, it create a temporary view to query the compacted file details.
+
+Instead of read-write the whole data in a table, it only merges data in the binary and file level,
+and it's more efficient.
+
+## syntax
+
+### compact table
+
+```sparksql
+compact table table_name [INTO ${targetFileSize} ${targetFileSizeUnit} ] [ cleanup | retain | list ]
+-- targetFileSizeUnit can be 'b','k','m','g','t','p'
+-- cleanup means cleaning compact staging folders, which contains original small files, default behavior
+-- retain means retaining compact staging folders, for testing, and we can recover with the staging data
+-- list means this command only get the merging result, and don't run actually
+```
+
+### recover table
+
+```sparksql
+corecover mpact table table_name
+-- recover the compacted table, and restore the small files from staging to the original location
+```
+
+## example
+
+The following command will compact the small files in the table `default.small_files_table` into 128MB files, and create
+a temporary view `v_merged_files` to query the compacted file details.
+
+```sparksql
+set spark.sql.shuffle.partitions=32;
+
+compact table default.small_files_table;
+
+select * from v_merged_files;
+```
+
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/pom.xml b/extensions/spark/kyuubi-extension-spark-3-5/pom.xml
index 113a815d5fc..d1d48058a01 100644
--- a/extensions/spark/kyuubi-extension-spark-3-5/pom.xml
+++ b/extensions/spark/kyuubi-extension-spark-3-5/pom.xml
@@ -99,6 +99,13 @@
test
+
+ org.apache.spark
+ spark-avro_${scala.binary.version}
+ ${spark.version}
+ test
+
+
org.apache.hadoop
hadoop-client-runtime
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/antlr4/org/apache/kyuubi/sql/KyuubiSparkSQL.g4 b/extensions/spark/kyuubi-extension-spark-3-5/src/main/antlr4/org/apache/kyuubi/sql/KyuubiSparkSQL.g4
index e52b7f5cfeb..54627771e94 100644
--- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/antlr4/org/apache/kyuubi/sql/KyuubiSparkSQL.g4
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/antlr4/org/apache/kyuubi/sql/KyuubiSparkSQL.g4
@@ -51,6 +51,10 @@ singleStatement
statement
: OPTIMIZE multipartIdentifier whereClause? zorderClause #optimizeZorder
+ | COMPACT TABLE multipartIdentifier
+ (INTO targetFileSize=INTEGER_VALUE FILE_SIZE_UNIT_LITERAL)?
+ (action=compactAction)? #compactTable
+ | RECOVER COMPACT TABLE multipartIdentifier #recoverCompactTable
| .*? #passThrough
;
@@ -62,6 +66,9 @@ zorderClause
: ZORDER BY order+=multipartIdentifier (',' order+=multipartIdentifier)*
;
+compactAction
+ : CLEANUP | RETAIN | LIST
+ ;
// We don't have an expression rule in our grammar here, so we just grab the tokens and defer
// parsing them to later.
predicateToken
@@ -101,6 +108,12 @@ nonReserved
| ZORDER
;
+COMPACT: 'COMPACT';
+INTO: 'INTO';
+RECOVER: 'RECOVER';
+CLEANUP: 'CLEANUP';
+RETAIN:'RETAIN';
+LIST:'LIST';
AND: 'AND';
BY: 'BY';
FALSE: 'FALSE';
@@ -115,7 +128,9 @@ WHERE: 'WHERE';
ZORDER: 'ZORDER';
MINUS: '-';
-
+FILE_SIZE_UNIT_LITERAL:
+ 'M' | 'MB'
+ ;
BIGINT_LITERAL
: DIGIT+ 'L'
;
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLAstBuilder.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLAstBuilder.scala
index 7ee439a4399..a2046087981 100644
--- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLAstBuilder.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLAstBuilder.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Sort}
import org.apache.kyuubi.sql.KyuubiSparkSQLParser._
+import org.apache.kyuubi.sql.compact.{CompactTableOptions, CompactTableStatement, RecoverCompactTableStatement}
import org.apache.kyuubi.sql.zorder.{OptimizeZorderStatement, Zorder}
class KyuubiSparkSQLAstBuilder extends KyuubiSparkSQLBaseVisitor[AnyRef] with SQLConfHelper {
@@ -127,6 +128,20 @@ class KyuubiSparkSQLAstBuilder extends KyuubiSparkSQLBaseVisitor[AnyRef] with SQ
UnparsedPredicateOptimize(tableIdent, predicate, orderExpr)
}
+ override def visitCompactTable(ctx: CompactTableContext): CompactTableStatement =
+ withOrigin(ctx) {
+ val tableParts = visitMultipartIdentifier(ctx.multipartIdentifier())
+ val targetFileSize = Option(ctx.targetFileSize).map(_.getText.toLong)
+ val action = Option(ctx.action).map(_.getText)
+ CompactTableStatement(tableParts, targetFileSize, CompactTableOptions(action))
+ }
+
+ override def visitRecoverCompactTable(ctx: RecoverCompactTableContext)
+ : RecoverCompactTableStatement = withOrigin(ctx) {
+ val tableParts = visitMultipartIdentifier(ctx.multipartIdentifier())
+ RecoverCompactTableStatement(tableParts)
+ }
+
override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null
override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): Seq[String] =
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
index 450a2c35e89..9990ff371b7 100644
--- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.kyuubi.sql.compact.CompactTableResolver
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
@@ -32,6 +33,7 @@ object KyuubiSparkSQLCommonExtension {
// inject zorder parser and related rules
extensions.injectParser { case (_, parser) => new SparkKyuubiSparkSQLParser(parser) }
extensions.injectResolutionRule(ResolveZorder)
+ extensions.injectResolutionRule(CompactTableResolver)
// Note that:
// InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
index fd11fb5f579..da05a72306a 100644
--- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala
@@ -19,9 +19,11 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}
+import org.apache.kyuubi.sql.compact.CompactTableSparkStrategy
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
// scalastyle:off line.size.limit
+
/**
* Depend on Spark SQL Extension framework, we can use this extension follow steps
* 1. move this jar into $SPARK_HOME/jars
@@ -40,6 +42,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
+ extensions.injectPlannerStrategy(CompactTableSparkStrategy)
extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/ParquetFileWriterWrapper.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/ParquetFileWriterWrapper.scala
new file mode 100644
index 00000000000..454751d2e23
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/ParquetFileWriterWrapper.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql
+
+import java.lang.reflect.Method
+
+import org.apache.parquet.hadoop.ParquetFileWriter
+import org.apache.parquet.hadoop.metadata.{FileMetaData, GlobalMetaData}
+
+object ParquetFileWriterWrapper {
+
+ val mergeInfoField: Method = classOf[ParquetFileWriter]
+ .getDeclaredMethod(
+ "mergeInto",
+ classOf[FileMetaData],
+ classOf[GlobalMetaData],
+ classOf[Boolean])
+
+ mergeInfoField.setAccessible(true)
+
+ def mergeInto(
+ toMerge: FileMetaData,
+ mergedMetadata: GlobalMetaData,
+ strict: Boolean): GlobalMetaData = {
+ mergeInfoField.invoke(
+ null,
+ toMerge.asInstanceOf[AnyRef],
+ mergedMetadata.asInstanceOf[AnyRef],
+ strict.asInstanceOf[AnyRef]).asInstanceOf[GlobalMetaData]
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CachePerformanceViewCommand.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CachePerformanceViewCommand.scala
new file mode 100644
index 00000000000..2c2aa92a497
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CachePerformanceViewCommand.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.spark.sql.{Row, SparkInternalExplorer, SparkSession}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{DropTableCommand, LeafRunnableCommand}
+
+case class CachePerformanceViewCommand(
+ tableIdentifier: Seq[String],
+ performancePlan: LogicalPlan,
+ originalFileLocations: Seq[String],
+ options: CompactTableOption) extends LeafRunnableCommand {
+
+ override def innerChildren: Seq[QueryPlan[_]] = Seq(performancePlan)
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val dropViewCommand = DropTableCommand(
+ CompactTableUtils.getTableIdentifier(tableIdentifier),
+ ifExists = true,
+ isView = true,
+ purge = true)
+ dropViewCommand.run(sparkSession)
+
+ val speculation =
+ sparkSession.sparkContext.getConf.getBoolean(
+ SparkInternalExplorer.SPECULATION_ENABLED_SYNONYM.key,
+ defaultValue = false)
+ if (speculation) {
+ sparkSession.sparkContext.getConf.set(
+ SparkInternalExplorer.SPECULATION_ENABLED_SYNONYM.key,
+ "false")
+ log.warn("set spark.speculation to false")
+ }
+ try {
+ val cacheTableCommand =
+ SparkInternalExplorer.CacheTableAsSelectExec(tableIdentifier.head, performancePlan)
+
+ // this result always empty
+ cacheTableCommand.run()
+
+ if (options == CompactTableOptions.CleanupStagingFolder) {
+ val fileSystem = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
+ originalFileLocations.foreach { originalFileLocation =>
+ val compactStagingDir = CompactTableUtils.getCompactStagingDir(originalFileLocation)
+ fileSystem.delete(compactStagingDir, true)
+ }
+
+ }
+ } finally {
+ if (speculation) {
+ sparkSession.sparkContext.getConf.set(
+ SparkInternalExplorer.SPECULATION_ENABLED_SYNONYM.key,
+ "true")
+ log.warn("rollback spark.speculation to true")
+ }
+ }
+ Seq.empty[Row]
+ }
+
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTable.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTable.scala
new file mode 100644
index 00000000000..e5ee5fda636
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTable.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedUnaryNode
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafParsedStatement, LogicalPlan}
+import org.apache.spark.sql.types._
+
+object CompactTable {
+ private val fileLocAndSizeStructArrayType: ArrayType =
+ DataTypes.createArrayType(DataTypes.createStructType(Array(
+ DataTypes.createStructField("sub_group_id", IntegerType, false),
+ DataTypes.createStructField("name", StringType, false),
+ DataTypes.createStructField("length", LongType, false))))
+
+ val smallFileCollectOutput: StructType = DataTypes.createStructType(Array(
+ DataTypes.createStructField("group_id", IntegerType, false),
+ DataTypes.createStructField("location", StringType, false),
+ DataTypes.createStructField("data_source", StringType, false),
+ DataTypes.createStructField("codec", StringType, true),
+ DataTypes.createStructField("smallFiles", fileLocAndSizeStructArrayType, false)))
+
+ val smallFileCollectOutputAttribute: Seq[AttributeReference] = smallFileCollectOutput
+ .map(field => AttributeReference(field.name, field.dataType, field.nullable)())
+
+ val mergedFilesCachedTableName = "v_merged_files"
+ val mergeMetadataKey = "spark.sql.compact.parquet.metadata.merge"
+}
+
+trait CompactTableOption
+
+object CompactTableOptions {
+ def apply(options: Option[String]): CompactTableOption = options.map(_.toLowerCase) match {
+ case Some("retain") => RetainStagingFolder
+ case Some("list") => DryRun
+ case _ => CleanupStagingFolder
+ }
+
+ case object CleanupStagingFolder extends CompactTableOption
+
+ case object RetainStagingFolder extends CompactTableOption
+
+ case object DryRun extends CompactTableOption
+}
+
+case class CompactTable(
+ child: LogicalPlan,
+ targetSizeInBytes: Option[Long],
+ options: CompactTableOption) extends UnresolvedUnaryNode {
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
+ CompactTable(newChild, targetSizeInBytes, options)
+ }
+}
+
+case class CompactTableStatement(
+ tableParts: Seq[String],
+ targetSizeInMB: Option[Long],
+ options: CompactTableOption) extends LeafParsedStatement
+
+case class RecoverCompactTableStatement(tableParts: Seq[String])
+ extends LeafParsedStatement
+
+case class RecoverCompactTable(child: LogicalPlan) extends UnresolvedUnaryNode {
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
+ RecoverCompactTable(newChild)
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTableResolver.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTableResolver.scala
new file mode 100644
index 00000000000..146e74a5f2f
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTableResolver.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+
+case class CompactTableResolver(sparkSession: SparkSession) extends Rule[LogicalPlan] with Logging {
+ override def apply(plan: LogicalPlan): LogicalPlan =
+ plan match {
+ case RecoverCompactTableStatement(tableParts) =>
+ RecoverCompactTable(UnresolvedRelation(CompactTableUtils.getTableIdentifier(tableParts)))
+
+ case RecoverCompactTable(SubqueryAlias(
+ _,
+ LogicalRelation(
+ _: HadoopFsRelation,
+ _,
+ Some(catalogTable),
+ _))) =>
+ RecoverCompactTableCommand(catalogTable)
+
+ case CompactTableStatement(tableParts, targetSizeInMB, options) =>
+ CompactTable(
+ UnresolvedRelation(CompactTableUtils.getTableIdentifier(tableParts)),
+ targetSizeInMB.map(ByteUnit.MiB.toBytes),
+ options)
+ case CompactTable(
+ SubqueryAlias(
+ _,
+ logicalRelation @ LogicalRelation(
+ _: HadoopFsRelation,
+ _,
+ Some(catalogTable),
+ _)),
+ targetSizeInBytes,
+ options) =>
+ createCacheCommand(
+ logicalRelation,
+ catalogTable,
+ targetSizeInBytes,
+ options)
+
+ case CompactTable(
+ SubqueryAlias(_, hiveTableRelation: HiveTableRelation),
+ targetSizeInBytes,
+ options) =>
+ createCacheCommand(
+ hiveTableRelation,
+ hiveTableRelation.tableMeta,
+ targetSizeInBytes,
+ options)
+ case _ => plan
+ }
+
+ private def createCacheCommand(
+ relation: LeafNode,
+ catalogTable: CatalogTable,
+ targetSizeInBytes: Option[Long],
+ options: CompactTableOption): CachePerformanceViewCommand = {
+
+ val smallFileCollect = SmallFileCollect(relation, targetSizeInBytes)
+ val repartitionByExpression =
+ RepartitionByExpression(Seq(smallFileCollect.output.head), smallFileCollect, None)
+ val smallFileMerge =
+ SmallFileMerge(repartitionByExpression, options == CompactTableOptions.DryRun)
+ val originalFileLocation = CompactTableUtils.getCompactDataDir(catalogTable.storage)
+ CachePerformanceViewCommand(
+ Seq(CompactTable.mergedFilesCachedTableName),
+ smallFileMerge,
+ originalFileLocation,
+ options)
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTableSparkStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTableSparkStrategy.scala
new file mode 100644
index 00000000000..03e1b1b73db
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTableSparkStrategy.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+
+case class CompactTableSparkStrategy(sparkSession: SparkSession) extends SparkStrategy {
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+ plan match {
+ case smallFileCollect @ SmallFileCollect(
+ LogicalRelation(hadoopFsRelation: HadoopFsRelation, _, Some(catalogTable), _),
+ targetSizeInBytes) =>
+ SmallFileCollectExec(
+ hadoopFsRelation,
+ smallFileCollect.output,
+ catalogTable,
+ targetSizeInBytes) :: Nil
+ case SmallFileMerge(child, noMerge) if !noMerge =>
+ SmallFileMergeExec(planLater(child)) :: Nil
+ case SmallFileMerge(child, noMerge) if noMerge =>
+ SmallFileListExec(planLater(child)) :: Nil
+ case _ => Nil
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTableUtils.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTableUtils.scala
new file mode 100644
index 00000000000..8edb88ce3a0
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/CompactTableUtils.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path => HadoopPath}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+
+import org.apache.kyuubi.sql.KyuubiSQLExtensionException
+
+object CompactTableUtils {
+
+ def toJavaList[A](it: List[A]): java.util.List[A] = {
+ val javaList = new java.util.ArrayList[A](it.size)
+ it.indices foreach { i =>
+ javaList.add(it(i))
+ }
+ javaList
+ }
+
+ def getStagingDir(path: String, jobId: String): HadoopPath = {
+ new HadoopPath(getCompactStagingDir(path), s".spark-compact-staging-$jobId")
+ }
+
+ def getCompactStagingDir(tableLocation: String): HadoopPath = {
+ new HadoopPath(tableLocation, ".compact")
+ }
+
+ def getCompactDataDir(tableStorage: CatalogStorageFormat): Seq[String] =
+ getCompactDataDir(tableStorage, Seq.empty)
+
+ def getCompactDataDir(
+ tableStorage: CatalogStorageFormat,
+ partitionStorage: Seq[CatalogStorageFormat]): Seq[String] = {
+ (partitionStorage.flatMap(_.locationUri), tableStorage.locationUri) match {
+ case (partUri, _) if partUri.nonEmpty => partUri.map(_.toString)
+ case (partUri, Some(tableUri)) if partUri.isEmpty => Seq(tableUri.toString)
+ case _ => Seq.empty
+ }
+ }
+
+ def getTableIdentifier(tableIdent: Seq[String]): TableIdentifier = tableIdent match {
+ case Seq(tbl) => TableIdentifier(tbl)
+ case Seq(db, tbl) => TableIdentifier(tbl, Some(db))
+ case _ => throw new KyuubiSQLExtensionException(
+ "only support session catalog table, please use db.table instead")
+ }
+
+ def getCodecExtFromFilePath(filePath: HadoopPath, hadoopConf: Configuration): Option[String] =
+ filePath.getName.split("\\.", 3) match {
+ case Array(_, codecExt, "parquet") =>
+ Some(codecExt)
+ case Array(_, codecExt, "orc") =>
+ Some(codecExt)
+ case Array(_, "parquet") =>
+ None
+ case Array(_, "orc") =>
+ None
+ case Array(_, _, codecExt) =>
+ Some(codecExt)
+ case Array(_, _) =>
+ None
+ case _ => None
+ }
+
+ def getExtFromFilePath(filePath: String): String =
+ filePath.split("\\.", 3).tail.mkString(".")
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/FileMergingRDD.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/FileMergingRDD.scala
new file mode 100644
index 00000000000..b0eba814705
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/FileMergingRDD.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+class FileMergingRDD(
+ @transient private val sparkSession: SparkSession,
+ val dataSchema: StructType,
+ val filePartitions: Array[MergingFilePartition])
+ extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
+
+ override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+ val onePartition = split.asInstanceOf[MergingFilePartition]
+ new Iterator[InternalRow] with AutoCloseable {
+ private[this] var currentRow: Option[InternalRow] = None
+
+ override def hasNext: Boolean = currentRow.isEmpty
+
+ override def next(): InternalRow = {
+ currentRow = Some(onePartition.toInternalRow)
+ currentRow.get
+ }
+
+ override def close(): Unit = {
+ currentRow = None
+ }
+ }
+ }
+
+ /**
+ * Implemented by subclasses to return the set of partitions in this RDD. This method will only
+ * be called once, so it is safe to implement a time-consuming computation in it.
+ *
+ * The partitions in this array must satisfy the following property:
+ * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
+ */
+
+ override protected def getPartitions: Array[Partition] =
+ filePartitions.map(_.asInstanceOf[Partition])
+
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/MergeFileException.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/MergeFileException.scala
new file mode 100644
index 00000000000..8ea21d16a6f
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/MergeFileException.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+case class MergeFileException(message: String) extends Exception(message)
+
+case class UnSupportedTableException(message: String) extends Exception(message)
+
+case class RecoverFileException(message: String) extends Exception(message)
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/MergingFile.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/MergingFile.scala
new file mode 100644
index 00000000000..65fead2a4b1
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/MergingFile.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.spark.Partition
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.unsafe.types.UTF8String
+
+case class MergingFile(subGroupId: Int, name: String, length: Long) {
+ override def toString: String =
+ s"MergingFile(sub group id $subGroupId, name $name, length $length)"
+}
+
+object MergingFilePartition {
+ private def toInternalRow(part: MergingFilePartition): InternalRow = {
+ val projection = UnsafeProjection.create(CompactTable.smallFileCollectOutput)
+ projection(InternalRow(
+ part.groupId,
+ UTF8String.fromString(part.location),
+ UTF8String.fromString(part.dataSource),
+ UTF8String.fromString(part.codecExt.orNull),
+ ArrayData.toArrayData(part.smallFiles.map(f =>
+ InternalRow(f.subGroupId, UTF8String.fromString(f.name), f.length)))))
+ }
+}
+
+case class MergingFilePartition(
+ groupId: Int,
+ location: String,
+ dataSource: String,
+ codecExt: Option[String],
+ smallFiles: Seq[MergingFile],
+ index: Int = -1) extends Partition {
+ override def toString: String = s"MergingFilePartition(index=$index,groupId=$groupId" +
+ s"location $location,data source $dataSource,codec ext $codecExt," +
+ s"small files ${smallFiles.mkString("[", ",", "]")})"
+
+ def toInternalRow: InternalRow = MergingFilePartition.toInternalRow(this)
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/RecoverCompactTableCommand.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/RecoverCompactTableCommand.scala
new file mode 100644
index 00000000000..ae096ec15f3
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/RecoverCompactTableCommand.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath, PathFilter}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+
+import org.apache.kyuubi.sql.compact.merge.AbstractFileMerger
+
+case class RecoverCompactTableCommand(
+ catalogTable: CatalogTable) extends LeafRunnableCommand {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val fileSystem = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
+
+ CompactTableUtils.getCompactDataDir(catalogTable.storage).foreach { originalFileLocation =>
+ val dataPath = new HadoopPath(originalFileLocation)
+ val compactStagingDir = CompactTableUtils.getCompactStagingDir(originalFileLocation)
+
+ if (fileSystem.exists(compactStagingDir)) {
+ fileSystem.listStatus(compactStagingDir).foreach { subFolder =>
+ log.debug(s"delete processing merged files under $subFolder")
+ fileSystem.listStatus(
+ subFolder.getPath,
+ new PathFilter {
+ override def accept(path: HadoopPath): Boolean =
+ path.getName.startsWith(AbstractFileMerger.mergedFilePrefix) &&
+ path.getName.endsWith(AbstractFileMerger.mergedFileProcessingSuffix)
+ }).foreach { f =>
+ if (!fileSystem.delete(f.getPath, false)) {
+ throw RecoverFileException(s"failed to delete processing merged file ${f.getPath}")
+ }
+ }
+
+ log.debug(s"recover merging files under $subFolder")
+
+ fileSystem.listStatus(
+ subFolder.getPath,
+ new PathFilter {
+ override def accept(path: HadoopPath): Boolean =
+ path.getName.startsWith(AbstractFileMerger.mergingFilePrefix)
+ }).foreach { smallFile =>
+ val fileName = smallFile.getPath.getName
+ val recoverFileName =
+ fileName.replaceFirst(AbstractFileMerger.mergingFilePrefix + "-\\d+-\\d+-", "")
+ if (!fileSystem.rename(smallFile.getPath, new HadoopPath(dataPath, recoverFileName))) {
+ throw RecoverFileException(
+ s"failed to recover file $fileName to $recoverFileName under $subFolder")
+ }
+ }
+
+ if (!fileSystem.delete(subFolder.getPath, false)) {
+ throw RecoverFileException(s"failed to delete sub folder $subFolder")
+ }
+ log.debug(s"delete sub folder $subFolder")
+ }
+ if (!fileSystem.delete(compactStagingDir, false)) {
+ throw RecoverFileException(s"failed to delete .compact folder $compactStagingDir")
+ }
+ log.debug(s"delete .compact folder $compactStagingDir")
+
+ log.debug(s"delete merged files under $originalFileLocation")
+ fileSystem.listStatus(
+ dataPath,
+ new PathFilter {
+ override def accept(path: HadoopPath): Boolean = {
+ path.getName.startsWith(AbstractFileMerger.mergedFilePrefix) &&
+ !path.getName.endsWith(AbstractFileMerger.mergedFileProcessingSuffix)
+ }
+ }).foreach { mergedFile =>
+ if (!fileSystem.delete(mergedFile.getPath, false)) {
+ throw RecoverFileException(s"can't delete merged file $mergedFile")
+ }
+ }
+ } else {
+ log.debug(s"no .compact folder found skip to recover $originalFileLocation")
+ }
+ }
+ log.debug("all files recovered")
+ Seq.empty
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileCollect.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileCollect.scala
new file mode 100644
index 00000000000..7dcafff9208
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileCollect.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+
+case class SmallFileCollect(child: LogicalPlan, targetSizeInBytes: Option[Long])
+ extends UnaryNode {
+
+ override def output: Seq[Attribute] = CompactTable.smallFileCollectOutputAttribute
+
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ SmallFileCollect(newChild, targetSizeInBytes)
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileCollectExec.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileCollectExec.scala
new file mode 100644
index 00000000000..692866ef2b9
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileCollectExec.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath, PathFilter}
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.sources.DataSourceRegister
+
+import org.apache.kyuubi.sql.compact.merge.AbstractFileMerger
+
+/**
+ * aggregate small files to groups, sum of file size in each group
+ * is equal to target merge size(for example 256Mb) nearly
+ * output
+ * smallest file location & size in bytes
+ * array of file location & size in bytes, need to be appended to the smallest file
+ */
+case class SmallFileCollectExec(
+ baseRelation: HadoopFsRelation,
+ output: Seq[Attribute],
+ catalogTable: CatalogTable,
+ targetSizeInBytes: Option[Long]) extends LeafExecNode {
+
+ private val dataSource = baseRelation.fileFormat.asInstanceOf[DataSourceRegister].shortName()
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val fileSizeInBytesThreshold =
+ targetSizeInBytes.getOrElse(JavaUtils.byteStringAsBytes("256MB"))
+ log.debug(s"target merged file size in bytes $fileSizeInBytesThreshold")
+
+ val smallFileLocations =
+ CompactTableUtils.getCompactDataDir(catalogTable.storage)
+ log.debug(s"merge file data in ${smallFileLocations.mkString("[", ",", "]")}")
+ val fileSystem = FileSystem.get(sparkContext.hadoopConfiguration)
+
+ smallFileLocations.foreach { dataPath =>
+ val compactStaging = CompactTableUtils.getCompactStagingDir(dataPath)
+ if (fileSystem.exists(compactStaging)) {
+ throw MergeFileException(
+ s"compact staging $compactStaging already exists, " +
+ s"you should recover it before compacting")
+ }
+ }
+
+ val shuffleNum = session.sqlContext.getConf("spark.sql.shuffle.partitions").toInt
+ log.debug(s"target shuffle num $shuffleNum")
+ val smallFileAndLocs = smallFileLocations.flatMap { loc =>
+ val smallFiles = getSmallFiles(fileSystem, new HadoopPath(loc), fileSizeInBytesThreshold)
+ if (smallFiles.nonEmpty) {
+ val codecExt = getCodecExtFromFile(smallFiles.head.name)
+ val neededMergeFileGroups = aggregateSmallFile(smallFiles, fileSizeInBytesThreshold)
+ .map { group =>
+ MergingFilePartition(-1, loc, dataSource, codecExt, group)
+ }
+
+ val groupNum = neededMergeFileGroups.length
+ val shortNeededMergeFileGroups =
+ if (groupNum > 1 &&
+ neededMergeFileGroups.apply(groupNum - 1).smallFiles.map(
+ _.length).sum < fileSizeInBytesThreshold) {
+ val last2 = neededMergeFileGroups.apply(groupNum - 2)
+ val last1 = neededMergeFileGroups.apply(groupNum - 1)
+ val merged = last2.copy(smallFiles = last2.smallFiles ++ last1.smallFiles)
+ neededMergeFileGroups.dropRight(2) :+ merged
+ } else {
+ neededMergeFileGroups
+ }
+
+ val originalGroupSize = shortNeededMergeFileGroups.length
+ val groupEleNum = Math.max(Math.floorDiv(originalGroupSize, shuffleNum), 1)
+
+ val regroupSmallFileAndLocs =
+ shortNeededMergeFileGroups.sliding(groupEleNum, groupEleNum).map { groups =>
+ val newGroup = groups.zipWithIndex.map { case (group, subIndex) =>
+ group.smallFiles.map(_.copy(subGroupId = subIndex))
+ }
+ MergingFilePartition(-1, loc, dataSource, codecExt, newGroup.flatten)
+ }.toList
+ regroupSmallFileAndLocs
+ } else {
+ Iterator.empty
+ }
+ }.toArray.zipWithIndex.map {
+ case (part, globalIndex) => part.copy(index = globalIndex, groupId = globalIndex)
+ }
+ new FileMergingRDD(
+ session,
+ CompactTable.smallFileCollectOutput,
+ smallFileAndLocs)
+ }
+
+ private def getCodecExtFromFile(filePath: String): Option[String] =
+ CompactTableUtils.getCodecExtFromFilePath(
+ new HadoopPath(filePath),
+ sparkContext.hadoopConfiguration)
+
+ private def getSmallFiles(
+ fs: FileSystem,
+ location: HadoopPath,
+ fileSizeInBytes: Long): Array[MergingFile] = {
+ fs.listStatus(
+ location,
+ new PathFilter {
+ override def accept(path: HadoopPath): Boolean = {
+ val pathName = path.getName
+ !(pathName.startsWith(".") || pathName.startsWith("_") || pathName.startsWith(
+ AbstractFileMerger.mergingFilePrefix) || pathName.startsWith(
+ AbstractFileMerger.mergedFilePrefix))
+ }
+ }).filter(_.getLen < fileSizeInBytes)
+ .map(file => MergingFile(0, file.getPath.getName, file.getLen))
+ .sortBy(_.length)
+ }
+
+ private def aggregateSmallFile(
+ sortedSmallFiles: Array[MergingFile],
+ targetFileSizeInBytes: Long): List[Seq[MergingFile]] = {
+ var groupedFiles: List[Seq[MergingFile]] = List.empty
+ var start = 0
+ var end = sortedSmallFiles.length
+ var sizeSum = 0L
+ while (start < sortedSmallFiles.length) {
+ sizeSum = 0L
+ start until sortedSmallFiles.length takeWhile { i =>
+ sizeSum += sortedSmallFiles(i).length
+ end = i
+ sizeSum < targetFileSizeInBytes
+ }
+ groupedFiles = groupedFiles :+ sortedSmallFiles.slice(start, end + 1).toSeq
+ start = end + 1
+ }
+ groupedFiles
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileListExec.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileListExec.scala
new file mode 100644
index 00000000000..b09032943cc
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileListExec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.CatalystTypeConverters.createToScalaConverter
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+
+case class SmallFileListExec(child: SparkPlan) extends UnaryExecNode {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val structType = DataTypeUtils.fromAttributes(output)
+ child.execute().mapPartitionsWithIndex { (partIndex, iterator) =>
+ val converter = createToScalaConverter(structType)
+
+ iterator.map(converter).map {
+ case Row(
+ groupId: Int,
+ location: String,
+ dataSource: String,
+ codecExt,
+ smallFileNameAndLength: Iterable[_]) =>
+ val codecExtOption = Option(codecExt).map(_.toString)
+
+ MergingFilePartition(
+ groupId,
+ location,
+ dataSource,
+ codecExtOption,
+ smallFileNameAndLength.map {
+ case Row(subGroupId: Int, name: String, length: Long) =>
+ MergingFile(subGroupId, name, length)
+ }.toList)
+ .toInternalRow
+ }
+ }
+ }
+
+ override def output: Seq[Attribute] = child.output
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
+ SmallFileListExec(newChild)
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileMerge.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileMerge.scala
new file mode 100644
index 00000000000..338699b726f
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileMerge.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+
+case class SmallFileMerge(child: LogicalPlan, noMerge: Boolean) extends UnaryNode {
+
+ override def output: Seq[Attribute] = child.output
+
+ override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ SmallFileMerge(newChild, noMerge)
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileMergeExec.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileMergeExec.scala
new file mode 100644
index 00000000000..ba2969e75a9
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/SmallFileMergeExec.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact
+
+import java.text.SimpleDateFormat
+
+import scala.util.{Failure, Success}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.CatalystTypeConverters.createToScalaConverter
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.kyuubi.sql.compact.merge.{AbstractFileMerger, FileMergerFactory}
+
+case class SmallFileMergeExec(child: SparkPlan) extends UnaryExecNode {
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val structType = DataTypeUtils.fromAttributes(output)
+ val serializableHadoopConf = new SerializableConfiguration(sparkContext.hadoopConfiguration)
+ val mergeDataFlag =
+ sparkContext.getConf.getBoolean(CompactTable.mergeMetadataKey, defaultValue = true)
+ child.execute().mapPartitionsWithIndex { (partIndex, iterator) =>
+ val jobId = new SimpleDateFormat("yyyyMMdd-HHmmss").format(
+ System.currentTimeMillis()) + s"-${partIndex}"
+
+ val converter = createToScalaConverter(structType)
+ iterator.map(converter).map {
+ case Row(
+ groupId: Int,
+ location: String,
+ dataSource: String,
+ codecExt,
+ smallFileNameAndLength: Iterable[_]) =>
+ val smallFiles = smallFileNameAndLength.map {
+ case Row(subGroupId: Int, name: String, length: Long) =>
+ MergingFile(subGroupId, name, length)
+ }.toList
+
+ val codecExtOption = Option(codecExt).map(_.toString)
+ val merger: AbstractFileMerger = FileMergerFactory.create(dataSource, codecExtOption)
+
+ merger.initialize(
+ partIndex,
+ jobId,
+ groupId,
+ location,
+ serializableHadoopConf,
+ mergeDataFlag)
+ merger.merge(smallFiles) match {
+ case Failure(exception) =>
+ throw exception
+ case Success(mergedFile) =>
+ MergingFilePartition(groupId, location, dataSource, codecExtOption, mergedFile)
+ .toInternalRow
+ }
+ }
+ }
+ }
+
+ override def output: Seq[Attribute] = child.output
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ SmallFileMergeExec(newChild)
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/AbstractFileMerger.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/AbstractFileMerger.scala
new file mode 100644
index 00000000000..2e36def5c86
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/AbstractFileMerger.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact.merge
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath}
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.kyuubi.sql.compact.{CompactTableUtils, MergeFileException, MergingFile}
+import org.apache.kyuubi.sql.compact.merge.AbstractFileMerger.{getMergedFilePrefix, getMergingFilePrefix}
+
+object AbstractFileMerger {
+ val mergedFilePrefix = "merged"
+ val mergedFileProcessingSuffix = ".processing"
+ val mergingFilePrefix = "merging"
+
+ def getMergingFilePrefix(groupId: Int, subGroupId: Int): String =
+ s"$mergingFilePrefix-$groupId-$subGroupId"
+
+ def getMergedFilePrefix(groupId: Int, subGroupId: Int): String =
+ s"$mergedFilePrefix-$groupId-$subGroupId"
+}
+
+abstract class AbstractFileMerger(
+ dataSource: String,
+ codec: Option[String],
+ fileLevelCodec: Boolean)
+ extends Logging
+ with Serializable {
+
+ protected var partitionIndex: Int = _
+ protected var jobId: String = _
+ protected var groupId: Int = _
+ protected var location: String = _
+ protected var serializableConfiguration: SerializableConfiguration = _
+ protected var isMergeMetadata: Boolean = _
+
+ def initialize(
+ partitionIndex: Int,
+ jobId: String,
+ groupId: Int,
+ location: String,
+ serializableConfiguration: SerializableConfiguration,
+ isMergeMetadata: Boolean): Unit = {
+ this.partitionIndex = partitionIndex
+ this.jobId = jobId
+ this.groupId = groupId
+ this.location = location
+ this.serializableConfiguration = serializableConfiguration
+ this.isMergeMetadata = isMergeMetadata
+ }
+
+ def merge(smallFiles: List[MergingFile]): Try[Array[MergingFile]] = Try {
+ val fileSystem: FileSystem = FileSystem.get(hadoopConf)
+
+ if (log.isDebugEnabled()) {
+ smallFiles.foreach { merging =>
+ log.debug(
+ s"merging files jobId $jobId, partition id $partitionIndex,group id $groupId,$merging")
+ }
+ }
+
+ smallFiles.groupBy(_.subGroupId).map { case (subGroupId, fileGroup) =>
+ mergeGroup(fileSystem, subGroupId, fileGroup).map(m =>
+ MergingFile(subGroupId, m._1.getName, m._2)).get
+ }.toArray
+
+ }
+
+ private def mergeGroup(
+ fileSystem: FileSystem,
+ subGroupId: Int,
+ smallFiles: List[MergingFile]): Try[(HadoopPath, Long)] = Try {
+ val stagingDir = CompactTableUtils.getStagingDir(location, jobId)
+ val locationPath = new HadoopPath(location)
+ val fileExt = CompactTableUtils.getExtFromFilePath(smallFiles.head.name)
+ val mergedFileName = s"${getMergedFilePrefix(groupId, subGroupId)}.$fileExt"
+ val mergedFileInStaging =
+ new HadoopPath(stagingDir, mergedFileName + AbstractFileMerger.mergedFileProcessingSuffix)
+ val targetMergedFile = new HadoopPath(locationPath, mergedFileName)
+ log.debug(s"prepare to merge $dataSource files to $mergedFileInStaging")
+ mergeFiles(fileSystem, smallFiles, mergedFileInStaging).get
+ val mergingFilePrefix = getMergingFilePrefix(groupId, subGroupId)
+ log.debug(s"prepare to add prefix ${mergingFilePrefix} to small files")
+ val stagingSmallFiles = smallFiles.map(_.name).map { fileName =>
+ val smallFile = new HadoopPath(location, fileName)
+ val newFileName = s"$mergingFilePrefix-$fileName"
+ val smallFileNewPath = new HadoopPath(location, newFileName)
+ if (!fileSystem.rename(smallFile, smallFileNewPath)) {
+ throw MergeFileException(s"failed to rename $smallFile to $smallFileNewPath")
+ }
+ smallFileNewPath
+ }
+ log.debug(s"move file $mergedFileInStaging to $targetMergedFile")
+ if (fileSystem.exists(targetMergedFile)) {
+ throw MergeFileException(s"file already exists $targetMergedFile")
+ }
+ if (!fileSystem.rename(mergedFileInStaging, targetMergedFile)) {
+ throw MergeFileException(s"failed to rename $mergedFileInStaging to $targetMergedFile")
+ }
+
+ log.debug(s"move small files to $stagingDir")
+ stagingSmallFiles.foreach { smallFilePath =>
+ val stagingFile = new HadoopPath(stagingDir, smallFilePath.getName)
+ if (!fileSystem.rename(smallFilePath, stagingFile)) {
+ throw MergeFileException(s"failed to rename $smallFilePath to $stagingFile")
+ }
+ }
+ (targetMergedFile, fileSystem.getFileStatus(targetMergedFile).getLen)
+ }
+
+ protected def hadoopConf: Configuration = serializableConfiguration.value
+
+ protected def mergeFiles(
+ fileSystem: FileSystem,
+ smallFiles: List[MergingFile],
+ mergedFileInStaging: HadoopPath): Try[HadoopPath]
+
+// protected def getMergedFileNameExtension: String = codec
+// .flatMap(CompressionCodecsUtil.getCodecExtension)
+// .map(ext =>
+// if (fileLevelCodec) s"$dataSource.$ext"
+// else s"$ext.$dataSource").getOrElse(dataSource)
+
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/AvroFileMerger.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/AvroFileMerger.scala
new file mode 100644
index 00000000000..dc748211b00
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/AvroFileMerger.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact.merge
+
+import scala.util.Try
+
+import org.apache.avro.{Schema => AvroSchema}
+import org.apache.avro.file.{DataFileReader, DataFileStream, DataFileWriter}
+import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
+import org.apache.avro.mapred.FsInput
+import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath}
+import org.apache.hadoop.io.IOUtils
+
+import org.apache.kyuubi.sql.compact.MergingFile
+
+class AvroFileMerger(dataSource: String, codec: Option[String])
+ extends AbstractFileMerger(dataSource, codec, false) {
+ override protected def mergeFiles(
+ fileSystem: FileSystem,
+ smallFiles: List[MergingFile],
+ mergedFileInStaging: HadoopPath): Try[HadoopPath] = Try {
+ val schema = getAvroSchema(new HadoopPath(location, smallFiles.head.name))
+
+ val recordWriter = new GenericDatumWriter[GenericRecord]
+ val outputStream = fileSystem.create(mergedFileInStaging)
+ try {
+ val fileWriter = new DataFileWriter[GenericRecord](recordWriter)
+ .create(schema, outputStream)
+
+ smallFiles.map(f => new HadoopPath(location, f.name)).foreach { file =>
+ val fileInput = fileSystem.open(file)
+ fileWriter.appendAllFrom(
+ new DataFileStream[GenericRecord](fileInput, new GenericDatumReader),
+ false)
+ IOUtils.closeStream(fileInput)
+ }
+ IOUtils.closeStream(fileWriter)
+ } finally {
+ IOUtils.closeStream(outputStream)
+ }
+ mergedFileInStaging
+ }
+
+ private def getAvroSchema(filePath: HadoopPath): AvroSchema = {
+ val recordReader = new GenericDatumReader[GenericRecord]
+ val avroReader = new DataFileReader(new FsInput(filePath, hadoopConf), recordReader)
+ val schema = avroReader.getSchema
+ avroReader.close()
+ schema
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/FileMergerFactory.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/FileMergerFactory.scala
new file mode 100644
index 00000000000..33d2246254f
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/FileMergerFactory.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact.merge
+
+import org.apache.spark.internal.Logging
+
+import org.apache.kyuubi.sql.compact.UnSupportedTableException
+
+object FileMergerFactory extends Logging {
+ def create(dataSource: String, codec: Option[String]): AbstractFileMerger = {
+ (dataSource, codec) match {
+ case ("parquet", _) =>
+ new ParquetFileMerger(dataSource, codec)
+ case ("avro", _) =>
+ new AvroFileMerger(dataSource, codec)
+ case ("orc", _) =>
+ new OrcFileMerger(dataSource, codec)
+ case ("text", _) =>
+ new PlainFileLikeMerger(dataSource, codec)
+ case ("csv", _) =>
+ new PlainFileLikeMerger(dataSource, codec)
+ case ("json", _) =>
+ new PlainFileLikeMerger(dataSource, codec)
+ case other =>
+ throw UnSupportedTableException(s"compact table doesn't support this format $other")
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/OrcFileMerger.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/OrcFileMerger.scala
new file mode 100644
index 00000000000..0c802922999
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/OrcFileMerger.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact.merge
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath}
+import org.apache.orc.OrcFile
+
+import org.apache.kyuubi.sql.compact.{CompactTableUtils, MergingFile}
+
+class OrcFileMerger(dataSource: String, codec: Option[String])
+ extends AbstractFileMerger(dataSource, codec, false) {
+ override protected def mergeFiles(
+ fileSystem: FileSystem,
+ smallFiles: List[MergingFile],
+ mergedFileInStaging: HadoopPath): Try[HadoopPath] = Try {
+ val smallFilePaths = smallFiles.map(r => new HadoopPath(location, r.name))
+ val writerOptions = OrcFile.writerOptions(hadoopConf)
+ val mergedFiles =
+ OrcFile.mergeFiles(
+ mergedFileInStaging,
+ writerOptions,
+ CompactTableUtils.toJavaList(smallFilePaths))
+
+ if (smallFilePaths.length != mergedFiles.size) {
+ val unMergedFiles = smallFilePaths.filterNot(mergedFiles.contains)
+ throw new IllegalStateException(
+ s"Failed to merge files: ${unMergedFiles.mkString}")
+ }
+ mergedFileInStaging
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/ParquetFileMerger.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/ParquetFileMerger.scala
new file mode 100644
index 00000000000..1867f839b5b
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/ParquetFileMerger.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact.merge
+
+import java.util
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath}
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetWriter}
+import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, GlobalMetaData, ParquetMetadata}
+import org.apache.parquet.hadoop.util.{HadoopInputFile, HadoopOutputFile}
+import org.apache.spark.sql.SparkInternalExplorer
+
+import org.apache.kyuubi.sql.ParquetFileWriterWrapper
+import org.apache.kyuubi.sql.compact.MergingFile
+
+class ParquetFileMerger(dataSource: String, codec: Option[String])
+ extends AbstractFileMerger(dataSource, codec, false) {
+
+ override protected def mergeFiles(
+ fileSystem: FileSystem,
+ smallFiles: List[MergingFile],
+ mergedFileInStaging: HadoopPath): Try[HadoopPath] = Try {
+ val smallFilePaths = smallFiles.map(r => new HadoopPath(location, r.name))
+
+ val metadataFiles = if (isMergeMetadata) smallFilePaths else smallFilePaths.take(1)
+ log.debug(s"merge metadata of files ${metadataFiles.length}")
+ val mergedMetadata = mergeMetadata(metadataFiles)
+ val writer = new ParquetFileWriter(
+ HadoopOutputFile.fromPath(mergedFileInStaging, hadoopConf),
+ mergedMetadata.getSchema,
+ ParquetFileWriter.Mode.CREATE,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.MAX_PADDING_SIZE_DEFAULT,
+ ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+ ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+ ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED)
+ log.debug(
+ s"begin to merge parquet files to $mergedFileInStaging from ${smallFilePaths.length} files")
+
+ writer.start()
+ smallFilePaths.foreach { smallFile =>
+ writer.appendFile(HadoopInputFile.fromPath(smallFile, hadoopConf))
+ }
+ writer.end(mergedMetadata.getKeyValueMetaData)
+
+ log.debug(s"finish to merge parquet files to $mergedFileInStaging")
+
+ mergedFileInStaging
+ }
+
+ private def mergeMetadata(files: List[HadoopPath]): FileMetaData = {
+ var globalMetaData: GlobalMetaData = null
+ val blocks: util.List[BlockMetaData] = new util.ArrayList[BlockMetaData]()
+ SparkInternalExplorer.parmap(files, "readingParquetFooters", 8) {
+ currentFile =>
+ ParquetFileReader.readFooter(
+ hadoopConf,
+ currentFile,
+ ParquetMetadataConverter.NO_FILTER)
+ }.foreach { pmd =>
+ val fmd = pmd.getFileMetaData
+ globalMetaData = ParquetFileWriterWrapper.mergeInto(fmd, globalMetaData, strict = true)
+ blocks.addAll(pmd.getBlocks)
+ }
+ new ParquetMetadata(globalMetaData.merge(), blocks).getFileMetaData
+ }
+
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/PlainFileLikeMerger.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/PlainFileLikeMerger.scala
new file mode 100644
index 00000000000..b69782aece1
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/compact/merge/PlainFileLikeMerger.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.compact.merge
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path => HadoopPath}
+import org.apache.hadoop.io.IOUtils
+
+import org.apache.kyuubi.sql.compact.MergingFile
+
+class PlainFileLikeMerger(dataSource: String, codec: Option[String])
+ extends AbstractFileMerger(dataSource, codec, true) {
+ override protected def mergeFiles(
+ fileSystem: FileSystem,
+ smallFiles: List[MergingFile],
+ mergedFileInStaging: HadoopPath): Try[HadoopPath] = Try {
+ val smallFilePaths = smallFiles.map(r => new HadoopPath(location, r.name))
+ val fos = fileSystem.create(mergedFileInStaging, false)
+ try {
+ smallFilePaths.foreach { f =>
+ var is: FSDataInputStream = null
+ try {
+ is = fileSystem.open(f)
+ IOUtils.copyBytes(is, fos, hadoopConf, false)
+ } finally {
+ IOUtils.closeStream(is)
+ }
+ }
+ } finally {
+ IOUtils.closeStream(fos)
+ }
+ mergedFileInStaging
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/spark/sql/SparkInternalExplorer.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/spark/sql/SparkInternalExplorer.scala
new file mode 100644
index 00000000000..302aad6697d
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/spark/sql/SparkInternalExplorer.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.internal.config.{ConfigEntry, SPECULATION_ENABLED}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.LocalTempView
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.CreateViewCommand
+import org.apache.spark.sql.execution.datasources.v2.BaseCacheTableExec
+import org.apache.spark.util.ThreadUtils
+
+object SparkInternalExplorer {
+
+ val SPECULATION_ENABLED_SYNONYM: ConfigEntry[Boolean] = SPECULATION_ENABLED
+ def parmap[I, O](in: Seq[I], prefix: String, maxThreads: Int)(f: I => O): Seq[O] =
+ ThreadUtils.parmap(in, prefix, maxThreads)(f)
+
+ def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame =
+ Dataset.ofRows(sparkSession, logicalPlan)
+
+ case class CacheTableAsSelectExec(tempViewName: String, query: LogicalPlan)
+ extends BaseCacheTableExec {
+ override lazy val relationName: String = tempViewName
+ override lazy val planToCache: LogicalPlan = {
+ CreateViewCommand(
+ name = TableIdentifier(tempViewName),
+ userSpecifiedColumns = Nil,
+ comment = None,
+ properties = Map.empty,
+ originalText = None,
+ plan = query,
+ allowExisting = true,
+ replace = true,
+ viewType = LocalTempView,
+ isAnalyzed = true,
+ referredTempFunctions = Seq.empty).run(session)
+
+ dataFrameForCachedPlan.logicalPlan
+ }
+ override lazy val dataFrameForCachedPlan: DataFrame = {
+ session.table(tempViewName)
+ }
+
+ override def isLazy: Boolean = false
+
+ override def options: Map[String, String] = Map.empty
+ }
+
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTablSuiteBase.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTablSuiteBase.scala
new file mode 100644
index 00000000000..821359da329
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTablSuiteBase.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+
+import org.apache.kyuubi.sql.compact.CompactTable
+import org.apache.kyuubi.sql.compact.merge.AbstractFileMerger
+
+trait CompactTablSuiteBase extends KyuubiSparkSQLExtensionTest {
+ def getTableSource(): String
+
+ def getTableCodec(): Option[String]
+
+ def getDataFiles(tableMetadata: CatalogTable): Seq[File] =
+ getFiles(tableMetadata, "part-")
+
+ private def getFiles(tableMetadata: CatalogTable, prefix: String): Seq[File] = {
+ val location = tableMetadata.location
+ val files = new File(location).listFiles()
+ val suffix = getDataFileSuffix()
+ files.filter(f =>
+ f.getName.startsWith(prefix)
+ && f.getName.endsWith(suffix))
+ }
+
+ def getDataFileSuffix(): String
+
+ def getMergingFiles(tableMetadata: CatalogTable): Seq[File] =
+ new File(tableMetadata.location.getPath + File.separator + ".compact").listFiles().flatMap(
+ _.listFiles()).filter(_.getName.startsWith(AbstractFileMerger.mergingFilePrefix + "-"))
+
+ def getMergedDataFiles(tableMetadata: CatalogTable): Seq[File] =
+ getFiles(tableMetadata, AbstractFileMerger.mergedFilePrefix + "-")
+
+ def withRandomTable(f: (String, Int, Int) => Unit)(implicit
+ messageCountPerFile: Int = Random.nextInt(1000) + 1000,
+ fileCount: Int = Random.nextInt(10) + 10): Unit = {
+ val tableName =
+ generateRandomTable(getTableSource(), messageCountPerFile, fileCount, getTableCodec())
+ withTable(tableName) {
+ f(tableName, messageCountPerFile, fileCount)
+ }
+ }
+
+ def generateRandomTable(
+ tableSource: String,
+ messageCountPerFile: Int,
+ fileCount: Int,
+ codec: Option[String]): String = {
+ val tableName = getRandomTableName()
+ sql(s"CREATE TABLE $tableName (key INT, value STRING) USING ${tableSource}" +
+ s" ${codec.map(c => s"OPTIONS('compression' '$c')").getOrElse("")}")
+ .show()
+
+ 0 until fileCount foreach { i =>
+ logInfo(s"inserting data into table ranges between " +
+ s"${i * messageCountPerFile} and $messageCountPerFile")
+
+ sql(s"""insert into $tableName
+ select /*+ COALESCE(1) */id, java_method('java.util.UUID', 'randomUUID')
+ from range(${i * messageCountPerFile}, ${i * messageCountPerFile + messageCountPerFile})""")
+ .show()
+ }
+
+ tableName
+ }
+
+ def getRandomTableName(): String = {
+ s"small_file_table_${Random.alphanumeric.take(10).mkString}"
+ }
+
+ private def getAllFiles(tableMetadata: CatalogTable): Seq[File] =
+ new File(tableMetadata.location).listFiles()
+
+ test("generate random table") {
+ withRandomTable { (tableName, messageCountPerFile, fileCount) =>
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val files = getDataFiles(tableMetadata)
+ getAllFiles(tableMetadata).foreach(f => logInfo("all file: " + f.getAbsolutePath))
+ assert(files.length == fileCount)
+ val messageCount = sql(s"select count(1) from ${tableName}").collect().head.getLong(0)
+ assert(messageCount == messageCountPerFile * fileCount)
+ }
+ }
+
+ test(s"compact table") {
+ withRandomTable { (tableName, messageCountPerFile, fileCount) =>
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val files = getDataFiles(tableMetadata)
+ assert(files.length == fileCount)
+ files.foreach(f => logInfo("merging file: " + f.getAbsolutePath))
+
+ sql(s"compact table $tableName").show()
+ val mergedTableMetadata =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val mergedFiles = getMergedDataFiles(mergedTableMetadata)
+ mergedFiles.foreach(f => logInfo("merged file: " + f.getAbsolutePath))
+ assert(mergedFiles.length == 1)
+ sql(s"refresh table $tableName").show()
+ val messageCount = sql(s"select count(1) from $tableName").collect().head.getLong(0)
+ assert(messageCount == messageCountPerFile * fileCount)
+ }
+ }
+
+ test(s"validating records") {
+ withRandomTable { (tableName, messageCountPerFile, fileCount) =>
+ val records =
+ sql(s"select * from $tableName").collect().map(r => r.getInt(0) -> r.getString(1)).toMap
+
+ records.foreach { r =>
+ logInfo("records: " + r)
+ }
+
+ sql(s"compact table $tableName").show()
+ sql(s"refresh table $tableName").show()
+ val mergedRecords =
+ sql(s"select * from $tableName").collect().map(r => r.getInt(0) -> r.getString(1)).toMap
+
+ mergedRecords.foreach { r =>
+ logInfo("merged records: " + r)
+ }
+ assert(records.size == mergedRecords.size)
+ assert(records == mergedRecords)
+ }
+ }
+
+ test(s"result view") {
+ withRandomTable { (tableName, messageCountPerFile, fileCount) =>
+ sql(s"compact table $tableName").show()
+ val viewOpt = spark.sessionState.catalog.getTempView(
+ CompactTable.mergedFilesCachedTableName)
+ assert(viewOpt.isDefined)
+ val view = viewOpt.get
+ assert(view.isTempView)
+ val result = sql(s"select * from ${CompactTable.mergedFilesCachedTableName}").collect()
+ assert(result.length == 1)
+ result.foreach { r =>
+ logInfo("result: " + r)
+ }
+ assert(result.head.getString(2) == getTableSource())
+ val mergedFileName =
+ result.head.getList(4).get(0).asInstanceOf[GenericRowWithSchema].getString(1)
+ val mergedTableMetadata =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val mergedFile = getMergedDataFiles(mergedTableMetadata).head
+ assert(mergedFileName == mergedFile.getName)
+
+ }(Random.nextInt(1000) + 100, 2)
+ }
+
+ test("compact table list") {
+ withRandomTable { (tableName, messageCountPerFile, fileCount) =>
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val files = getDataFiles(tableMetadata)
+ assert(files.length == fileCount)
+
+ sql(s"compact table $tableName list").show()
+ val mergedTableMetadata =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val mergedFiles = getMergedDataFiles(mergedTableMetadata)
+ assert(mergedFiles.isEmpty)
+ sql(s"select * from ${CompactTable.mergedFilesCachedTableName}").show(truncate = false)
+ val result = sql(s"select * from ${CompactTable.mergedFilesCachedTableName}").collect()
+ assert(result.length == 1)
+ assert(result.head.getString(2) == getTableSource())
+ }
+ }
+
+ test("compact table retain") {
+ withRandomTable { (tableName, messageCountPerFile, fileCount) =>
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val files = getDataFiles(tableMetadata)
+ assert(files.length == fileCount)
+
+ sql(s"compact table $tableName retain").show()
+ val mergedTableMetadata =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val mergedFiles = getMergedDataFiles(mergedTableMetadata)
+ assert(mergedFiles.length == 1)
+
+ val allFile = getMergingFiles(tableMetadata)
+ assert(allFile.length == fileCount)
+ }
+ }
+
+ test("recover compact table") {
+ withRandomTable { (tableName, messageCountPerFile, fileCount) =>
+ val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val files = getDataFiles(tableMetadata)
+ assert(files.length == fileCount)
+
+ sql(s"compact table $tableName retain").show()
+ val mergedTableMetadata =
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ val mergedFiles = getMergedDataFiles(mergedTableMetadata)
+ assert(mergedFiles.length == 1)
+ sql(s"recover compact table $tableName").show()
+ val recoveredFiles = getDataFiles(tableMetadata)
+ assert(recoveredFiles.length == files.length)
+ files.foreach { f =>
+ assert(recoveredFiles.exists(_.getName == f.getName))
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTableParserSuite.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTableParserSuite.scala
new file mode 100644
index 00000000000..0302b3ddd4a
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTableParserSuite.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.parser.ParseException
+
+import org.apache.kyuubi.sql.SparkKyuubiSparkSQLParser
+import org.apache.kyuubi.sql.compact.{CompactTableOptions, CompactTableStatement, RecoverCompactTableStatement}
+
+class CompactTableParserSuite extends KyuubiSparkSQLExtensionTest {
+
+ test("parse compact table statement without target size") {
+ val statement = s"COMPACT TABLE db1.t1"
+ val parser = new SparkKyuubiSparkSQLParser(spark.sessionState.sqlParser)
+ val parsed = parser.parsePlan(statement)
+ assert(parsed.isInstanceOf[CompactTableStatement])
+ val compactTableStatement = parsed.asInstanceOf[CompactTableStatement]
+ assert(compactTableStatement.tableParts === Seq("db1", "t1"))
+ assert(compactTableStatement.targetSizeInMB === None)
+ assert(CompactTableOptions.CleanupStagingFolder === compactTableStatement.options)
+ }
+
+ test("parse compact table statement with target size") {
+ val targetSize = new Random(1).nextInt(256) + 1
+ val statement = s"COMPACT TABLE db1.t1 INTO ${targetSize} MB"
+ val parser = new SparkKyuubiSparkSQLParser(spark.sessionState.sqlParser)
+ val parsed = parser.parsePlan(statement)
+ assert(parsed.isInstanceOf[CompactTableStatement])
+ val compactTableStatement = parsed.asInstanceOf[CompactTableStatement]
+ assert(compactTableStatement.tableParts === Seq("db1", "t1"))
+ assert(compactTableStatement.targetSizeInMB === Some(targetSize))
+ assert(CompactTableOptions.CleanupStagingFolder === compactTableStatement.options)
+ }
+
+ test("parse compact table statement with unsupported target size unit") {
+ val targetSize = new Random(1).nextInt(256) + 1
+ val parser = new SparkKyuubiSparkSQLParser(spark.sessionState.sqlParser)
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} M")
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} MB")
+
+ assertThrows[ParseException] {
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} B")
+ }
+
+ assertThrows[ParseException] {
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} K")
+ }
+
+ assertThrows[ParseException] {
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} KB")
+ }
+
+ assertThrows[ParseException] {
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} G")
+ }
+
+ assertThrows[ParseException] {
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} GB")
+ }
+
+ assertThrows[ParseException] {
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} T")
+ }
+
+ assertThrows[ParseException] {
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} TB")
+ }
+ }
+
+ test("parse compact table statement with unsupported action") {
+ val targetSize = new Random(1).nextInt(256) + 1
+ val parser = new SparkKyuubiSparkSQLParser(spark.sessionState.sqlParser)
+
+ assertThrows[ParseException] {
+ parser.parsePlan(s"COMPACT TABLE db1.t1 INTO ${targetSize} M ${Random.alphanumeric.take(10)}")
+ }
+ }
+
+ test("parse compact table statement with retain options") {
+ val targetSize = new Random(1).nextInt(256) + 1
+ val statement = s"COMPACT TABLE db1.t1 INTO ${targetSize} MB retain"
+ val parser = new SparkKyuubiSparkSQLParser(spark.sessionState.sqlParser)
+ val parsed = parser.parsePlan(statement)
+ assert(parsed.isInstanceOf[CompactTableStatement])
+ val compactTableStatement = parsed.asInstanceOf[CompactTableStatement]
+ assert(compactTableStatement.tableParts === Seq("db1", "t1"))
+ assert(compactTableStatement.targetSizeInMB === Some(targetSize))
+ assert(CompactTableOptions.RetainStagingFolder === compactTableStatement.options)
+ }
+
+ test("parse compact table statement with retain options, without target size") {
+ val statement = s"COMPACT TABLE db1.t1 retain"
+ val parser = new SparkKyuubiSparkSQLParser(spark.sessionState.sqlParser)
+ val parsed = parser.parsePlan(statement)
+ assert(parsed.isInstanceOf[CompactTableStatement])
+ val compactTableStatement = parsed.asInstanceOf[CompactTableStatement]
+ assert(compactTableStatement.tableParts === Seq("db1", "t1"))
+ assert(compactTableStatement.targetSizeInMB === None)
+ assert(CompactTableOptions.RetainStagingFolder === compactTableStatement.options)
+ }
+
+ test("parse compact table statement with list options") {
+ val targetSize = new Random(1).nextInt(256) + 1
+ val statement = s"COMPACT TABLE db1.t1 INTO ${targetSize} MB list"
+ val parser = new SparkKyuubiSparkSQLParser(spark.sessionState.sqlParser)
+ val parsed = parser.parsePlan(statement)
+ assert(parsed.isInstanceOf[CompactTableStatement])
+ val compactTableStatement = parsed.asInstanceOf[CompactTableStatement]
+ assert(compactTableStatement.tableParts === Seq("db1", "t1"))
+ assert(compactTableStatement.targetSizeInMB === Some(targetSize))
+ assert(CompactTableOptions.DryRun === compactTableStatement.options)
+ }
+
+ test("parse compact table statement with list options, without target size") {
+ val statement = s"COMPACT TABLE db1.t1 list"
+ val parser = new SparkKyuubiSparkSQLParser(spark.sessionState.sqlParser)
+ val parsed = parser.parsePlan(statement)
+ assert(parsed.isInstanceOf[CompactTableStatement])
+ val compactTableStatement = parsed.asInstanceOf[CompactTableStatement]
+ assert(compactTableStatement.tableParts === Seq("db1", "t1"))
+ assert(compactTableStatement.targetSizeInMB === None)
+ assert(CompactTableOptions.DryRun === compactTableStatement.options)
+ }
+
+ test("parse recover compact table statement") {
+ val statement = s"RECOVER COMPACT TABLE db1.t1"
+ val parser = new SparkKyuubiSparkSQLParser(spark.sessionState.sqlParser)
+ val parsed = parser.parsePlan(statement)
+ assert(parsed.isInstanceOf[RecoverCompactTableStatement])
+ val compactTableStatement = parsed.asInstanceOf[RecoverCompactTableStatement]
+ assert(compactTableStatement.tableParts === Seq("db1", "t1"))
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTableResolverStrategySuite.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTableResolverStrategySuite.scala
new file mode 100644
index 00000000000..d5a64e3a127
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTableResolverStrategySuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, RepartitionByExpression}
+import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.kyuubi.sql.compact._
+
+class CompactTableResolverStrategySuite extends KyuubiSparkSQLExtensionTest {
+
+ def createRandomTable(): String = {
+ val tableName = s"small_file_table_${Random.alphanumeric.take(10).mkString}"
+ spark.sql(s"CREATE TABLE ${tableName} (key INT, val_str STRING) USING csv").show()
+ tableName
+ }
+
+ test("compact table execution plan") {
+ val tableName = createRandomTable()
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ val result = spark.sql(s"compact table ${tableName}")
+ result.show()
+ val groupId = CompactTable.smallFileCollectOutputAttribute.head
+ result.queryExecution.analyzed match {
+ case CachePerformanceViewCommand(
+ Seq(CompactTable.mergedFilesCachedTableName),
+ SmallFileMerge(
+ RepartitionByExpression(
+ Seq(AttributeReference(groupId.name, groupId.dataType, groupId.nullable, _)),
+ SmallFileCollect(LogicalRelation(_, _, Some(table), _), None),
+ None,
+ None),
+ false),
+ Seq(originalFileLocation),
+ CompactTableOptions.CleanupStagingFolder)
+ if table.identifier.table == tableName
+ && table.location.toString == originalFileLocation => // ok
+ case CachePerformanceViewCommand(
+ Seq(CompactTable.mergedFilesCachedTableName),
+ SmallFileMerge(
+ RepartitionByExpression(
+ Seq(AttributeReference(groupId.name, groupId.dataType, groupId.nullable, _)),
+ SmallFileCollect(LogicalRelation(_, _, Some(table), _), None),
+ None,
+ None),
+ false),
+ Seq(originalFileLocation),
+ CompactTableOptions.CleanupStagingFolder
+ ) => // not ok
+ log.info(s"result.queryExecution.analyzed: ${result.queryExecution.analyzed}")
+ case other => fail(s"Unexpected plan: $other, should be CachePerformanceViewCommand")
+ }
+
+ result.queryExecution.optimizedPlan match {
+ case CommandResult(
+ _,
+ CachePerformanceViewCommand(
+ Seq(CompactTable.mergedFilesCachedTableName),
+ SmallFileMerge(
+ RepartitionByExpression(
+ Seq(AttributeReference(groupId.name, groupId.dataType, groupId.nullable, _)),
+ SmallFileCollect(LogicalRelation(_, _, Some(table), _), None),
+ None,
+ None),
+ false),
+ Seq(originalFileLocation),
+ CompactTableOptions.CleanupStagingFolder),
+ _,
+ Seq())
+ if table.identifier.table == tableName
+ && originalFileLocation == table.location.toString => // ok
+ case other => fail(s"Unexpected plan: $other, should be CachePerformanceViewCommand")
+ }
+
+ result.queryExecution.executedPlan match {
+ case CommandResultExec(
+ output,
+ ExecutedCommandExec(CachePerformanceViewCommand(
+ Seq(CompactTable.mergedFilesCachedTableName),
+ SmallFileMerge(
+ RepartitionByExpression(
+ Seq(AttributeReference(groupId.name, groupId.dataType, groupId.nullable, _)),
+ SmallFileCollect(LogicalRelation(_, _, Some(table), _), None),
+ None,
+ None),
+ false),
+ Seq(originalFileLocation),
+ CompactTableOptions.CleanupStagingFolder)),
+ Seq())
+ if table.identifier.table == tableName
+ && table.location.toString == originalFileLocation => // ok
+ case other => fail(s"Unexpected plan: $other, should be CachePerformanceViewCommand")
+ }
+ }
+ }
+ }
+
+ test("recover compact table execution plan") {
+ val tableName = createRandomTable()
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ val result = spark.sql(s"recover compact table ${tableName}")
+ result.show()
+ result.queryExecution.analyzed match {
+ case RecoverCompactTableCommand(catalogTable: CatalogTable)
+ if catalogTable.identifier.table == tableName => // ok
+ case other => fail(s"Unexpected plan: $other, should be RecoverCompactTableCommand")
+ }
+
+ result.queryExecution.optimizedPlan match {
+ case CommandResult(_, RecoverCompactTableCommand(catalogTable: CatalogTable), _, Seq())
+ if catalogTable.identifier.table == tableName => // ok
+ case other => fail(s"Unexpected plan: $other, should be RecoverCompactTableCommand")
+ }
+
+ result.queryExecution.executedPlan match {
+ case CommandResultExec(
+ _,
+ ExecutedCommandExec(RecoverCompactTableCommand(catalogTable: CatalogTable)),
+ Seq())
+ if catalogTable.identifier.table == tableName => // ok
+ case other => fail(s"Unexpected plan: $other, should be RecoverCompactTableCommand")
+ }
+
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTableSuite.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTableSuite.scala
new file mode 100644
index 00000000000..b000ac5b937
--- /dev/null
+++ b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/CompactTableSuite.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+class CompactJsonTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "json"
+ override def getTableCodec(): Option[String] = None
+ override def getDataFileSuffix(): String = ".json"
+}
+
+class CompactDeflateJsonTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "json"
+ override def getTableCodec(): Option[String] = Some("deflate")
+ override def getDataFileSuffix(): String = ".json.deflate"
+}
+
+class CompactLz4JsonTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "json"
+ override def getTableCodec(): Option[String] = Some("lz4")
+ override def getDataFileSuffix(): String = ".json.lz4"
+}
+
+class CompactSnappyJsonTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "json"
+ override def getTableCodec(): Option[String] = Some("snappy")
+ override def getDataFileSuffix(): String = ".json.snappy"
+}
+
+class CompactGzJsonTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "json"
+ override def getTableCodec(): Option[String] = Some("gzip")
+ override def getDataFileSuffix(): String = ".json.gz"
+}
+
+class CompactBzipJsonTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "json"
+ override def getTableCodec(): Option[String] = Some("bzip2")
+ override def getDataFileSuffix(): String = ".json.bz2"
+}
+
+class CompactCsvTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "csv"
+ override def getTableCodec(): Option[String] = None
+ override def getDataFileSuffix(): String = ".csv"
+}
+
+class CompactGzCsvTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "csv"
+ override def getTableCodec(): Option[String] = Some("gzip")
+ override def getDataFileSuffix(): String = ".csv.gz"
+}
+
+class CompactBzipCsvTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "csv"
+ override def getTableCodec(): Option[String] = Some("bzip2")
+ override def getDataFileSuffix(): String = ".csv.bz2"
+}
+
+class CompactParquetTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "parquet"
+ override def getTableCodec(): Option[String] = None
+ override def getDataFileSuffix(): String = ".parquet"
+}
+
+class CompactSnappyParquetTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "parquet"
+ override def getTableCodec(): Option[String] = Some("snappy")
+ override def getDataFileSuffix(): String = ".snappy.parquet"
+}
+
+class CompactZstdParquetTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "parquet"
+ override def getTableCodec(): Option[String] = Some("zstd")
+ override def getDataFileSuffix(): String = ".zstd.parquet"
+}
+
+class CompactAvroTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "avro"
+ override def getTableCodec(): Option[String] = None
+ override def getDataFileSuffix(): String = ".avro"
+}
+
+class CompactSnappyAvroTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "avro"
+ override def getTableCodec(): Option[String] = Some("snappy")
+ override def getDataFileSuffix(): String = ".avro"
+}
+
+class CompactOrcTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "orc"
+ override def getTableCodec(): Option[String] = None
+ override def getDataFileSuffix(): String = ".orc"
+}
+
+class CompactLz4OrcTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "orc"
+ override def getTableCodec(): Option[String] = Some("lz4")
+ override def getDataFileSuffix(): String = ".lz4.orc"
+}
+
+class CompactZlibOrcTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "orc"
+ override def getTableCodec(): Option[String] = Some("zlib")
+ override def getDataFileSuffix(): String = ".zlib.orc"
+}
+
+class CompactSnappyOrcTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "orc"
+ override def getTableCodec(): Option[String] = Some("snappy")
+ override def getDataFileSuffix(): String = ".snappy.orc"
+}
+
+class CompactZstdOrcTableSuiteBase extends CompactTablSuiteBase {
+ override def getTableSource(): String = "orc"
+ override def getTableCodec(): Option[String] = Some("zstd")
+ override def getDataFileSuffix(): String = ".zstd.orc"
+}