Skip to content

Commit 49bdfce

Browse files
authored
Data annotator and pvmap generator (datacommonsorg#1264)
* Data annotator and pvmap generator * Data annotator and pvmap generator
1 parent 761f0fa commit 49bdfce

12 files changed

Lines changed: 532214 additions & 0 deletions

tools/statvar_importer/schema/data_annotator.py

Lines changed: 659 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an 'AS IS' BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Utilities to sample csv files.
15+
16+
To sample a CSV data file, run the command:
17+
python data_sampler.py --sampler_input=<input-csv> --sampler_output=<output-csv>
18+
19+
This generates a sample output CSV with atmost 100 rows selecting input rows
20+
with unique column values.
21+
22+
Use the function: sample_csv_file(<input_file>, <output_file>)
23+
to generate sample CSV in code.
24+
"""
25+
26+
import csv
27+
import os
28+
import random
29+
import re
30+
import sys
31+
import tempfile
32+
33+
from absl import app
34+
from absl import flags
35+
from absl import logging
36+
37+
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
38+
sys.path.append(_SCRIPT_DIR)
39+
sys.path.append(os.path.dirname(_SCRIPT_DIR))
40+
sys.path.append(os.path.dirname(os.path.dirname(_SCRIPT_DIR)))
41+
sys.path.append(
42+
os.path.join(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), 'util'))
43+
44+
flags.DEFINE_string('sampler_input', '', 'CSV file to be sample')
45+
flags.DEFINE_string('sampler_output', '', 'Output file for CSV.')
46+
flags.DEFINE_integer('sampler_output_rows', 100,
47+
'Maximum number of output rows.')
48+
flags.DEFINE_integer('sampler_header_rows', 1,
49+
'Number of header rows to be copied to output.')
50+
flags.DEFINE_integer('sampler_rows_per_key', 5,
51+
'Number of rows per unique value.')
52+
flags.DEFINE_float('sampler_rate', -1, 'Number of rows per unique value.')
53+
flags.DEFINE_string('sampler_column_regex', r'^[0-9]{4}$|[a-zA-Z-]',
54+
'Regex to select unique column values.')
55+
flags.DEFINE_string('sampler_unique_columns', '',
56+
'List of columns to look for unique values.')
57+
flags.DEFINE_string('sampler_input_delimiter', ',', 'delimiter for input data')
58+
flags.DEFINE_string('sampler_input_encoding', 'UTF8',
59+
'delimiter for input data')
60+
flags.DEFINE_string('sampler_output_delimiter', None,
61+
'delimiter for output data')
62+
63+
_FLAGS = flags.FLAGS
64+
65+
import file_util
66+
import process_http_server
67+
68+
from config_map import ConfigMap
69+
from counters import Counters
70+
from config_map import ConfigMap
71+
72+
73+
# Class to sample a data file.
74+
class DataSampler:
75+
76+
def __init__(
77+
self,
78+
config_dict: dict = None,
79+
counters: dict = None,
80+
):
81+
self._config = ConfigMap(config_dict=get_default_config())
82+
if config_dict:
83+
self._config.add_configs(config_dict)
84+
self._counters = counters
85+
if counters is None:
86+
self._counters = Counters()
87+
self.reset()
88+
89+
def reset(self):
90+
"""Reset state for unique column values."""
91+
# Dictionary of unique values: count per column
92+
self._column_counts = {}
93+
# Dictionary of column index: list of header strings
94+
self._column_headers = {}
95+
self._column_regex = None
96+
regex = self._config.get('sampler_column_regex')
97+
if regex:
98+
self._column_regex = re.compile(regex)
99+
self._selected_rows = 0
100+
101+
def __del__(self):
102+
logging.log(2, f'Sampler column headers: {self._column_headers}')
103+
logging.log(2, f'sampler column counts: {self._column_counts}')
104+
105+
def _get_column_count(self, column_index: int, value: str) -> int:
106+
"""Returns the existing number of rows for column value.
107+
Count is only returned for values matching sampler_column_regex.
108+
109+
Args:
110+
column_index: index of the column value in the current row.
111+
value: string value of the column in oclumn_index
112+
113+
Returns:
114+
number of times this value has been seen before for the column.
115+
sys.maxsize if column value doesn't match sampler_column_regex
116+
"""
117+
# Check if column value is to be tracked.
118+
if self._column_regex:
119+
if not self._column_regex.search(value):
120+
# Not an interesting value.
121+
return sys.maxsize
122+
123+
col_values = self._column_counts.get(column_index)
124+
if col_values is None:
125+
return 0
126+
return col_values.get(value, 0)
127+
128+
def _add_column_header(self, column_index: int, value: str):
129+
"""Adds the first value for column as header."""
130+
cur_header = self._column_headers.get(column_index, '')
131+
if not cur_header and value:
132+
# This is the first value for the column. Set as header.
133+
self._column_headers[column_index] = value
134+
return value
135+
return cur_header
136+
137+
def _add_row_counts(self, row: list):
138+
"""Update column counts for a selected row."""
139+
# Update counts for each column value in the row.
140+
for index in range(len(row)):
141+
value = row[index]
142+
col_counts = self._column_counts.get(index)
143+
if col_counts is None:
144+
# Add a new column
145+
col_counts = {}
146+
self._column_counts[index] = col_counts
147+
# Add count for column value
148+
if value not in col_counts:
149+
header = self._add_column_header(index, value)
150+
self._counters.add_counter(
151+
f'sampler-unique-values-column-{index}-{header}', 1)
152+
count = col_counts.get(value, 0)
153+
col_counts[value] = count + 1
154+
self._selected_rows += 1
155+
return
156+
157+
def select_row(self, row: list, sample_rate: float = -1) -> bool:
158+
"""Returns True if row can be added ot the sample output."""
159+
max_rows = self._config.get('sampler_output_rows')
160+
if max_rows > 0 and self._selected_rows >= max_rows:
161+
# Too many rows already selected. Drop it.
162+
return False
163+
max_count = self._config.get('sampler_rows_per_key', 3)
164+
max_uniques_per_col = self._config.get('sampler_uniques_per_column', 10)
165+
for index in range(len(row)):
166+
value = row[index]
167+
value_count = self._get_column_count(index, value)
168+
if value_count == 0 or value_count < max_count:
169+
# This is a new value for this column.
170+
col_counts = self._column_counts.get(index, {})
171+
if len(col_counts) < max_uniques_per_col:
172+
# Column has few unique values. Select this row for column.
173+
self._counters.add_counter(f'sampler-selected-rows', 1)
174+
self._counters.add_counter(
175+
f'sampler-selected-column-{index}', 1)
176+
return True
177+
# No new unique value for the row.
178+
# Check random sampler.
179+
if sample_rate < 0:
180+
sample_rate = self._config.get('sampler_rate')
181+
if random.random() <= sample_rate:
182+
self._counters.add_counter(f'sampler-sampled-rows', 1)
183+
return True
184+
return False
185+
186+
def sample_csv_file(self, input_file: str, output_file: str = '') -> str:
187+
"""Emits a sample of rows from input_file into the output.
188+
189+
Args:
190+
input_file: CSV file to be processed.
191+
output_file: CSV file to be generated with smapled rows.
192+
config: dictionary of config parameters:
193+
output_rows: Maximum output rows to be generated.
194+
header_rows: Header rows to be copied to the output as is.
195+
rows_per_key: Maximum rows per unique value.
196+
unique_columns: List of columns to select unique keys.
197+
It can be column numbers or column headers.
198+
For a combination of columns, use '+', such as 1+2.
199+
counters: dictionary of counters to be updated.
200+
201+
Returns:
202+
output file with the sampled rows.
203+
"""
204+
max_rows = self._config.get('sampler_output_rows')
205+
sample_rate = self._config.get('sampler_rate')
206+
header_rows = self._config.get('header_rows', 1)
207+
input_files = file_util.file_get_matching(input_file)
208+
if not input_files:
209+
return None
210+
output_delimiter = self._config.get('output_delimiter')
211+
if not output_file:
212+
if not file_util.file_is_local(output_file):
213+
output_file = tempfile.NamedTemporaryFile(
214+
delete=False, suffix='-sample.csv').name
215+
else:
216+
output_file = file_util.file_get_name(input_files[0], '-sample',
217+
'.csv')
218+
# Set sampling rate by file size
219+
num_rows = file_util.file_estimate_num_rows(input_files)
220+
if num_rows and self._config.get('sampler_rate') < 0:
221+
if max_rows > 0:
222+
sample_rate = float(max_rows) / float(num_rows)
223+
logging.debug(
224+
f'Sampling rate for {input_files}: {sample_rate} for {num_rows} rows'
225+
)
226+
227+
# Get sample rows from each input file.
228+
for input_index in range(len(input_files)):
229+
file = input_files[input_index]
230+
input_encoding = self._config.get('input_encoding')
231+
if not input_encoding:
232+
input_encoding = file_util.file_get_encoding(file)
233+
with file_util.FileIO(file, encoding=input_encoding) as csv_file:
234+
csv_options = {'delimiter': self._config.get('input_delimiter')}
235+
csv_options = file_util.file_get_csv_reader_options(
236+
file, csv_options)
237+
if not output_delimiter:
238+
# No output delimiter set. Use same as input.
239+
output_delimiter = csv_options.get('delimiter', ',')
240+
output_mode = 'w' if input_index == 0 else 'a'
241+
# Write sample rows from current input
242+
with file_util.FileIO(output_file, mode=output_mode) as output:
243+
csv_writer = csv.writer(output,
244+
delimiter=output_delimiter,
245+
doublequote=False,
246+
escapechar='\\')
247+
logging.level_debug() and logging.debug(
248+
f'Sampling rows from {file} with config: {self._config.get_configs()}'
249+
)
250+
# Examine each input row for any unique column values
251+
csv_reader = csv.reader(csv_file, **csv_options)
252+
row_index = 0
253+
for row in csv_reader:
254+
self._counters.add_counter(f'sampler-input-row', 1)
255+
row_index += 1
256+
# Write headers from first input file to the output.
257+
if row_index <= header_rows and input_index == 0:
258+
csv_writer.writerow(row)
259+
self._counters.add_counter(f'sampler-header-rows',
260+
1)
261+
continue
262+
# Check if input row has any unique values to be output
263+
if self.select_row(row, sample_rate):
264+
self._add_row_counts(row)
265+
csv_writer.writerow(row)
266+
logging.level_debug() and logging.log(
267+
2, f'Selecting row:{file}:{row_index}')
268+
if max_rows > 0 and self._selected_rows >= max_rows:
269+
# Got enough sample output rows
270+
break
271+
logging.info(
272+
f'Sampled {self._selected_rows} row from {file} into {output_file}'
273+
)
274+
logging.level_debug() and logging.debug(
275+
f'Column counts: {self._column_counts}')
276+
return output_file
277+
278+
279+
def sample_csv_file(input_file: str,
280+
output_file: str = '',
281+
config: dict = {}) -> str:
282+
"""Returns the output file name into which a sample of rows from input_file is written.
283+
284+
Args:
285+
input_file: input file pattern to be loaded.
286+
output_file: (optional) output file into whcih sampled rows are written.
287+
If empty, creates a file with suffix '-sample.csv'
288+
config: dictionary of parameters for sampling including:
289+
sampler_output_rows: maximum number of output rows.
290+
sampler_rate: number between 0 to 1 for sampling rate if
291+
sampler_output_rows is not set.
292+
header_rows: number of headers rows from input copied over to output.
293+
sampler_column_regex: regex to select unique cell values
294+
sampler_rows_per_key: number of rows with duplcate values for a key.
295+
"""
296+
data_sampler = DataSampler(config_dict=config)
297+
return data_sampler.sample_csv_file(input_file, output_file)
298+
299+
300+
def get_default_config() -> dict:
301+
return {
302+
'sampler_rate': _FLAGS.sampler_rate,
303+
'sampler_input': _FLAGS.sampler_input,
304+
'sampler_output': _FLAGS.sampler_output,
305+
'sampler_output_rows': _FLAGS.sampler_output_rows,
306+
'header_rows': _FLAGS.sampler_header_rows,
307+
'sampler_rows_per_key': _FLAGS.sampler_rows_per_key,
308+
'sampler_column_regex': _FLAGS.sampler_column_regex,
309+
'sampler_unique_columns': _FLAGS.sampler_unique_columns,
310+
'input_delimiter': _FLAGS.sampler_input_delimiter,
311+
'output_delimiter': _FLAGS.sampler_output_delimiter,
312+
'input_encoding': _FLAGS.sampler_input_encoding,
313+
}
314+
315+
316+
def main(_):
317+
sample_csv_file(_FLAGS.sampler_input, _FLAGS.sampler_output)
318+
319+
320+
if __name__ == '__main__':
321+
app.run(main)

0 commit comments

Comments
 (0)