Skip to content

Commit b46a322

Browse files
committed
Modify BigQuery write mechanism to use the new WriteToBigQuery PTransform, and remove num_bigquery_write_shards flag.
Previously, issue #199 forced us to use a hack to shard the variants before they are written to BigQuery, which negatively affects the speed of the tool. With the implementation of the new sink, the flag is no longer need.
1 parent 8602ea7 commit b46a322

File tree

7 files changed

+12
-64
lines changed

7 files changed

+12
-64
lines changed

Diff for: docs/large_inputs.md

+4-25
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ Default settings:
1414
--worker_machine_type <default n1-standard-1> \
1515
--disk_size_gb <default 250> \
1616
--worker_disk_type <default PD> \
17-
--num_bigquery_write_shards <default 1> \
1817
--partition_config_path <default None> \
1918
```
2019

@@ -98,8 +97,7 @@ transforms (e.g. the sample name is repeated in every record in the BigQuery
9897
output rather than just being specified once as in the VCF header), you
9998
typically need 3 to 4 times the total size of the raw VCF files.
10099

101-
In addition, if [merging](variant_merging.md) or
102-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled, you may
100+
In addition, if [merging](variant_merging.md) is enabled, you may
103101
need more disk per worker (e.g. 500GB) as the same variants need to be
104102
aggregated together on one machine.
105103

@@ -110,32 +108,14 @@ more expensive. However, when choosing a large machine (e.g. `n1-standard-16`),
110108
they can reduce cost as they can avoid idle CPU cycles due to disk IOPS
111109
limitations.
112110

113-
As a result, we recommend using SSDs if [merging](variant_merge.md) or
114-
[--num_bigquery_write_shards](#--num_bigquery_write_shards) is enabled: these
115-
operations require "shuffling" the data (i.e. redistributing the data among
116-
workers), which require significant disk I/O.
111+
As a result, we recommend using SSDs if [merging](variant_merge.md) is enabled:
112+
these operations require "shuffling" the data (i.e. redistributing the data
113+
among workers), which require significant disk I/O.
117114

118115
Set
119116
`--worker_disk_type compute.googleapis.com/projects//zones//diskTypes/pd-ssd`
120117
to use SSDs.
121118

