Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 11 additions & 33 deletions hail/hail/src/is/hail/HailContext.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package is.hail

import is.hail.backend.Backend
import is.hail.backend.{Backend, ExecuteContext}
import is.hail.backend.spark.SparkBackend
import is.hail.expr.ir.functions.IRFunctionRegistry
import is.hail.io.fs.FS
import is.hail.io.vcf._
import is.hail.types.virtual._
import is.hail.utils._

import scala.reflect.ClassTag
Expand All @@ -17,33 +15,21 @@ import org.apache.log4j.{LogManager, PropertyConfigurator}
import org.apache.spark._
import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.json4s.Extraction
import org.json4s.jackson.JsonMethods
import sourcecode.Enclosing

case class FilePartition(index: Int, file: String) extends Partition

object HailContext {
val tera: Long = 1024L * 1024L * 1024L * 1024L

val logFormat: String = "%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1}: %p: %m%n"

private var theContext: HailContext = _

def isInitialized: Boolean = synchronized {
theContext != null
}

def get: HailContext = synchronized {
assert(TaskContext.get() == null, "HailContext not available on worker")
assert(theContext != null, "HailContext not initialized")
theContext
}

def backend: Backend = get.backend

def sparkBackend(implicit E: Enclosing): SparkBackend = get.backend.asSpark

def configureLogging(logFile: String, quiet: Boolean, append: Boolean): Unit = {
org.apache.log4j.helpers.LogLog.setInternalDebugging(true)
org.apache.log4j.helpers.LogLog.setQuietMode(false)
Expand Down Expand Up @@ -94,7 +80,7 @@ object HailContext {

def getOrCreate(backend: Backend): HailContext =
synchronized {
if (isInitialized) theContext
if (theContext != null) theContext
else HailContext(backend)
}

Expand Down Expand Up @@ -123,25 +109,25 @@ object HailContext {
theContext
}

def stop(): Unit = synchronized {
IRFunctionRegistry.clearUserFunctions()
backend.close()

theContext = null
}
def stop(): Unit =
synchronized {
IRFunctionRegistry.clearUserFunctions()
theContext.backend.close()
theContext = null
}

def readPartitions[T: ClassTag](
fs: FS,
ctx: ExecuteContext,
path: String,
partFiles: IndexedSeq[String],
read: (Int, InputStream, InputMetrics) => Iterator[T],
optPartitioner: Option[Partitioner] = None,
): RDD[T] = {
val nPartitions = partFiles.length

val fsBc = fs.broadcast
val fsBc = ctx.fsBc

new RDD[T](SparkBackend.sparkContext, Nil) {
new RDD[T](ctx.backend.asSpark.sc, Nil) {
def getPartitions: Array[Partition] =
Array.tabulate(nPartitions)(i => FilePartition(i, partFiles(i)))

Expand Down Expand Up @@ -192,12 +178,4 @@ class HailContext private (
: Array[(String, Array[String])] =
fileAndLineCounts(fs: FS, regex, files, maxLines).mapValues(_.map(_.value)).toArray

def parseVCFMetadata(fs: FS, file: String): Map[String, Map[String, Map[String, String]]] =
LoadVCF.parseHeaderMetadata(fs, Set.empty, TFloat64, file)

def pyParseVCFMetadataJSON(fs: FS, file: String): String = {
val metadata = LoadVCF.parseHeaderMetadata(fs, Set.empty, TFloat64, file)
implicit val formats = defaultJSONFormats
JsonMethods.compact(Extraction.decompose(metadata))
}
Comment on lines -195 to -202
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused/implemented in BackendRpc

}
2 changes: 1 addition & 1 deletion hail/hail/src/is/hail/backend/BackendUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BackendUtils(
val remainingPartitions =
contexts.indices.filterNot(k => cachedResults.containsOrdered[Int](k, _ < _, _._2))

val backend = HailContext.backend
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you get rid of this upstack, but I'm curious if there's a simple rule for when we get the backend via an ExecuteContext, vs when we still need to get a HailContext (for now, later using a different mechanism). Is it just a compile-time vs runtime distinction?

Copy link
Member Author

@ehigham ehigham Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been proceeding on the basis of avoiding "global" mutable fields entirely, instead favouring dependency injection in code that we maintain (via ExecuteContext in this case). For generated code, using a constant pool or some such is probably the right thing to do, so long as it doesn't depend on non-generated code.

For Backend specifically, my intention is that that ref in the upcoming change should only be used by BackendUtils.collectDArray . I hope is to remove the mutable ref eventually, either by

  • code-generating parallelizeAndComputeWithIndex,
  • by initialising a "constant" field in the generated code with either
    • the backend in a similar way to reference genomes etc
    • the BackendContext that's passed to collectDArray

I'm not sure if that answers your question properly...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might just do that last one now...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if that answers your question properly...

It does, thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val backend = HailContext.get.backend
val mod = getModule(modID)
val t = System.nanoTime()
val (failureOpt, successes) =
Expand Down
2 changes: 1 addition & 1 deletion hail/hail/src/is/hail/backend/ExecuteContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class ExecuteContext(

val stateManager = HailStateManager(references)

def fsBc: BroadcastValue[FS] = fs.broadcast
lazy val fsBc: BroadcastValue[FS] = backend.broadcast(fs)

val memo: mutable.Map[Any, Any] = new mutable.HashMap[Any, Any]()

Expand Down
5 changes: 3 additions & 2 deletions hail/hail/src/is/hail/backend/driver/Py4JQueryDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ final class Py4JQueryDriver(backend: Backend) extends Closeable {
): Unit = {
void {
withExecuteContext() { ctx =>
val rm = linalg.RowMatrix.readBlockMatrix(ctx.fs, pathIn, partitionSize)
val rm = linalg.RowMatrix.readBlockMatrix(ctx, pathIn, partitionSize)
entries match {
case "full" =>
rm.export(ctx, pathOut, delimiter, Option(header), addIndex, exportType)
Expand Down Expand Up @@ -225,7 +225,8 @@ final class Py4JQueryDriver(backend: Backend) extends Closeable {
def pyToDF(s: String): DataFrame =
withExecuteContext(selfContainedExecution = false) { ctx =>
val tir = IRParser.parse_table_ir(ctx, s)
Interpret(tir, ctx).toDF()
val tv = Interpret(tir, ctx)
tv.toDF(ctx)
}._1

def pyReadMultipleMatrixTables(jsonQuery: String): util.List[MatrixIR] =
Expand Down
6 changes: 5 additions & 1 deletion hail/hail/src/is/hail/backend/spark/SparkBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ object SparkBackend {

private var theSparkBackend: SparkBackend = _

def sparkContext(implicit E: Enclosing): SparkContext = HailContext.sparkBackend.sc
def sparkContext(implicit E: Enclosing): SparkContext =
synchronized {
if (theSparkBackend == null) throw new IllegalStateException(E.value)
else theSparkBackend.sc
}

def checkSparkCompatibility(jarVersion: String, sparkVersion: String): Unit = {
def majorMinor(version: String): String = version.split("\\.", 3).take(2).mkString(".")
Expand Down
Loading