Skip to content

Commit 65ae981

Browse files
committed
[query] run ServiceBackend in Py4jQueryDriver
1 parent 2e55e71 commit 65ae981

26 files changed

+716
-167
lines changed

build.yaml

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2560,6 +2560,158 @@ steps:
25602560
- dev
25612561
clouds:
25622562
- azure
2563+
- kind: runImage
2564+
name: test_hail_python_service_backend_gcp
2565+
numSplits: 16
2566+
image:
2567+
valueFrom: hail_run_image.image
2568+
resources:
2569+
cpu: '0.25'
2570+
preemptible: False
2571+
script: |
2572+
set -ex
2573+
python3 -m pip install --no-dependencies /io/wheel/hail-*-py3-none-any.whl
2574+
2575+
cd /io/repo/hail/python
2576+
2577+
export HAIL_CLOUD={{ global.cloud }}
2578+
export HAIL_DEFAULT_NAMESPACE={{ default_ns.name }}
2579+
export HAIL_TEST_STORAGE_URI={{ global.test_storage_uri }}/{{ token }}
2580+
export HAIL_TEST_RESOURCES_DIR="{{ global.test_storage_uri }}/{{ upload_test_resources_to_blob_storage.token }}/test/resources/"
2581+
export HAIL_DOCTEST_DATA_DIR="{{ global.test_storage_uri }}/{{ upload_test_resources_to_blob_storage.token }}/doctest/data/"
2582+
export HAIL_GENETICS_VEP_GRCH37_85_IMAGE={{ hailgenetics_vep_grch37_85_image.image }}
2583+
export HAIL_GENETICS_VEP_GRCH38_95_IMAGE={{ hailgenetics_vep_grch38_95_image.image }}
2584+
export GOOGLE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
2585+
export AZURE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
2586+
2587+
export GCS_REQUESTER_PAYS_PROJECT=broad-ctsa
2588+
2589+
export HAIL_SHUFFLE_MAX_BRANCH=4
2590+
export HAIL_SHUFFLE_CUTOFF=1000000
2591+
export HAIL_QUERY_BACKEND=batch
2592+
export HAIL_QUERY_USE_LOCAL_DRIVER=1
2593+
export HAIL_BATCH_REGIONS={{ global.gcp_region }}
2594+
export HAIL_BATCH_BILLING_PROJECT=test
2595+
export HAIL_BATCH_REMOTE_TMPDIR={{ global.test_storage_uri }}
2596+
2597+
python3 -m pytest \
2598+
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
2599+
--log-cli-level=INFO \
2600+
-s \
2601+
-r A \
2602+
-vv \
2603+
--instafail \
2604+
--durations=50 \
2605+
--ignore=test/hailtop/ \
2606+
--ignore=test/hail/matrixtable/test_file_formats.py \
2607+
-m backend \
2608+
--timeout=600 \
2609+
test
2610+
timeout: 5400
2611+
inputs:
2612+
- from: /derived/release/hail/build/deploy/dist
2613+
to: /io/wheel
2614+
- from: /repo/hail/python/pytest.ini
2615+
to: /io/repo/hail/python/pytest.ini
2616+
- from: /repo/hail/python/test
2617+
to: /io/repo/hail/python/test
2618+
secrets:
2619+
- name: test-gsa-key
2620+
namespace:
2621+
valueFrom: default_ns.name
2622+
mountPath: /test-gsa-key
2623+
dependsOn:
2624+
- default_ns
2625+
- merge_code
2626+
- deploy_batch
2627+
- create_deploy_config
2628+
- create_accounts
2629+
- hail_run_image
2630+
- upload_query_jar
2631+
- upload_test_resources_to_blob_storage
2632+
- build_hail_jar_and_wheel
2633+
- hailgenetics_vep_grch37_85_image
2634+
- hailgenetics_vep_grch38_95_image
2635+
clouds:
2636+
- gcp
2637+
- kind: runImage
2638+
name: test_hail_python_service_backend_azure
2639+
numSplits: 16
2640+
image:
2641+
valueFrom: hail_run_image.image
2642+
resources:
2643+
cpu: '0.25'
2644+
preemptible: False
2645+
script: |
2646+
set -ex
2647+
python3 -m pip install --no-dependencies /io/wheel/hail-*-py3-none-any.whl
2648+
2649+
cd /io/repo/hail/python
2650+
2651+
export HAIL_CLOUD={{ global.cloud }}
2652+
export HAIL_DEFAULT_NAMESPACE={{ default_ns.name }}
2653+
export HAIL_TEST_STORAGE_URI={{ global.test_storage_uri }}/{{ token }}
2654+
export HAIL_TEST_RESOURCES_DIR="{{ global.test_storage_uri }}/{{ upload_test_resources_to_blob_storage.token }}/test/resources/"
2655+
export HAIL_DOCTEST_DATA_DIR="{{ global.test_storage_uri }}/{{ upload_test_resources_to_blob_storage.token }}/doctest/data/"
2656+
export HAIL_GENETICS_VEP_GRCH37_85_IMAGE={{ hailgenetics_vep_grch37_85_image.image }}
2657+
export HAIL_GENETICS_VEP_GRCH38_95_IMAGE={{ hailgenetics_vep_grch38_95_image.image }}
2658+
export GOOGLE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
2659+
export AZURE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
2660+
2661+
export HAIL_AZURE_SUBSCRIPTION_ID={{ global.azure_subscription_id }}
2662+
export HAIL_AZURE_RESOURCE_GROUP={{ global.azure_resource_group }}
2663+
2664+
export HAIL_SHUFFLE_MAX_BRANCH=4
2665+
export HAIL_SHUFFLE_CUTOFF=1000000
2666+
export HAIL_QUERY_BACKEND=batch
2667+
export HAIL_QUERY_USE_LOCAL_DRIVER=1
2668+
export HAIL_BATCH_REGIONS={{ global.azure_location }}
2669+
export HAIL_BATCH_BILLING_PROJECT=test
2670+
export HAIL_BATCH_REMOTE_TMPDIR={{ global.test_storage_uri }}
2671+
2672+
python3 -m pytest \
2673+
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
2674+
--log-cli-level=INFO \
2675+
-s \
2676+
-r A \
2677+
-vv \
2678+
--instafail \
2679+
--durations=50 \
2680+
--ignore=test/hailtop/ \
2681+
--ignore=test/hail/matrixtable/test_file_formats.py \
2682+
-m backend \
2683+
--timeout=600 \
2684+
test
2685+
timeout: 5400
2686+
inputs:
2687+
- from: /derived/release/hail/build/deploy/dist
2688+
to: /io/wheel
2689+
- from: /repo/hail/python/pytest.ini
2690+
to: /io/repo/hail/python/pytest.ini
2691+
- from: /repo/hail/python/test
2692+
to: /io/repo/hail/python/test
2693+
secrets:
2694+
- name: test-gsa-key
2695+
namespace:
2696+
valueFrom: default_ns.name
2697+
mountPath: /test-gsa-key
2698+
dependsOn:
2699+
- default_ns
2700+
- merge_code
2701+
- deploy_batch
2702+
- create_deploy_config
2703+
- create_accounts
2704+
- hail_run_image
2705+
- upload_query_jar
2706+
- upload_test_resources_to_blob_storage
2707+
- build_hail_jar_and_wheel
2708+
- hailgenetics_vep_grch37_85_image
2709+
- hailgenetics_vep_grch38_95_image
2710+
scopes:
2711+
- deploy
2712+
- dev
2713+
clouds:
2714+
- azure
25632715
- kind: runImage
25642716
name: test_hail_spark_conf_requester_pays_parsing
25652717
image:

