Skip to content

Commit a86f260

Browse files
authored
sync up with compact-table branch (#1)
apache#6695
1 parent ebe7e92 commit a86f260

32 files changed

+2289
-1
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<!--
2+
- Licensed to the Apache Software Foundation (ASF) under one or more
3+
- contributor license agreements. See the NOTICE file distributed with
4+
- this work for additional information regarding copyright ownership.
5+
- The ASF licenses this file to You under the Apache License, Version 2.0
6+
- (the "License"); you may not use this file except in compliance with
7+
- the License. You may obtain a copy of the License at
8+
-
9+
- http://www.apache.org/licenses/LICENSE-2.0
10+
-
11+
- Unless required by applicable law or agreed to in writing, software
12+
- distributed under the License is distributed on an "AS IS" BASIS,
13+
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
- See the License for the specific language governing permissions and
15+
- limitations under the License.
16+
-->
17+
18+
# Compact Table Command Support
19+
20+
It's a new spark sql command to compact small files in a table into larger files, such as 128MB.
21+
After compacting is done, it create a temporary view to query the compacted file details.
22+
23+
Instead of read-write the whole data in a table, it only merges data in the binary and file level,
24+
and it's more efficient.
25+
26+
## syntax
27+
28+
### compact table
29+
30+
```sparksql
31+
compact table table_name [INTO ${targetFileSize} ${targetFileSizeUnit} ] [ cleanup | retain | list ]
32+
-- targetFileSizeUnit can be 'b','k','m','g','t','p'
33+
-- cleanup means cleaning compact staging folders, which contains original small files, default behavior
34+
-- retain means retaining compact staging folders, for testing, and we can recover with the staging data
35+
-- list means this command only get the merging result, and don't run actually
36+
```
37+
38+
### recover table
39+
40+
```sparksql
41+
corecover mpact table table_name
42+
-- recover the compacted table, and restore the small files from staging to the original location
43+
```
44+
45+
## example
46+
47+
The following command will compact the small files in the table `default.small_files_table` into 128MB files, and create
48+
a temporary view `v_merged_files` to query the compacted file details.
49+
50+
```sparksql
51+
set spark.sql.shuffle.partitions=32;
52+
53+
compact table default.small_files_table;
54+
55+
select * from v_merged_files;
56+
```
57+

extensions/spark/kyuubi-extension-spark-3-5/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@
9999
<scope>test</scope>
100100
</dependency>
101101

102+
<dependency>
103+
<groupId>org.apache.spark</groupId>
104+
<artifactId>spark-avro_${scala.binary.version}</artifactId>
105+
<version>${spark.version}</version>
106+
<scope>test</scope>
107+
</dependency>
108+
102109
<dependency>
103110
<groupId>org.apache.hadoop</groupId>
104111
<artifactId>hadoop-client-runtime</artifactId>

extensions/spark/kyuubi-extension-spark-3-5/src/main/antlr4/org/apache/kyuubi/sql/KyuubiSparkSQL.g4

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ singleStatement
5151

5252
statement
5353
: OPTIMIZE multipartIdentifier whereClause? zorderClause #optimizeZorder
54+
| COMPACT TABLE multipartIdentifier
55+
(INTO targetFileSize=INTEGER_VALUE FILE_SIZE_UNIT_LITERAL)?
56+
(action=compactAction)? #compactTable
57+
| RECOVER COMPACT TABLE multipartIdentifier #recoverCompactTable
5458
| .*? #passThrough
5559
;
5660

@@ -62,6 +66,9 @@ zorderClause
6266
: ZORDER BY order+=multipartIdentifier (',' order+=multipartIdentifier)*
6367
;
6468

69+
compactAction
70+
: CLEANUP | RETAIN | LIST
71+
;
6572
// We don't have an expression rule in our grammar here, so we just grab the tokens and defer
6673
// parsing them to later.
6774
predicateToken
@@ -101,6 +108,12 @@ nonReserved
101108
| ZORDER
102109
;
103110

111+
COMPACT: 'COMPACT';
112+
INTO: 'INTO';
113+
RECOVER: 'RECOVER';
114+
CLEANUP: 'CLEANUP';
115+
RETAIN:'RETAIN';
116+
LIST:'LIST';
104117
AND: 'AND';
105118
BY: 'BY';
106119
FALSE: 'FALSE';
@@ -115,7 +128,9 @@ WHERE: 'WHERE';
115128
ZORDER: 'ZORDER';
116129

117130
MINUS: '-';
118-
131+
FILE_SIZE_UNIT_LITERAL:
132+
'M' | 'MB'
133+
;
119134
BIGINT_LITERAL
120135
: DIGIT+ 'L'
121136
;

extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLAstBuilder.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
3030
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Sort}
3131