122-
### `--num_bigquery_write_shards`
123-
124-
Currently, the write operation to BigQuery in Dataflow is performed as a
125-
postprocessing step after the main transforms are done. As a workaround for
126-
BigQuery write limitations (more details
127-
[here](https://github.com/googlegenomics/gcp-variant-transforms/issues/199)),
128-
we have added "sharding" when writing to BigQuery. This makes the data load
129-
to BigQuery significantly faster as it parallelizes the process and enables
130-
loading large (>5TB) data to BigQuery at once.
131-
132-
As a result, we recommend setting `--num_bigquery_write_shards 20` when loading
133-
any data that has more than 1 billion rows (after merging) or 1TB of final
134-
output. You may use a smaller number of write shards (e.g. 5) when using
135-
[partitioned output](#--partition_config_path) as each partition also acts as a
136-
"shard". Note that using a larger value (e.g. 50) can cause BigQuery write to
137-
fail as there is a maximum limit on the number of concurrent writes per table.
138-
139119
### `--partition_config_path`
140120

141121
Partitioning the output can save significant query costs once the data is in
@@ -146,4 +126,3 @@ partition).
146126
As a result, we recommend setting the partition config for very large data
147127
where possible. Please see the [documentation](partitioning.md) for more
148128
details.
149-

Diff for: gcp_variant_transforms/options/variant_transform_options.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ def add_arguments(self, parser):
173173
parser.add_argument(
174174
'--num_bigquery_write_shards',
175175
type=int, default=1,
176-
help=('Before writing the final result to output BigQuery, the data is '
176+
help=('Note: This flag is now deprecated and should not be used!'
177+
'Before writing the final result to output BigQuery, the data is '
177178
'sharded to avoid a known failure for very large inputs (issue '
178179
'#199). Setting this flag to 1 will avoid this extra sharding.'
179180
'It is recommended to use 20 for loading large inputs without '

Diff for: gcp_variant_transforms/testing/integration/vcf_to_bq_tests/huge_tests/test_1000_genomes.json

-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-64",
1010
"max_num_workers": "64",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "20",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],

Diff for: gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/option_optimize_for_large_inputs.json

-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
"worker_machine_type": "n1-standard-16",
1010
"max_num_workers": "20",
1111
"num_workers": "20",
12-
"num_bigquery_write_shards": "2",
1312
"assertion_configs": [
1413
{
1514
"query": ["NUM_ROWS_QUERY"],
@@ -68,4 +67,3 @@
6867
]
6968
}
7069
]
71-

Diff for: gcp_variant_transforms/testing/integration/vcf_to_bq_tests/presubmit_tests/large_tests/platinum_no_merge.json

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
"worker_machine_type": "n1-standard-16",
88
"max_num_workers": "20",
99
"num_workers": "20",
10-
"num_bigquery_write_shards": "20",
1110
"assertion_configs": [
1211
{
1312
"query": ["NUM_ROWS_QUERY"],

Diff for: gcp_variant_transforms/transforms/variant_to_bigquery.py

+6-33
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
from __future__ import absolute_import
1818

19-
import random
2019
from typing import Dict, List # pylint: disable=unused-import
2120

2221
import apache_beam as beam
@@ -29,7 +28,6 @@
2928
from gcp_variant_transforms.libs import processed_variant
3029
from gcp_variant_transforms.libs import vcf_field_conflict_resolver
3130
from gcp_variant_transforms.libs.variant_merge import variant_merge_strategy # pylint: disable=unused-import
32-
from gcp_variant_transforms.transforms import limit_write
3331

3432

3533
# TODO(samanvp): remove this hack when BQ custom sink is added to Python SDK,
@@ -75,7 +73,6 @@ def __init__(
7573
update_schema_on_append=False, # type: bool
7674
allow_incompatible_records=False, # type: bool
7775
omit_empty_sample_calls=False, # type: bool
78-
num_bigquery_write_shards=1, # type: int
7976
null_numeric_value_replacement=None # type: int
8077
):
8178
# type: (...) -> None
@@ -99,8 +96,6 @@ def __init__(
9996
+ schema if there is a mismatch.
10097
omit_empty_sample_calls: If true, samples that don't have a given call
10198
will be omitted.
102-
num_bigquery_write_shards: If > 1, we will limit number of sources which
103-
are used for writing to the output BigQuery table.
10499
null_numeric_value_replacement: the value to use instead of null for
105100
numeric (float/int/long) lists. For instance, [0, None, 1] will become
106101
[0, `null_numeric_value_replacement`, 1]. If not set, the value will set
@@ -125,7 +120,6 @@ def __init__(
125120

126121
self._allow_incompatible_records = allow_incompatible_records
127122
self._omit_empty_sample_calls = omit_empty_sample_calls
128-
self._num_bigquery_write_shards = num_bigquery_write_shards
129123
if update_schema_on_append:
130124
bigquery_util.update_bigquery_schema_on_append(self._schema.fields,
131125
self._output_table)
@@ -136,35 +130,14 @@ def expand(self, pcoll):
136130
self._bigquery_row_generator,
137131
self._allow_incompatible_records,
138132
self._omit_empty_sample_calls))
139-
if self._num_bigquery_write_shards > 1:
140-
# We split data into self._num_bigquery_write_shards random partitions
141-
# and then write each part to final BQ by appending them together.
142-
# Combined with LimitWrite transform, this will avoid the BQ failure.
143-
bq_row_partitions = bq_rows | beam.Partition(
144-
lambda _, n: random.randint(0, n - 1),
145-
self._num_bigquery_write_shards)
146-
bq_writes = []
147-
for i in range(self._num_bigquery_write_shards):
148-
bq_rows = (bq_row_partitions[i] | 'LimitWrite' + str(i) >>
149-
limit_write.LimitWrite(_WRITE_SHARDS_LIMIT))
150-
bq_writes.append(
151-
bq_rows | 'WriteToBigQuery' + str(i) >>
152-
beam.io.Write(beam.io.BigQuerySink(
133+
return (bq_rows
134+
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
153135
self._output_table,
154136
schema=self._schema,
155137
create_disposition=(
156138
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
157139
write_disposition=(
158-
beam.io.BigQueryDisposition.WRITE_APPEND))))
159-
return bq_writes
160-
else:
161-
return (bq_rows
162-
| 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
163-
self._output_table,
164-
schema=self._schema,
165-
create_disposition=(
166-
beam.io.BigQueryDisposition.CREATE_IF_NEEDED),
167-
write_disposition=(
168-
beam.io.BigQueryDisposition.WRITE_APPEND
169-
if self._append
170-
else beam.io.BigQueryDisposition.WRITE_TRUNCATE))))
140+
beam.io.BigQueryDisposition.WRITE_APPEND
141+
if self._append
142+
else beam.io.BigQueryDisposition.WRITE_TRUNCATE),
143+
method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS))

Diff for: gcp_variant_transforms/vcf_to_bq.py

-1
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,6 @@ def run(argv=None):
475475
update_schema_on_append=known_args.update_schema_on_append,
476476
allow_incompatible_records=known_args.allow_incompatible_records,
477477
omit_empty_sample_calls=known_args.omit_empty_sample_calls,
478-
num_bigquery_write_shards=known_args.num_bigquery_write_shards,
479478
null_numeric_value_replacement=(
480479
known_args.null_numeric_value_replacement)))
481480

0 commit comments

Comments
 (0)