Skip to content

Commit 4961d3b

Browse files
author
John Han
committed
Clean up disk_estimator code and style per PR comments.
1 parent 770bdc2 commit 4961d3b

File tree

3 files changed

+41
-31
lines changed

3 files changed

+41
-31
lines changed

gcp_variant_transforms/beam_io/vcf_file_size_io.py

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import absolute_import
1818

1919
from typing import Iterable, List, Tuple # pylint: disable=unused-import
20+
import itertools
2021
import logging
2122

2223
import apache_beam as beam
@@ -31,18 +32,21 @@
3132
from gcp_variant_transforms.beam_io import vcfio
3233

3334

34-
def _get_file_sizes(file_pattern):
35+
def _get_file_size(file_name):
3536
# type: (str) -> List[FileSizeInfo]
36-
file_sizes = []
37-
match_result = filesystems.FileSystems.match([file_pattern])[0]
38-
for file_metadata in match_result.metadata_list:
39-
compression_type = filesystem.CompressionTypes.detect_compression_type(
40-
file_metadata.path)
41-
if compression_type != filesystem.CompressionTypes.UNCOMPRESSED:
42-
logging.error("VCF file %s is compressed; disk requirement estimator "
43-
"will not be accurate.", file_metadata.path)
44-
file_sizes.append((file_metadata.path, file_metadata.size_in_bytes,))
45-
return file_sizes
37+
match_result = filesystems.FileSystems.match([file_name])[0]
38+
if len(match_result.metadata_list) != 1:
39+
raise IOError("File name {} did not correspond to exactly 1 result. "
40+
"Instead, got {}.".format(file_name,
41+
len(match_result.metadata_list)))
42+
file_metadata = match_result.metadata_list[0]
43+
44+
compression_type = filesystem.CompressionTypes.detect_compression_type(
45+
file_metadata.path)
46+
if compression_type != filesystem.CompressionTypes.UNCOMPRESSED:
47+
logging.error("VCF file %s is compressed; disk requirement estimator "
48+
"will not be accurate.", file_metadata.path)
49+
return file_metadata.size_in_bytes
4650

4751

4852
def _convert_variants_to_bytesize(variant):
@@ -64,7 +68,7 @@ def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size):
6468
Given the raw_file_size and measurements of several VCF lines from the file,
6569
estimate how much disk the file will take after expansion due to encoding
6670
lines as `vcfio.Variant` objects. The encoded_sample_size will be set as
67-
`self.encoded`.
71+
`self.encoded_size`.
6872
6973
This is a simple ratio problem, solving for encoded_sample_size which is
7074
the only unknown:
@@ -111,8 +115,11 @@ def extract_output(self, (raw, encoded)):
111115
class _EstimateVcfSizeSource(filebasedsource.FileBasedSource):
112116
"""A source for estimating the encoded size of a VCF file in `vcf_to_bq`.
113117
114-
This source first reads a limited number of variants from a set of VCF files,
115-
then
118+
This source first obtains the raw file sizes of a set of VCF files. Then,
119+
the source reads a limited number of variants from a set of VCF files,
120+
both as raw strings and encoded `Variant` objects. Finally, the reader
121+
returns a single `FileSizeInfo` object with an estimate of the input size
122+
if all sizes had been encoded as `Variant` objects.
116123
117124
Lines that are malformed are skipped.
118125
@@ -142,7 +149,7 @@ def read_records(
142149
file_name, # type: str
143150
range_tracker # type: range_trackers.UnsplittableRangeTracker
144151
):
145-
# type: (...) -> Iterable[Tuple[str, str, vcfio.Variant]]
152+
# type: (...) -> Iterable[FileSizeInfo]
146153
"""This "generator" only emits a single FileSizeInfo object per file."""
147154
vcf_parser_class = vcfio.get_vcf_parser(self._vcf_parser_type)
148155
record_iterator = vcf_parser_class(
@@ -155,31 +162,34 @@ def read_records(
155162
buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE,
156163
skip_header_lines=0)
157164

158-
_, raw_file_size = _get_file_sizes(file_name)[0]
165+
_, raw_file_size = _get_file_size(file_name)
159166

160167
# Open distinct channel to read lines as raw bytestrings.
161168
with filesystems.FileSystems.open(file_name,
162-
self._compression_type) as raw_reader:
163-
raw_record = raw_reader.readline()
164-
while raw_record and raw_record.startswith('#'):
165-
# Skip headers, assume header size is negligible.
166-
raw_record = raw_reader.readline()
167-
169+
self._compression_type) as raw_iterator:
168170
count, raw_size, encoded_size = 0, 0, 0
169-
for encoded_record in record_iterator:
171+
for encoded_record, raw_record in itertools.izip(record_iterator,
172+
raw_iterator):
173+
while raw_record and raw_record.startswith('#'):
174+
# Skip headers. Assume that header size is negligible.
175+
raw_record = raw_iterator.next()
176+
logging.debug(
177+
"Reading record for disk usage estimation. Encoded variant: %s\n"
178+
"Raw variant: %s", encoded_record, raw_record)
170179
if count >= self._sample_size:
171180
break
172181
if not isinstance(encoded_record, vcfio.Variant):
173182
logging.error(
174183
"Skipping VCF line that could not be decoded as a "
175184
"`vcfio.Variant` in file %s: %s", file_name, raw_record)
176185
continue
177-
178-
raw_size += len(raw_record)
186+
# Encoding in `utf-8` should represent the string as one byte per char,
187+
# even for non-ASCII chars. Python adds significant overhead to the
188+
# bytesize of the full str object.
189+
raw_size += len(raw_record.encode('utf-8'))
179190
encoded_size += _convert_variants_to_bytesize(encoded_record)
180191
count += 1
181192

182-
raw_record = raw_reader.readline() # Increment raw iterator.
183193
file_size_info = FileSizeInfo(file_name, raw_file_size)
184194
file_size_info.estimate_encoded_file_size(raw_size, encoded_size)
185195
yield file_size_info

gcp_variant_transforms/libs/preprocess_reporter.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
File Path Variant Record Error Message
4646
file 1 rs6 G A 29 PASS NS=3; invalid literal for int() with base 10.
4747
"""
48+
import logging
4849
import math
4950