hail/hail/src/is/hail/backend/driver/BackendRpc.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,15 @@ import is.hail.io.plink.LoadPlink
99
import is.hail.io.vcf.LoadVCF
1010
import is.hail.types.virtual.{Kind, TFloat64}
1111
import is.hail.types.virtual.Kinds._
12-
import is.hail.utils.{using, BoxedArrayBuilder, ExecutionTimer, FastSeq}
12+
import is.hail.utils.{jsonToBytes, using, BoxedArrayBuilder, ExecutionTimer, FastSeq}
1313
import is.hail.utils.ExecutionTimer.Timings
1414
import is.hail.variant.ReferenceGenome
1515

1616
import scala.util.control.NonFatal
1717

1818
import java.io.ByteArrayOutputStream
19-
import java.nio.charset.StandardCharsets
2019

2120
import org.json4s.{DefaultFormats, Extraction, Formats, JArray, JValue}
22-
import org.json4s.jackson.JsonMethods
2321

2422
case class SerializedIRFunction(
2523
name: String,
@@ -146,9 +144,6 @@ trait BackendRpc {
146144
case NonFatal(error) => R.failure(env, error)
147145
}
148146

149-
def jsonToBytes(v: JValue): Array[Byte] =
150-
JsonMethods.compact(v).getBytes(StandardCharsets.UTF_8)
151-
152147
private[this] def withRegisterSerializedFns[A](
153148
ctx: ExecuteContext,
154149
serializedFns: Array[SerializedIRFunction],

hail/hail/src/is/hail/backend/driver/BatchQueryDriver.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import is.hail.io.fs.{CloudStorageFSConfig, FS, RouterFS}
1010
import is.hail.io.reference.{IndexedFastaSequenceFile, LiftOver}
1111
import is.hail.macros.void
1212
import is.hail.services._
13+
import is.hail.services.oauth2.CloudCredentials
1314
import is.hail.types.virtual.Kinds._
1415
import is.hail.utils._
1516
import is.hail.utils.ExecutionTimer.Timings
@@ -136,8 +137,6 @@ object BatchQueryDriver extends HttpLikeRpc with Logging {
136137
val inputURL = argv(5)
137138
val outputURL = argv(6)
138139

139-
val deployConfig = DeployConfig.fromConfigFile("/deploy-config/deploy-config.json")
140-
DeployConfig.set(deployConfig)
141140
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir)
142141

143142
val (rpcConfig, jobConfig, action, payload) = {
@@ -188,7 +187,10 @@ object BatchQueryDriver extends HttpLikeRpc with Logging {
188187
val backend =
189188
new ServiceBackend(
190189
name,
191-
BatchClient(deployConfig, Path.of(scratchDir, "secrets/gsa-key/key.json")),
190+
BatchClient(
191+
DeployConfig.fromConfigFile("/deploy-config/deploy-config.json"),
192+
CloudCredentials(Some(Path.of(scratchDir, "secrets/gsa-key/key.json"))),
193+
),
192194
JarUrl(jarLocation),
193195
BatchConfig.fromConfigFile(Path.of(scratchDir, "batch-config/batch-config.json")),
194196
jobConfig,

hail/hail/src/is/hail/backend/service/ServiceBackend.scala

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package is.hail.backend.service
22

3-
import is.hail.HailFeatureFlags
3+
import is.hail.{HAIL_REVISION, HailFeatureFlags}
44
import is.hail.annotations._
55
import is.hail.asm4s._
66
import is.hail.backend._
@@ -14,6 +14,7 @@ import is.hail.expr.ir.lowering._
1414
import is.hail.io.fs._
1515
import is.hail.services._
1616
import is.hail.services.JobGroupStates.{Cancelled, Failure, Success}
17+
import is.hail.services.oauth2.{CloudCredentials, HailCredentials}
1718
import is.hail.types._
1819
import is.hail.types.physical._
1920
import is.hail.types.physical.stypes.PTypeReferenceSingleCodeType
@@ -26,8 +27,65 @@ import java.io._
2627
import java.nio.charset.StandardCharsets
2728
import java.util.concurrent._
2829

30+
import com.fasterxml.jackson.core.StreamReadConstraints
31+
2932
object ServiceBackend {
3033
val MaxAvailableGcsConnections = 1000
34+
35+
// See https://github.com/hail-is/hail/issues/14580
36+
StreamReadConstraints.overrideDefaultStreamReadConstraints(
37+
StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build()
38+
)
39+
40+
def pyServiceBackend(
41+
name: String,
42+
batchId_ : Integer,
43+
billingProject: String,
44+
deployConfigFile: String,
45+
workerCores: String,
46+
workerMemory: String,
47+
storage: String,
48+
cloudfuse: Array[CloudfuseConfig],
49+
regions: Array[String],
50+
): ServiceBackend = {
51+
val credentials: CloudCredentials =
52+
HailCredentials().getOrElse(CloudCredentials(keyPath = None))
53+
54+
val client =
55+
BatchClient(
56+
DeployConfig.fromConfigFile(deployConfigFile),
57+
credentials,
58+
)
59+
60+
val batchId =
61+
Option(batchId_).map(_.toInt).getOrElse {
62+
client.newBatch(
63+
BatchRequest(
64+
billing_project = billingProject,
65+
token = tokenUrlSafe,
66+
n_jobs = 0,
67+
attributes = Map("name" -> name),
68+
)
69+
)
70+
}
71+
72+
val workerConfig =
73+
BatchJobConfig(
74+
workerCores,
75+
workerMemory,
76+
storage,
77+
cloudfuse,
78+
regions,
79+
)
80+
81+
new ServiceBackend(
82+
name,
83+
client,
84+
GitRevision(HAIL_REVISION),
85+
BatchConfig(batchId, 0),
86+
workerConfig,
87+
)
88+
}
3189
}
3290

3391
case class BatchJobConfig(
@@ -147,10 +205,16 @@ class ServiceBackend(
147205
)
148206

149207
stageCount += 1
150-
151-
Thread.sleep(600) // it is not possible for the batch to be finished in less than 600ms
152-
val response = batchClient.waitForJobGroup(batchConfig.batchId, jobGroupId)
153-
(response, startJobId)
208+
try {
209+
Thread.sleep(600) // it is not possible for the batch to be finished in less than 600ms
210+
val response = batchClient.waitForJobGroup(batchConfig.batchId, jobGroupId)
211+
(response, startJobId)
212+
} catch {
213+
case _: InterruptedException =>
214+
batchClient.cancelJobGroup(batchConfig.batchId, jobGroupId)
215+
Thread.currentThread().interrupt()
216+
throw new CancellationException()
217+
}
154218
}
155219

156220
private[this] def readPartitionResult(fs: FS, root: String, i: Int): Array[Byte] = {

hail/hail/src/is/hail/backend/service/Worker.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ object Worker {
122122
val n = argv(6).toInt
123123
val timer = new WorkerTimer()
124124

125-
val deployConfig = DeployConfig.fromConfigFile("/deploy-config/deploy-config.json")
126-
DeployConfig.set(deployConfig)
127125
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir)
128126

129127
log.info(s"is.hail.backend.service.Worker $myRevision")

hail/hail/src/is/hail/io/fs/AzureStorageFS.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import is.hail.shadedazure.com.azure.storage.blob.models.{
1313
BlobItem, BlobRange, BlobStorageException, ListBlobsOptions,
1414
}
1515
import is.hail.shadedazure.com.azure.storage.blob.specialized.BlockBlobClient
16-
import is.hail.utils.FastSeq
1716

1817
import scala.collection.JavaConverters._
1918
import scala.collection.mutable
@@ -60,8 +59,8 @@ object AzureStorageFS {
6059
private val AZURE_HTTPS_URI_REGEX =
6160
"^https:\\/\\/([a-z0-9_\\-\\.]+)\\.blob\\.core\\.windows\\.net\\/([a-z0-9_\\-\\.]+)(\\/.*)?".r
6261

63-
val RequiredOAuthScopes: IndexedSeq[String] =
64-
FastSeq("https://storage.azure.com/.default")
62+
val RequiredOAuthScopes: Array[String] =
63+
Array("https://storage.azure.com/.default")
6564

6665
def parseUrl(filename: String): AzureStorageFSURL = {
6766
AZURE_HTTPS_URI_REGEX

hail/hail/src/is/hail/io/fs/GoogleStorageFS.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ object GoogleStorageFS {
4545

4646
private[this] val GCS_URI_REGEX = "^gs:\\/\\/([a-z0-9_\\-\\.]+)(\\/.*)?".r
4747

48-
val RequiredOAuthScopes: IndexedSeq[String] =
49-
FastSeq("https://www.googleapis.com/auth/devstorage.read_write")
48+
val RequiredOAuthScopes: Array[String] =
49+
Array("https://www.googleapis.com/auth/devstorage.read_write")
5050

5151
def parseUrl(filename: String): GoogleStorageFSURL = {
5252
val scheme = filename.split(":")(0)

hail/hail/src/is/hail/io/fs/RouterFS.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,23 @@ object RouterFS {
5252
def buildRoutes(cloudConfig: CloudStorageFSConfig, env: Map[String, String] = sys.env): FS =
5353
new RouterFS(
5454
IndexedSeq.concat(
55-
cloudConfig.google.map { case GoogleStorageFSConfig(path, mRPConfig) =>
55+
cloudConfig.google.map { case GoogleStorageFSConfig(path, rpConfig) =>
5656
new GoogleStorageFS(
57-
GoogleCloudCredentials(path, GoogleStorageFS.RequiredOAuthScopes, env),
58-
mRPConfig,
57+
GoogleCloudCredentials(path).scoped(GoogleStorageFS.RequiredOAuthScopes),
58+
rpConfig,
5959
)
6060
},
6161
cloudConfig.azure.map { case AzureStorageFSConfig(path) =>
62-
if (env.contains("HAIL_TERRA")) {
63-
val creds = AzureCloudCredentials(path, TerraAzureStorageFS.RequiredOAuthScopes, env)
64-
new TerraAzureStorageFS(creds)
65-
} else
66-
new AzureStorageFS(AzureCloudCredentials(path, AzureStorageFS.RequiredOAuthScopes, env))
62+
if (env.contains("HAIL_TERRA"))
63+
new TerraAzureStorageFS(
64+
AzureCloudCredentials(path, env)
65+
.scoped(TerraAzureStorageFS.RequiredOAuthScopes)
66+
)
67+
else
68+
new AzureStorageFS(
69+
AzureCloudCredentials(path, env)
70+
.scoped(AzureStorageFS.RequiredOAuthScopes)
71+
)
6772
},
6873
FastSeq(new HadoopFS(new SerializableHadoopConfiguration(new Configuration()))),
6974
)

0 commit comments

Comments
 (0)