From 861dc409d7209c3a8d4518708016d1b843f5c52b Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sat, 19 Oct 2013 16:25:08 -0700 Subject: [PATCH 1/8] Refactor of DiskStore for shuffle file consolidation The main goal of this refactor was to allow the interposition of a new layer which maps logical BlockIds to physical locations other than a file with the same name as the BlockId. In particular, BlockIds will need to be mappable to chunks of files, as multiple will be stored in the same file. In order to accomplish this, the following changes have been made: - Creation of DiskBlockManager, which manages the association of logical BlockIds to physical disk locations (called FileSegments). By default, Blocks are simply mapped to physical files of the same name, as before. - The DiskStore now indirects all requests for a given BlockId through the DiskBlockManager in order to resolve the actual File location. - DiskBlockObjectWriter has been merged into BlockObjectWriter. - The Netty PathResolver has been changed to map BlockIds into FileSegments, as this codepath is the only one that uses Netty, and that is likely to remain the case. Overall, I think this refactor produces a clearer division between the logical Block paradigm and their physical on-disk location. There is now an explicit (and documented) mapping from one to the other. --- .../network/netty/FileServerHandler.java | 17 +- .../spark/network/netty/PathResolver.java | 11 +- .../spark/network/netty/ShuffleSender.scala | 7 +- .../spark/scheduler/ShuffleMapTask.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 33 ++- .../spark/storage/BlockObjectWriter.scala | 90 +++++- .../spark/storage/DiskBlockManager.scala | 179 ++++++++++++ .../org/apache/spark/storage/DiskStore.scala | 266 +++--------------- .../apache/spark/storage/FileSegment.scala | 28 ++ .../spark/storage/ShuffleBlockManager.scala | 2 +- 10 files changed, 366 insertions(+), 269 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/FileSegment.scala diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index cfd813289..ab790b785 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -25,6 +25,7 @@ import io.netty.channel.DefaultFileRegion; import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.FileSegment; class FileServerHandler extends ChannelInboundMessageHandlerAdapter { @@ -37,40 +38,34 @@ public FileServerHandler(PathResolver pResolver){ @Override public void messageReceived(ChannelHandlerContext ctx, String blockIdString) { BlockId blockId = BlockId.apply(blockIdString); - String path = pResolver.getAbsolutePath(blockId.name()); - // if getFilePath returns null, close the channel - if (path == null) { + FileSegment fileSegment = pResolver.getBlockLocation(blockId); + // if getBlockLocation returns null, close the channel + if (fileSegment == null) { //ctx.close(); return; } - File file = new File(path); + File file = fileSegment.file(); if (file.exists()) { if (!file.isFile()) { - //logger.info("Not a file : " + file.getAbsolutePath()); ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); return; } long length = file.length(); if (length > Integer.MAX_VALUE || length <= 0) { - //logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length); ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); return; } int len = new Long(length).intValue(); - //logger.info("Sending block "+blockId+" filelen = "+len); - //logger.info("header = "+ (new FileHeader(len, blockId)).buffer()); ctx.write((new FileHeader(len, blockId)).buffer()); try { ctx.sendFile(new DefaultFileRegion(new FileInputStream(file) - .getChannel(), 0, file.length())); + .getChannel(), fileSegment.offset(), fileSegment.length())); } catch (Exception e) { - //logger.warning("Exception when sending file : " + file.getAbsolutePath()); e.printStackTrace(); } } else { - //logger.warning("File not found: " + file.getAbsolutePath()); ctx.write(new FileHeader(0, blockId).buffer()); } ctx.flush(); diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java index 94c034cad..370fcdeea 100755 --- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java +++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java @@ -17,13 +17,10 @@ package org.apache.spark.network.netty; +import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.FileSegment; public interface PathResolver { - /** - * Get the absolute path of the file - * - * @param fileId - * @return the absolute path of file - */ - public String getAbsolutePath(String fileId); + /** Get the file segment in which the given Block resides. */ + public FileSegment getBlockLocation(BlockId blockId); } diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 1586dff25..546d92106 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.spark.Logging import org.apache.spark.util.Utils -import org.apache.spark.storage.BlockId +import org.apache.spark.storage.{BlockId, FileSegment} private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { @@ -54,8 +54,7 @@ private[spark] object ShuffleSender { val localDirs = args.drop(2).map(new File(_)) val pResovler = new PathResolver { - override def getAbsolutePath(blockIdString: String): String = { - val blockId = BlockId(blockIdString) + override def getBlockLocation(blockId: BlockId): FileSegment = { if (!blockId.isShuffle) { throw new Exception("Block " + blockId + " is not a shuffle block") } @@ -65,7 +64,7 @@ private[spark] object ShuffleSender { val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) val file = new File(subDir, blockId.name) - return file.getAbsolutePath + return new FileSegment(file, 0, file.length()) } } val sender = new ShuffleSender(port, pResovler) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 802791797..ed1b36d18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -167,7 +167,7 @@ private[spark] class ShuffleMapTask( val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => writer.commit() writer.close() - val size = writer.size() + val size = writer.fileSegment().length totalBytes += size MapOutputTracker.compressSize(size) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c67a61515..2f96590c5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -28,7 +28,7 @@ import akka.dispatch.{Await, Future} import akka.util.Duration import akka.util.duration._ -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec @@ -102,18 +102,19 @@ private[spark] class BlockManager( } val shuffleBlockManager = new ShuffleBlockManager(this) + val diskBlockManager = new DiskBlockManager( + System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore: DiskStore = - new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) + private[storage] val diskStore = new DiskStore(this, diskBlockManager) // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private val nettyPort: Int = { val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt - if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0 + if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0 } val connectionManager = new ConnectionManager(0) @@ -567,16 +568,19 @@ private[spark] class BlockManager( /** * A short circuited method to get a block writer that can write data directly to disk. + * The Block will be appended to the File specified by filename. * This is currently used for writing shuffle files out. Callers should handle error * cases. */ - def getDiskBlockWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int) + def getDiskWriter(blockId: BlockId, filename: String, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { - val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize) + val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) + val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true) + val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) writer.registerCloseEventHandler(() => { val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) blockInfo.put(blockId, myInfo) - myInfo.markReady(writer.size()) + myInfo.markReady(writer.fileSegment().length) }) writer } @@ -988,13 +992,24 @@ private[spark] class BlockManager( if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s } + /** Serializes into a stream. */ + def dataSerializeStream( + blockId: BlockId, + outputStream: OutputStream, + values: Iterator[Any], + serializer: Serializer = defaultSerializer) { + val byteStream = new FastBufferedOutputStream(outputStream) + val ser = serializer.newInstance() + ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() + } + + /** Serializes into a byte buffer. */ def dataSerialize( blockId: BlockId, values: Iterator[Any], serializer: Serializer = defaultSerializer): ByteBuffer = { val byteStream = new FastByteArrayOutputStream(4096) - val ser = serializer.newInstance() - ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() + dataSerializeStream(blockId, byteStream, values, serializer) byteStream.trim() ByteBuffer.wrap(byteStream.array) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 2a67800c4..0b5a47299 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -17,6 +17,13 @@ package org.apache.spark.storage +import java.io.{FileOutputStream, File, OutputStream} +import java.nio.channels.FileChannel + +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream + +import org.apache.spark.Logging +import org.apache.spark.serializer.{SerializationStream, Serializer} /** * An interface for writing JVM objects to some underlying storage. This interface allows @@ -59,7 +66,86 @@ abstract class BlockObjectWriter(val blockId: BlockId) { def write(value: Any) /** - * Size of the valid writes, in bytes. + * Returns the file segment of committed data that this Writer has written. */ - def size(): Long + def fileSegment(): FileSegment +} + +/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ +class DiskBlockObjectWriter( + blockId: BlockId, + file: File, + serializer: Serializer, + bufferSize: Int, + compressStream: OutputStream => OutputStream) + extends BlockObjectWriter(blockId) + with Logging +{ + + /** The file channel, used for repositioning / truncating the file. */ + private var channel: FileChannel = null + private var bs: OutputStream = null + private var objOut: SerializationStream = null + private var initialPosition = 0L + private var lastValidPosition = 0L + private var initialized = false + + override def open(): BlockObjectWriter = { + val fos = new FileOutputStream(file, true) + channel = fos.getChannel() + initialPosition = channel.position + lastValidPosition = initialPosition + bs = compressStream(new FastBufferedOutputStream(fos, bufferSize)) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + this + } + + override def close() { + if (initialized) { + objOut.close() + channel = null + bs = null + objOut = null + } + super.close() + } + + override def isOpen: Boolean = objOut != null + + override def commit(): Long = { + if (initialized) { + // NOTE: Flush the serializer first and then the compressed/buffered output stream + objOut.flush() + bs.flush() + val prevPos = lastValidPosition + lastValidPosition = channel.position() + lastValidPosition - prevPos + } else { + // lastValidPosition is zero if stream is uninitialized + lastValidPosition + } + } + + override def revertPartialWrites() { + if (initialized) { + // Discard current writes. We do this by flushing the outstanding writes and + // truncate the file to the last valid position. + objOut.flush() + bs.flush() + channel.truncate(lastValidPosition) + } + } + + override def write(value: Any) { + if (!initialized) { + open() + } + objOut.writeObject(value) + } + + override def fileSegment(): FileSegment = { + val bytesWritten = lastValidPosition - initialPosition + new FileSegment(file, initialPosition, bytesWritten) + } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala new file mode 100644 index 000000000..6ace4eb52 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.spark.executor.ExecutorExitCode +import org.apache.spark.Logging +import org.apache.spark.network.netty.{PathResolver, ShuffleSender} +import org.apache.spark.util.Utils + +/** + * Creates an maintains the logical mapping between logical Blocks and physical on-disk + * locations. By default, one Block is mapped to one file with a name given by its BlockId. + * However, it is also possible to have a Block map to only a segment of a file, by calling + * mapBlockToFileSegment(). + * + * @param rootDirs The directories to use for storing Block files. Data will be hashed among these. + */ +class DiskBlockManager(rootDirs: String) extends PathResolver with Logging { + + private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 + private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt + + // Create one local directory for each path mentioned in spark.local.dir; then, inside this + // directory, create multiple subdirectories that we will hash files into, in order to avoid + // having really large inodes at the top level. + private val localDirs: Array[File] = createLocalDirs() + private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + private var shuffleSender : ShuffleSender = null + + // Stores only Blocks which have been specifically mapped to segments of files + // (rather than the default, which maps a Block to a whole file). + // This keeps our bookkeeping down, since the file system itself tracks the standalone Blocks. + // ConcurrentHashMap does not take a lock on read operations, which makes it very efficient here. + private val blockToFileSegmentMap = new ConcurrentHashMap[BlockId, FileSegment] + + addShutdownHook() + + /** + * Creates a logical mapping from the given BlockId to a segment of a file. + * This will cause any accesses of the logical BlockId to be directed to the specified + * physical location. + */ + def mapBlockToFileSegment(blockId: BlockId, fileSegment: FileSegment) { + blockToFileSegmentMap.put(blockId, fileSegment) + } + + /** + * Returns the phyiscal file segment in which the given BlockId is located. + * If the BlockId has been mapped to a specific FileSegment, that will be returned. + * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly. + */ + def getBlockLocation(blockId: BlockId): FileSegment = { + if (blockToFileSegmentMap.containsKey(blockId)) { + blockToFileSegmentMap.get(blockId) + } else { + val file = getFile(blockId.name) + new FileSegment(file, 0, file.length()) + } + } + + /** + * Simply returns a File to place the given Block into. This does not physically create the file. + * If filename is given, that file will be used. Otherwise, we will use the BlockId to get + * a unique filename. + */ + def createBlockFile(blockId: BlockId, filename: String = "", allowAppending: Boolean): File = { + val actualFilename = if (filename == "") blockId.name else filename + val file = getFile(actualFilename) + if (!allowAppending && file.exists()) { + throw new IllegalStateException( + "Attempted to create file that already exists: " + actualFilename) + } + file + } + + private def getFile(filename: String): File = { + // Figure out which local directory it hashes to, and which subdirectory in that + val hash = Utils.nonNegativeHash(filename) + val dirId = hash % localDirs.length + val subDirId = (hash / localDirs.length) % subDirsPerLocalDir + + // Create the subdirectory if it doesn't already exist + var subDir = subDirs(dirId)(subDirId) + if (subDir == null) { + subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) + newDir.mkdir() + subDirs(dirId)(subDirId) = newDir + newDir + } + } + } + + new File(subDir, filename) + } + + private def createLocalDirs(): Array[File] = { + logDebug("Creating local directories at root dirs '" + rootDirs + "'") + val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") + rootDirs.split(",").map { rootDir => + var foundLocalDir = false + var localDir: File = null + var localDirId: String = null + var tries = 0 + val rand = new Random() + while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { + tries += 1 + try { + localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) + localDir = new File(rootDir, "spark-local-" + localDirId) + if (!localDir.exists) { + foundLocalDir = localDir.mkdirs() + } + } catch { + case e: Exception => + logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e) + } + } + if (!foundLocalDir) { + logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + + " attempts to create local dir in " + rootDir) + System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) + } + logInfo("Created local directory at " + localDir) + localDir + } + } + + private def addShutdownHook() { + localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) + Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { + override def run() { + logDebug("Shutdown hook called") + localDirs.foreach { localDir => + try { + if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) + } catch { + case t: Throwable => + logError("Exception while deleting local spark dir: " + localDir, t) + } + } + + if (shuffleSender != null) { + shuffleSender.stop() + } + } + }) + } + + private[storage] def startShuffleBlockSender(port: Int): Int = { + shuffleSender = new ShuffleSender(port, this) + logInfo("Created ShuffleSender binding to port : " + shuffleSender.port) + shuffleSender.port + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index b7ca61e93..e703a3329 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,120 +17,25 @@ package org.apache.spark.storage -import java.io.{File, FileOutputStream, OutputStream, RandomAccessFile} +import java.io.{FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer -import java.nio.channels.FileChannel import java.nio.channels.FileChannel.MapMode -import java.util.{Random, Date} -import java.text.SimpleDateFormat import scala.collection.mutable.ArrayBuffer -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream - -import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.serializer.{Serializer, SerializationStream} import org.apache.spark.Logging -import org.apache.spark.network.netty.ShuffleSender -import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer import org.apache.spark.util.Utils /** * Stores BlockManager blocks on disk. */ -private class DiskStore(blockManager: BlockManager, rootDirs: String) +private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) extends BlockStore(blockManager) with Logging { - class DiskBlockObjectWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int) - extends BlockObjectWriter(blockId) { - - private val f: File = createFile(blockId /*, allowAppendExisting */) - - // The file channel, used for repositioning / truncating the file. - private var channel: FileChannel = null - private var bs: OutputStream = null - private var objOut: SerializationStream = null - private var lastValidPosition = 0L - private var initialized = false - - override def open(): DiskBlockObjectWriter = { - val fos = new FileOutputStream(f, true) - channel = fos.getChannel() - bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(fos, bufferSize)) - objOut = serializer.newInstance().serializeStream(bs) - initialized = true - this - } - - override def close() { - if (initialized) { - objOut.close() - channel = null - bs = null - objOut = null - } - // Invoke the close callback handler. - super.close() - } - - override def isOpen: Boolean = objOut != null - - // Flush the partial writes, and set valid length to be the length of the entire file. - // Return the number of bytes written for this commit. - override def commit(): Long = { - if (initialized) { - // NOTE: Flush the serializer first and then the compressed/buffered output stream - objOut.flush() - bs.flush() - val prevPos = lastValidPosition - lastValidPosition = channel.position() - lastValidPosition - prevPos - } else { - // lastValidPosition is zero if stream is uninitialized - lastValidPosition - } - } - - override def revertPartialWrites() { - if (initialized) { - // Discard current writes. We do this by flushing the outstanding writes and - // truncate the file to the last valid position. - objOut.flush() - bs.flush() - channel.truncate(lastValidPosition) - } - } - - override def write(value: Any) { - if (!initialized) { - open() - } - objOut.writeObject(value) - } - - override def size(): Long = lastValidPosition - } - - private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt - - private var shuffleSender : ShuffleSender = null - // Create one local directory for each path mentioned in spark.local.dir; then, inside this - // directory, create multiple subdirectories that we will hash files into, in order to avoid - // having really large inodes at the top level. - private val localDirs: Array[File] = createLocalDirs() - private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) - - addShutdownHook() - - def getBlockWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int) - : BlockObjectWriter = { - new DiskBlockObjectWriter(blockId, serializer, bufferSize) - } - override def getSize(blockId: BlockId): Long = { - getFile(blockId).length() + diskManager.getBlockLocation(blockId).length } override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) { @@ -139,27 +44,15 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) val bytes = _bytes.duplicate() logDebug("Attempting to put block " + blockId) val startTime = System.currentTimeMillis - val file = createFile(blockId) - val channel = new RandomAccessFile(file, "rw").getChannel() + val file = diskManager.createBlockFile(blockId, allowAppending = false) + val channel = new FileOutputStream(file).getChannel() while (bytes.remaining > 0) { channel.write(bytes) } channel.close() val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( - blockId, Utils.bytesToString(bytes.limit), (finishTime - startTime))) - } - - private def getFileBytes(file: File): ByteBuffer = { - val length = file.length() - val channel = new RandomAccessFile(file, "r").getChannel() - val buffer = try { - channel.map(MapMode.READ_ONLY, 0, length) - } finally { - channel.close() - } - - buffer + file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime))) } override def putValues( @@ -171,21 +64,18 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Attempting to write values for block " + blockId) val startTime = System.currentTimeMillis - val file = createFile(blockId) - val fileOut = blockManager.wrapForCompression(blockId, - new FastBufferedOutputStream(new FileOutputStream(file))) - val objOut = blockManager.defaultSerializer.newInstance().serializeStream(fileOut) - objOut.writeAll(values.iterator) - objOut.close() - val length = file.length() + val file = diskManager.createBlockFile(blockId, allowAppending = false) + val outputStream = new FileOutputStream(file) + blockManager.dataSerializeStream(blockId, outputStream, values.iterator) + val length = file.length val timeTaken = System.currentTimeMillis - startTime logDebug("Block %s stored as %s file on disk in %d ms".format( - blockId, Utils.bytesToString(length), timeTaken)) + file.getName, Utils.bytesToString(length), timeTaken)) if (returnValues) { // Return a byte buffer for the contents of the file - val buffer = getFileBytes(file) + val buffer = getBytes(blockId).get PutResult(length, Right(buffer)) } else { PutResult(length, null) @@ -193,13 +83,19 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } override def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val file = getFile(blockId) - val bytes = getFileBytes(file) - Some(bytes) + val segment = diskManager.getBlockLocation(blockId) + val channel = new RandomAccessFile(segment.file, "r").getChannel() + val buffer = try { + logWarning("") + channel.map(MapMode.READ_ONLY, segment.offset, segment.length) + } finally { + channel.close() + } + Some(buffer) } override def getValues(blockId: BlockId): Option[Iterator[Any]] = { - getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes)) + getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } /** @@ -210,119 +106,21 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) } - override def remove(blockId: BlockId): Boolean = { - val file = getFile(blockId) - if (file.exists()) { + override def remove(blockId: BlockId) = { + val fileSegment = diskManager.getBlockLocation(blockId) + val file = fileSegment.file + if (file.exists() && file.length() == fileSegment.length) { file.delete() } else { + if (file.length() < fileSegment.length) { + logWarning("Could not delete block associated with only a part of a file: " + blockId) + } false } } override def contains(blockId: BlockId): Boolean = { - getFile(blockId).exists() - } - - private def createFile(blockId: BlockId, allowAppendExisting: Boolean = false): File = { - val file = getFile(blockId) - if (!allowAppendExisting && file.exists()) { - // NOTE(shivaram): Delete the file if it exists. This might happen if a ShuffleMap task - // was rescheduled on the same machine as the old task. - logWarning("File for block " + blockId + " already exists on disk: " + file + ". Deleting") - file.delete() - } - file - } - - private def getFile(blockId: BlockId): File = { - logDebug("Getting file for block " + blockId) - - // Figure out which local directory it hashes to, and which subdirectory in that - val hash = Utils.nonNegativeHash(blockId) - val dirId = hash % localDirs.length - val subDirId = (hash / localDirs.length) % subDirsPerLocalDir - - // Create the subdirectory if it doesn't already exist - var subDir = subDirs(dirId)(subDirId) - if (subDir == null) { - subDir = subDirs(dirId).synchronized { - val old = subDirs(dirId)(subDirId) - if (old != null) { - old - } else { - val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - newDir.mkdir() - subDirs(dirId)(subDirId) = newDir - newDir - } - } - } - - new File(subDir, blockId.name) - } - - private def createLocalDirs(): Array[File] = { - logDebug("Creating local directories at root dirs '" + rootDirs + "'") - val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") - rootDirs.split(",").map { rootDir => - var foundLocalDir = false - var localDir: File = null - var localDirId: String = null - var tries = 0 - val rand = new Random() - while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { - tries += 1 - try { - localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - localDir = new File(rootDir, "spark-local-" + localDirId) - if (!localDir.exists) { - foundLocalDir = localDir.mkdirs() - } - } catch { - case e: Exception => - logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e) - } - } - if (!foundLocalDir) { - logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + - " attempts to create local dir in " + rootDir) - System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) - } - logInfo("Created local directory at " + localDir) - localDir - } - } - - private def addShutdownHook() { - localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { - override def run() { - logDebug("Shutdown hook called") - localDirs.foreach { localDir => - try { - if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) - } catch { - case t: Throwable => - logError("Exception while deleting local spark dir: " + localDir, t) - } - } - if (shuffleSender != null) { - shuffleSender.stop() - } - } - }) - } - - private[storage] def startShuffleBlockSender(port: Int): Int = { - val pResolver = new PathResolver { - override def getAbsolutePath(blockIdString: String): String = { - val blockId = BlockId(blockIdString) - if (!blockId.isShuffle) null - else DiskStore.this.getFile(blockId).getAbsolutePath - } - } - shuffleSender = new ShuffleSender(port, pResolver) - logInfo("Created ShuffleSender binding to port : "+ shuffleSender.port) - shuffleSender.port + val file = diskManager.getBlockLocation(blockId).file + file.exists() } } diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala new file mode 100644 index 000000000..9947053e3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.File + +/** + * References a particular segment of a file (potentially the entire file), + * based off an offset and a length. + */ +class FileSegment(val file: File, val offset: Long, val length : Long) { + override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length) +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index f39fcd87f..05a14c909 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -40,7 +40,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - blockManager.getDiskBlockWriter(blockId, serializer, bufferSize) + blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize) } new ShuffleWriterGroup(mapId, writers) } From 136b9b3a3ed358bc04b28e8d62657d56d55c2c3e Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 20 Oct 2013 02:30:23 -0700 Subject: [PATCH 2/8] Basic shuffle file consolidation The Spark shuffle phase can produce a large number of files, as one file is created per mapper per reducer. For large or repeated jobs, this often produces millions of shuffle files, which sees extremely degredaded performance from the OS file system. This patch seeks to reduce that burden by combining multipe shuffle files into one. This PR draws upon the work of Jason Dai in https://github.com/mesos/spark/pull/669. However, it simplifies the design in order to get the majority of the gain with less overall intellectual and code burden. The vast majority of code in this pull request is a refactor to allow the insertion of a clean layer of indirection between logical block ids and physical files. This, I feel, provides some design clarity in addition to enabling shuffle file consolidation. The main goal is to produce one shuffle file per reducer per active mapper thread. This allows us to isolate the mappers (simplifying the failure modes), while still allowing us to reduce the number of mappers tremendously for large tasks. In order to accomplish this, we simply create a new set of shuffle files for every parallel task, and return the files to a pool which will be given out to the next run task. --- .../network/netty/FileServerHandler.java | 2 +- .../scala/org/apache/spark/TaskContext.scala | 1 + .../org/apache/spark/executor/Executor.scala | 2 +- .../spark/scheduler/ShuffleMapTask.scala | 3 +- .../org/apache/spark/scheduler/Task.scala | 4 +- .../apache/spark/storage/BlockManager.scala | 1 + .../spark/storage/ShuffleBlockManager.scala | 56 ++++++++++++++++--- .../spark/scheduler/TaskContextSuite.scala | 2 +- 8 files changed, 57 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index ab790b785..172c6e4b1 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -51,7 +51,7 @@ public void messageReceived(ChannelHandlerContext ctx, String blockIdString) { ctx.flush(); return; } - long length = file.length(); + long length = fileSegment.length(); if (length > Integer.MAX_VALUE || length <= 0) { ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index cae983ed4..7601ffe41 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -25,6 +25,7 @@ class TaskContext( val stageId: Int, val partitionId: Int, val attemptId: Long, + val executorId: String, val runningLocally: Boolean = false, @volatile var interrupted: Boolean = false, private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 032eb04f4..eb12c26d2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -206,7 +206,7 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() - val value = task.run(taskId.toInt) + val value = task.run(taskId.toInt, executorId) val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index ed1b36d18..29c610825 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -152,7 +152,8 @@ private[spark] class ShuffleMapTask( try { // Obtain all the block writers for shuffle blocks. val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) - shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) + shuffle = blockManager.shuffleBlockManager.forShuffle( + dep.shuffleId, context.executorId, numOutputSplits, ser) buckets = shuffle.acquireWriters(partitionId) // Write the map output to its associated buckets. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1fe0d0e4e..64fe5b196 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -45,8 +45,8 @@ import org.apache.spark.util.ByteBufferInputStream */ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { - def run(attemptId: Long): T = { - context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) + final def run(attemptId: Long, executorId: String): T = { + context = new TaskContext(stageId, partitionId, attemptId, executorId, runningLocally = false) if (_killed) { kill() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2f96590c5..1f173c772 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -578,6 +578,7 @@ private[spark] class BlockManager( val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true) val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) writer.registerCloseEventHandler(() => { + diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) blockInfo.put(blockId, myInfo) myInfo.markReady(writer.fileSegment().length) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 05a14c909..6208856e5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -17,12 +17,13 @@ package org.apache.spark.storage -import org.apache.spark.serializer.Serializer +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.serializer.Serializer private[spark] -class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter]) - +class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter]) private[spark] trait ShuffleBlocks { @@ -30,24 +31,63 @@ trait ShuffleBlocks { def releaseWriters(group: ShuffleWriterGroup) } +/** + * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer + * per reducer. + * + * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle + * Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer + * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, + * it releases them for another task. + * Regarding the implementation of this feature, shuffle files are identified by a 4-tuple: + * - shuffleId: The unique id given to the entire shuffle stage. + * - executorId: The id of the executor running the task. Required in order to ensure that + * multiple executors running on the same node do not collide. + * - bucketId: The id of the output partition (i.e., reducer id) + * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a + * time owns a particular fileId, and this id is returned to a pool when the task finishes. + */ private[spark] class ShuffleBlockManager(blockManager: BlockManager) { + /** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */ + val consolidateShuffleFiles = + System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean + + var nextFileId = new AtomicInteger(0) + val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]() - def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { + def forShuffle(shuffleId: Int, executorId: String, numBuckets: Int, serializer: Serializer) = { new ShuffleBlocks { // Get a group of writers for a map task. override def acquireWriters(mapId: Int): ShuffleWriterGroup = { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + val fileId = getUnusedFileId() val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize) + val filename = physicalFileName(shuffleId, executorId, bucketId, fileId) + blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) } - new ShuffleWriterGroup(mapId, writers) + new ShuffleWriterGroup(mapId, fileId, writers) } - override def releaseWriters(group: ShuffleWriterGroup) = { - // Nothing really to release here. + override def releaseWriters(group: ShuffleWriterGroup) { + recycleFileId(group.fileId) } } } + + private def getUnusedFileId(): Int = { + val fileId = unusedFileIds.poll() + if (fileId == null) nextFileId.getAndIncrement() + else fileId + } + + private def recycleFileId(fileId: Int) { + if (!consolidateShuffleFiles) { return } // ensures we always generate new file id + unusedFileIds.add(fileId) + } + + private def physicalFileName(shuffleId: Int, executorId: String, bucketId: Int, fileId: Int) = { + "merged_shuffle_%d_%s_%d_%d".format(shuffleId, executorId, bucketId, fileId) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index e31a116a7..668cd5d48 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -40,7 +40,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte val func = (c: TaskContext, i: Iterator[String]) => i.next val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0) intercept[RuntimeException] { - task.run(0) + task.run(0, "test") } assert(completed === true) } From 38b8048f291dd42ee996e75bd1b6d33aa24b1a5e Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 20 Oct 2013 11:03:36 -0700 Subject: [PATCH 3/8] Fix compiler errors Whoops. Last-second changes require testing too, it seems. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../src/test/scala/org/apache/spark/CacheManagerSuite.scala | 6 +++--- core/src/test/scala/org/apache/spark/JavaAPISuite.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d84f5968d..c890a1f04 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -521,7 +521,7 @@ class DAGScheduler( val rdd = job.finalStage.rdd val split = rdd.partitions(job.partitions(0)) val taskContext = - new TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true) + new TaskContext(job.finalStage.id, job.partitions(0), 0, "local", runningLocally = true) try { val result = job.func(taskContext, rdd.iterator(split, taskContext)) job.listener.taskSucceeded(0, result) diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index ea936e815..5db7aa53b 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -58,7 +58,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, + val context = new TaskContext(0, 0, 0, "test", interrupted = false, runningLocally = false, taskMetrics = null) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(1, 2, 3, 4)) @@ -71,7 +71,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, + val context = new TaskContext(0, 0, 0, "test", interrupted = false, runningLocally = false, taskMetrics = null) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(5, 6, 7)) @@ -85,7 +85,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, runningLocally = true, interrupted = false, + val context = new TaskContext(0, 0, 0, "test", runningLocally = true, interrupted = false, taskMetrics = null) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(1, 2, 3, 4)) diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 7b0bb89ab..324183e15 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -495,7 +495,7 @@ public void persist() { @Test public void iterator() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContext(0, 0, 0, false, false, null); + TaskContext context = new TaskContext(0, 0, 0, "test", false, false, null); Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue()); } From 42a049723d92d8a7f87fae0a305f8933cb0f7374 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 20 Oct 2013 16:10:40 -0700 Subject: [PATCH 4/8] Address Josh and Reynold's comments --- .../org/apache/spark/network/netty/PathResolver.java | 2 +- .../org/apache/spark/storage/BlockObjectWriter.scala | 4 ++-- .../org/apache/spark/storage/DiskBlockManager.scala | 12 ++++++------ .../scala/org/apache/spark/storage/DiskStore.scala | 5 ++--- .../scala/org/apache/spark/storage/FileSegment.scala | 4 ++-- .../apache/spark/storage/ShuffleBlockManager.scala | 8 ++++---- 6 files changed, 17 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java index 370fcdeea..9f7ced44c 100755 --- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java +++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java @@ -21,6 +21,6 @@ import org.apache.spark.storage.FileSegment; public interface PathResolver { - /** Get the file segment in which the given Block resides. */ + /** Get the file segment in which the given block resides. */ public FileSegment getBlockLocation(BlockId blockId); } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 0b5a47299..6e4382d71 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer} * * This interface does not support concurrent writes. */ -abstract class BlockObjectWriter(val blockId: BlockId) { +private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { var closeEventHandler: () => Unit = _ @@ -72,7 +72,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) { } /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ -class DiskBlockObjectWriter( +private[spark] class DiskBlockObjectWriter( blockId: BlockId, file: File, serializer: Serializer, diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 6ace4eb52..ecbd9c2ff 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -22,20 +22,20 @@ import java.text.SimpleDateFormat import java.util.{Date, Random} import java.util.concurrent.ConcurrentHashMap -import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.Logging +import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.network.netty.{PathResolver, ShuffleSender} import org.apache.spark.util.Utils /** - * Creates an maintains the logical mapping between logical Blocks and physical on-disk - * locations. By default, one Block is mapped to one file with a name given by its BlockId. - * However, it is also possible to have a Block map to only a segment of a file, by calling + * Creates and maintains the logical mapping between logical blocks and physical on-disk + * locations. By default, one block is mapped to one file with a name given by its BlockId. + * However, it is also possible to have a block map to only a segment of a file, by calling * mapBlockToFileSegment(). * - * @param rootDirs The directories to use for storing Block files. Data will be hashed among these. + * @param rootDirs The directories to use for storing block files. Data will be hashed among these. */ -class DiskBlockManager(rootDirs: String) extends PathResolver with Logging { +private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index e703a3329..a3c496f9e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -86,7 +86,6 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage val segment = diskManager.getBlockLocation(blockId) val channel = new RandomAccessFile(segment.file, "r").getChannel() val buffer = try { - logWarning("") channel.map(MapMode.READ_ONLY, segment.offset, segment.length) } finally { channel.close() @@ -106,13 +105,13 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) } - override def remove(blockId: BlockId) = { + override def remove(blockId: BlockId): Boolean = { val fileSegment = diskManager.getBlockLocation(blockId) val file = fileSegment.file if (file.exists() && file.length() == fileSegment.length) { file.delete() } else { - if (file.length() < fileSegment.length) { + if (fileSegment.length < file.length()) { logWarning("Could not delete block associated with only a part of a file: " + blockId) } false diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala index 9947053e3..555486830 100644 --- a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala +++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala @@ -23,6 +23,6 @@ import java.io.File * References a particular segment of a file (potentially the entire file), * based off an offset and a length. */ -class FileSegment(val file: File, val offset: Long, val length : Long) { +private[spark] class FileSegment(val file: File, val offset: Long, val length : Long) { override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length) -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 6208856e5..31849eb58 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -36,7 +36,7 @@ trait ShuffleBlocks { * per reducer. * * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle - * Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer + * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, * it releases them for another task. * Regarding the implementation of this feature, shuffle files are identified by a 4-tuple: @@ -49,7 +49,8 @@ trait ShuffleBlocks { */ private[spark] class ShuffleBlockManager(blockManager: BlockManager) { - /** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */ + // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. + // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean @@ -78,8 +79,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { private def getUnusedFileId(): Int = { val fileId = unusedFileIds.poll() - if (fileId == null) nextFileId.getAndIncrement() - else fileId + if (fileId == null) nextFileId.getAndIncrement() else fileId } private def recycleFileId(fileId: Int) { From 4b68ddf3d02e425120eece9b8abf1fad4e0fd7ff Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 20 Oct 2013 17:56:41 -0700 Subject: [PATCH 5/8] Cleanup old shuffle file metadata from memory --- .../apache/spark/storage/DiskBlockManager.scala | 15 ++++++++++----- .../org/apache/spark/util/MetadataCleaner.scala | 5 +++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ecbd9c2ff..bcb58ad94 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.network.netty.{PathResolver, ShuffleSender} -import org.apache.spark.util.Utils +import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} /** * Creates and maintains the logical mapping between logical blocks and physical on-disk @@ -50,8 +50,9 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit // Stores only Blocks which have been specifically mapped to segments of files // (rather than the default, which maps a Block to a whole file). // This keeps our bookkeeping down, since the file system itself tracks the standalone Blocks. - // ConcurrentHashMap does not take a lock on read operations, which makes it very efficient here. - private val blockToFileSegmentMap = new ConcurrentHashMap[BlockId, FileSegment] + private val blockToFileSegmentMap = new TimeStampedHashMap[BlockId, FileSegment] + + val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DISK_BLOCK_MANAGER, this.cleanup) addShutdownHook() @@ -70,8 +71,8 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly. */ def getBlockLocation(blockId: BlockId): FileSegment = { - if (blockToFileSegmentMap.containsKey(blockId)) { - blockToFileSegmentMap.get(blockId) + if (blockToFileSegmentMap.internalMap.containsKey(blockId)) { + blockToFileSegmentMap.get(blockId).get } else { val file = getFile(blockId.name) new FileSegment(file, 0, file.length()) @@ -150,6 +151,10 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit } } + private def cleanup(cleanupTime: Long) { + blockToFileSegmentMap.clearOldValues(cleanupTime) + } + private def addShutdownHook() { localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 0ce1394c7..3f963727d 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -56,9 +56,10 @@ class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, clea } object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast", "DagScheduler", "ResultTask", - "ShuffleMapTask", "BlockManager", "BroadcastVars") { + "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") { - val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, BROADCAST_VARS = Value + val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, + SHUFFLE_MAP_TASK, BLOCK_MANAGER, DISK_BLOCK_MANAGER, BROADCAST_VARS = Value type MetadataCleanerType = Value From 947fceaa73a21ddc4263b98913ebf11aa71f5ba1 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 20 Oct 2013 22:46:26 -0700 Subject: [PATCH 6/8] Close shuffle writers during failure & remove executorId from TaskContext --- core/src/main/scala/org/apache/spark/TaskContext.scala | 1 - .../org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../org/apache/spark/scheduler/ShuffleMapTask.scala | 5 ++--- .../main/scala/org/apache/spark/scheduler/Task.scala | 2 +- .../org/apache/spark/storage/ShuffleBlockManager.scala | 10 +++++----- .../scala/org/apache/spark/CacheManagerSuite.scala | 6 +++--- core/src/test/scala/org/apache/spark/JavaAPISuite.java | 2 +- 7 files changed, 13 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 7601ffe41..cae983ed4 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -25,7 +25,6 @@ class TaskContext( val stageId: Int, val partitionId: Int, val attemptId: Long, - val executorId: String, val runningLocally: Boolean = false, @volatile var interrupted: Boolean = false, private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty() diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c890a1f04..d84f5968d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -521,7 +521,7 @@ class DAGScheduler( val rdd = job.finalStage.rdd val split = rdd.partitions(job.partitions(0)) val taskContext = - new TaskContext(job.finalStage.id, job.partitions(0), 0, "local", runningLocally = true) + new TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true) try { val result = job.func(taskContext, rdd.iterator(split, taskContext)) job.listener.taskSucceeded(0, result) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 29c610825..e86852311 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -152,8 +152,7 @@ private[spark] class ShuffleMapTask( try { // Obtain all the block writers for shuffle blocks. val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) - shuffle = blockManager.shuffleBlockManager.forShuffle( - dep.shuffleId, context.executorId, numOutputSplits, ser) + shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) buckets = shuffle.acquireWriters(partitionId) // Write the map output to its associated buckets. @@ -167,7 +166,6 @@ private[spark] class ShuffleMapTask( var totalBytes = 0L val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => writer.commit() - writer.close() val size = writer.fileSegment().length totalBytes += size MapOutputTracker.compressSize(size) @@ -189,6 +187,7 @@ private[spark] class ShuffleMapTask( } finally { // Release the writers back to the shuffle block manager. if (shuffle != null && buckets != null) { + buckets.writers.foreach(_.close()) shuffle.releaseWriters(buckets) } // Execute the callbacks on task completion. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 64fe5b196..1b66e7268 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.ByteBufferInputStream private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { final def run(attemptId: Long, executorId: String): T = { - context = new TaskContext(stageId, partitionId, attemptId, executorId, runningLocally = false) + context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) if (_killed) { kill() } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 31849eb58..bd1dc62a1 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -52,12 +52,12 @@ class ShuffleBlockManager(blockManager: BlockManager) { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = - System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean + System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean var nextFileId = new AtomicInteger(0) val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]() - def forShuffle(shuffleId: Int, executorId: String, numBuckets: Int, serializer: Serializer) = { + def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleBlocks { // Get a group of writers for a map task. override def acquireWriters(mapId: Int): ShuffleWriterGroup = { @@ -65,7 +65,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { val fileId = getUnusedFileId() val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - val filename = physicalFileName(shuffleId, executorId, bucketId, fileId) + val filename = physicalFileName(shuffleId, bucketId, fileId) blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) } new ShuffleWriterGroup(mapId, fileId, writers) @@ -87,7 +87,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { unusedFileIds.add(fileId) } - private def physicalFileName(shuffleId: Int, executorId: String, bucketId: Int, fileId: Int) = { - "merged_shuffle_%d_%s_%d_%d".format(shuffleId, executorId, bucketId, fileId) + private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { + "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId) } } diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 5db7aa53b..ea936e815 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -58,7 +58,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, "test", interrupted = false, runningLocally = false, + val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, taskMetrics = null) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(1, 2, 3, 4)) @@ -71,7 +71,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, "test", interrupted = false, runningLocally = false, + val context = new TaskContext(0, 0, 0, interrupted = false, runningLocally = false, taskMetrics = null) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(5, 6, 7)) @@ -85,7 +85,7 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar } whenExecuting(blockManager) { - val context = new TaskContext(0, 0, 0, "test", runningLocally = true, interrupted = false, + val context = new TaskContext(0, 0, 0, runningLocally = true, interrupted = false, taskMetrics = null) val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) assert(value.toList === List(1, 2, 3, 4)) diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 324183e15..7b0bb89ab 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -495,7 +495,7 @@ public void persist() { @Test public void iterator() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContext(0, 0, 0, "test", false, false, null); + TaskContext context = new TaskContext(0, 0, 0, false, false, null); Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue()); } From 444162afe7c1bd48ad63ae51d99f75c72c9e29ff Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 20 Oct 2013 22:59:45 -0700 Subject: [PATCH 7/8] Documentation update --- .../scala/org/apache/spark/storage/ShuffleBlockManager.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index bd1dc62a1..229178c09 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -39,10 +39,8 @@ trait ShuffleBlocks { * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, * it releases them for another task. - * Regarding the implementation of this feature, shuffle files are identified by a 4-tuple: + * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: * - shuffleId: The unique id given to the entire shuffle stage. - * - executorId: The id of the executor running the task. Required in order to ensure that - * multiple executors running on the same node do not collide. * - bucketId: The id of the output partition (i.e., reducer id) * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a * time owns a particular fileId, and this id is returned to a pool when the task finishes. From 4aa0ba1df7336ab5066be58c208e8b1eb69864df Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 21 Oct 2013 12:19:15 -0700 Subject: [PATCH 8/8] Remove executorId from Task.run() --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 2 +- .../scala/org/apache/spark/scheduler/TaskContextSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index eb12c26d2..032eb04f4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -206,7 +206,7 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() - val value = task.run(taskId.toInt, executorId) + val value = task.run(taskId.toInt) val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1b66e7268..69b42e86e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -45,7 +45,7 @@ import org.apache.spark.util.ByteBufferInputStream */ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { - final def run(attemptId: Long, executorId: String): T = { + final def run(attemptId: Long): T = { context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) if (_killed) { kill() diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 668cd5d48..e31a116a7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -40,7 +40,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte val func = (c: TaskContext, i: Iterator[String]) => i.next val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0) intercept[RuntimeException] { - task.run(0, "test") + task.run(0) } assert(completed === true) }