Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dataframely/_storage/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def sink_frame(
) -> None:
file = kwargs.pop("file")
metadata = kwargs.pop("metadata", {})
file.parent.mkdir(parents=True, exist_ok=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we shouldn't do this here but rather in the specialized functions for collections. Also, I wouldn't set parents=True (see main comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm fair point. I looked into this more. pl.write_parquet has a parameter mkdir (default False). So to get the exact same behavior, I guess we would have to add this. But I see that the additional complexity is not worth it and we can just always not do it.

We currently rely on the fact that write_parquet sets mkdir to True for partitioned writes and creates all parent directories here because we add an extra layer of directories that is then going to be created by polars.

I think if we do not want an mkdir argument, we should not create anything anywhere, except for partitioned writes. That would not require any code changes, just updates to the docs. Is this what you have in mind?

Copy link
Contributor Author

@MoritzPotthoffQC MoritzPotthoffQC Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to just delegate this to the mkdir parameter in polars.

@borchero I just realized that the mkdir parameter is only exposed from polars 1.33 onwards. So I think we should wait with this PR until we bump there anyway.

Copy link
Member

@borchero borchero Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will bump the version requirement as part of #139, do you want to wait until then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that works for me. I will redraft this for now then.

lf.sink_parquet(
file,
metadata={**metadata, SCHEMA_METADATA_KEY: serialized_schema},
Expand All @@ -42,6 +43,7 @@ def write_frame(
) -> None:
file = kwargs.pop("file")
metadata = kwargs.pop("metadata", {})
file.parent.mkdir(parents=True, exist_ok=True)
df.write_parquet(
file,
metadata={**metadata, SCHEMA_METADATA_KEY: serialized_schema},
Expand Down
1 change: 1 addition & 0 deletions dataframely/testing/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def write_typed(
schema.write_parquet(df, self._wrap_path(path))

def write_untyped(self, df: pl.DataFrame, path: Path, lazy: bool) -> None:
path.mkdir(parents=True, exist_ok=True)
if lazy:
df.lazy().sink_parquet(self._wrap_path(path))
else:
Expand Down
94 changes: 53 additions & 41 deletions tests/collection/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ class MyCollection2(dy.Collection):
@pytest.mark.parametrize("kwargs", [{}, {"partition_by": "a"}])
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write(
tester: CollectionStorageTester, tmp_path: Path, kwargs: dict[str, Any], lazy: bool
tester: CollectionStorageTester,
tmp_path_non_existent: Path,
kwargs: dict[str, Any],
lazy: bool,
) -> None:
# Arrange
collection = MyCollection.validate(
Expand All @@ -72,10 +75,10 @@ def test_read_write(
)

# Act
tester.write_typed(collection, tmp_path, lazy=lazy, **kwargs)
tester.write_typed(collection, tmp_path_non_existent, lazy=lazy, **kwargs)

# Assert
out = tester.read(MyCollection, tmp_path, lazy)
out = tester.read(MyCollection, tmp_path_non_existent, lazy)
assert_frame_equal(collection.first, out.first)
assert collection.second is not None
assert out.second is not None
Expand All @@ -86,7 +89,10 @@ def test_read_write(
@pytest.mark.parametrize("kwargs", [{}, {"partition_by": "a"}])
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_optional(
tester: CollectionStorageTester, tmp_path: Path, kwargs: dict[str, Any], lazy: bool
tester: CollectionStorageTester,
tmp_path_non_existent: Path,
kwargs: dict[str, Any],
lazy: bool,
) -> None:
# Arrange
collection = MyCollection.validate(
Expand All @@ -95,10 +101,10 @@ def test_read_write_optional(

# Act
write_lazy = lazy and "partition_by" not in kwargs
tester.write_typed(collection, tmp_path, lazy=write_lazy, **kwargs)
tester.write_typed(collection, tmp_path_non_existent, lazy=write_lazy, **kwargs)

# Assert
out = tester.read(MyCollection, tmp_path, lazy)
out = tester.read(MyCollection, tmp_path_non_existent, lazy)
assert_frame_equal(collection.first, out.first)
assert collection.second is None
assert out.second is None
Expand All @@ -112,18 +118,18 @@ def test_read_write_optional(
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_if_schema_matches(
tester: CollectionStorageTester,
tmp_path: Path,
tmp_path_non_existent: Path,
mocker: pytest_mock.MockerFixture,
validation: Any,
lazy: bool,
) -> None:
# Arrange
collection = MyCollection.create_empty()
tester.write_typed(collection, tmp_path, lazy=lazy)
tester.write_typed(collection, tmp_path_non_existent, lazy=lazy)

# Act
spy = mocker.spy(MyCollection, "validate")
tester.read(MyCollection, tmp_path, lazy=lazy, validation=validation)
tester.read(MyCollection, tmp_path_non_existent, lazy=lazy, validation=validation)

# Assert
spy.assert_not_called()
Expand All @@ -136,21 +142,21 @@ def test_read_write_if_schema_matches(
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_validation_warn_no_schema(
tester: CollectionStorageTester,
tmp_path: Path,
tmp_path_non_existent: Path,
mocker: pytest_mock.MockerFixture,
lazy: bool,
) -> None:
# Arrange
collection = MyCollection.create_empty()
tester.write_untyped(collection, tmp_path, lazy=lazy)
tester.write_untyped(collection, tmp_path_non_existent, lazy=lazy)

# Act
spy = mocker.spy(MyCollection, "validate")
with pytest.warns(
UserWarning,
match=r"requires validation: no collection schema to check validity",
):
tester.read(MyCollection, tmp_path, lazy, validation="warn")
tester.read(MyCollection, tmp_path_non_existent, lazy, validation="warn")

# Assert
spy.assert_called_once()
Expand All @@ -160,21 +166,21 @@ def test_read_write_validation_warn_no_schema(
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_validation_warn_invalid_schema(
tester: CollectionStorageTester,
tmp_path: Path,
tmp_path_non_existent: Path,
mocker: pytest_mock.MockerFixture,
lazy: bool,
) -> None:
# Arrange
collection = MyCollection.create_empty()
tester.write_typed(collection, tmp_path, lazy=lazy)
tester.write_typed(collection, tmp_path_non_existent, lazy=lazy)

# Act
spy = mocker.spy(MyCollection2, "validate")
with pytest.warns(
UserWarning,
match=r"requires validation: current collection schema does not match",
):
tester.read(MyCollection2, tmp_path, lazy)
tester.read(MyCollection2, tmp_path_non_existent, lazy)

# Assert
spy.assert_called_once()
Expand All @@ -185,17 +191,17 @@ def test_read_write_validation_warn_invalid_schema(
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_validation_allow_no_schema(
tester: CollectionStorageTester,
tmp_path: Path,
tmp_path_non_existent: Path,
mocker: pytest_mock.MockerFixture,
lazy: bool,
) -> None:
# Arrange
collection = MyCollection.create_empty()
tester.write_untyped(collection, tmp_path, lazy=lazy)
tester.write_untyped(collection, tmp_path_non_existent, lazy=lazy)

# Act
spy = mocker.spy(MyCollection, "validate")
tester.read(MyCollection, tmp_path, lazy, validation="allow")
tester.read(MyCollection, tmp_path_non_existent, lazy, validation="allow")

# Assert
spy.assert_called_once()
Expand All @@ -205,17 +211,17 @@ def test_read_write_validation_allow_no_schema(
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_validation_allow_invalid_schema(
tester: CollectionStorageTester,
tmp_path: Path,
tmp_path_non_existent: Path,
mocker: pytest_mock.MockerFixture,
lazy: bool,
) -> None:
# Arrange
collection = MyCollection.create_empty()
tester.write_typed(collection, tmp_path, lazy=lazy)
tester.write_typed(collection, tmp_path_non_existent, lazy=lazy)

# Act
spy = mocker.spy(MyCollection2, "validate")
tester.read(MyCollection2, tmp_path, lazy, validation="allow")
tester.read(MyCollection2, tmp_path_non_existent, lazy, validation="allow")

# Assert
spy.assert_called_once()
Expand All @@ -227,37 +233,37 @@ def test_read_write_validation_allow_invalid_schema(
@pytest.mark.parametrize("tester", TESTERS)
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_validation_forbid_no_schema(
tester: CollectionStorageTester, tmp_path: Path, lazy: bool
tester: CollectionStorageTester, tmp_path_non_existent: Path, lazy: bool
) -> None:
# Arrange
collection = MyCollection.create_empty()
tester.write_untyped(collection, tmp_path, lazy=lazy)
tester.write_untyped(collection, tmp_path_non_existent, lazy=lazy)

# Act
with pytest.raises(
ValidationRequiredError,
match=r"without validation: no collection schema to check validity",
):
tester.read(MyCollection, tmp_path, lazy, validation="forbid")
tester.read(MyCollection, tmp_path_non_existent, lazy, validation="forbid")


@pytest.mark.parametrize("tester", TESTERS)
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_validation_forbid_invalid_schema(
tester: CollectionStorageTester, tmp_path: Path, lazy: bool
tester: CollectionStorageTester, tmp_path_non_existent: Path, lazy: bool
) -> None:
# Arrange

collection = MyCollection.create_empty()

tester.write_typed(collection, tmp_path, lazy=lazy)
tester.write_typed(collection, tmp_path_non_existent, lazy=lazy)

# Act
with pytest.raises(
ValidationRequiredError,
match=r"without validation: current collection schema does not match",
):
tester.read(MyCollection2, tmp_path, lazy, validation="forbid")
tester.read(MyCollection2, tmp_path_non_existent, lazy, validation="forbid")


# --------------------------------- VALIDATION "SKIP" -------------------------------- #
Expand All @@ -267,17 +273,17 @@ def test_read_write_validation_forbid_invalid_schema(
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_validation_skip_no_schema(
tester: CollectionStorageTester,
tmp_path: Path,
tmp_path_non_existent: Path,
mocker: pytest_mock.MockerFixture,
lazy: bool,
) -> None:
# Arrange
collection = MyCollection.create_empty()
tester.write_untyped(collection, tmp_path, lazy=lazy)
tester.write_untyped(collection, tmp_path_non_existent, lazy=lazy)

# Act
spy = mocker.spy(MyCollection, "validate")
tester.read(MyCollection, tmp_path, lazy, validation="skip")
tester.read(MyCollection, tmp_path_non_existent, lazy, validation="skip")

# Assert
spy.assert_not_called()
Expand All @@ -287,17 +293,17 @@ def test_read_write_validation_skip_no_schema(
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_validation_skip_invalid_schema(
tester: CollectionStorageTester,
tmp_path: Path,
tmp_path_non_existent: Path,
mocker: pytest_mock.MockerFixture,
lazy: bool,
) -> None:
# Arrange
collection = MyCollection.create_empty()
tester.write_typed(collection, tmp_path, lazy=lazy)
tester.write_typed(collection, tmp_path_non_existent, lazy=lazy)

# Act
spy = mocker.spy(collection, "validate")
tester.read(MyCollection2, tmp_path, lazy, validation="skip")
tester.read(MyCollection2, tmp_path_non_existent, lazy, validation="skip")

# Assert
spy.assert_not_called()
Expand Down Expand Up @@ -332,7 +338,10 @@ def test_reconcile_collection_types(
@pytest.mark.parametrize("validation", ["warn", "allow", "forbid", "skip"])
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_parquet_fallback_schema_json_success(
tmp_path: Path, mocker: pytest_mock.MockerFixture, validation: Any, lazy: bool
tmp_path_non_existent: Path,
mocker: pytest_mock.MockerFixture,
validation: Any,
lazy: bool,
) -> None:
# In https://github.com/Quantco/dataframely/pull/107, the
# mechanism for storing collection metadata was changed.
Expand All @@ -345,12 +354,12 @@ def test_read_write_parquet_fallback_schema_json_success(
# Arrange
tester = ParquetCollectionStorageTester()
collection = MyCollection.create_empty()
tester.write_untyped(collection, tmp_path, lazy)
(tmp_path / "schema.json").write_text(collection.serialize())
tester.write_untyped(collection, tmp_path_non_existent, lazy)
(tmp_path_non_existent / "schema.json").write_text(collection.serialize())

# Act
spy = mocker.spy(MyCollection, "validate")
tester.read(MyCollection, tmp_path, lazy, validation=validation)
tester.read(MyCollection, tmp_path_non_existent, lazy, validation=validation)

# Assert
spy.assert_not_called()
Expand All @@ -359,21 +368,24 @@ def test_read_write_parquet_fallback_schema_json_success(
@pytest.mark.parametrize("validation", ["allow", "warn"])
@pytest.mark.parametrize("lazy", [True, False])
def test_read_write_parquet_schema_json_fallback_corrupt(
tmp_path: Path, mocker: pytest_mock.MockerFixture, validation: Any, lazy: bool
tmp_path_non_existent: Path,
mocker: pytest_mock.MockerFixture,
validation: Any,
lazy: bool,
) -> None:
"""If the schema.json file is present, but corrupt, we should always fall back to
validating."""
# Arrange
collection = MyCollection.create_empty()
tester = ParquetCollectionStorageTester()
tester.write_untyped(collection, tmp_path, lazy)
(tmp_path / "schema.json").write_text("} this is not a valid JSON {")
tester.write_untyped(collection, tmp_path_non_existent, lazy)
(tmp_path_non_existent / "schema.json").write_text("} this is not a valid JSON {")

# Act
spy = mocker.spy(MyCollection, "validate")
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UserWarning)
tester.read(MyCollection, tmp_path, lazy, validation=validation)
tester.read(MyCollection, tmp_path_non_existent, lazy, validation=validation)

# Assert
spy.assert_called_once()
Expand Down
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) QuantCo 2025-2025
# SPDX-License-Identifier: BSD-3-Clause

from pathlib import Path

import pytest


@pytest.fixture()
def tmp_path_non_existent(tmp_path: Path) -> Path:
"""A path to a directory below `tmp_path` that does not exist yet."""
return tmp_path / "subdir"
Loading
Loading