Skip to content

Commit 2ac6163

Browse files
steveloughranMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-23977][SQL] Support High Performance S3A committers [test-hadoop3.2]
This patch adds the binding classes to enable spark to switch dataframe output to using the S3A zero-rename committers shipping in Hadoop 3.1+. It adds a source tree into the hadoop-cloud-storage module which only compiles with the hadoop-3.2 profile, and contains a binding for normal output and a specific bridge class for Parquet (as the parquet output format requires a subclass of `ParquetOutputCommitter`. Commit algorithms are a critical topic. There's no formal proof of correctness, but the algorithms are documented an analysed in [A Zero Rename Committer](https://github.com/steveloughran/zero-rename-committer/releases). This also reviews the classic v1 and v2 algorithms, IBM's swift committer and the one from EMRFS which they admit was based on the concepts implemented here. Test-wise * There's a public set of scala test suites [on github](https://github.com/hortonworks-spark/cloud-integration) * We have run integration tests against Spark on Yarn clusters. * This code has been shipping for ~12 months in HDP-3.x. Closes apache#24970 from steveloughran/cloud/SPARK-23977-s3a-committer. Authored-by: Steve Loughran <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent ba5ee27 commit 2ac6163

File tree

9 files changed

+741
-18
lines changed

9 files changed

+741
-18
lines changed

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.util.Utils
4242
* 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job
4343
* failed to execute (e.g. too many failed tasks), the job should call abortJob.
4444
*/
45-
abstract class FileCommitProtocol {
45+
abstract class FileCommitProtocol extends Logging {
4646
import FileCommitProtocol._
4747

4848
/**
@@ -129,7 +129,9 @@ abstract class FileCommitProtocol {
129129
* before the job has finished. These same task commit messages will be passed to commitJob()
130130
* if the entire job succeeds.
131131
*/
132-
def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {}
132+
def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
133+
logDebug(s"onTaskCommit($taskCommit)")
134+
}
133135
}
134136

135137

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

+45-9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.internal.io
1919

20+
import java.io.IOException
2021
import java.util.{Date, UUID}
2122

2223
import scala.collection.mutable
@@ -136,7 +137,7 @@ class HadoopMapReduceCommitProtocol(
136137
tmpOutputPath
137138
}
138139

139-
private def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
140+
protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
140141
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
141142
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
142143
// the file name is fine and won't overflow.
@@ -205,11 +206,28 @@ class HadoopMapReduceCommitProtocol(
205206
}
206207
}
207208

209+
/**
210+
* Abort the job; log and ignore any IO exception thrown.
211+
* This is invariably invoked in an exception handler; raising
212+
* an exception here will lose the root cause of the failure.
213+
*
214+
* @param jobContext job context
215+
*/
208216
override def abortJob(jobContext: JobContext): Unit = {
209-
committer.abortJob(jobContext, JobStatus.State.FAILED)
210-
if (hasValidPath) {
211-
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
212-
fs.delete(stagingDir, true)
217+
try {
218+
committer.abortJob(jobContext, JobStatus.State.FAILED)
219+
} catch {
220+
case e: IOException =>
221+
logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
222+
}
223+
try {
224+
if (hasValidPath) {
225+
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
226+
fs.delete(stagingDir, true)
227+
}
228+
} catch {
229+
case e: IOException =>
230+
logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
213231
}
214232
}
215233

@@ -222,17 +240,35 @@ class HadoopMapReduceCommitProtocol(
222240

223241
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
224242
val attemptId = taskContext.getTaskAttemptID
243+
logTrace(s"Commit task ${attemptId}")
225244
SparkHadoopMapRedUtil.commitTask(
226245
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
227246
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
228247
}
229248

249+
/**
250+
* Abort the task; log and ignore any failure thrown.
251+
* This is invariably invoked in an exception handler; raising
252+
* an exception here will lose the root cause of the failure.
253+
*
254+
* @param taskContext context
255+
*/
230256
override def abortTask(taskContext: TaskAttemptContext): Unit = {
231-
committer.abortTask(taskContext)
257+
try {
258+
committer.abortTask(taskContext)
259+
} catch {
260+
case e: IOException =>
261+
logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e)
262+
}
232263
// best effort cleanup of other staged files
233-
for ((src, _) <- addedAbsPathFiles) {
234-
val tmp = new Path(src)
235-
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
264+
try {
265+
for ((src, _) <- addedAbsPathFiles) {
266+
val tmp = new Path(src)
267+
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
268+
}
269+
} catch {
270+
case e: IOException =>
271+
logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e)
236272
}
237273
}
238274
}

