Skip to content

Commit 00a2f37

Browse files
committed
[query] move block matrix read out of HailContext
1 parent 9883ce4 commit 00a2f37

File tree

2 files changed

+19
-43
lines changed

2 files changed

+19
-43
lines changed

hail/hail/src/is/hail/HailContext.scala

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,15 @@
11
package is.hail
22

3-
import is.hail.backend.{Backend, ExecuteContext}
3+
import is.hail.backend.Backend
44
import is.hail.backend.spark.SparkBackend
55
import is.hail.expr.ir.functions.IRFunctionRegistry
66
import is.hail.io.fs.FS
77
import is.hail.utils._
88

9-
import scala.reflect.ClassTag
10-
11-
import java.io.InputStream
129
import java.util.Properties
1310

1411
import org.apache.log4j.{LogManager, PropertyConfigurator}
1512
import org.apache.spark._
16-
import org.apache.spark.executor.InputMetrics
17-
import org.apache.spark.rdd.RDD
18-
19-
case class FilePartition(index: Int, file: String) extends Partition
2013

2114
object HailContext {
2215

@@ -115,32 +108,6 @@ object HailContext {
115108
theContext.backend.close()
116109
theContext = null
117110
}
118-
119-
def readPartitions[T: ClassTag](
120-
ctx: ExecuteContext,
121-
path: String,
122-
partFiles: IndexedSeq[String],
123-
read: (Int, InputStream, InputMetrics) => Iterator[T],
124-
optPartitioner: Option[Partitioner] = None,
125-
): RDD[T] = {
126-
val nPartitions = partFiles.length
127-
128-
val fsBc = ctx.fsBc
129-
130-
new RDD[T](ctx.backend.asSpark.sc, Nil) {
131-
def getPartitions: Array[Partition] =
132-
Array.tabulate(nPartitions)(i => FilePartition(i, partFiles(i)))
133-
134-
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
135-
val p = split.asInstanceOf[FilePartition]
136-
val filename = path + "/parts/" + p.file
137-
val in = fsBc.value.open(filename)
138-
read(p.index, in, context.taskMetrics().inputMetrics)
139-
}
140-
141-
@transient override val partitioner: Option[Partitioner] = optPartitioner
142-
}
143-
}
144111
}
145112

146113
class HailContext private (

hail/hail/src/is/hail/linalg/BlockMatrix.scala

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package is.hail.linalg
22

3-
import is.hail._
43
import is.hail.annotations._
54
import is.hail.backend.{BroadcastValue, ExecuteContext, HailStateManager}
65
import is.hail.backend.spark.{SparkBackend, SparkTaskContext}
@@ -28,7 +27,6 @@ import breeze.numerics.{abs => breezeAbs, log => breezeLog, pow => breezePow, sq
2827
import breeze.stats.distributions.RandBasis
2928
import org.apache.commons.lang3.StringUtils
3029
import org.apache.spark._
31-
import org.apache.spark.executor.InputMetrics
3230
import org.apache.spark.mllib.linalg.distributed.{GridPartitioner => _, _}
3331
import org.apache.spark.rdd.RDD
3432
import org.apache.spark.storage.StorageLevel
@@ -216,16 +214,27 @@ object BlockMatrix {
216214
readMetadata(ctx.fs, uri)
217215

218216
val gp = GridPartitioner(blockSize, nRows, nCols, maybeFiltered)
217+
val nPartitions = partFiles.length
218+
val fsBc = ctx.fsBc
219219

220-
def readBlock(pi: Int, is: InputStream, metrics: InputMetrics)
221-
: Iterator[((Int, Int), BDM[Double])] = {
222-
val block = RichDenseMatrixDouble.read(is, bufferSpec)
223-
is.close()
220+
val blocks =
221+
new RDD[((Int, Int), BDM[Double])](ctx.backend.asSpark.sc, Nil) {
224222

225-
Iterator.single(gp.partCoordinates(pi) -> block)
226-
}
223+
case class FilePartition(index: Int, file: String) extends Partition
224+
225+
override lazy val getPartitions: Array[Partition] =
226+
Array.tabulate(nPartitions)(i => FilePartition(i, partFiles(i)))
227227

228-
val blocks = HailContext.readPartitions(ctx, uri, partFiles, readBlock, Some(gp))
228+
override def compute(split: Partition, context: TaskContext)
229+
: Iterator[((Int, Int), BDM[Double])] =
230+
using(fsBc.value.open(uri + "/parts/" + split.asInstanceOf[FilePartition].file)) { in =>
231+
val block = RichDenseMatrixDouble.read(in, bufferSpec)
232+
Iterator.single(gp.partCoordinates(split.index) -> block)
233+
}
234+
235+
@transient override val partitioner: Option[Partitioner] =
236+
Some(gp)
237+
}
229238

230239
new BlockMatrix(blocks, blockSize, nRows, nCols)
231240
}

0 commit comments

Comments
 (0)