3232
import org.apache.kyuubi.sql.KyuubiSparkSQLParser._
33+
import org.apache.kyuubi.sql.compact.{CompactTableOptions, CompactTableStatement, RecoverCompactTableStatement}
3334
import org.apache.kyuubi.sql.zorder.{OptimizeZorderStatement, Zorder}
3435

3536
class KyuubiSparkSQLAstBuilder extends KyuubiSparkSQLBaseVisitor[AnyRef] with SQLConfHelper {
@@ -127,6 +128,20 @@ class KyuubiSparkSQLAstBuilder extends KyuubiSparkSQLBaseVisitor[AnyRef] with SQ
127128
UnparsedPredicateOptimize(tableIdent, predicate, orderExpr)
128129
}
129130

131+
override def visitCompactTable(ctx: CompactTableContext): CompactTableStatement =
132+
withOrigin(ctx) {
133+
val tableParts = visitMultipartIdentifier(ctx.multipartIdentifier())
134+
val targetFileSize = Option(ctx.targetFileSize).map(_.getText.toLong)
135+
val action = Option(ctx.action).map(_.getText)
136+
CompactTableStatement(tableParts, targetFileSize, CompactTableOptions(action))
137+
}
138+
139+
override def visitRecoverCompactTable(ctx: RecoverCompactTableContext)
140+
: RecoverCompactTableStatement = withOrigin(ctx) {
141+
val tableParts = visitMultipartIdentifier(ctx.multipartIdentifier())
142+
RecoverCompactTableStatement(tableParts)
143+
}
144+
130145
override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null
131146

132147
override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): Seq[String] =

extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.kyuubi.sql
1919

2020
import org.apache.spark.sql.SparkSessionExtensions
2121

22+
import org.apache.kyuubi.sql.compact.CompactTableResolver
2223
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}
2324

2425
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
@@ -32,6 +33,7 @@ object KyuubiSparkSQLCommonExtension {
3233
// inject zorder parser and related rules
3334
extensions.injectParser { case (_, parser) => new SparkKyuubiSparkSQLParser(parser) }
3435
extensions.injectResolutionRule(ResolveZorder)
36+
extensions.injectResolutionRule(CompactTableResolver)
3537

3638
// Note that:
3739
// InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive

extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ package org.apache.kyuubi.sql
1919

2020
import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}
2121

22+
import org.apache.kyuubi.sql.compact.CompactTableSparkStrategy
2223
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
2324

