diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index cfd813289..172c6e4b1 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -25,6 +25,7 @@ import io.netty.channel.DefaultFileRegion; import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.FileSegment; class FileServerHandler extends ChannelInboundMessageHandlerAdapter { @@ -37,40 +38,34 @@ public FileServerHandler(PathResolver pResolver){ @Override public void messageReceived(ChannelHandlerContext ctx, String blockIdString) { BlockId blockId = BlockId.apply(blockIdString); - String path = pResolver.getAbsolutePath(blockId.name()); - // if getFilePath returns null, close the channel - if (path == null) { + FileSegment fileSegment = pResolver.getBlockLocation(blockId); + // if getBlockLocation returns null, close the channel + if (fileSegment == null) { //ctx.close(); return; } - File file = new File(path); + File file = fileSegment.file(); if (file.exists()) { if (!file.isFile()) { - //logger.info("Not a file : " + file.getAbsolutePath()); ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); return; } - long length = file.length(); + long length = fileSegment.length(); if (length > Integer.MAX_VALUE || length <= 0) { - //logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length); ctx.write(new FileHeader(0, blockId).buffer()); ctx.flush(); return; } int len = new Long(length).intValue(); - //logger.info("Sending block "+blockId+" filelen = "+len); - //logger.info("header = "+ (new FileHeader(len, blockId)).buffer()); ctx.write((new FileHeader(len, blockId)).buffer()); try { ctx.sendFile(new DefaultFileRegion(new FileInputStream(file) - .getChannel(), 0, file.length())); + .getChannel(), fileSegment.offset(), fileSegment.length())); } catch (Exception e) { - //logger.warning("Exception when sending file : " + file.getAbsolutePath()); e.printStackTrace(); } } else { - //logger.warning("File not found: " + file.getAbsolutePath()); ctx.write(new FileHeader(0, blockId).buffer()); } ctx.flush(); diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java index 94c034cad..9f7ced44c 100755 --- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java +++ b/core/src/main/java/org/apache/spark/network/netty/PathResolver.java @@ -17,13 +17,10 @@ package org.apache.spark.network.netty; +import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.FileSegment; public interface PathResolver { - /** - * Get the absolute path of the file - * - * @param fileId - * @return the absolute path of file - */ - public String getAbsolutePath(String fileId); + /** Get the file segment in which the given block resides. */ + public FileSegment getBlockLocation(BlockId blockId); } diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 1586dff25..546d92106 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.spark.Logging import org.apache.spark.util.Utils -import org.apache.spark.storage.BlockId +import org.apache.spark.storage.{BlockId, FileSegment} private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { @@ -54,8 +54,7 @@ private[spark] object ShuffleSender { val localDirs = args.drop(2).map(new File(_)) val pResovler = new PathResolver { - override def getAbsolutePath(blockIdString: String): String = { - val blockId = BlockId(blockIdString) + override def getBlockLocation(blockId: BlockId): FileSegment = { if (!blockId.isShuffle) { throw new Exception("Block " + blockId + " is not a shuffle block") } @@ -65,7 +64,7 @@ private[spark] object ShuffleSender { val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) val file = new File(subDir, blockId.name) - return file.getAbsolutePath + return new FileSegment(file, 0, file.length()) } } val sender = new ShuffleSender(port, pResovler) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 40baea69e..24d97da6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -167,8 +167,7 @@ private[spark] class ShuffleMapTask( var totalTime = 0L val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => writer.commit() - writer.close() - val size = writer.size() + val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() MapOutputTracker.compressSize(size) @@ -191,6 +190,7 @@ private[spark] class ShuffleMapTask( } finally { // Release the writers back to the shuffle block manager. if (shuffle != null && buckets != null) { + buckets.writers.foreach(_.close()) shuffle.releaseWriters(buckets) } // Execute the callbacks on task completion. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1fe0d0e4e..69b42e86e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -45,7 +45,7 @@ import org.apache.spark.util.ByteBufferInputStream */ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { - def run(attemptId: Long): T = { + final def run(attemptId: Long): T = { context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) if (_killed) { kill() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 768e5a647..e6329cbd4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -28,7 +28,7 @@ import akka.dispatch.{Await, Future} import akka.util.Duration import akka.util.duration._ -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec @@ -102,18 +102,19 @@ private[spark] class BlockManager( } val shuffleBlockManager = new ShuffleBlockManager(this) + val diskBlockManager = new DiskBlockManager( + System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore: DiskStore = - new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) + private[storage] val diskStore = new DiskStore(this, diskBlockManager) // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private val nettyPort: Int = { val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt - if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0 + if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0 } val connectionManager = new ConnectionManager(0) @@ -512,16 +513,20 @@ private[spark] class BlockManager( /** * A short circuited method to get a block writer that can write data directly to disk. + * The Block will be appended to the File specified by filename. * This is currently used for writing shuffle files out. Callers should handle error * cases. */ - def getDiskBlockWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int) + def getDiskWriter(blockId: BlockId, filename: String, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { - val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize) + val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) + val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true) + val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream) writer.registerCloseEventHandler(() => { + diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment()) val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) blockInfo.put(blockId, myInfo) - myInfo.markReady(writer.size()) + myInfo.markReady(writer.fileSegment().length) }) writer } @@ -862,13 +867,24 @@ private[spark] class BlockManager( if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s } + /** Serializes into a stream. */ + def dataSerializeStream( + blockId: BlockId, + outputStream: OutputStream, + values: Iterator[Any], + serializer: Serializer = defaultSerializer) { + val byteStream = new FastBufferedOutputStream(outputStream) + val ser = serializer.newInstance() + ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() + } + + /** Serializes into a byte buffer. */ def dataSerialize( blockId: BlockId, values: Iterator[Any], serializer: Serializer = defaultSerializer): ByteBuffer = { val byteStream = new FastByteArrayOutputStream(4096) - val ser = serializer.newInstance() - ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() + dataSerializeStream(blockId, byteStream, values, serializer) byteStream.trim() ByteBuffer.wrap(byteStream.array) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 76c92cefd..32d2dd069 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -17,6 +17,13 @@ package org.apache.spark.storage +import java.io.{FileOutputStream, File, OutputStream} +import java.nio.channels.FileChannel + +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream + +import org.apache.spark.Logging +import org.apache.spark.serializer.{SerializationStream, Serializer} /** * An interface for writing JVM objects to some underlying storage. This interface allows @@ -59,12 +66,129 @@ abstract class BlockObjectWriter(val blockId: BlockId) { def write(value: Any) /** - * Size of the valid writes, in bytes. + * Returns the file segment of committed data that this Writer has written. */ - def size(): Long + def fileSegment(): FileSegment /** * Cumulative time spent performing blocking writes, in ns. */ def timeWriting(): Long } + +/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ +class DiskBlockObjectWriter( + blockId: BlockId, + file: File, + serializer: Serializer, + bufferSize: Int, + compressStream: OutputStream => OutputStream) + extends BlockObjectWriter(blockId) + with Logging +{ + + /** Intercepts write calls and tracks total time spent writing. Not thread safe. */ + private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream { + def timeWriting = _timeWriting + private var _timeWriting = 0L + + private def callWithTiming(f: => Unit) = { + val start = System.nanoTime() + f + _timeWriting += (System.nanoTime() - start) + } + + def write(i: Int): Unit = callWithTiming(out.write(i)) + override def write(b: Array[Byte]) = callWithTiming(out.write(b)) + override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len)) + } + + private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean + + /** The file channel, used for repositioning / truncating the file. */ + private var channel: FileChannel = null + private var bs: OutputStream = null + private var fos: FileOutputStream = null + private var ts: TimeTrackingOutputStream = null + private var objOut: SerializationStream = null + private var initialPosition = 0L + private var lastValidPosition = 0L + private var initialized = false + private var _timeWriting = 0L + + override def open(): BlockObjectWriter = { + fos = new FileOutputStream(file, true) + ts = new TimeTrackingOutputStream(fos) + channel = fos.getChannel() + initialPosition = channel.position + lastValidPosition = initialPosition + bs = compressStream(new FastBufferedOutputStream(ts, bufferSize)) + objOut = serializer.newInstance().serializeStream(bs) + initialized = true + this + } + + override def close() { + if (initialized) { + if (syncWrites) { + // Force outstanding writes to disk and track how long it takes + objOut.flush() + val start = System.nanoTime() + fos.getFD.sync() + _timeWriting += System.nanoTime() - start + } + objOut.close() + + _timeWriting += ts.timeWriting + + channel = null + bs = null + fos = null + ts = null + objOut = null + } + // Invoke the close callback handler. + super.close() + } + + override def isOpen: Boolean = objOut != null + + override def commit(): Long = { + if (initialized) { + // NOTE: Flush the serializer first and then the compressed/buffered output stream + objOut.flush() + bs.flush() + val prevPos = lastValidPosition + lastValidPosition = channel.position() + lastValidPosition - prevPos + } else { + // lastValidPosition is zero if stream is uninitialized + lastValidPosition + } + } + + override def revertPartialWrites() { + if (initialized) { + // Discard current writes. We do this by flushing the outstanding writes and + // truncate the file to the last valid position. + objOut.flush() + bs.flush() + channel.truncate(lastValidPosition) + } + } + + override def write(value: Any) { + if (!initialized) { + open() + } + objOut.writeObject(value) + } + + override def fileSegment(): FileSegment = { + val bytesWritten = lastValidPosition - initialPosition + new FileSegment(file, initialPosition, bytesWritten) + } + + // Only valid if called after close() + override def timeWriting() = _timeWriting +} diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala new file mode 100644 index 000000000..bcb58ad94 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Random} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.spark.Logging +import org.apache.spark.executor.ExecutorExitCode +import org.apache.spark.network.netty.{PathResolver, ShuffleSender} +import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} + +/** + * Creates and maintains the logical mapping between logical blocks and physical on-disk + * locations. By default, one block is mapped to one file with a name given by its BlockId. + * However, it is also possible to have a block map to only a segment of a file, by calling + * mapBlockToFileSegment(). + * + * @param rootDirs The directories to use for storing block files. Data will be hashed among these. + */ +private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging { + + private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 + private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt + + // Create one local directory for each path mentioned in spark.local.dir; then, inside this + // directory, create multiple subdirectories that we will hash files into, in order to avoid + // having really large inodes at the top level. + private val localDirs: Array[File] = createLocalDirs() + private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + private var shuffleSender : ShuffleSender = null + + // Stores only Blocks which have been specifically mapped to segments of files + // (rather than the default, which maps a Block to a whole file). + // This keeps our bookkeeping down, since the file system itself tracks the standalone Blocks. + private val blockToFileSegmentMap = new TimeStampedHashMap[BlockId, FileSegment] + + val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DISK_BLOCK_MANAGER, this.cleanup) + + addShutdownHook() + + /** + * Creates a logical mapping from the given BlockId to a segment of a file. + * This will cause any accesses of the logical BlockId to be directed to the specified + * physical location. + */ + def mapBlockToFileSegment(blockId: BlockId, fileSegment: FileSegment) { + blockToFileSegmentMap.put(blockId, fileSegment) + } + + /** + * Returns the phyiscal file segment in which the given BlockId is located. + * If the BlockId has been mapped to a specific FileSegment, that will be returned. + * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly. + */ + def getBlockLocation(blockId: BlockId): FileSegment = { + if (blockToFileSegmentMap.internalMap.containsKey(blockId)) { + blockToFileSegmentMap.get(blockId).get + } else { + val file = getFile(blockId.name) + new FileSegment(file, 0, file.length()) + } + } + + /** + * Simply returns a File to place the given Block into. This does not physically create the file. + * If filename is given, that file will be used. Otherwise, we will use the BlockId to get + * a unique filename. + */ + def createBlockFile(blockId: BlockId, filename: String = "", allowAppending: Boolean): File = { + val actualFilename = if (filename == "") blockId.name else filename + val file = getFile(actualFilename) + if (!allowAppending && file.exists()) { + throw new IllegalStateException( + "Attempted to create file that already exists: " + actualFilename) + } + file + } + + private def getFile(filename: String): File = { + // Figure out which local directory it hashes to, and which subdirectory in that + val hash = Utils.nonNegativeHash(filename) + val dirId = hash % localDirs.length + val subDirId = (hash / localDirs.length) % subDirsPerLocalDir + + // Create the subdirectory if it doesn't already exist + var subDir = subDirs(dirId)(subDirId) + if (subDir == null) { + subDir = subDirs(dirId).synchronized { + val old = subDirs(dirId)(subDirId) + if (old != null) { + old + } else { + val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) + newDir.mkdir() + subDirs(dirId)(subDirId) = newDir + newDir + } + } + } + + new File(subDir, filename) + } + + private def createLocalDirs(): Array[File] = { + logDebug("Creating local directories at root dirs '" + rootDirs + "'") + val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") + rootDirs.split(",").map { rootDir => + var foundLocalDir = false + var localDir: File = null + var localDirId: String = null + var tries = 0 + val rand = new Random() + while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { + tries += 1 + try { + localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) + localDir = new File(rootDir, "spark-local-" + localDirId) + if (!localDir.exists) { + foundLocalDir = localDir.mkdirs() + } + } catch { + case e: Exception => + logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e) + } + } + if (!foundLocalDir) { + logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + + " attempts to create local dir in " + rootDir) + System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) + } + logInfo("Created local directory at " + localDir) + localDir + } + } + + private def cleanup(cleanupTime: Long) { + blockToFileSegmentMap.clearOldValues(cleanupTime) + } + + private def addShutdownHook() { + localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) + Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { + override def run() { + logDebug("Shutdown hook called") + localDirs.foreach { localDir => + try { + if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) + } catch { + case t: Throwable => + logError("Exception while deleting local spark dir: " + localDir, t) + } + } + + if (shuffleSender != null) { + shuffleSender.stop() + } + } + }) + } + + private[storage] def startShuffleBlockSender(port: Int): Int = { + shuffleSender = new ShuffleSender(port, this) + logInfo("Created ShuffleSender binding to port : " + shuffleSender.port) + shuffleSender.port + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 2a9a3f61b..a3c496f9e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -17,158 +17,25 @@ package org.apache.spark.storage -import java.io.{File, FileOutputStream, OutputStream, RandomAccessFile} +import java.io.{FileOutputStream, RandomAccessFile} import java.nio.ByteBuffer -import java.nio.channels.FileChannel import java.nio.channels.FileChannel.MapMode -import java.util.{Random, Date} -import java.text.SimpleDateFormat import scala.collection.mutable.ArrayBuffer -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream - -import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.serializer.{Serializer, SerializationStream} import org.apache.spark.Logging -import org.apache.spark.network.netty.ShuffleSender -import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer import org.apache.spark.util.Utils /** * Stores BlockManager blocks on disk. */ -private class DiskStore(blockManager: BlockManager, rootDirs: String) +private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) extends BlockStore(blockManager) with Logging { - class DiskBlockObjectWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int) - extends BlockObjectWriter(blockId) { - - /** Intercepts write calls and tracks total time spent writing. Not thread safe. */ - private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream { - def timeWriting = _timeWriting - private var _timeWriting = 0L - - private def callWithTiming(f: => Unit) = { - val start = System.nanoTime() - f - _timeWriting += (System.nanoTime() - start) - } - - def write(i: Int): Unit = callWithTiming(out.write(i)) - override def write(b: Array[Byte]) = callWithTiming(out.write(b)) - override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len)) - } - - private val f: File = createFile(blockId /*, allowAppendExisting */) - private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean - - // The file channel, used for repositioning / truncating the file. - private var channel: FileChannel = null - private var bs: OutputStream = null - private var fos: FileOutputStream = null - private var ts: TimeTrackingOutputStream = null - private var objOut: SerializationStream = null - private var lastValidPosition = 0L - private var initialized = false - private var _timeWriting = 0L - - override def open(): DiskBlockObjectWriter = { - fos = new FileOutputStream(f, true) - ts = new TimeTrackingOutputStream(fos) - channel = fos.getChannel() - bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize)) - objOut = serializer.newInstance().serializeStream(bs) - initialized = true - this - } - - override def close() { - if (initialized) { - if (syncWrites) { - // Force outstanding writes to disk and track how long it takes - objOut.flush() - val start = System.nanoTime() - fos.getFD.sync() - _timeWriting += System.nanoTime() - start - objOut.close() - } else { - objOut.close() - } - - _timeWriting += ts.timeWriting - - channel = null - bs = null - fos = null - ts = null - objOut = null - } - // Invoke the close callback handler. - super.close() - } - - override def isOpen: Boolean = objOut != null - - // Flush the partial writes, and set valid length to be the length of the entire file. - // Return the number of bytes written for this commit. - override def commit(): Long = { - if (initialized) { - // NOTE: Flush the serializer first and then the compressed/buffered output stream - objOut.flush() - bs.flush() - val prevPos = lastValidPosition - lastValidPosition = channel.position() - lastValidPosition - prevPos - } else { - // lastValidPosition is zero if stream is uninitialized - lastValidPosition - } - } - - override def revertPartialWrites() { - if (initialized) { - // Discard current writes. We do this by flushing the outstanding writes and - // truncate the file to the last valid position. - objOut.flush() - bs.flush() - channel.truncate(lastValidPosition) - } - } - - override def write(value: Any) { - if (!initialized) { - open() - } - objOut.writeObject(value) - } - - override def size(): Long = lastValidPosition - - // Only valid if called after close() - override def timeWriting = _timeWriting - } - - private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt - - private var shuffleSender : ShuffleSender = null - // Create one local directory for each path mentioned in spark.local.dir; then, inside this - // directory, create multiple subdirectories that we will hash files into, in order to avoid - // having really large inodes at the top level. - private val localDirs: Array[File] = createLocalDirs() - private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) - - addShutdownHook() - - def getBlockWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int) - : BlockObjectWriter = { - new DiskBlockObjectWriter(blockId, serializer, bufferSize) - } - override def getSize(blockId: BlockId): Long = { - getFile(blockId).length() + diskManager.getBlockLocation(blockId).length } override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) { @@ -177,27 +44,15 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) val bytes = _bytes.duplicate() logDebug("Attempting to put block " + blockId) val startTime = System.currentTimeMillis - val file = createFile(blockId) - val channel = new RandomAccessFile(file, "rw").getChannel() + val file = diskManager.createBlockFile(blockId, allowAppending = false) + val channel = new FileOutputStream(file).getChannel() while (bytes.remaining > 0) { channel.write(bytes) } channel.close() val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( - blockId, Utils.bytesToString(bytes.limit), (finishTime - startTime))) - } - - private def getFileBytes(file: File): ByteBuffer = { - val length = file.length() - val channel = new RandomAccessFile(file, "r").getChannel() - val buffer = try { - channel.map(MapMode.READ_ONLY, 0, length) - } finally { - channel.close() - } - - buffer + file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime))) } override def putValues( @@ -209,21 +64,18 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) logDebug("Attempting to write values for block " + blockId) val startTime = System.currentTimeMillis - val file = createFile(blockId) - val fileOut = blockManager.wrapForCompression(blockId, - new FastBufferedOutputStream(new FileOutputStream(file))) - val objOut = blockManager.defaultSerializer.newInstance().serializeStream(fileOut) - objOut.writeAll(values.iterator) - objOut.close() - val length = file.length() + val file = diskManager.createBlockFile(blockId, allowAppending = false) + val outputStream = new FileOutputStream(file) + blockManager.dataSerializeStream(blockId, outputStream, values.iterator) + val length = file.length val timeTaken = System.currentTimeMillis - startTime logDebug("Block %s stored as %s file on disk in %d ms".format( - blockId, Utils.bytesToString(length), timeTaken)) + file.getName, Utils.bytesToString(length), timeTaken)) if (returnValues) { // Return a byte buffer for the contents of the file - val buffer = getFileBytes(file) + val buffer = getBytes(blockId).get PutResult(length, Right(buffer)) } else { PutResult(length, null) @@ -231,13 +83,18 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } override def getBytes(blockId: BlockId): Option[ByteBuffer] = { - val file = getFile(blockId) - val bytes = getFileBytes(file) - Some(bytes) + val segment = diskManager.getBlockLocation(blockId) + val channel = new RandomAccessFile(segment.file, "r").getChannel() + val buffer = try { + channel.map(MapMode.READ_ONLY, segment.offset, segment.length) + } finally { + channel.close() + } + Some(buffer) } override def getValues(blockId: BlockId): Option[Iterator[Any]] = { - getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes)) + getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } /** @@ -249,118 +106,20 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } override def remove(blockId: BlockId): Boolean = { - val file = getFile(blockId) - if (file.exists()) { + val fileSegment = diskManager.getBlockLocation(blockId) + val file = fileSegment.file + if (file.exists() && file.length() == fileSegment.length) { file.delete() } else { + if (fileSegment.length < file.length()) { + logWarning("Could not delete block associated with only a part of a file: " + blockId) + } false } } override def contains(blockId: BlockId): Boolean = { - getFile(blockId).exists() - } - - private def createFile(blockId: BlockId, allowAppendExisting: Boolean = false): File = { - val file = getFile(blockId) - if (!allowAppendExisting && file.exists()) { - // NOTE(shivaram): Delete the file if it exists. This might happen if a ShuffleMap task - // was rescheduled on the same machine as the old task. - logWarning("File for block " + blockId + " already exists on disk: " + file + ". Deleting") - file.delete() - } - file - } - - private def getFile(blockId: BlockId): File = { - logDebug("Getting file for block " + blockId) - - // Figure out which local directory it hashes to, and which subdirectory in that - val hash = Utils.nonNegativeHash(blockId) - val dirId = hash % localDirs.length - val subDirId = (hash / localDirs.length) % subDirsPerLocalDir - - // Create the subdirectory if it doesn't already exist - var subDir = subDirs(dirId)(subDirId) - if (subDir == null) { - subDir = subDirs(dirId).synchronized { - val old = subDirs(dirId)(subDirId) - if (old != null) { - old - } else { - val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) - newDir.mkdir() - subDirs(dirId)(subDirId) = newDir - newDir - } - } - } - - new File(subDir, blockId.name) - } - - private def createLocalDirs(): Array[File] = { - logDebug("Creating local directories at root dirs '" + rootDirs + "'") - val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") - rootDirs.split(",").map { rootDir => - var foundLocalDir = false - var localDir: File = null - var localDirId: String = null - var tries = 0 - val rand = new Random() - while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { - tries += 1 - try { - localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - localDir = new File(rootDir, "spark-local-" + localDirId) - if (!localDir.exists) { - foundLocalDir = localDir.mkdirs() - } - } catch { - case e: Exception => - logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e) - } - } - if (!foundLocalDir) { - logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + - " attempts to create local dir in " + rootDir) - System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) - } - logInfo("Created local directory at " + localDir) - localDir - } - } - - private def addShutdownHook() { - localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { - override def run() { - logDebug("Shutdown hook called") - localDirs.foreach { localDir => - try { - if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) - } catch { - case t: Throwable => - logError("Exception while deleting local spark dir: " + localDir, t) - } - } - if (shuffleSender != null) { - shuffleSender.stop() - } - } - }) - } - - private[storage] def startShuffleBlockSender(port: Int): Int = { - val pResolver = new PathResolver { - override def getAbsolutePath(blockIdString: String): String = { - val blockId = BlockId(blockIdString) - if (!blockId.isShuffle) null - else DiskStore.this.getFile(blockId).getAbsolutePath - } - } - shuffleSender = new ShuffleSender(port, pResolver) - logInfo("Created ShuffleSender binding to port : "+ shuffleSender.port) - shuffleSender.port + val file = diskManager.getBlockLocation(blockId).file + file.exists() } } diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala new file mode 100644 index 000000000..555486830 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.File + +/** + * References a particular segment of a file (potentially the entire file), + * based off an offset and a length. + */ +private[spark] class FileSegment(val file: File, val offset: Long, val length : Long) { + override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length) +} diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index f39fcd87f..229178c09 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -17,12 +17,13 @@ package org.apache.spark.storage -import org.apache.spark.serializer.Serializer +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.serializer.Serializer private[spark] -class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter]) - +class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter]) private[spark] trait ShuffleBlocks { @@ -30,24 +31,61 @@ trait ShuffleBlocks { def releaseWriters(group: ShuffleWriterGroup) } +/** + * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer + * per reducer. + * + * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle + * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer + * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files, + * it releases them for another task. + * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: + * - shuffleId: The unique id given to the entire shuffle stage. + * - bucketId: The id of the output partition (i.e., reducer id) + * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a + * time owns a particular fileId, and this id is returned to a pool when the task finishes. + */ private[spark] class ShuffleBlockManager(blockManager: BlockManager) { + // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. + // TODO: Remove this once the shuffle file consolidation feature is stable. + val consolidateShuffleFiles = + System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean + + var nextFileId = new AtomicInteger(0) + val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]() - def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { + def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = { new ShuffleBlocks { // Get a group of writers for a map task. override def acquireWriters(mapId: Int): ShuffleWriterGroup = { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + val fileId = getUnusedFileId() val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) - blockManager.getDiskBlockWriter(blockId, serializer, bufferSize) + val filename = physicalFileName(shuffleId, bucketId, fileId) + blockManager.getDiskWriter(blockId, filename, serializer, bufferSize) } - new ShuffleWriterGroup(mapId, writers) + new ShuffleWriterGroup(mapId, fileId, writers) } - override def releaseWriters(group: ShuffleWriterGroup) = { - // Nothing really to release here. + override def releaseWriters(group: ShuffleWriterGroup) { + recycleFileId(group.fileId) } } } + + private def getUnusedFileId(): Int = { + val fileId = unusedFileIds.poll() + if (fileId == null) nextFileId.getAndIncrement() else fileId + } + + private def recycleFileId(fileId: Int) { + if (!consolidateShuffleFiles) { return } // ensures we always generate new file id + unusedFileIds.add(fileId) + } + + private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = { + "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId) + } } diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala index 5f30383fd..1b074e5ec 100644 --- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala +++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala @@ -44,7 +44,7 @@ object StoragePerfTester { } buckets.writers.map {w => w.commit() - total.addAndGet(w.size()) + total.addAndGet(w.fileSegment().length) w.close() } diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 0ce1394c7..3f963727d 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -56,9 +56,10 @@ class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, clea } object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast", "DagScheduler", "ResultTask", - "ShuffleMapTask", "BlockManager", "BroadcastVars") { + "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") { - val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, SHUFFLE_MAP_TASK, BLOCK_MANAGER, BROADCAST_VARS = Value + val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK, + SHUFFLE_MAP_TASK, BLOCK_MANAGER, DISK_BLOCK_MANAGER, BROADCAST_VARS = Value type MetadataCleanerType = Value