docs/cloud-integration.md

+63-7
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ consult the relevant documentation.
125125
### Recommended settings for writing to object stores
126126

127127
For object stores whose consistency model means that rename-based commits are safe
128-
use the `FileOutputCommitter` v2 algorithm for performance:
128+
use the `FileOutputCommitter` v2 algorithm for performance; v1 for safety.
129129

130130
```
131131
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
@@ -143,8 +143,30 @@ job failure:
143143
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true
144144
```
145145

146+
The original v1 commit algorithm renames the output of successful tasks
147+
to a job attempt directory, and then renames all the files in that directory
148+
into the final destination during the job commit phase:
149+
150+
```
151+
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1
152+
```
153+
154+
The slow performance of mimicked renames on Amazon S3 makes this algorithm
155+
very, very slow. The recommended solution to this is switch to an S3 "Zero Rename"
156+
committer (see below).
157+
158+
For reference, here are the performance and safety characteristics of
159+
different stores and connectors when renaming directories:
160+
161+
| Store | Connector | Directory Rename Safety | Rename Performance |
162+
|---------------|-----------|-------------------------|--------------------|
163+
| Amazon S3 | s3a | Unsafe | O(data) |
164+
| Azure Storage | wasb | Safe | O(files) |
165+
| Azure Datalake Gen 2 | abfs | Safe | O(1) |
166+
| Google GCS | gs | Safe | O(1) |
167+
146168
As storing temporary files can run up charges; delete
147-
directories called `"_temporary"` on a regular basis to avoid this.
169+
directories called `"_temporary"` on a regular basis.
148170

149171
### Parquet I/O Settings
150172

@@ -190,15 +212,49 @@ while they are still being written. Applications can write straight to the monit
190212
atomic `rename()` operation.
191213
Otherwise the checkpointing may be slow and potentially unreliable.
192214

215+
## Committing work into cloud storage safely and fast.
216+
217+
As covered earlier, commit-by-rename is dangerous on any object store which
218+
exhibits eventual consistency (example: S3), and often slower than classic
219+
filesystem renames.
220+
221+
Some object store connectors provide custom committers to commit tasks and
222+
jobs without using rename. In versions of Spark built with Hadoop 3.1 or later,
223+
the S3A connector for AWS S3 is such a committer.
224+
225+
Instead of writing data to a temporary directory on the store for renaming,
226+
these committers write the files to the final destination, but do not issue
227+
the final POST command to make a large "multi-part" upload visible. Those
228+
operations are postponed until the job commit itself. As a result, task and
229+
job commit are much faster, and task failures do not affect the result.
230+
231+
To switch to the S3A committers, use a version of Spark was built with Hadoop
232+
3.1 or later, and switch the committers through the following options.
233+
234+
```
235+
spark.hadoop.fs.s3a.committer.name directory
236+
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
237+
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
238+
```
239+
240+
It has been tested with the most common formats supported by Spark.
241+
242+
```python
243+
mydataframe.write.format("parquet").save("s3a://bucket/destination")
244+
```
245+
246+
More details on these committers can be found in the latest Hadoop documentation.
247+
193248
## Further Reading
194249

195250
Here is the documentation on the standard connectors both from Apache and the cloud providers.
196251

