Skip to content

Commit 18a198a

Browse files
committed
[query] run ServiceBackend in Py4jQueryDriver
1 parent a492cc5 commit 18a198a

28 files changed

+771
-196
lines changed

batch/batch/front_end/front_end.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1867,11 +1867,13 @@ async def create_update(request, userdata):
18671867
update_id, start_job_group_id, start_job_id = await _create_batch_update(
18681868
batch_id, update_spec['token'], n_jobs, n_job_groups, user, db
18691869
)
1870-
return json_response({
1870+
response = {
18711871
'update_id': update_id,
18721872
'start_job_group_id': start_job_group_id,
18731873
'start_job_id': start_job_id,
1874-
})
1874+
}
1875+
log.info(response)
1876+
return json_response(response)
18751877

18761878

18771879
async def _create_batch_update(

build.yaml

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2560,6 +2560,158 @@ steps:
25602560
- dev
25612561
clouds:
25622562
- azure
2563+
- kind: runImage
2564+
name: test_hail_python_service_backend_gcp
2565+
numSplits: 16
2566+
image:
2567+
valueFrom: hail_run_image.image
2568+
resources:
2569+
cpu: '0.25'
2570+
preemptible: False
2571+
script: |
2572+
set -ex
2573+
python3 -m pip install --no-dependencies /io/wheel/hail-*-py3-none-any.whl
2574+
2575+
cd /io/repo/hail/python
2576+
2577+
export HAIL_CLOUD={{ global.cloud }}
2578+
export HAIL_DEFAULT_NAMESPACE={{ default_ns.name }}
2579+
export HAIL_TEST_STORAGE_URI={{ global.test_storage_uri }}/{{ token }}
2580+
export HAIL_TEST_RESOURCES_DIR="{{ global.test_storage_uri }}/{{ upload_test_resources_to_blob_storage.token }}/test/resources/"
2581+
export HAIL_DOCTEST_DATA_DIR="{{ global.test_storage_uri }}/{{ upload_test_resources_to_blob_storage.token }}/doctest/data/"
2582+
export HAIL_GENETICS_VEP_GRCH37_85_IMAGE={{ hailgenetics_vep_grch37_85_image.image }}
2583+
export HAIL_GENETICS_VEP_GRCH38_95_IMAGE={{ hailgenetics_vep_grch38_95_image.image }}
2584+
export GOOGLE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
2585+
export AZURE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
2586+
2587+
export GCS_REQUESTER_PAYS_PROJECT=broad-ctsa
2588+
2589+
export HAIL_SHUFFLE_MAX_BRANCH=4
2590+
export HAIL_SHUFFLE_CUTOFF=1000000
2591+
export HAIL_QUERY_BACKEND=batch
2592+
export HAIL_QUERY_USE_LOCAL_DRIVER=1
2593+
export HAIL_BATCH_REGIONS={{ global.gcp_region }}
2594+
export HAIL_BATCH_BILLING_PROJECT=test
2595+
export HAIL_BATCH_REMOTE_TMPDIR={{ global.test_storage_uri }}
2596+
2597+
python3 -m pytest \
2598+
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
2599+
--log-cli-level=INFO \
2600+
-s \
2601+
-r A \
2602+
-vv \
2603+
--instafail \
2604+
--durations=50 \
2605+
--ignore=test/hailtop/ \
2606+
--ignore=test/hail/matrixtable/test_file_formats.py \
2607+
-m backend \
2608+
--timeout=600 \
2609+
test
2610+
timeout: 5400
2611+
inputs:
2612+
- from: /derived/release/hail/build/deploy/dist
2613+
to: /io/wheel
2614+
- from: /repo/hail/python/pytest.ini
2615+
to: /io/repo/hail/python/pytest.ini
2616+
- from: /repo/hail/python/test
2617+
to: /io/repo/hail/python/test
2618+
secrets:
2619+
- name: test-gsa-key
2620+
namespace:
2621+
valueFrom: default_ns.name
2622+
mountPath: /test-gsa-key
2623+
dependsOn:
2624+
- default_ns
2625+
- merge_code
2626+
- deploy_batch
2627+
- create_deploy_config
2628+
- create_accounts
2629+
- hail_run_image
2630+
- upload_query_jar
2631+
- upload_test_resources_to_blob_storage
2632+
- build_hail_jar_and_wheel
2633+
- hailgenetics_vep_grch37_85_image
2634+
- hailgenetics_vep_grch38_95_image
2635+
clouds:
2636+
- gcp
2637+
- kind: runImage
2638+
name: test_hail_python_service_backend_azure
2639+
numSplits: 16
2640+
image:
2641+
valueFrom: hail_run_image.image
2642+
resources:
2643+
cpu: '0.25'
2644+
preemptible: False
2645+
script: |
2646+
set -ex
2647+
python3 -m pip install --no-dependencies /io/wheel/hail-*-py3-none-any.whl
2648+
2649+
cd /io/repo/hail/python
2650+
2651+
export HAIL_CLOUD={{ global.cloud }}
2652+
export HAIL_DEFAULT_NAMESPACE={{ default_ns.name }}
2653+
export HAIL_TEST_STORAGE_URI={{ global.test_storage_uri }}/{{ token }}
2654+
export HAIL_TEST_RESOURCES_DIR="{{ global.test_storage_uri }}/{{ upload_test_resources_to_blob_storage.token }}/test/resources/"
2655+
export HAIL_DOCTEST_DATA_DIR="{{ global.test_storage_uri }}/{{ upload_test_resources_to_blob_storage.token }}/doctest/data/"
2656+
export HAIL_GENETICS_VEP_GRCH37_85_IMAGE={{ hailgenetics_vep_grch37_85_image.image }}
2657+
export HAIL_GENETICS_VEP_GRCH38_95_IMAGE={{ hailgenetics_vep_grch38_95_image.image }}
2658+
export GOOGLE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
2659+
export AZURE_APPLICATION_CREDENTIALS=/test-gsa-key/key.json
2660+
2661+
export HAIL_AZURE_SUBSCRIPTION_ID={{ global.azure_subscription_id }}
2662+
export HAIL_AZURE_RESOURCE_GROUP={{ global.azure_resource_group }}
2663+
2664+
export HAIL_SHUFFLE_MAX_BRANCH=4
2665+
export HAIL_SHUFFLE_CUTOFF=1000000
2666+
export HAIL_QUERY_BACKEND=batch
2667+
export HAIL_QUERY_USE_LOCAL_DRIVER=1
2668+
export HAIL_BATCH_REGIONS={{ global.azure_location }}
2669+
export HAIL_BATCH_BILLING_PROJECT=test
2670+
export HAIL_BATCH_REMOTE_TMPDIR={{ global.test_storage_uri }}
2671+
2672+
python3 -m pytest \
2673+
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
2674+
--log-cli-level=INFO \
2675+
-s \
2676+
-r A \
2677+
-vv \
2678+
--instafail \
2679+
--durations=50 \
2680+
--ignore=test/hailtop/ \
2681+
--ignore=test/hail/matrixtable/test_file_formats.py \
2682+
-m backend \
2683+
--timeout=600 \
2684+
test
2685+
timeout: 5400
2686+
inputs:
2687+
- from: /derived/release/hail/build/deploy/dist
2688+
to: /io/wheel
2689+
- from: /repo/hail/python/pytest.ini
2690+
to: /io/repo/hail/python/pytest.ini
2691+
- from: /repo/hail/python/test
2692+
to: /io/repo/hail/python/test
2693+
secrets:
2694+
- name: test-gsa-key
2695+
namespace:
2696+
valueFrom: default_ns.name
2697+
mountPath: /test-gsa-key
2698+
dependsOn:
2699+
- default_ns
2700+
- merge_code
2701+
- deploy_batch
2702+
- create_deploy_config
2703+
- create_accounts
2704+
- hail_run_image
2705+
- upload_query_jar
2706+
- upload_test_resources_to_blob_storage
2707+
- build_hail_jar_and_wheel
2708+
- hailgenetics_vep_grch37_85_image
2709+
- hailgenetics_vep_grch38_95_image
2710+
scopes:
2711+
- deploy
2712+
- dev
2713+
clouds:
2714+
- azure
25632715
- kind: runImage
25642716
name: test_hail_spark_conf_requester_pays_parsing
25652717
image:

hail/hail/src/is/hail/backend/driver/BackendRpc.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,15 @@ import is.hail.io.plink.LoadPlink
99
import is.hail.io.vcf.LoadVCF
1010
import is.hail.types.virtual.{Kind, TFloat64}
1111
import is.hail.types.virtual.Kinds._
12-
import is.hail.utils.{using, BoxedArrayBuilder, ExecutionTimer, FastSeq}
12+
import is.hail.utils.{jsonToBytes, using, BoxedArrayBuilder, ExecutionTimer, FastSeq}
1313
import is.hail.utils.ExecutionTimer.Timings
1414
import is.hail.variant.ReferenceGenome
1515

1616
import scala.util.control.NonFatal
1717

1818
import java.io.ByteArrayOutputStream
19-
import java.nio.charset.StandardCharsets
2019

2120
import org.json4s.{DefaultFormats, Extraction, Formats, JArray, JValue}
22-
import org.json4s.jackson.JsonMethods
2321

2422
case class SerializedIRFunction(
2523
name: String,
@@ -146,9 +144,6 @@ trait BackendRpc {
146144
case NonFatal(error) => R.failure(env, error)
147145
}
148146

149-
def jsonToBytes(v: JValue): Array[Byte] =
150-
JsonMethods.compact(v).getBytes(StandardCharsets.UTF_8)
151-
152147
private[this] def withRegisterSerializedFns[A](
153148
ctx: ExecuteContext,
154149
serializedFns: Array[SerializedIRFunction],

hail/hail/src/is/hail/backend/driver/BatchQueryDriver.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import is.hail.io.fs.{CloudStorageFSConfig, FS, RouterFS}
1010
import is.hail.io.reference.{IndexedFastaSequenceFile, LiftOver}
1111
import is.hail.macros.void
1212
import is.hail.services._
13+
import is.hail.services.oauth2.CloudCredentials
1314
import is.hail.types.virtual.Kinds._
1415
import is.hail.utils._
1516
import is.hail.utils.ExecutionTimer.Timings
@@ -136,8 +137,6 @@ object BatchQueryDriver extends HttpLikeRpc with Logging {
136137
val inputURL = argv(5)
137138
val outputURL = argv(6)
138139

139-
val deployConfig = DeployConfig.fromConfigFile("/deploy-config/deploy-config.json")
140-
DeployConfig.set(deployConfig)
141140
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir)
142141

143142
val (rpcConfig, jobConfig, action, payload) = {
@@ -188,7 +187,10 @@ object BatchQueryDriver extends HttpLikeRpc with Logging {
188187
val backend =
189188
new ServiceBackend(
190189
name,
191-
BatchClient(deployConfig, Path.of(scratchDir, "secrets/gsa-key/key.json")),
190+
BatchClient(
191+
DeployConfig.fromConfigFile("/deploy-config/deploy-config.json"),
192+
CloudCredentials(Some(Path.of(scratchDir, "secrets/gsa-key/key.json"))),
193+
),
192194
JarUrl(jarLocation),
193195
BatchConfig.fromConfigFile(Path.of(scratchDir, "batch-config/batch-config.json")),
194196
jobConfig,

hail/hail/src/is/hail/backend/service/ServiceBackend.scala

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package is.hail.backend.service
22

3-
import is.hail.HailFeatureFlags
3+
import is.hail.{HAIL_REVISION, HailFeatureFlags}
44
import is.hail.asm4s._
55
import is.hail.backend._
66
import is.hail.backend.service.ServiceBackend.MaxAvailableGcsConnections
@@ -13,6 +13,7 @@ import is.hail.expr.ir.lowering._
1313
import is.hail.io.fs._
1414
import is.hail.services._
1515
import is.hail.services.JobGroupStates.{Cancelled, Failure, Success}
16+
import is.hail.services.oauth2.{CloudCredentials, HailCredentials}
1617
import is.hail.types._
1718
import is.hail.types.physical._
1819
import is.hail.utils._
@@ -23,8 +24,65 @@ import java.io._
2324
import java.nio.charset.StandardCharsets
2425
import java.util.concurrent._
2526

27+
import com.fasterxml.jackson.core.StreamReadConstraints
28+
2629
object ServiceBackend {
2730
val MaxAvailableGcsConnections = 1000
31+
32+
// See https://github.com/hail-is/hail/issues/14580
33+
StreamReadConstraints.overrideDefaultStreamReadConstraints(
34+
StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build()
35+
)
36+
37+
def pyServiceBackend(
38+
name: String,
39+
batchId_ : Integer,
40+
billingProject: String,
41+
deployConfigFile: String,
42+
workerCores: String,
43+
workerMemory: String,
44+
storage: String,
45+
cloudfuse: Array[CloudfuseConfig],
46+
regions: Array[String],
47+
): ServiceBackend = {
48+
val credentials: CloudCredentials =
49+
HailCredentials().getOrElse(CloudCredentials(keyPath = None))
50+
51+
val client =
52+
BatchClient(
53+
DeployConfig.fromConfigFile(deployConfigFile),
54+
credentials,
55+
)
56+
57+
val batchId =
58+
Option(batchId_).map(_.toInt).getOrElse {
59+
client.newBatch(
60+
BatchRequest(
61+
billing_project = billingProject,
62+
token = tokenUrlSafe,
63+
n_jobs = 0,
64+
attributes = Map("name" -> name),
65+
)
66+
)
67+
}
68+
69+
val workerConfig =
70+
BatchJobConfig(
71+
workerCores,
72+
workerMemory,
73+
storage,
74+
cloudfuse,
75+
regions,
76+
)
77+
78+
new ServiceBackend(
79+
name,
80+
client,
81+
GitRevision(HAIL_REVISION),
82+
BatchConfig(batchId, 0),
83+
workerConfig,
84+
)
85+
}
2886
}
2987

3088
case class BatchJobConfig(
@@ -144,10 +202,16 @@ class ServiceBackend(
144202
)
145203

146204
stageCount += 1
147-
148-
Thread.sleep(600) // it is not possible for the batch to be finished in less than 600ms
149-
val response = batchClient.waitForJobGroup(batchConfig.batchId, jobGroupId)
150-
(response, startJobId)
205+
try {
206+
Thread.sleep(600) // it is not possible for the batch to be finished in less than 600ms
207+
val response = batchClient.waitForJobGroup(batchConfig.batchId, jobGroupId)
208+
(response, startJobId)
209+
} catch {
210+
case _: InterruptedException =>
211+
batchClient.cancelJobGroup(batchConfig.batchId, jobGroupId)
212+
Thread.currentThread().interrupt()
213+
throw new CancellationException()
214+
}
151215
}
152216

153217
private[this] def readPartitionResult(fs: FS, root: String, i: Int): Array[Byte] = {

hail/hail/src/is/hail/backend/service/Worker.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ object Worker {
122122
val n = argv(6).toInt
123123
val timer = new WorkerTimer()
124124

125-
val deployConfig = DeployConfig.fromConfigFile("/deploy-config/deploy-config.json")
126-
DeployConfig.set(deployConfig)
127125
sys.env.get("HAIL_SSL_CONFIG_DIR").foreach(tls.setSSLConfigFromDir)
128126

129127
log.info(s"is.hail.backend.service.Worker $myRevision")

hail/hail/src/is/hail/io/fs/AzureStorageFS.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import is.hail.shadedazure.com.azure.storage.blob.models.{
1313
BlobItem, BlobRange, BlobStorageException, ListBlobsOptions,
1414
}
1515
import is.hail.shadedazure.com.azure.storage.blob.specialized.BlockBlobClient
16-
import is.hail.utils.FastSeq
1716

1817
import scala.collection.JavaConverters._
1918
import scala.collection.mutable
@@ -60,8 +59,8 @@ object AzureStorageFS {
6059
private val AZURE_HTTPS_URI_REGEX =
6160
"^https:\\/\\/([a-z0-9_\\-\\.]+)\\.blob\\.core\\.windows\\.net\\/([a-z0-9_\\-\\.]+)(\\/.*)?".r
6261

63-
val RequiredOAuthScopes: IndexedSeq[String] =
64-
FastSeq("https://storage.azure.com/.default")
62+
val RequiredOAuthScopes: Array[String] =
63+
Array("https://storage.azure.com/.default")
6564

6665
def parseUrl(filename: String): AzureStorageFSURL = {
6766
AZURE_HTTPS_URI_REGEX

hail/hail/src/is/hail/io/fs/GoogleStorageFS.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ object GoogleStorageFS {
4545

4646
private[this] val GCS_URI_REGEX = "^gs:\\/\\/([a-z0-9_\\-\\.]+)(\\/.*)?".r
4747

48-
val RequiredOAuthScopes: IndexedSeq[String] =
49-
FastSeq("https://www.googleapis.com/auth/devstorage.read_write")
48+
val RequiredOAuthScopes: Array[String] =
49+
Array("https://www.googleapis.com/auth/devstorage.read_write")
5050

5151
def parseUrl(filename: String): GoogleStorageFSURL = {
5252
val scheme = filename.split(":")(0)

0 commit comments

Comments
 (0)