Skip to content

Commit dde9456

Browse files
committed
[query] HailContext is vanquished
1 parent 78641f1 commit dde9456

File tree

10 files changed

+55
-108
lines changed

10 files changed

+55
-108
lines changed

hail/hail/src/is/hail/HailContext.scala

Lines changed: 0 additions & 83 deletions
This file was deleted.

hail/hail/src/is/hail/backend/driver/BatchQueryDriver.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ object BatchQueryDriver extends HttpLikeRpc with Logging {
194194
jobConfig,
195195
)
196196

197+
Logging.preamble()
198+
197199
// FIXME: when can the classloader be shared? (optimizer benefits!)
198200
try runRpc(
199201
Env(

hail/hail/src/is/hail/backend/driver/Py4JQueryDriver.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,34 @@ final class Py4JQueryDriver(backend: Backend) extends Closeable {
275275
IRParser.parse_blockmatrix_ir(ctx, s)
276276
}._1
277277

278+
private[this] def fileAndLineCounts(
279+
regex: String,
280+
files: Seq[String],
281+
maxLines: Int,
282+
): Map[String, Array[WithContext[String]]] =
283+
synchronized {
284+
val regexp = regex.r
285+
backend.asSpark.sc
286+
.textFilesLines(tmpFileManager.fs.globAll(files).map(_.getPath))
287+
.filter(line => regexp.findFirstIn(line.value).isDefined)
288+
.take(maxLines)
289+
.groupBy(_.source.file)
290+
}
291+
292+
def pyGrepPrint(regex: String, files: Seq[String], maxLines: Int): Unit =
293+
fileAndLineCounts(regex, files, maxLines).foreach { case (file, lines) =>
294+
info(s"$file: ${lines.length} ${plural(lines.length, "match", "matches")}:")
295+
lines.map(_.value).foreach { line =>
296+
val (screen, logged) = line.truncatable().strings
297+
log.info("\t" + logged)
298+
println(s"\t$screen")
299+
}
300+
}
301+
302+
def pyGrepReturn(regex: String, files: Seq[String], maxLines: Int)
303+
: Array[(String, Array[String])] =
304+
fileAndLineCounts(regex, files, maxLines).mapValues(_.map(_.value)).toArray
305+
278306
private[this] def addReference(rg: ReferenceGenome): Unit =
279307
ReferenceGenome.addFatalOnCollision(references, FastSeq(rg))
280308

hail/hail/src/is/hail/backend/local/LocalBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ object LocalBackend extends Backend {
4949
): LocalBackend.type =
5050
synchronized {
5151
if (!skipLoggingConfiguration) Logging.configureLogging(logFile, quiet, append)
52+
Logging.preamble()
5253
this
5354
}
5455

hail/hail/src/is/hail/backend/spark/SparkBackend.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ object SparkBackend {
108108
def createSparkConf(appName: String, master: String, local: String, blockSize: Long)
109109
: SparkConf = {
110110
require(blockSize >= 0)
111+
111112
checkSparkCompatibility(is.hail.HAIL_SPARK_VERSION, org.apache.spark.SPARK_VERSION)
112113

113114
val conf = new SparkConf().setAppName(appName)
@@ -219,6 +220,7 @@ object SparkBackend {
219220

220221
// there should be only one SparkContext
221222
assert(sc == null || (sc eq theSparkBackend.sc))
223+
Logging.preamble()
222224

223225
val initializedMinBlockSize =
224226
theSparkBackend.sc.getConf.getLong(
@@ -258,6 +260,7 @@ object SparkBackend {
258260
if (!skipLoggingConfiguration)
259261
Logging.configureLogging(logFile, quiet, append)
260262

263+
Logging.preamble()
261264
var sc1 = sc
262265
if (sc1 == null)
263266
sc1 = configureAndCreateSparkContext(appName, master, local, minBlockSize)

hail/hail/src/is/hail/utils/Logging.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package is.hail.utils
22

3+
import is.hail.HAIL_PRETTY_VERSION
4+
35
import java.util.Properties
46

57
import org.apache.log4j.{LogManager, Logger, PropertyConfigurator}
@@ -94,4 +96,10 @@ object Logging {
9496
PropertyConfigurator.configure(logProps)
9597
}
9698

99+
def preamble(): Unit = {
100+
info(s"Running Hail version $HAIL_PRETTY_VERSION")
101+
val jreVersion = Runtime.version().feature()
102+
if (jreVersion != 11) warn(s"Hail is tested against Java 11, found $jreVersion")
103+
}
104+
97105
}

hail/python/hail/backend/local_backend.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,8 @@ def __init__(
7878
append,
7979
skip_logging_configuration,
8080
)
81-
jhc = hail_package.HailContext.apply()
8281

83-
super().__init__(self._gateway.jvm, jbackend, jhc, tmpdir, tmpdir)
82+
super().__init__(self._gateway.jvm, jbackend, tmpdir, tmpdir)
8483
self.gcs_requester_pays_configuration = gcs_requester_pays_configuration
8584
self._fs = self._exit_stack.enter_context(
8685
RouterFS(gcs_kwargs={'gcs_requester_pays_configuration': gcs_requester_pays_configuration})

hail/python/hail/backend/py4j_backend.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ def __init__(
178178
self,
179179
jvm: JVMView,
180180
jbackend: JavaObject,
181-
jhc: JavaObject,
182181
tmpdir: str,
183182
remote_tmpdir: str,
184183
):
@@ -195,7 +194,6 @@ def decode_bytearray(encoded):
195194
self._jvm = jvm
196195
self._hail_package = getattr(self._jvm, 'is').hail
197196
self._utils_package_object = scala_package_object(self._hail_package.utils)
198-
self._jhc = jhc
199197

200198
self._jbackend = self._hail_package.backend.driver.Py4JQueryDriver(jbackend)
201199
self.local_tmpdir = tmpdir
@@ -337,8 +335,6 @@ def _stop_jhttp_server(self):
337335
def stop(self):
338336
self._stop_jhttp_server()
339337
self._jbackend.close()
340-
self._jhc.stop()
341-
self._jhc = None
342338
uninstall_exception_handler()
343339
super().stop()
344340

hail/python/hail/backend/spark_backend.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ def __init__(
117117
skip_logging_configuration,
118118
min_block_size,
119119
)
120-
jhc = hail_package.HailContext.getOrCreate()
121120
else:
122121
jbackend = hail_package.backend.spark.SparkBackend.apply(
123122
jsc,
@@ -130,17 +129,13 @@ def __init__(
130129
skip_logging_configuration,
131130
min_block_size,
132131
)
133-
jhc = hail_package.HailContext.apply()
134132

135133
self._jsc = jbackend.sc()
136-
if sc:
137-
self.sc = sc
138-
else:
139-
self.sc = pyspark.SparkContext(gateway=self._gateway, jsc=jvm.JavaSparkContext(self._jsc))
134+
self.sc = sc if sc else pyspark.SparkContext(gateway=self._gateway, jsc=jvm.JavaSparkContext(self._jsc))
140135
self._jspark_session = jbackend.sparkSession().apply()
141136
self._spark_session = pyspark.sql.SparkSession(self.sc, self._jspark_session)
142137

143-
super().__init__(jvm, jbackend, jhc, local_tmpdir, tmpdir)
138+
super().__init__(jvm, jbackend, local_tmpdir, tmpdir)
144139
self.gcs_requester_pays_configuration = gcs_requester_pays_config
145140

146141
self._logger = None

hail/python/hail/methods/impex.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import hail as hl
1010
from hail import ir
11+
from hail.backend.py4j_backend import Py4JBackend
1112
from hail.expr import (
1213
LocusExpression,
1314
StructExpression,
@@ -1062,24 +1063,21 @@ def grep(regex, path, max_count=100, *, show: bool = True, force: bool = False,
10621063
---
10631064
:obj:`dict` of :class:`str` to :obj:`list` of :obj:`str`
10641065
"""
1065-
from hail.backend.spark_backend import SparkBackend
1066-
1067-
if isinstance(hl.current_backend(), SparkBackend):
1068-
jfs = Env.spark_backend('grep').fs._jfs
1069-
if show:
1070-
Env.backend()._jhc.grepPrint(jfs, regex, jindexed_seq_args(path), max_count)
1071-
return
1072-
else:
1073-
jarr = Env.backend()._jhc.grepReturn(jfs, regex, jindexed_seq_args(path), max_count)
1074-
return {x._1(): list(x._2()) for x in jarr}
1066+
backend = hl.current_backend()
1067+
if isinstance(backend, Py4JBackend):
1068+
jb = backend._jbackend
1069+
return (
1070+
jb.pyGrepPrint(regex, jindexed_seq_args(path), max_count)
1071+
if show
1072+
else {x._1(): list(x._2()) for x in jb.pyGrepReturn(regex, jindexed_seq_args(path), max_count)}
1073+
)
10751074

10761075
ht = hl.import_lines(path, force=force, force_bgz=force_bgz)
10771076
ht = ht.filter(ht.text.matches(regex))
10781077
ht = ht.head(max_count)
10791078
lines = ht.collect()
10801079
if show:
1081-
print('\n'.join(line.file + ': ' + line.text for line in lines))
1082-
return
1080+
return print('\n'.join(line.file + ': ' + line.text for line in lines))
10831081

10841082
results = defaultdict(list)
10851083
for line in lines:

0 commit comments

Comments
 (0)