5051
from typing import Dict, List, Optional, Union # pylint: disable=unused-import
@@ -56,7 +57,6 @@
5657
from gcp_variant_transforms.beam_io import vcf_file_size_io # pylint: disable=unused-import
5758
from gcp_variant_transforms.beam_io.vcf_header_io import VcfParserHeaderKeyConstants
5859
from gcp_variant_transforms.transforms import merge_header_definitions # pylint: disable=unused-import
59-
6060
from gcp_variant_transforms.transforms.merge_header_definitions import Definition # pylint: disable=unused-import
6161

6262

@@ -287,6 +287,8 @@ def _append_disk_usage_estimate_to_report(file_to_write, disk_usage_estimate):
287287
# type: (file, vcf_file_size_io.FileSizeInfo) -> None
288288
if disk_usage_estimate is None:
289289
return
290+
logging.info("Final estimate of encoded size: %d GB",
291+
disk_usage_estimate.encoded_size / 1e9)
290292
file_to_write.write(
291293
'Estimated disk usage by Dataflow: {} GB\n'
292294
'Total raw file sizes: {} GB\n'.format(

gcp_variant_transforms/vcf_to_bq_preprocess.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""Pipeline for preprocessing the VCF files.
15+
r"""Pipeline for preprocessing the VCF files.
1616
1717
This pipeline is aimed to help the user to easily identify and further import
1818
the malformed/incompatible VCF files to BigQuery. It generates two files as the
@@ -68,6 +68,7 @@
6868
_COMMAND_LINE_OPTIONS = [variant_transform_options.PreprocessOptions]
6969

7070
# Number of lines from each VCF that should be read when estimating disk usage.
71+
# TODO(hanjohn): Convert this field to a flag.
7172
_SNIPPET_READ_SIZE = 50
7273

7374
def _get_inferred_headers(variants, # type: pvalue.PCollection
@@ -102,9 +103,6 @@ def _estimate_disk_resources(p, input_pattern):
102103
input_pattern, _SNIPPET_READ_SIZE)
103104
| 'SumFileSizeEstimates' >> beam.CombineGlobally(
104105
vcf_file_size_io.FileSizeInfoSumFn()))
105-
result | ('PrintEstimate' >> # pylint: disable=expression-not-assigned
106-
beam.Map(lambda x: logging.info(
107-
"Final estimate of encoded size: %d GB", x.encoded_size / 1e9)))
108106
return result
109107

110108

0 commit comments

Comments
 (0)