From ad9a65c0a0eb093f24e77fb54ab82bbabbabf21d Mon Sep 17 00:00:00 2001 From: Jorge Rivera Date: Fri, 19 Dec 2025 17:17:57 +0100 Subject: [PATCH] Add bulk download 2a --- CHANGELOG.md | 7 + pyproject.toml | 4 +- src/oda_reader/__init__.py | 3 +- src/oda_reader/crs.py | 1 - src/oda_reader/dac2a.py | 54 ++++- src/oda_reader/download/download_tools.py | 215 ++++++++++++----- src/oda_reader/multisystem.py | 1 - tests/datasets/dac2a/unit/test_dac2a_bulk.py | 98 ++++++++ tests/download/unit/test_download_tools.py | 228 +++++++++++++++++++ uv.lock | 6 +- 10 files changed, 553 insertions(+), 64 deletions(-) create mode 100644 tests/datasets/dac2a/unit/test_dac2a_bulk.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 12bc067..48ffad8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog for oda_reader +## 1.4.0 (2025-12-19) +- Adds `bulk_download_dac2a()` function for bulk downloading the full DAC2A dataset. +- Auto-detects file types (parquet vs txt/csv) in bulk downloads, removing the need for the `is_txt` parameter. +- Auto-detects CSV delimiters (comma, pipe, tab, semicolon) when reading txt files from bulk downloads. +- Deprecates the `is_txt` parameter in `bulk_download_parquet()`. The parameter is still accepted for backward compatibility but emits a deprecation warning and will be removed in a future major release. +- Adds pytest and pytest-mock to dev dependencies for improved testing support. + ## 1.3.5 (2025-12-19) - Fixes `_get_dataflow_version()` to gracefully handle URLs without a version pattern instead of crashing. diff --git a/pyproject.toml b/pyproject.toml index d61f764..a153af4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oda_reader" -version = "1.3.5" +version = "1.4.0" description = "A simple package to import ODA data from the OECD's API and AidData's database" readme = "README.md" license = "MIT" @@ -42,6 +42,8 @@ build-backend = "uv_build" [dependency-groups] dev = [ "pre-commit>=4.0.0", + "pytest>=9.0.2", + "pytest-mock>=3.15.1", "ruff>=0.14.0", ] docs = [ diff --git a/src/oda_reader/__init__.py b/src/oda_reader/__init__.py index c20975a..45f33af 100644 --- a/src/oda_reader/__init__.py +++ b/src/oda_reader/__init__.py @@ -28,7 +28,7 @@ ) from oda_reader.crs import bulk_download_crs, download_crs, download_crs_file from oda_reader.dac1 import download_dac1 -from oda_reader.dac2a import download_dac2a +from oda_reader.dac2a import bulk_download_dac2a, download_dac2a from oda_reader.download.query_builder import QueryBuilder from oda_reader.multisystem import bulk_download_multisystem, download_multisystem from oda_reader.tools import get_available_filters @@ -38,6 +38,7 @@ "QueryBuilder", "download_dac1", "download_dac2a", + "bulk_download_dac2a", "download_multisystem", "bulk_download_multisystem", "download_crs", diff --git a/src/oda_reader/crs.py b/src/oda_reader/crs.py index 9e9a2d2..f51c584 100644 --- a/src/oda_reader/crs.py +++ b/src/oda_reader/crs.py @@ -62,7 +62,6 @@ def download_crs_file( return bulk_download_parquet( file_id=file_id, save_to_path=save_to_path, - is_txt=True, as_iterator=as_iterator, ) diff --git a/src/oda_reader/dac2a.py b/src/oda_reader/dac2a.py index 67c5b1e..707c727 100644 --- a/src/oda_reader/dac2a.py +++ b/src/oda_reader/dac2a.py @@ -1,13 +1,38 @@ +import typing +from pathlib import Path + import pandas as pd from oda_reader._cache import cache_info from oda_reader.common import logger -from oda_reader.download.download_tools import download +from oda_reader.download.download_tools import ( + DAC2A_FLOW_URL, + bulk_download_parquet, + download, + get_bulk_file_id, +) DATAFLOW_ID: str = "DSD_DAC2@DF_DAC2A" DATAFLOW_VERSION: str = "1.6" +def get_full_dac2a_parquet_id() -> str: + """Retrieve the file ID for the full DAC2A bulk download parquet file. + + Queries the OECD dataflow to find the bulk download link for the complete + DAC2A dataset in dotStat format. + + Returns: + str: The file ID to use with the bulk download service. + + Raises: + RuntimeError: If the file ID cannot be found after maximum retries. + """ + return get_bulk_file_id( + flow_url=DAC2A_FLOW_URL, search_string="DAC2A full dataset (dotStat format)|" + ) + + @cache_info def download_dac2a( start_year: int | None = None, @@ -52,3 +77,30 @@ def download_dac2a( ) return df + + +def bulk_download_dac2a( + save_to_path: Path | str | None = None, + *, + as_iterator: bool = False, +) -> pd.DataFrame | None | typing.Iterator[pd.DataFrame]: + """ + Bulk download the DAC2a data from the bulk download service. The file is very large. + It is therefore strongly recommended to save it to disk. If save_to_path is not + provided, the function will return a DataFrame. + + Args: + save_to_path: The path to save the file to. Optional. If not provided, a DataFrame is returned. + as_iterator: If ``True`` yields ``DataFrame`` chunks instead of a single ``DataFrame``. + + Returns: + pd.DataFrame | Iterator[pd.DataFrame] | None + + """ + file_id = get_full_dac2a_parquet_id() + + return bulk_download_parquet( + file_id=file_id, + save_to_path=save_to_path, + as_iterator=as_iterator, + ) diff --git a/src/oda_reader/download/download_tools.py b/src/oda_reader/download/download_tools.py index 6bd93f4..09c2edb 100644 --- a/src/oda_reader/download/download_tools.py +++ b/src/oda_reader/download/download_tools.py @@ -1,3 +1,4 @@ +import csv import hashlib import io import os @@ -5,6 +6,7 @@ import shutil import tempfile import typing +import warnings import zipfile from pathlib import Path @@ -37,6 +39,7 @@ BULK_DOWNLOAD_URL = "https://stats.oecd.org/wbos/fileview2.aspx?IDFile=" BASE_DATAFLOW = "https://sdmx.oecd.org/public/rest/dataflow/OECD.DCD.FSD/" CRS_FLOW_URL = BASE_DATAFLOW + "DSD_CRS@DF_CRS/" +DAC2A_FLOW_URL = BASE_DATAFLOW + "DSD_DAC2@DF_DAC2A/" MULTI_FLOW_URL = BASE_DATAFLOW + "DSD_MULTI@DF_MULTI/" AIDDATA_VERSION = "3.0" AIDDATA_DOWNLOAD_URL = ( @@ -49,6 +52,36 @@ MAX_RETRIES = 5 +def _detect_delimiter(file_obj, sample_size: int = 8192) -> str: + """Detect the delimiter used in a CSV/text file. + + Reads a sample of the file and uses csv.Sniffer to detect the delimiter. + Falls back to comma if detection fails. + + Args: + file_obj: A file-like object to read from. + sample_size: Number of bytes to sample for detection. + + Returns: + str: The detected delimiter (typically ',' or '|'). + """ + sample = file_obj.read(sample_size) + if isinstance(sample, bytes): + sample = sample.decode("utf-8", errors="replace") + + # Reset file position + file_obj.seek(0) + + try: + dialect = csv.Sniffer().sniff(sample, delimiters=",|\t;") + return dialect.delimiter + except csv.Error: + # If sniffing fails, check which delimiter appears more often + comma_count = sample.count(",") + pipe_count = sample.count("|") + return "|" if pipe_count > comma_count else "," + + def _open_zip(response_content: bytes | Path) -> zipfile.ZipFile: """Open a zip file from bytes or a file path.""" if isinstance(response_content, bytes | bytearray): @@ -193,19 +226,23 @@ def _save_or_return_parquet_files_from_content( *, as_iterator: bool = False, ) -> list[pd.DataFrame] | None | typing.Iterator[pd.DataFrame]: - """Extract parquet files from a zip archive supplied as bytes or a file path. + """Extract parquet or txt (CSV) files from a zip archive. - If `save_to_path` is provided the parquet files are extracted and written + If `save_to_path` is provided the files are extracted and written to disk. Otherwise the contents are returned either as a list of `DataFrame` objects or, when `as_iterator` is `True`, as an iterator - yielding one `DataFrame` per row group. Iterating over row groups avoids - materialising the entire file in memory at once. + yielding one `DataFrame` per row group (parquet only). + + The function auto-detects whether the zip contains parquet or txt files. + Txt files have their delimiter auto-detected (comma or pipe) and are + converted to parquet when saving. Args: - response_content: Bytes or `Path` pointing to the zipped parquet file. - save_to_path: Optional path to save the parquet files to. + response_content: Bytes or `Path` pointing to the zipped file. + save_to_path: Optional path to save the files to. as_iterator: When `True` return an iterator that yields `DataFrame` - objects for each row group. Defaults to ``False``. + objects for each row group. Defaults to ``False``. Only supported + for parquet files. Returns: list[pd.DataFrame] | Iterator[pd.DataFrame] | None @@ -215,26 +252,72 @@ def _save_or_return_parquet_files_from_content( with _open_zip(response_content=response_content) as z: parquet_files = [name for name in z.namelist() if name.endswith(".parquet")] + txt_files = [name for name in z.namelist() if name.endswith(".txt")] + + # Determine which file type we're dealing with + if parquet_files: + if save_to_path: + save_to_path.mkdir(parents=True, exist_ok=True) + for file_name in parquet_files: + logger.info(f"Saving {file_name}") + dest_path = save_to_path / file_name + dest_path.parent.mkdir(parents=True, exist_ok=True) + with ( + z.open(file_name) as f_in, + dest_path.open("wb") as f_out, + ): + shutil.copyfileobj(f_in, f_out, length=1024 * 1024) + return None - if save_to_path: - save_to_path.mkdir(parents=True, exist_ok=True) - for file_name in parquet_files: - logger.info(f"Saving {file_name}") - dest_path = save_to_path / file_name - dest_path.parent.mkdir(parents=True, exist_ok=True) - with ( - z.open(file_name) as f_in, - dest_path.open("wb") as f_out, - ): - shutil.copyfileobj(f_in, f_out, length=1024 * 1024) - return None + if as_iterator: + # Return a generator over row groups + return _iter_frames(response_content=response_content) - if as_iterator: - # Return a generator over row groups - return _iter_frames(response_content=response_content) + logger.info(f"Reading {len(parquet_files)} parquet files.") + return [pd.read_parquet(z.open(file)) for file in parquet_files] + + elif txt_files: + if as_iterator: + raise ValueError("Streaming not supported for txt files.") - logger.info(f"Reading {len(parquet_files)} parquet files.") - return [pd.read_parquet(z.open(file)) for file in parquet_files] + if save_to_path: + save_to_path.mkdir(parents=True, exist_ok=True) + for file_name in txt_files: + clean_name = ( + file_name.replace(".txt", ".parquet").lower().replace(" ", "_") + ) + logger.info(f"Saving {clean_name}") + with z.open(file_name) as f_in: + delimiter = _detect_delimiter(f_in) + logger.info(f"Detected delimiter: '{delimiter}'") + pd.read_csv( + f_in, + delimiter=delimiter, + encoding="utf-8", + quotechar='"', + low_memory=False, + ).to_parquet(save_to_path / clean_name) + return None + + logger.info(f"Reading {len(txt_files)} txt files.") + dfs = [] + for file_name in txt_files: + with z.open(file_name) as f_in: + delimiter = _detect_delimiter(f_in) + logger.info(f"Detected delimiter for {file_name}: '{delimiter}'") + dfs.append( + pd.read_csv( + f_in, + delimiter=delimiter, + encoding="utf-8", + quotechar='"', + low_memory=False, + ) + ) + return dfs + + else: + raise ValueError("No parquet or txt files found in the zip archive.") def _save_or_return_parquet_files_from_txt_in_zip( @@ -243,7 +326,8 @@ def _save_or_return_parquet_files_from_txt_in_zip( ) -> list[pd.DataFrame] | None: """Extract a `.txt` file from a zipped archive supplied as bytes or a file path. - The file is read as CSV and optionally saved as a parquet file. + The file is read as CSV (with auto-detected delimiter) and optionally saved + as a parquet file. Args: response_content: Bytes or ``Path`` pointing to the zipped archive. @@ -253,18 +337,11 @@ def _save_or_return_parquet_files_from_txt_in_zip( Returns: list[pd.DataFrame]: The extracted DataFrames if save_to_path is not provided. """ - oecd_txt_args = { - "delimiter": "|", - "encoding": "utf-8", - "quotechar": '"', - "low_memory": False, - } - # Convert the save_to_path to a Path object save_to_path = Path(save_to_path).expanduser().resolve() if save_to_path else None with _open_zip(response_content=response_content) as z: - # Find all parquet files in the zip archive + # Find all txt files in the zip archive files = [name for name in z.namelist() if name.endswith(".txt")] # If save_to_path is provided, save the files to the path @@ -276,14 +353,34 @@ def _save_or_return_parquet_files_from_txt_in_zip( ) logger.info(f"Saving {clean_name}") with z.open(file_name) as f_in: - pd.read_csv(f_in, **oecd_txt_args).to_parquet( - save_to_path / clean_name - ) - return + delimiter = _detect_delimiter(f_in) + logger.info(f"Detected delimiter: '{delimiter}'") + pd.read_csv( + f_in, + delimiter=delimiter, + encoding="utf-8", + quotechar='"', + low_memory=False, + ).to_parquet(save_to_path / clean_name) + return None # If save_to_path is not provided, return the DataFrames logger.info(f"Reading {len(files)} files.") - return [pd.read_csv(z.open(file), **oecd_txt_args) for file in files] + dfs = [] + for file_name in files: + with z.open(file_name) as f_in: + delimiter = _detect_delimiter(f_in) + logger.info(f"Detected delimiter for {file_name}: '{delimiter}'") + dfs.append( + pd.read_csv( + f_in, + delimiter=delimiter, + encoding="utf-8", + quotechar='"', + low_memory=False, + ) + ) + return dfs def _save_or_return_excel_files_from_content( @@ -397,26 +494,36 @@ def _get_temp_file(file_url: str, use_cache: bool = True) -> tuple[Path, bool]: def bulk_download_parquet( file_id: str, save_to_path: Path | str | None = None, - is_txt: bool = False, + is_txt: bool | None = None, *, as_iterator: bool = False, ) -> pd.DataFrame | None | typing.Iterator[pd.DataFrame]: """Download data from the stats.oecd.org file download service. - Certain data files are available as a bulk download in parquet format. This function - downloads the parquet files and returns a single DataFrame. + Certain data files are available as a bulk download. This function + downloads the files (parquet or txt/csv) and returns a single DataFrame. + The file type is auto-detected from the zip contents. Args: file_id (str): The ID of the file to download. save_to_path (Path | str | None): The path to save the file to. Optional. If not provided, the contents are returned. - is_txt (bool): Whether the file is a .txt file. Defaults to False. + is_txt (bool | None): Deprecated. File type is now auto-detected. + This parameter is ignored and will be removed in a future version. as_iterator (bool): When ``True`` return an iterator over ``DataFrame`` chunks instead of a single ``DataFrame``. Useful for large files. + Only supported for parquet files. Returns: pd.DataFrame | Iterator[pd.DataFrame] | None """ + if is_txt is not None: + warnings.warn( + "The 'is_txt' parameter is deprecated and will be removed in a future " + "version. File type (parquet or txt) is now auto-detected.", + DeprecationWarning, + stacklevel=2, + ) # Construct the URL file_url = BULK_DOWNLOAD_URL + file_id @@ -430,22 +537,14 @@ def bulk_download_parquet( temp_zip_path, cleanup = _get_temp_file(file_url) try: - # Read the parquet file - if is_txt: - if as_iterator: - raise ValueError("Streaming not supported for txt files.") - files = _save_or_return_parquet_files_from_txt_in_zip( - response_content=temp_zip_path, - save_to_path=save_to_path, - ) - else: - files = _save_or_return_parquet_files_from_content( - response_content=temp_zip_path, - save_to_path=save_to_path, - as_iterator=as_iterator, - ) - if as_iterator: - return files + # Auto-detect file type (parquet or txt) and process + files = _save_or_return_parquet_files_from_content( + response_content=temp_zip_path, + save_to_path=save_to_path, + as_iterator=as_iterator, + ) + if as_iterator: + return files except zipfile.BadZipFile: if cleanup: os.unlink(temp_zip_path) diff --git a/src/oda_reader/multisystem.py b/src/oda_reader/multisystem.py index ba3731d..b53a163 100644 --- a/src/oda_reader/multisystem.py +++ b/src/oda_reader/multisystem.py @@ -47,7 +47,6 @@ def bulk_download_multisystem( return bulk_download_parquet( file_id=file_id, save_to_path=save_to_path, - is_txt=True, as_iterator=as_iterator, ) diff --git a/tests/datasets/dac2a/unit/test_dac2a_bulk.py b/tests/datasets/dac2a/unit/test_dac2a_bulk.py new file mode 100644 index 0000000..0f1e979 --- /dev/null +++ b/tests/datasets/dac2a/unit/test_dac2a_bulk.py @@ -0,0 +1,98 @@ +"""Unit tests for DAC2a bulk download functionality.""" + +import pytest + +from oda_reader.dac2a import bulk_download_dac2a, get_full_dac2a_parquet_id + + +@pytest.mark.unit +class TestDAC2aBulkDownload: + """Test DAC2a bulk download functions with mocked dependencies.""" + + def test_get_full_dac2a_parquet_id_calls_correct_function(self, mocker): + """Test that get_full_dac2a_parquet_id calls get_bulk_file_id with correct params.""" + mock_get_bulk = mocker.patch( + "oda_reader.dac2a.get_bulk_file_id", + return_value="test-file-id-123", + ) + + result = get_full_dac2a_parquet_id() + + assert result == "test-file-id-123" + mock_get_bulk.assert_called_once() + # Verify the flow URL contains DAC2A + call_kwargs = mock_get_bulk.call_args.kwargs + assert "DAC2A" in call_kwargs["flow_url"] + # Verify the search string is for DAC2A full dataset + assert "DAC2A full dataset" in call_kwargs["search_string"] + + def test_bulk_download_dac2a_returns_dataframe(self, mocker): + """Test that bulk_download_dac2a returns DataFrame when no save path.""" + import pandas as pd + + mock_df = pd.DataFrame({"col1": [1, 2], "col2": [3, 4]}) + + mocker.patch( + "oda_reader.dac2a.get_full_dac2a_parquet_id", + return_value="mock-file-id", + ) + mock_bulk = mocker.patch( + "oda_reader.dac2a.bulk_download_parquet", + return_value=mock_df, + ) + + result = bulk_download_dac2a() + + assert result is mock_df + mock_bulk.assert_called_once_with( + file_id="mock-file-id", + save_to_path=None, + as_iterator=False, + ) + + def test_bulk_download_dac2a_saves_to_path(self, mocker, tmp_path): + """Test that bulk_download_dac2a passes save path to underlying function.""" + mocker.patch( + "oda_reader.dac2a.get_full_dac2a_parquet_id", + return_value="mock-file-id", + ) + mock_bulk = mocker.patch( + "oda_reader.dac2a.bulk_download_parquet", + return_value=None, + ) + + result = bulk_download_dac2a(save_to_path=tmp_path) + + assert result is None + mock_bulk.assert_called_once_with( + file_id="mock-file-id", + save_to_path=tmp_path, + as_iterator=False, + ) + + def test_bulk_download_dac2a_as_iterator(self, mocker): + """Test that bulk_download_dac2a passes as_iterator flag correctly.""" + import pandas as pd + + def mock_iterator(): + yield pd.DataFrame({"col1": [1]}) + yield pd.DataFrame({"col1": [2]}) + + mocker.patch( + "oda_reader.dac2a.get_full_dac2a_parquet_id", + return_value="mock-file-id", + ) + mock_bulk = mocker.patch( + "oda_reader.dac2a.bulk_download_parquet", + return_value=mock_iterator(), + ) + + result = bulk_download_dac2a(as_iterator=True) + + # Result should be an iterator + assert hasattr(result, "__iter__") + mock_bulk.assert_called_once_with( + file_id="mock-file-id", + save_to_path=None, + as_iterator=True, + ) diff --git a/tests/download/unit/test_download_tools.py b/tests/download/unit/test_download_tools.py index ca16d32..9191eed 100644 --- a/tests/download/unit/test_download_tools.py +++ b/tests/download/unit/test_download_tools.py @@ -1,8 +1,18 @@ """Unit tests for download tools with mocked API responses.""" +import io +import warnings +import zipfile + +import pandas as pd import pytest from oda_reader.common import get_data_from_api +from oda_reader.download.download_tools import ( + _detect_delimiter, + _save_or_return_parquet_files_from_content, + bulk_download_parquet, +) @pytest.mark.unit @@ -55,3 +65,221 @@ def test_get_data_from_api_non_404_error_raises(self, mocker): with pytest.raises(ConnectionError, match="Error 500"): get_data_from_api("https://example.com/data") + + +@pytest.mark.unit +class TestDetectDelimiter: + """Test delimiter detection for CSV/txt files.""" + + def test_detect_comma_delimiter(self): + """Test that comma-delimited content is detected correctly.""" + csv_content = "col1,col2,col3\nval1,val2,val3\nval4,val5,val6" + file_obj = io.BytesIO(csv_content.encode("utf-8")) + + delimiter = _detect_delimiter(file_obj) + + assert delimiter == "," + # Verify file position was reset + assert file_obj.tell() == 0 + + def test_detect_pipe_delimiter(self): + """Test that pipe-delimited content is detected correctly.""" + csv_content = "col1|col2|col3\nval1|val2|val3\nval4|val5|val6" + file_obj = io.BytesIO(csv_content.encode("utf-8")) + + delimiter = _detect_delimiter(file_obj) + + assert delimiter == "|" + assert file_obj.tell() == 0 + + def test_detect_tab_delimiter(self): + """Test that tab-delimited content is detected correctly.""" + csv_content = "col1\tcol2\tcol3\nval1\tval2\tval3" + file_obj = io.BytesIO(csv_content.encode("utf-8")) + + delimiter = _detect_delimiter(file_obj) + + assert delimiter == "\t" + assert file_obj.tell() == 0 + + def test_comma_wins_when_ambiguous(self): + """Test that comma is preferred when sniffing fails and counts are equal.""" + # Content with no clear delimiter + csv_content = "just some text without clear delimiters" + file_obj = io.BytesIO(csv_content.encode("utf-8")) + + delimiter = _detect_delimiter(file_obj) + + # Should default to comma when counts are equal (both 0) + assert delimiter == "," + + def test_works_with_string_io(self): + """Test that delimiter detection works with StringIO objects too.""" + csv_content = "col1;col2;col3\nval1;val2;val3" + file_obj = io.StringIO(csv_content) + + delimiter = _detect_delimiter(file_obj) + + assert delimiter == ";" + assert file_obj.tell() == 0 + + +@pytest.mark.unit +class TestFileTypeAutoDetection: + """Test automatic file type detection in zip archives.""" + + def _create_zip_with_parquet(self) -> bytes: + """Create a zip file containing a parquet file.""" + df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}) + parquet_buffer = io.BytesIO() + df.to_parquet(parquet_buffer) + + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "w") as z: + z.writestr("test_data.parquet", parquet_buffer.getvalue()) + return zip_buffer.getvalue() + + def _create_zip_with_txt(self, delimiter: str = ",") -> bytes: + """Create a zip file containing a txt/CSV file.""" + if delimiter == "|": + csv_content = "col1|col2|col3\n1|2|3\n4|5|6" + else: + csv_content = "col1,col2,col3\n1,2,3\n4,5,6" + + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "w") as z: + z.writestr("test_data.txt", csv_content.encode("utf-8")) + return zip_buffer.getvalue() + + def test_auto_detect_parquet_files(self): + """Test that parquet files are auto-detected and read correctly.""" + zip_content = self._create_zip_with_parquet() + + result = _save_or_return_parquet_files_from_content(zip_content) + + assert result is not None + assert len(result) == 1 + assert isinstance(result[0], pd.DataFrame) + assert list(result[0].columns) == ["col1", "col2"] + assert len(result[0]) == 3 + + def test_auto_detect_txt_files_comma(self): + """Test that comma-delimited txt files are auto-detected.""" + zip_content = self._create_zip_with_txt(delimiter=",") + + result = _save_or_return_parquet_files_from_content(zip_content) + + assert result is not None + assert len(result) == 1 + assert isinstance(result[0], pd.DataFrame) + assert list(result[0].columns) == ["col1", "col2", "col3"] + + def test_auto_detect_txt_files_pipe(self): + """Test that pipe-delimited txt files are auto-detected.""" + zip_content = self._create_zip_with_txt(delimiter="|") + + result = _save_or_return_parquet_files_from_content(zip_content) + + assert result is not None + assert len(result) == 1 + df = result[0] + assert isinstance(df, pd.DataFrame) + assert list(df.columns) == ["col1", "col2", "col3"] + assert len(df) == 2 + + def test_save_parquet_to_path(self, tmp_path): + """Test saving parquet files to a path.""" + zip_content = self._create_zip_with_parquet() + + result = _save_or_return_parquet_files_from_content( + zip_content, save_to_path=tmp_path + ) + + assert result is None + saved_files = list(tmp_path.glob("*.parquet")) + assert len(saved_files) == 1 + # Verify the saved file can be read + df = pd.read_parquet(saved_files[0]) + assert len(df) == 3 + + def test_save_txt_as_parquet_to_path(self, tmp_path): + """Test that txt files are converted to parquet when saving.""" + zip_content = self._create_zip_with_txt() + + result = _save_or_return_parquet_files_from_content( + zip_content, save_to_path=tmp_path + ) + + assert result is None + saved_files = list(tmp_path.glob("*.parquet")) + assert len(saved_files) == 1 + # Verify conversion to parquet + assert saved_files[0].suffix == ".parquet" + df = pd.read_parquet(saved_files[0]) + assert len(df) == 2 + + def test_raises_on_empty_zip(self): + """Test that ValueError is raised when zip has no valid files.""" + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "w") as z: + z.writestr("readme.md", "Not a data file") + + with pytest.raises(ValueError, match="No parquet or txt files"): + _save_or_return_parquet_files_from_content(zip_buffer.getvalue()) + + def test_txt_iterator_raises(self): + """Test that as_iterator raises for txt files.""" + zip_content = self._create_zip_with_txt() + + with pytest.raises(ValueError, match="Streaming not supported"): + _save_or_return_parquet_files_from_content(zip_content, as_iterator=True) + + +@pytest.mark.unit +class TestDeprecationWarnings: + """Test deprecation warnings for backward compatibility.""" + + def test_is_txt_parameter_emits_deprecation_warning(self, mocker): + """Test that using is_txt parameter emits a deprecation warning.""" + # Mock the internal functions to avoid actual downloads + mocker.patch( + "oda_reader.download.download_tools._get_temp_file", + return_value=("/fake/path", False), + ) + mocker.patch( + "oda_reader.download.download_tools._save_or_return_parquet_files_from_content", + return_value=[pd.DataFrame({"col": [1, 2]})], + ) + + with pytest.warns(DeprecationWarning, match="is_txt.*deprecated"): + bulk_download_parquet("fake-id", is_txt=True) + + def test_is_txt_false_also_emits_warning(self, mocker): + """Test that is_txt=False also emits deprecation warning.""" + mocker.patch( + "oda_reader.download.download_tools._get_temp_file", + return_value=("/fake/path", False), + ) + mocker.patch( + "oda_reader.download.download_tools._save_or_return_parquet_files_from_content", + return_value=[pd.DataFrame({"col": [1, 2]})], + ) + + with pytest.warns(DeprecationWarning, match="is_txt.*deprecated"): + bulk_download_parquet("fake-id", is_txt=False) + + def test_no_warning_when_is_txt_not_provided(self, mocker): + """Test that no warning is emitted when is_txt is not provided.""" + mocker.patch( + "oda_reader.download.download_tools._get_temp_file", + return_value=("/fake/path", False), + ) + mocker.patch( + "oda_reader.download.download_tools._save_or_return_parquet_files_from_content", + return_value=[pd.DataFrame({"col": [1, 2]})], + ) + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + # Should not raise any DeprecationWarning + bulk_download_parquet("fake-id") diff --git a/uv.lock b/uv.lock index fa9a79a..9512dd9 100644 --- a/uv.lock +++ b/uv.lock @@ -789,7 +789,7 @@ wheels = [ [[package]] name = "oda-reader" -version = "1.3.4" +version = "1.4.0" source = { editable = "." } dependencies = [ { name = "filelock" }, @@ -805,6 +805,8 @@ dependencies = [ [package.dev-dependencies] dev = [ { name = "pre-commit" }, + { name = "pytest" }, + { name = "pytest-mock" }, { name = "ruff" }, ] docs = [ @@ -834,6 +836,8 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ { name = "pre-commit", specifier = ">=4.0.0" }, + { name = "pytest", specifier = ">=9.0.2" }, + { name = "pytest-mock", specifier = ">=3.15.1" }, { name = "ruff", specifier = ">=0.14.0" }, ] docs = [