2425
// scalastyle:off line.size.limit
26+
2527
/**
2628
* Depend on Spark SQL Extension framework, we can use this extension follow steps
2729
* 1. move this jar into $SPARK_HOME/jars
@@ -40,6 +42,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
4042
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
4143
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
4244
extensions.injectPlannerStrategy(MaxScanStrategy)
45+
extensions.injectPlannerStrategy(CompactTableSparkStrategy)
4346

4447
extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
4548
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.sql
19+
20+
import java.lang.reflect.Method
21+
22+
import org.apache.parquet.hadoop.ParquetFileWriter
23+
import org.apache.parquet.hadoop.metadata.{FileMetaData, GlobalMetaData}
24+
25+
object ParquetFileWriterWrapper {
26+
27+
val mergeInfoField: Method = classOf[ParquetFileWriter]
28+
.getDeclaredMethod(
29+
"mergeInto",
30+
classOf[FileMetaData],
31+
classOf[GlobalMetaData],
32+
classOf[Boolean])
33+
34+
mergeInfoField.setAccessible(true)
35+
36+
def mergeInto(
37+
toMerge: FileMetaData,
38+
mergedMetadata: GlobalMetaData,
39+
strict: Boolean): GlobalMetaData = {
40+
mergeInfoField.invoke(
41+
null,
42+
toMerge.asInstanceOf[AnyRef],
43+
mergedMetadata.asInstanceOf[AnyRef],
44+
strict.asInstanceOf[AnyRef]).asInstanceOf[GlobalMetaData]
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.sql.compact
19+
20+
import org.apache.hadoop.fs.FileSystem
21+
import org.apache.spark.sql.{Row, SparkInternalExplorer, SparkSession}
22+
import org.apache.spark.sql.catalyst.plans.QueryPlan
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.execution.command.{DropTableCommand, LeafRunnableCommand}
25+
26+
case class CachePerformanceViewCommand(
27+
tableIdentifier: Seq[String],
28+
performancePlan: LogicalPlan,
29+
originalFileLocations: Seq[String],
30+
options: CompactTableOption) extends LeafRunnableCommand {
31+
32+
override def innerChildren: Seq[QueryPlan[_]] = Seq(performancePlan)
33+
34+
override def run(sparkSession: SparkSession): Seq[Row] = {
35+
val dropViewCommand = DropTableCommand(
36+
CompactTableUtils.getTableIdentifier(tableIdentifier),
37+
ifExists = true,
38+
isView = true,
39+
purge = true)
40+
dropViewCommand.run(sparkSession)
41+
42+
val speculation =
43+
sparkSession.sparkContext.getConf.getBoolean(
44+
SparkInternalExplorer.SPECULATION_ENABLED_SYNONYM.key,
45+
defaultValue = false)
46+
if (speculation) {
47+
sparkSession.sparkContext.getConf.set(
48+
SparkInternalExplorer.SPECULATION_ENABLED_SYNONYM.key,
49+
"false")
50+
log.warn("set spark.speculation to false")
51+
}
52+
try {
53+
val cacheTableCommand =
54+
SparkInternalExplorer.CacheTableAsSelectExec(tableIdentifier.head, performancePlan)
55+
56+
// this result always empty
57+
cacheTableCommand.run()
58+
59+
if (options == CompactTableOptions.CleanupStagingFolder) {
60+
val fileSystem = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
61+
originalFileLocations.foreach { originalFileLocation =>
62+
val compactStagingDir = CompactTableUtils.getCompactStagingDir(originalFileLocation)
63+
fileSystem.delete(compactStagingDir, true)
64+
}
65+
66+
}
67+
} finally {
68+
if (speculation) {
69+
sparkSession.sparkContext.getConf.set(
70+
SparkInternalExplorer.SPECULATION_ENABLED_SYNONYM.key,
71+
"true")
72+
log.warn("rollback spark.speculation to true")
73+
}
74+
}
75+
Seq.empty[Row]
76+
}
77+
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.sql.compact
19+
20+
import org.apache.spark.sql.catalyst.analysis.UnresolvedUnaryNode
21+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
22+
import org.apache.spark.sql.catalyst.plans.logical.{LeafParsedStatement, LogicalPlan}
23+
import org.apache.spark.sql.types._
24+
25+
object CompactTable {
26+
private val fileLocAndSizeStructArrayType: ArrayType =
27+
DataTypes.createArrayType(DataTypes.createStructType(Array(
28+
DataTypes.createStructField("sub_group_id", IntegerType, false),
29+
DataTypes.createStructField("name", StringType, false),
30+
DataTypes.createStructField("length", LongType, false))))
31+
32+
val smallFileCollectOutput: StructType = DataTypes.createStructType(Array(
33+
DataTypes.createStructField("group_id", IntegerType, false),
34+
DataTypes.createStructField("location", StringType, false),
35+
DataTypes.createStructField("data_source", StringType, false),
36+
DataTypes.createStructField("codec", StringType, true),
37+
DataTypes.createStructField("smallFiles", fileLocAndSizeStructArrayType, false)))
38+
39+
val smallFileCollectOutputAttribute: Seq[AttributeReference] = smallFileCollectOutput
40+
.map(field => AttributeReference(field.name, field.dataType, field.nullable)())
41+
42+
val mergedFilesCachedTableName = "v_merged_files"
43+
val mergeMetadataKey = "spark.sql.compact.parquet.metadata.merge"
44+
}
45+
46+
trait CompactTableOption
47+
48+
object CompactTableOptions {
49+
def apply(options: Option[String]): CompactTableOption = options.map(_.toLowerCase) match {
50+
case Some("retain") => RetainStagingFolder
51+
case Some("list") => DryRun
52+
case _ => CleanupStagingFolder
53+
}
54+
55+
case object CleanupStagingFolder extends CompactTableOption
56+
57+
case object RetainStagingFolder extends CompactTableOption
58+
59+
case object DryRun extends CompactTableOption
60+
}
61+
62+
case class CompactTable(
63+
child: LogicalPlan,
64+
targetSizeInBytes: Option[Long],
65+
options: CompactTableOption) extends UnresolvedUnaryNode {
66+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
67+
CompactTable(newChild, targetSizeInBytes, options)
68+
}
69+
}
70+
71+
case class CompactTableStatement(
72+
tableParts: Seq[String],
73+
targetSizeInMB: Option[Long],
74+
options: CompactTableOption) extends LeafParsedStatement
75+
76+
case class RecoverCompactTableStatement(tableParts: Seq[String])
77+
extends LeafParsedStatement
78+
79+
case class RecoverCompactTable(child: LogicalPlan) extends UnresolvedUnaryNode {
80+
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
81+
RecoverCompactTable(newChild)
82+
}
83+
}

0 commit comments

Comments
 (0)