Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 40 additions & 32 deletions scripts/us_census/acs5yr/subject_tables/common/generate_col_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,38 @@ def __init__(self, spec_dict={}, column_list=[], delimiter='!!'):

def _find_and_replace_column_names(self, column):
"""
if spec has find_and_replace defined, this function updates column names
Final robust version: Handles long keys containing delimiters
and multiple individual token replacements.
"""
if 'find_and_replace' in self.features['preprocess']:
find_and_replace_dict = self.features['preprocess'][
'find_and_replace']
# replace entire column name
if column in find_and_replace_dict:
return find_and_replace_dict[column]
# replace a token in the column name
else:
# TODO: Support the find_and_replace of more than one token
part_list = column.split(self.delimiter)
for idx, part in enumerate(part_list):
if part in find_and_replace_dict:
part_list[idx] = find_and_replace_dict[part]
return self.delimiter.join(part_list)
return column
if 'find_and_replace' not in self.features.get('preprocess', {}):
return column

find_and_replace_dict = self.features['preprocess']['find_and_replace']
new_column = column

# 1. Handle Long Keys/Partial Strings (Most likely fix for your JSON)
# We sort by length (longest first) so we don't accidentally replace
# a small part of a larger key.
sorted_keys = sorted(find_and_replace_dict.keys(), key=len, reverse=True)

for key in sorted_keys:
if key in new_column:
new_column = new_column.replace(key, find_and_replace_dict[key])

# 2. Token-based replacement (as a backup for exact token matches)
# This ensures that if 'INCOME' is a key, it only matches 'INCOME'
# and not 'INCOMES'
parts = new_column.split(self.delimiter)
modified_tokens = False
for idx, part in enumerate(parts):
if part in find_and_replace_dict:
parts[idx] = find_and_replace_dict[part]
modified_tokens = True

if modified_tokens:
new_column = self.delimiter.join(parts)

return new_column
Comment on lines 167 to +200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic in _find_and_replace_column_names is flawed. It performs two sequential replacement passes: a greedy substring replacement followed by a token-based replacement on the already modified string. This can lead to incorrect substitutions (e.g., replacing "INCOME" within "INCOMES") and chained replacements (e.g., A -> B, then B -> C). This could corrupt column names and cause downstream processing errors. A single, more robust replacement mechanism that correctly handles both multi-token keys and single tokens without these side effects should be used.


def _generate_stat_vars_from_spec(self):
"""generates stat_var nodes for each column in column list and
Expand All @@ -203,23 +218,16 @@ def _generate_stat_vars_from_spec(self):
# len((set(self.features['ignoreColumns']) &
# set(col.split(self.delimiter)) > 0:
for col in self.column_list:
# TODO: Replace the type of ignore_token_count to boolean
ignore_token_count = 0
for part in col.split(self.delimiter):
for token in self.features['ignoreColumns']:
if part == token:
ignore_token_count = 1
if token == col:
ignore_token_count = 1

# if no tokens of the columns are in ignoreColumns of the spec
if ignore_token_count == 0:
# Check if any string in ignoreColumns exists within the current header
is_ignored = False
for ignore_pattern in self.features['ignoreColumns']:
if ignore_pattern in col:
is_ignored = True
break
Comment on lines +221 to +226
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic for ignoring columns has been changed from exact token matching to substring matching (if ignore_pattern in col:). This is a significant change that could lead to incorrect behavior. For example, if an ignore pattern is "Total", it would match columns like "Total_Households", which might not be the intention. The previous implementation was more precise by checking for exact token matches. This change could cause valid data columns to be skipped from processing. Consider reverting to a more precise matching logic, like the one used previously, perhaps with performance improvements.


# If not ignored, proceed to find_and_replace and statvar generation
if not is_ignored:
renamed_col = self._find_and_replace_column_names(col)
# TODO: Before calling the column_to_statvar method,
# remove the base class or generalization token in the
# column name from the enumSpecialization section of the
# spec.
# TODO: Should we generate an error _column_to_statvar() returns an empty statvar?
self.column_map[col] = self._column_to_statvar(renamed_col)

# TODO: Deprecate this function, since enumSpecialization are used to
Expand Down
14 changes: 10 additions & 4 deletions scripts/us_census/api_utils/census_api_data_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@

module_dir_ = os.path.dirname(os.path.realpath(__file__))
path.insert(1, os.path.join(module_dir_, '../../../'))

from .download_utils import download_url_list_iterations
from download_utils import download_url_list_iterations
from tools.download_utils.requests_wrappers import request_url_json
from .status_file_utils import sync_status_list
from status_file_utils import sync_status_list
Comment on lines +37 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The imports for download_utils and status_file_utils have been changed from relative to absolute. While this might make the script runnable directly, it reduces modularity and makes the code dependent on sys.path manipulation. It's best practice to use relative imports for modules within the same package to maintain encapsulation.

Suggested change
from download_utils import download_url_list_iterations
from tools.download_utils.requests_wrappers import request_url_json
from .status_file_utils import sync_status_list
from status_file_utils import sync_status_list
from .download_utils import download_url_list_iterations
from tools.download_utils.requests_wrappers import request_url_json
from .status_file_utils import sync_status_list


FLAGS = flags.FLAGS

Expand Down Expand Up @@ -165,6 +164,7 @@ def download_table(dataset: str,
url_list = get_table_url_list(dataset, table_id, q_variable, year_list,
output_path, api_key, s_level_list,
force_fetch_config, force_fetch_data)
print(url_list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This print() statement appears to be for debugging purposes. Similar debug prints are present on lines 458 and 461. These should be removed before merging to avoid polluting the output with potentially large amounts of data.


status_path = os.path.join(output_path, 'download_status.json')

Expand Down Expand Up @@ -292,7 +292,11 @@ def consolidate_files(dataset: str,
df = pd.DataFrame()
for csv_file in csv_files_list[year]:
cur_csv_path = os.path.join(output_path, csv_file)
df2 = pd.read_csv(cur_csv_path, low_memory=False)
try:
df2 = pd.read_csv(cur_csv_path, low_memory=False)
except pd.errors.EmptyDataError:
logging.warning('Skipping empty file: %s', cur_csv_path)
continue
print("Collecting", csv_file)
# remove extra columns
drop_list = []
Expand Down Expand Up @@ -451,8 +455,10 @@ def download_table_variables(dataset, table_id, year_list, geo_url_map_path,

def main(argv):
year_list_int = list(range(FLAGS.start_year, FLAGS.end_year + 1))
print("#########################",year_list_int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This print() statement appears to be for debugging purposes and should be removed before merging.

year_list = [str(y) for y in year_list_int]
out_path = os.path.expanduser(FLAGS.output_path)
print("#####",FLAGS.summary_levels)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This print() statement appears to be for debugging purposes and should be removed before merging.

if FLAGS.summary_levels:
s_list = FLAGS.summary_levels
else:
Expand Down