11package is .hail .backend
22
33import is .hail .asm4s ._
4+ import is .hail .backend .Backend .jsonToBytes
45import is .hail .backend .spark .SparkBackend
56import is .hail .expr .ir .{
67 BaseIR , CodeCacheKey , CompiledFunction , IR , IRParser , IRParserEnvironment , LoweringAnalyses ,
78 SortField , TableIR , TableReader ,
89}
9- import is .hail .expr .ir .functions .IRFunctionRegistry
1010import is .hail .expr .ir .lowering .{TableStage , TableStageDependency }
1111import is .hail .io .{BufferSpec , TypedCodecSpec }
1212import is .hail .io .fs ._
@@ -20,16 +20,14 @@ import is.hail.types.virtual.{BlockMatrixType, TFloat64}
2020import is .hail .utils ._
2121import is .hail .variant .ReferenceGenome
2222
23- import scala .collection .JavaConverters ._
2423import scala .collection .mutable
2524import scala .reflect .ClassTag
2625
2726import java .io ._
2827import java .nio .charset .StandardCharsets
2928
30- import com .fasterxml .jackson .core .StreamReadConstraints
3129import org .json4s ._
32- import org .json4s .jackson .{ JsonMethods , Serialization }
30+ import org .json4s .jackson .JsonMethods
3331import sourcecode .Enclosing
3432
3533object Backend {
@@ -41,13 +39,6 @@ object Backend {
4139 s " hail_query_ $id"
4240 }
4341
44- private var irID : Int = 0
45-
46- def nextIRID (): Int = {
47- irID += 1
48- irID
49- }
50-
5142 def encodeToOutputStream (
5243 ctx : ExecuteContext ,
5344 t : PTuple ,
@@ -66,6 +57,9 @@ object Backend {
6657 assert(t.isFieldDefined(off, 0 ))
6758 codec.encode(ctx, elementType, t.loadField(off, 0 ), os)
6859 }
60+
61+ def jsonToBytes (f : => JValue ): Array [Byte ] =
62+ JsonMethods .compact(f).getBytes(StandardCharsets .UTF_8 )
6963}
7064
7165abstract class BroadcastValue [T ] { def value : T }
@@ -75,28 +69,8 @@ trait BackendContext {
7569}
7670
7771abstract class Backend extends Closeable {
78- // From https://github.com/hail-is/hail/issues/14580 :
79- // IR can get quite big, especially as it can contain an arbitrary
80- // amount of encoded literals from the user's python session. This
81- // was a (controversial) restriction imposed by Jackson and should be lifted.
82- //
83- // We remove this restriction for all backends, and we do so here, in the
84- // constructor since constructing a backend is one of the first things that
85- // happens and this constraint should be overrided as early as possible.
86- StreamReadConstraints .overrideDefaultStreamReadConstraints(
87- StreamReadConstraints .builder().maxStringLength(Integer .MAX_VALUE ).build()
88- )
89-
9072 val persistedIR : mutable.Map [Int , BaseIR ] = mutable.Map ()
9173
92- protected [this ] def addJavaIR (ir : BaseIR ): Int = {
93- val id = Backend .nextIRID()
94- persistedIR += (id -> ir)
95- id
96- }
97-
98- def removeJavaIR (id : Int ): Unit = persistedIR.remove(id)
99-
10074 def defaultParallelism : Int
10175
10276 def canExecuteParallelTasksOnDriver : Boolean = true
@@ -131,30 +105,7 @@ abstract class Backend extends Closeable {
131105 def lookupOrCompileCachedFunction [T ](k : CodeCacheKey )(f : => CompiledFunction [T ])
132106 : CompiledFunction [T ]
133107
134- var references : Map [String , ReferenceGenome ] = Map .empty
135-
136- def addDefaultReferences (): Unit =
137- references = ReferenceGenome .builtinReferences()
138-
139- def addReference (rg : ReferenceGenome ): Unit = {
140- references.get(rg.name) match {
141- case Some (rg2) =>
142- if (rg != rg2) {
143- fatal(
144- s " Cannot add reference genome ' ${rg.name}', a different reference with that name already exists. Choose a reference name NOT in the following list: \n " +
145- s " @1 " ,
146- references.keys.truncatable(" \n " ),
147- )
148- }
149- case None =>
150- references += (rg.name -> rg)
151- }
152- }
153-
154- def hasReference (name : String ) = references.contains(name)
155-
156- def removeReference (name : String ): Unit =
157- references -= name
108+ def references : mutable.Map [String , ReferenceGenome ]
158109
159110 def lowerDistributedSort (
160111 ctx : ExecuteContext ,
@@ -189,9 +140,6 @@ abstract class Backend extends Closeable {
189140
190141 def withExecuteContext [T ](f : ExecuteContext => T )(implicit E : Enclosing ): T
191142
192- private [this ] def jsonToBytes (f : => JValue ): Array [Byte ] =
193- JsonMethods .compact(f).getBytes(StandardCharsets .UTF_8 )
194-
195143 final def valueType (s : String ): Array [Byte ] =
196144 jsonToBytes {
197145 withExecuteContext { ctx =>
@@ -220,15 +168,7 @@ abstract class Backend extends Closeable {
220168 }
221169 }
222170
223- def loadReferencesFromDataset (path : String ): Array [Byte ] = {
224- withExecuteContext { ctx =>
225- val rgs = ReferenceGenome .fromHailDataset(ctx.fs, path)
226- rgs.foreach(addReference)
227-
228- implicit val formats : Formats = defaultJSONFormats
229- Serialization .write(rgs.map(_.toJSON).toFastSeq).getBytes(StandardCharsets .UTF_8 )
230- }
231- }
171+ def loadReferencesFromDataset (path : String ): Array [Byte ]
232172
233173 def fromFASTAFile (
234174 name : String ,
@@ -240,18 +180,22 @@ abstract class Backend extends Closeable {
240180 parInput : Array [String ],
241181 ): Array [Byte ] =
242182 withExecuteContext { ctx =>
243- val rg = ReferenceGenome .fromFASTAFile(ctx, name, fastaFile, indexFile,
244- xContigs, yContigs, mtContigs, parInput)
245- rg.toJSONString.getBytes(StandardCharsets .UTF_8 )
183+ jsonToBytes {
184+ Extraction .decompose {
185+ ReferenceGenome .fromFASTAFile(ctx, name, fastaFile, indexFile,
186+ xContigs, yContigs, mtContigs, parInput).toJSON
187+ }(defaultJSONFormats)
188+ }
246189 }
247190
248- def parseVCFMetadata (path : String ): Array [Byte ] = jsonToBytes {
191+ def parseVCFMetadata (path : String ): Array [Byte ] =
249192 withExecuteContext { ctx =>
250- val metadata = LoadVCF .parseHeaderMetadata(ctx.fs, Set .empty, TFloat64 , path)
251- implicit val formats = defaultJSONFormats
252- Extraction .decompose(metadata)
193+ jsonToBytes {
194+ Extraction .decompose {
195+ LoadVCF .parseHeaderMetadata(ctx.fs, Set .empty, TFloat64 , path)
196+ }(defaultJSONFormats)
197+ }
253198 }
254- }
255199
256200 def importFam (path : String , isQuantPheno : Boolean , delimiter : String , missingValue : String )
257201 : Array [Byte ] =
@@ -261,27 +205,6 @@ abstract class Backend extends Closeable {
261205 )
262206 }
263207
264- def pyRegisterIR (
265- name : String ,
266- typeParamStrs : java.util.ArrayList [String ],
267- argNameStrs : java.util.ArrayList [String ],
268- argTypeStrs : java.util.ArrayList [String ],
269- returnType : String ,
270- bodyStr : String ,
271- ): Unit = {
272- withExecuteContext { ctx =>
273- IRFunctionRegistry .registerIR(
274- ctx,
275- name,
276- typeParamStrs.asScala.toArray,
277- argNameStrs.asScala.toArray,
278- argTypeStrs.asScala.toArray,
279- returnType,
280- bodyStr,
281- )
282- }
283- }
284-
285208 def execute (ctx : ExecuteContext , ir : IR ): Either [Unit , (PTuple , Long )]
286209}
287210
0 commit comments