197-
* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). Hadoop 2.6+
198-
* [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Since Hadoop 2.7
199-
* [Azure Data Lake](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). Since Hadoop 2.8
200-
* [Amazon S3 via S3A and S3N](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Hadoop 2.6+
252+
* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
253+
* [Azure Blob Storage and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
254+
* [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
255+
* [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
256+
* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
201257
* [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon
202258
* [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector). From Google
203-
259+
* [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver)
204260

hadoop-cloud/pom.xml

+39
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,45 @@
198198
-->
199199
<profile>
200200
<id>hadoop-3.2</id>
201+
<properties>
202+
<extra.source.dir>src/hadoop-3/main/scala</extra.source.dir>
203+
<extra.testsource.dir>src/hadoop-3/test/scala</extra.testsource.dir>
204+
</properties>
205+
206+
<build>
207+
<plugins>
208+
<plugin>
209+
<groupId>org.codehaus.mojo</groupId>
210+
<artifactId>build-helper-maven-plugin</artifactId>
211+
<executions>
212+
<execution>
213+
<id>add-scala-sources</id>
214+
<phase>generate-sources</phase>
215+
<goals>
216+
<goal>add-source</goal>
217+
</goals>
218+
<configuration>
219+
<sources>
220+
<source>${extra.source.dir}</source>
221+
</sources>
222+
</configuration>
223+
</execution>
224+
<execution>
225+
<id>add-scala-test-sources</id>
226+
<phase>generate-test-sources</phase>
227+
<goals>
228+
<goal>add-test-source</goal>
229+
</goals>
230+
<configuration>
231+
<sources>
232+
<source>${extra.testsource.dir}</source>
233+
</sources>
234+
</configuration>
235+
</execution>
236+
</executions>
237+
</plugin>
238+
</plugins>
239+
</build>
201240
<dependencies>
202241
<!--
203242
There's now a hadoop-cloud-storage which transitively pulls in the store JARs,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.internal.io.cloud
19+
20+
import java.io.IOException
21+
22+
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
24+
import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter}
25+
import org.apache.parquet.hadoop.ParquetOutputCommitter
26+
27+
import org.apache.spark.internal.Logging
28+
29+
/**
30+
* This Parquet Committer subclass dynamically binds to the factory-configured
31+
* output committer, and is intended to allow callers to use any 'PathOutputCommitter',
32+
* even if not a subclass of 'ParquetOutputCommitter'.
33+
*
34+
* The Parquet `parquet.enable.summary-metadata` option will only be supported
35+
* if the instantiated committer itself supports it.
36+
*/
37+
class BindingParquetOutputCommitter(
38+
path: Path,
39+
context: TaskAttemptContext)
40+
extends ParquetOutputCommitter(path, context) with Logging {
41+
42+
logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path")
43+
44+
private val committer = new BindingPathOutputCommitter(path, context)
45+
46+
/**
47+
* This is the committer ultimately bound to.
48+
* @return the committer instantiated by the factory.
49+
*/
50+
private[cloud] def boundCommitter(): PathOutputCommitter = {
51+
committer.getCommitter
52+
}
53+
54+
override def getWorkPath(): Path = {
55+
committer.getWorkPath()
56+
}
57+
58+
override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = {
59+
committer.setupTask(taskAttemptContext)
60+
}
61+
62+
override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = {
63+
committer.commitTask(taskAttemptContext)
64+
}
65+
66+
override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = {
67+
committer.abortTask(taskAttemptContext)
68+
}
69+
70+
override def setupJob(jobContext: JobContext): Unit = {
71+
committer.setupJob(jobContext)
72+
}
73+
74+
override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean = {
75+
committer.needsTaskCommit(taskAttemptContext)
76+
}
77+
78+
override def cleanupJob(jobContext: JobContext): Unit = {
79+
committer.cleanupJob(jobContext)
80+
}
81+
82+
override def isCommitJobRepeatable(jobContext: JobContext): Boolean = {
83+
committer.isCommitJobRepeatable(jobContext)
84+
}
85+
86+
override def commitJob(jobContext: JobContext): Unit = {
87+
committer.commitJob(jobContext)
88+
}
89+
90+
override def recoverTask(taskAttemptContext: TaskAttemptContext): Unit = {
91+
committer.recoverTask(taskAttemptContext)
92+
}
93+
94+
/**
95+
* Abort the job; log and ignore any IO exception thrown.
96+
* This is invariably invoked in an exception handler; raising
97+
* an exception here will lose the root cause of the failure.
98+
*
99+
* @param jobContext job context
100+
* @param state final state of the job
101+
*/
102+
override def abortJob(jobContext: JobContext, state: JobStatus.State): Unit = {
103+
try {
104+
committer.abortJob(jobContext, state)
105+
} catch {
106+
case e: IOException =>
107+
// swallow exception to avoid problems when called within exception
108+
// handlers
109+
logWarning("Abort job failed", e)
110+
}
111+
}
112+
113+
override def isRecoverySupported: Boolean = {
114+
committer.isRecoverySupported()
115+
}
116+
117+
override def isRecoverySupported(jobContext: JobContext): Boolean = {
118+
committer.isRecoverySupported(jobContext)
119+
}
120+
121+
override def toString: String = s"BindingParquetOutputCommitter($committer)"
122+
}

0 commit comments

Comments
 (0)