Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e7c04b2
added access via S3 bucket
jsreu Nov 7, 2025
b5f78d7
saving pickle fils with pkl
jsreu Nov 7, 2025
08ce659
changed to .pkl, accessing S3 bucket, accessing non-Collection-1 prod…
jsreu Nov 7, 2025
0baa966
added boto3 to requirements
jsreu Nov 7, 2025
92d8b83
fixed linting
jsreu Nov 7, 2025
a13660b
added utils for s3 access
jsreu Nov 7, 2025
66e4140
updated API URL
jsreu Nov 14, 2025
e37ac78
Fixed linting
jsreu Nov 14, 2025
55dc2d7
Fixed linting
jsreu Nov 14, 2025
67d7027
Fixed linting
jsreu Nov 14, 2025
c49d218
replacing 'codede' prefix;raising AssertionError if list is empty
jsreu Nov 14, 2025
a857aef
Fixed linting
jsreu Nov 14, 2025
ea03a71
replacing 'codede' prefix
jsreu Nov 14, 2025
c07dea5
fixed linting
jsreu Nov 14, 2025
f6a0208
replace list with None if any value is nan
jsreu Nov 25, 2025
f9ed253
fixing url, splitting error message, fixing API call requests
jsreu Nov 25, 2025
db4579d
adding nuts regions for sub countries if necessary
jsreu Nov 27, 2025
32c1cd6
adding nuts regions for sub countries if necessary
jsreu Nov 27, 2025
e50558a
add option to filter countries for preprocessing
jsreu Nov 27, 2025
91b3c75
linting
jsreu Nov 27, 2025
01119e6
fixed torch version for numpy compatability
jsreu Nov 27, 2025
0bc2600
Merge branch 'main' into 103-add-data-access-via-s3
jsreu Nov 27, 2025
6ff4018
removing parcels without label;updated warning
jsreu Dec 29, 2025
d598cf2
limiting reading to only label files
jsreu Dec 29, 2025
9e8a846
updated changelog
jsreu Dec 29, 2025
a77552b
resolved merge conflicts
jsreu Dec 29, 2025
270e180
resolved merge conflicts
jsreu Dec 29, 2025
6d29766
removed duplicated keys
jsreu Dec 29, 2025
3d0a7f0
updated datahub URL
jsreu Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ Changes for the upcoming release can be found (and will be added on merging feat
Changes from previous releases are listed below.

## Upcoming Release
- Set all band values to None if any of them is None _(see #104)_
- Remove entries from shapefile without label before preprocessing _(see #109)_
- Read only label files for class namt to id dict _(see #110)_
- Fix numpy/torch version conflict _(see #95)_
- Add option to filter countries for preprocessing _(see #108)_
- Add NUTS region identifier _(see #107)_
- Update collection of Sentinel-2 data to Collection 1 _(see #75)_
- Fix API URL _(see #94)_
- Change extension of pickle files _(see #105)_
- Add data access via S3 bucket _(see #103)_
- Fix country polygons _(see #99)_
- Update to EuroCrops V11 _(see #63)_
- Update country polygons _(see #89)_
Expand Down
15 changes: 14 additions & 1 deletion eurocropsml/acquisition/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import logging
from pathlib import Path
from typing import cast
from typing import Literal, cast

from eurocropsml.acquisition import collector, copier, region
from eurocropsml.acquisition.clipping import clipper
from eurocropsml.acquisition.config import AcquisitionConfig
from eurocropsml.acquisition.s3 import _set_s3_env_variables
from eurocropsml.settings import Settings

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,6 +50,13 @@ def build_dataset(

logger.info(f"Processing year {ct_config.year} for {country}.")

if config.eodata_dir is None:
# local_dir = None
_set_s3_env_variables()
source: Literal["eodata", "s3"] = "s3"
else:
source = "eodata"

collector.acquire_sentinel_tiles(
ct_config,
satellite_output_dir.joinpath("collector"),
Expand All @@ -58,18 +66,22 @@ def build_dataset(
config.workers,
)
logger.info("Finished step 1: Acquiring list of necessary .SAFE files.")

copier.merge_safe_files(
ct_config.satellite,
cast(list[str], ct_config.bands),
satellite_output_dir,
config.workers,
source,
local_dir,
)
if local_dir is not None:
logger.info(
"Finished step 2: Copying .SAFE files to local disk and "
"acquiring list of individual band image paths."
)
source = "eodata"
logger.info("Tiles will now be accessed via local storage. Setting `source` to 'eodata'.")
else:
logger.info("Finished step 2: Acquiring list of individual band image paths.")

Expand All @@ -80,6 +92,7 @@ def build_dataset(
config.workers,
config.chunk_size,
config.multiplier,
source,
local_dir,
config.rebuild,
)
Expand Down
54 changes: 37 additions & 17 deletions eurocropsml/acquisition/clipping/clipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@
import logging
import multiprocessing as mp_orig
import pickle
import sys
from functools import partial
from pathlib import Path
from typing import cast
from typing import Callable, Literal, cast

import geopandas as gpd
import pandas as pd
import pyogrio
from tqdm import tqdm

from eurocropsml.acquisition.clipping.utils import _merge_clipper, mask_polygon_raster
from eurocropsml.acquisition.clipping.utils import (
_merge_clipper,
mask_polygon_raster,
mask_polygon_raster_s3,
)
from eurocropsml.acquisition.config import CollectorConfig

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -101,15 +106,15 @@ def _get_arguments(
clipping_path = output_dir.joinpath("clipper", f"{month}")
clipping_path.mkdir(exist_ok=True, parents=True)

if clipping_path.joinpath("args.pkg").exists():
if clipping_path.joinpath("args.pkl").exists():
logger.info("Loading argument list for parallel raster clipping.")
with open(clipping_path.joinpath("args.pkg"), "rb") as file:
with open(clipping_path.joinpath("args.pkl"), "rb") as file:
args: list[tuple[pd.DataFrame, list]] = pickle.load(file)
shapefile: gpd.GeoDataFrame = pd.read_pickle(clipping_path.joinpath("empty_polygon_df.pkg"))
shapefile: gpd.GeoDataFrame = pd.read_pickle(clipping_path.joinpath("empty_polygon_df.pkl"))
else:
logger.info("No argument list found. Will create it.")
# DataFrame of raster file/parcel matches
full_images_paths: Path = output_dir.joinpath("collector", "full_parcel_list.pkg")
full_images_paths: Path = output_dir.joinpath("collector", "full_parcel_list.pkl")
full_images = pd.read_pickle(full_images_paths)

full_images["completionDate"] = pd.to_datetime(full_images["completionDate"]).dt.date
Expand All @@ -118,11 +123,13 @@ def _get_arguments(
]

if local_dir is not None:
full_images["productIdentifier"] = str(local_dir) + full_images[
"productIdentifier"
].astype(str)
full_images["productIdentifier"] = (
full_images["productIdentifier"]
.astype(str)
.apply(lambda x: str(local_dir.joinpath(x)))
)

band_image_path: Path = output_dir.joinpath("copier", "band_images.pkg")
band_image_path: Path = output_dir.joinpath("copier", "band_images.pkl")
band_images: pd.DataFrame = pd.read_pickle(band_image_path)

# filter out month
Expand All @@ -144,10 +151,12 @@ def _get_arguments(
ti.update(n=1)
ti.close()

with open(clipping_path.joinpath("args.pkg"), "wb") as fp:
with open(clipping_path.joinpath("args.pkl"), "wb") as fp:
pickle.dump(args, fp)
logger.info("Saved argument list.")

if sys.stdout is not None:
sys.stdout.flush()
date_list = list(full_images["completionDate"].unique())
cols = [parcel_id_name, "geometry"] + date_list

Expand All @@ -157,7 +166,7 @@ def _get_arguments(

shapefile = shapefile.reindex(columns=cols)

shapefile.to_pickle(clipping_path.joinpath("empty_polygon_df.pkg"))
shapefile.to_pickle(clipping_path.joinpath("empty_polygon_df.pkl"))

shapefile[parcel_id_name] = shapefile[parcel_id_name].astype(int)

Expand All @@ -177,6 +186,7 @@ def _filter_args(


def _process_raster_parallel(
masking_fct: Callable,
polygon_df: pd.DataFrame,
parcel_id_name: str,
filtered_images: gpd.GeoDataFrame,
Expand All @@ -185,6 +195,7 @@ def _process_raster_parallel(
"""Processing one raster file.

Args:
masking_fct: Function to use for clipping. Either via S3 or local access.
polygon_df: Dataframe containing all parcel ids. Will be merged with the clipped values.
parcel_id_name: The country's parcel ID name (varies from country to country).
filtered_images: Dataframe containing all parcel ids that lie in this raster tile.
Expand All @@ -203,7 +214,7 @@ def _process_raster_parallel(
# geometry information of all parcels
filtered_geom = polygon_df[polygon_df[parcel_id_name].isin(parcel_ids)]

result = mask_polygon_raster(band_tiles, filtered_geom, parcel_id_name, product_date)
result = masking_fct(band_tiles, filtered_geom, parcel_id_name, product_date)

result.set_index(parcel_id_name, inplace=True)
result.index = result.index.astype(int) # make sure index is integer
Expand All @@ -219,6 +230,7 @@ def clipping(
workers: int,
chunk_size: int,
multiplier: int,
source: Literal["eodata", "s3"] = "s3",
local_dir: Path | None = None,
rebuild: bool = False,
) -> None:
Expand All @@ -231,10 +243,14 @@ def clipping(
workers: Maximum number of workers used for multiprocessing.
chunk_size: Chunk size used for multiprocessed raster clipping.
multiplier: Intermediate results will be saved every multiplier steps.
source: Source of the Sentinel tiles. Either directory ('eodata') or S3 bucket ('s3').
If files have been copied to a local directory, this was set to 'eodata'.
local_dir: Local directory where the .SAFE files were copied to.
rebuild: Whether to re-build the clipped parquet files for each month.
This will overwrite the existing ones.
"""

masking_fct = mask_polygon_raster_s3 if source == "s3" else mask_polygon_raster
for month in tqdm(
range(config.months[0], config.months[1] + 1), desc="Clipping rasters on monthly basis"
):
Expand All @@ -258,7 +274,7 @@ def clipping(
clipped_dir.mkdir(exist_ok=True, parents=True)

# Process data in smaller chunks
file_counts = len(list(clipped_dir.rglob("Final_*.pkg")))
file_counts = len(list(clipped_dir.rglob("Final_*.pkl")))

processed = file_counts * multiplier * chunk_size
save_files = multiplier * chunk_size
Expand All @@ -269,6 +285,7 @@ def clipping(
)
func = partial(
_process_raster_parallel,
masking_fct,
polygon_df_month,
cast(str, config.parcel_id_name),
)
Expand All @@ -292,7 +309,10 @@ def clipping(
]
results: list[pd.DataFrame] = []

with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_workers, mp_context=mp_orig.get_context("spawn")
) as executor:

futures = [executor.submit(func, *arg) for arg in chunk_args]

for future in concurrent.futures.as_completed(futures):
Expand All @@ -310,15 +330,15 @@ def clipping(

processed += len(chunk_args)
if processed % save_files == 0:
df_final_month.to_pickle(clipped_dir.joinpath(f"Final_{file_counts}.pkg"))
df_final_month.to_pickle(clipped_dir.joinpath(f"Final_{file_counts}.pkl"))
del df_final_month
df_final_month = polygon_df_month.copy()
file_counts += 1
# Clear variables to release memory
del chunk_args, futures
gc.collect()

df_final_month.to_pickle(clipped_dir.joinpath(f"Final_{file_counts}.pkg"))
df_final_month.to_pickle(clipped_dir.joinpath(f"Final_{file_counts}.pkl"))
te.close()

_merge_dataframe(
Expand Down
73 changes: 73 additions & 0 deletions eurocropsml/acquisition/clipping/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Utilities for clipping polygons from raster tiles."""

import logging
import os
from pathlib import Path
from typing import cast

Expand All @@ -14,10 +15,68 @@
from tqdm import tqdm

logger = logging.getLogger(__name__)
logging.getLogger("botocore.credentials").setLevel(logging.WARNING)

pd.options.mode.chained_assignment = None


def mask_polygon_raster_s3(
tilepaths: list[Path],
polygon_df: pd.DataFrame,
parcel_id_name: str,
product_date: str,
) -> pd.DataFrame:
"""Clipping parcels from raster files (per band) and calculating median pixel value per band.

Args:
tilepaths: Paths to the raster's band tiles.
polygon_df: GeoDataFrame of all parcels to be clipped.
parcel_id_name: The country's parcel ID name (varies from country to country).
product_date: Date on which the raster tile was obtained.

Returns:
Dataframe with clipped values.

Raises:
FileNotFoundError: If the raster file cannot be found.
"""

parcels_dict: dict[int, list[float | None]] = {
parcel_id: [] for parcel_id in polygon_df[parcel_id_name].unique()
}

# removing any self-intersections or inconsistencies in geometries
polygon_df["geometry"] = polygon_df["geometry"].buffer(0)
polygon_df = polygon_df.reset_index(drop=True)

s3_container = os.environ.get("S3_CONTAINER_NAME")
for b, band_path in enumerate(tilepaths):
s3_uri = f"/vsis3/{s3_container}/{band_path}"
with rasterio.open(s3_uri, "r") as raster_tile:
if b == 0 and polygon_df.crs.srs != raster_tile.crs:
# transforming shapefile into CRS of raster tile
polygon_df = polygon_df.to_crs(raster_tile.crs)

# clippping geometry out of raster tile and saving in dictionary
polygon_df.apply(
lambda row: _process_row(row, raster_tile, parcels_dict, parcel_id_name),
axis=1,
)

# if any value is None, set all to None
parcels_dict = {
parcel_id: (
[None] * len(clipped_list)
if any(item is None for item in clipped_list)
else clipped_list
)
for parcel_id, clipped_list in parcels_dict.items()
}
parcels_df = pd.DataFrame(list(parcels_dict.items()), columns=[parcel_id_name, product_date])

return parcels_df


def mask_polygon_raster(
tilepaths: list[Path],
polygon_df: pd.DataFrame,
Expand Down Expand Up @@ -59,6 +118,15 @@ def mask_polygon_raster(
axis=1,
)

# if any value is None, set all to None
parcels_dict = {
parcel_id: (
[None] * len(clipped_list)
if any(item is None for item in clipped_list)
else clipped_list
)
for parcel_id, clipped_list in parcels_dict.items()
}
parcels_df = pd.DataFrame(list(parcels_dict.items()), columns=[parcel_id_name, product_date])

return parcels_df
Expand All @@ -72,6 +140,11 @@ def _process_row(
) -> None:
"""Masking geometry from raster tiles and calculating median pixel value."""
parcel_id: int = row[parcel_id_name]
if any(item is None for item in parcels_dict[parcel_id]):
# skip clipping for this parcel_id if any band already produced None
parcels_dict[parcel_id].append(None)
return

geom = row["geometry"]

try:
Expand Down
Loading