From e2d5603cef0a86974640d703e704bd66b7e6230c Mon Sep 17 00:00:00 2001 From: zkeram Date: Fri, 11 Apr 2025 10:30:56 +0200 Subject: [PATCH 1/8] Make Python functions have seperate files --- polars_bio/__init__.py | 8 +- polars_bio/count_overlaps.py | 192 ++++++++++++ polars_bio/coverage.py | 78 +++++ polars_bio/merge.py | 163 +++++++++++ polars_bio/nearest.py | 77 +++++ polars_bio/overlap.py | 112 +++++++ polars_bio/range_op.py | 546 ----------------------------------- 7 files changed, 628 insertions(+), 548 deletions(-) create mode 100644 polars_bio/count_overlaps.py create mode 100644 polars_bio/coverage.py create mode 100644 polars_bio/merge.py create mode 100644 polars_bio/nearest.py create mode 100644 polars_bio/overlap.py delete mode 100644 polars_bio/range_op.py diff --git a/polars_bio/__init__.py b/polars_bio/__init__.py index 01d66d58..73ee24d0 100644 --- a/polars_bio/__init__.py +++ b/polars_bio/__init__.py @@ -1,4 +1,4 @@ -from polars_bio.polars_bio import InputFormat, ReadOptions, VcfReadOptions +from polars_bio.polars_bio import InputFormat, ReadOptions, VcfReadOptions, FilterOp from .context import ctx, set_option from .io import ( @@ -14,7 +14,11 @@ sql, ) from .polars_ext import PolarsRangesOperations as LazyFrame -from .range_op import FilterOp, count_overlaps, coverage, merge, nearest, overlap +from .count_overlaps import count_overlaps +from .coverage import coverage +from .merge import merge +from .nearest import nearest +from .overlap import overlap from .range_viz import visualize_intervals POLARS_BIO_MAX_THREADS = "datafusion.execution.target_partitions" diff --git a/polars_bio/count_overlaps.py b/polars_bio/count_overlaps.py new file mode 100644 index 00000000..e6f62acc --- /dev/null +++ b/polars_bio/count_overlaps.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +import datafusion +import pandas as pd +import polars as pl +from datafusion import col, literal +from typing_extensions import TYPE_CHECKING, Union + +from .constants import DEFAULT_INTERVAL_COLUMNS +from .context import ctx +from .interval_op_helpers import convert_result, get_py_ctx, read_df_to_datafusion +from .range_op_helpers import _validate_overlap_input, range_operation + +__all__ = ["count_overlaps"] + + +if TYPE_CHECKING: + pass +from polars_bio.polars_bio import FilterOp, RangeOp, RangeOptions + + +def count_overlaps( + df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + overlap_filter: FilterOp = FilterOp.Strict, + suffixes: tuple[str, str] = ("", "_"), + cols1: Union[list[str], None] = ["chrom", "start", "end"], + cols2: Union[list[str], None] = ["chrom", "start", "end"], + on_cols: Union[list[str], None] = None, + output_type: str = "polars.LazyFrame", + streaming: bool = False, + naive_query: bool = True, +) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: + """ + Count pairs of overlapping genomic intervals. + Bioframe inspired API. + + Parameters: + df1: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table (see [register_vcf](api.md#polars_bio.register_vcf)). CSV with a header, BED and Parquet are supported. + df2: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table. CSV with a header, BED and Parquet are supported. + overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. + suffixes: Suffixes for the columns of the two overlapped sets. + cols1: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + cols2: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + on_cols: List of additional column names to join on. default is None. + output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. + naive_query: If True, use naive query for counting overlaps based on overlaps. + streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. + Returns: + **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. + + Example: + ```python + import polars_bio as pb + import pandas as pd + + df1 = pd.DataFrame([ + ['chr1', 1, 5], + ['chr1', 3, 8], + ['chr1', 8, 10], + ['chr1', 12, 14]], + columns=['chrom', 'start', 'end'] + ) + + df2 = pd.DataFrame( + [['chr1', 4, 8], + ['chr1', 10, 11]], + columns=['chrom', 'start', 'end' ] + ) + counts = pb.count_overlaps(df1, df2, output_type="pandas.DataFrame") + + counts + + chrom start end count + 0 chr1 1 5 1 + 1 chr1 3 8 1 + 2 chr1 8 10 0 + 3 chr1 12 14 0 + ``` + + Todo: + Support return_input. + """ + _validate_overlap_input(cols1, cols2, on_cols, suffixes, output_type, how="inner") + my_ctx = get_py_ctx() + on_cols = [] if on_cols is None else on_cols + cols1 = DEFAULT_INTERVAL_COLUMNS if cols1 is None else cols1 + cols2 = DEFAULT_INTERVAL_COLUMNS if cols2 is None else cols2 + if naive_query: + range_options = RangeOptions( + range_op=RangeOp.CountOverlapsNaive, + filter_op=overlap_filter, + suffixes=suffixes, + columns_1=cols1, + columns_2=cols2, + streaming=streaming, + ) + return range_operation(df2, df1, range_options, output_type, ctx) + df1 = read_df_to_datafusion(my_ctx, df1) + df2 = read_df_to_datafusion(my_ctx, df2) + + # TODO: guarantee no collisions + s1start_s2end = "s1starts2end" + s1end_s2start = "s1ends2start" + contig = "contig" + count = "count" + starts = "starts" + ends = "ends" + is_s1 = "is_s1" + suff, _ = suffixes + df1, df2 = df2, df1 + df1 = df1.select( + *( + [ + literal(1).alias(is_s1), + col(cols1[1]).alias(s1start_s2end), + col(cols1[2]).alias(s1end_s2start), + col(cols1[0]).alias(contig), + ] + + on_cols + ) + ) + df2 = df2.select( + *( + [ + literal(0).alias(is_s1), + col(cols2[2]).alias(s1end_s2start), + col(cols2[1]).alias(s1start_s2end), + col(cols2[0]).alias(contig), + ] + + on_cols + ) + ) + + df = df1.union(df2) + + partitioning = [col(contig)] + [col(c) for c in on_cols] + df = df.select( + *( + [ + s1start_s2end, + s1end_s2start, + contig, + is_s1, + datafusion.functions.sum(col(is_s1)) + .over( + datafusion.expr.Window( + partition_by=partitioning, + order_by=[ + col(s1start_s2end).sort(), + col(is_s1).sort( + ascending=(overlap_filter == FilterOp.Strict) + ), + ], + ) + ) + .alias(starts), + datafusion.functions.sum(col(is_s1)) + .over( + datafusion.expr.Window( + partition_by=partitioning, + order_by=[ + col(s1end_s2start).sort(), + col(is_s1).sort( + ascending=(overlap_filter == FilterOp.Weak) + ), + ], + ) + ) + .alias(ends), + ] + + on_cols + ) + ) + df = df.filter(col(is_s1) == 0) + df = df.select( + *( + [ + col(contig).alias(cols1[0] + suff), + col(s1end_s2start).alias(cols1[1] + suff), + col(s1start_s2end).alias(cols1[2] + suff), + ] + + on_cols + + [(col(starts) - col(ends)).alias(count)] + ) + ) + + return convert_result(df, output_type, streaming) + + diff --git a/polars_bio/coverage.py b/polars_bio/coverage.py new file mode 100644 index 00000000..40fd922f --- /dev/null +++ b/polars_bio/coverage.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import datafusion +import pandas as pd +import polars as pl +from typing_extensions import TYPE_CHECKING, Union + +from polars_bio.polars_bio import ReadOptions + +from .constants import DEFAULT_INTERVAL_COLUMNS +from .context import ctx +from .range_op_helpers import _validate_overlap_input, range_operation + +__all__ = ["coverage"] + +if TYPE_CHECKING: + pass +from polars_bio.polars_bio import FilterOp, RangeOp, RangeOptions + + +def coverage( + df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + overlap_filter: FilterOp = FilterOp.Strict, + suffixes: tuple[str, str] = ("_1", "_2"), + on_cols: Union[list[str], None] = None, + cols1: Union[list[str], None] = ["chrom", "start", "end"], + cols2: Union[list[str], None] = ["chrom", "start", "end"], + output_type: str = "polars.LazyFrame", + streaming: bool = False, + read_options: Union[ReadOptions, None] = None, +) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: + """ + Calculate intervals coverage. + Bioframe inspired API. + + Parameters: + df1: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table (see [register_vcf](api.md#polars_bio.register_vcf)). CSV with a header, BED and Parquet are supported. + df2: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table. CSV with a header, BED and Parquet are supported. + overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. + cols1: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + cols2: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + suffixes: Suffixes for the columns of the two overlapped sets. + on_cols: List of additional column names to join on. default is None. + output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. + streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. + read_options: Additional options for reading the input files. + + + Returns: + **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. + + Note: + The default output format, i.e. [LazyFrame](https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html), is recommended for large datasets as it supports output streaming and lazy evaluation. + This enables efficient processing of large datasets without loading the entire output dataset into memory. + + Example: + + Todo: + Support for on_cols. + """ + + _validate_overlap_input(cols1, cols2, on_cols, suffixes, output_type, how="inner") + + cols1 = DEFAULT_INTERVAL_COLUMNS if cols1 is None else cols1 + cols2 = DEFAULT_INTERVAL_COLUMNS if cols2 is None else cols2 + range_options = RangeOptions( + range_op=RangeOp.Coverage, + filter_op=overlap_filter, + suffixes=suffixes, + columns_1=cols1, + columns_2=cols2, + streaming=streaming, + ) + return range_operation(df2, df1, range_options, output_type, ctx, read_options) + diff --git a/polars_bio/merge.py b/polars_bio/merge.py new file mode 100644 index 00000000..624c239d --- /dev/null +++ b/polars_bio/merge.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import datafusion +import pandas as pd +import polars as pl +from datafusion import col, literal +from typing_extensions import TYPE_CHECKING, Union + +from polars_bio.polars_bio import ReadOptions + +from .constants import DEFAULT_INTERVAL_COLUMNS +from .context import ctx +from .interval_op_helpers import convert_result, get_py_ctx, read_df_to_datafusion +from .range_op_helpers import _validate_overlap_input, range_operation + +__all__ = ["merge"] + + +if TYPE_CHECKING: + pass +from polars_bio.polars_bio import FilterOp, RangeOp, RangeOptions + +def merge( + df: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + overlap_filter: FilterOp = FilterOp.Strict, + min_dist: float = 0, + cols: Union[list[str], None] = ["chrom", "start", "end"], + on_cols: Union[list[str], None] = None, + output_type: str = "polars.LazyFrame", + streaming: bool = False, +) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: + """ + Merge overlapping intervals. It is assumed that start < end. + + + Parameters: + df: Can be a path to a file, a polars DataFrame, or a pandas DataFrame. CSV with a header, BED and Parquet are supported. + overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. + cols: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + on_cols: List of additional column names for clustering. default is None. + output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. + streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. + + Returns: + **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. + + Example: + + Todo: + Support for on_cols. + """ + suffixes = ("_1", "_2") + _validate_overlap_input(cols, cols, on_cols, suffixes, output_type, how="inner") + + my_ctx = get_py_ctx() + cols = DEFAULT_INTERVAL_COLUMNS if cols is None else cols + contig = cols[0] + start = cols[1] + end = cols[2] + + on_cols = [] if on_cols is None else on_cols + on_cols = [contig] + on_cols + + df = read_df_to_datafusion(my_ctx, df) + df_schema = df.schema() + start_type = df_schema.field(start).type + end_type = df_schema.field(end).type + # TODO: make sure to avoid conflicting column names + start_end = "start_end" + is_start_end = "is_start_or_end" + current_intervals = "current_intervals" + n_intervals = "n_intervals" + + end_positions = df.select( + *( + [(col(end) + min_dist).alias(start_end), literal(-1).alias(is_start_end)] + + on_cols + ) + ) + start_positions = df.select( + *([col(start).alias(start_end), literal(1).alias(is_start_end)] + on_cols) + ) + all_positions = start_positions.union(end_positions) + start_end_type = all_positions.schema().field(start_end).type + all_positions = all_positions.select( + *([col(start_end).cast(start_end_type), col(is_start_end)] + on_cols) + ) + + sorting = [ + col(start_end).sort(), + col(is_start_end).sort(ascending=(overlap_filter == FilterOp.Strict)), + ] + all_positions = all_positions.sort(*sorting) + + on_cols_expr = [col(c) for c in on_cols] + + win = datafusion.expr.Window( + partition_by=on_cols_expr, + order_by=sorting, + ) + all_positions = all_positions.select( + *( + [ + start_end, + is_start_end, + datafusion.functions.sum(col(is_start_end)) + .over(win) + .alias(current_intervals), + ] + + on_cols + + [ + datafusion.functions.row_number( + partition_by=on_cols_expr, order_by=sorting + ).alias(n_intervals) + ] + ) + ) + all_positions = all_positions.filter( + ((col(current_intervals) == 0) & (col(is_start_end) == -1)) + | ((col(current_intervals) == 1) & (col(is_start_end) == 1)) + ) + all_positions = all_positions.select( + *( + [start_end, is_start_end] + + on_cols + + [ + ( + ( + col(n_intervals) + - datafusion.functions.lag( + col(n_intervals), partition_by=on_cols_expr + ) + + 1 + ) + / 2 + ).alias(n_intervals) + ] + ) + ) + result = all_positions.select( + *( + [ + (col(start_end) - min_dist).alias(end), + is_start_end, + datafusion.functions.lag( + col(start_end), partition_by=on_cols_expr + ).alias(start), + ] + + on_cols + + [n_intervals] + ) + ) + result = result.filter(col(is_start_end) == -1) + result = result.select( + *( + [contig, col(start).cast(start_type), col(end).cast(end_type)] + + on_cols[1:] + + [n_intervals] + ) + ) + + return convert_result(result, output_type, streaming) diff --git a/polars_bio/nearest.py b/polars_bio/nearest.py new file mode 100644 index 00000000..521e4228 --- /dev/null +++ b/polars_bio/nearest.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import datafusion +import pandas as pd +import polars as pl +from typing_extensions import TYPE_CHECKING, Union + +from polars_bio.polars_bio import ReadOptions + +from .constants import DEFAULT_INTERVAL_COLUMNS +from .context import ctx +from .range_op_helpers import _validate_overlap_input, range_operation + +__all__ = ["nearest"] + + +if TYPE_CHECKING: + pass +from polars_bio.polars_bio import FilterOp, RangeOp, RangeOptions + +def nearest( + df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + overlap_filter: FilterOp = FilterOp.Strict, + suffixes: tuple[str, str] = ("_1", "_2"), + on_cols: Union[list[str], None] = None, + cols1: Union[list[str], None] = ["chrom", "start", "end"], + cols2: Union[list[str], None] = ["chrom", "start", "end"], + output_type: str = "polars.LazyFrame", + streaming: bool = False, + read_options: Union[ReadOptions, None] = None, +) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: + """ + Find pairs of closest genomic intervals. + Bioframe inspired API. + + Parameters: + df1: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table (see [register_vcf](api.md#polars_bio.register_vcf)). CSV with a header, BED and Parquet are supported. + df2: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table. CSV with a header, BED and Parquet are supported. + overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. + cols1: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + cols2: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + suffixes: Suffixes for the columns of the two overlapped sets. + on_cols: List of additional column names to join on. default is None. + output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. + streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. + read_options: Additional options for reading the input files. + + + Returns: + **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. + + Note: + The default output format, i.e. [LazyFrame](https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html), is recommended for large datasets as it supports output streaming and lazy evaluation. + This enables efficient processing of large datasets without loading the entire output dataset into memory. + + Example: + + Todo: + Support for on_cols. + """ + + _validate_overlap_input(cols1, cols2, on_cols, suffixes, output_type, how="inner") + + cols1 = DEFAULT_INTERVAL_COLUMNS if cols1 is None else cols1 + cols2 = DEFAULT_INTERVAL_COLUMNS if cols2 is None else cols2 + range_options = RangeOptions( + range_op=RangeOp.Nearest, + filter_op=overlap_filter, + suffixes=suffixes, + columns_1=cols1, + columns_2=cols2, + streaming=streaming, + ) + return range_operation(df1, df2, range_options, output_type, ctx, read_options) diff --git a/polars_bio/overlap.py b/polars_bio/overlap.py new file mode 100644 index 00000000..e73c35e3 --- /dev/null +++ b/polars_bio/overlap.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import datafusion +import pandas as pd +import polars as pl +from typing_extensions import TYPE_CHECKING, Union + +from polars_bio.polars_bio import ReadOptions + +from .constants import DEFAULT_INTERVAL_COLUMNS +from .context import ctx +from .range_op_helpers import _validate_overlap_input, range_operation + +__all__ = ["overlap"] + + +if TYPE_CHECKING: + pass +from polars_bio.polars_bio import FilterOp, RangeOp, RangeOptions + + +def overlap( + df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], + how: str = "inner", + overlap_filter: FilterOp = FilterOp.Strict, + suffixes: tuple[str, str] = ("_1", "_2"), + on_cols: Union[list[str], None] = None, + cols1: Union[list[str], None] = ["chrom", "start", "end"], + cols2: Union[list[str], None] = ["chrom", "start", "end"], + algorithm: str = "Coitrees", + output_type: str = "polars.LazyFrame", + streaming: bool = False, + read_options1: Union[ReadOptions, None] = None, + read_options2: Union[ReadOptions, None] = None, +) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: + """ + Find pairs of overlapping genomic intervals. + Bioframe inspired API. + + Parameters: + df1: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table (see [register_vcf](api.md#polars_bio.register_vcf)). CSV with a header, BED and Parquet are supported. + df2: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table. CSV with a header, BED and Parquet are supported. + how: How to handle the overlaps on the two dataframes. inner: use intersection of the set of intervals from df1 and df2, optional. + overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. + cols1: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + cols2: The names of columns containing the chromosome, start and end of the + genomic intervals, provided separately for each set. + suffixes: Suffixes for the columns of the two overlapped sets. + on_cols: List of additional column names to join on. default is None. + algorithm: The algorithm to use for the overlap operation. + output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. + streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. + read_options1: Additional options for reading the input files. + read_options2: Additional options for reading the input files. + + Returns: + **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. + + Note: + 1. The default output format, i.e. [LazyFrame](https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html), is recommended for large datasets as it supports output streaming and lazy evaluation. + This enables efficient processing of large datasets without loading the entire output dataset into memory. + 2. Streaming is only supported for polars.LazyFrame output. + + Example: + ```python + import polars_bio as pb + import pandas as pd + + df1 = pd.DataFrame([ + ['chr1', 1, 5], + ['chr1', 3, 8], + ['chr1', 8, 10], + ['chr1', 12, 14]], + columns=['chrom', 'start', 'end'] + ) + + df2 = pd.DataFrame( + [['chr1', 4, 8], + ['chr1', 10, 11]], + columns=['chrom', 'start', 'end' ] + ) + overlapping_intervals = pb.overlap(df1, df2, output_type="pandas.DataFrame") + + overlapping_intervals + chrom_1 start_1 end_1 chrom_2 start_2 end_2 + 0 chr1 1 5 chr1 4 8 + 1 chr1 3 8 chr1 4 8 + + ``` + + Todo: + Support for on_cols. + """ + + _validate_overlap_input(cols1, cols2, on_cols, suffixes, output_type, how) + + cols1 = DEFAULT_INTERVAL_COLUMNS if cols1 is None else cols1 + cols2 = DEFAULT_INTERVAL_COLUMNS if cols2 is None else cols2 + range_options = RangeOptions( + range_op=RangeOp.Overlap, + filter_op=overlap_filter, + suffixes=suffixes, + columns_1=cols1, + columns_2=cols2, + overlap_alg=algorithm, + streaming=streaming, + ) + return range_operation( + df1, df2, range_options, output_type, ctx, read_options1, read_options2 + ) diff --git a/polars_bio/range_op.py b/polars_bio/range_op.py deleted file mode 100644 index 33a18b53..00000000 --- a/polars_bio/range_op.py +++ /dev/null @@ -1,546 +0,0 @@ -from __future__ import annotations - -import datafusion -import pandas as pd -import polars as pl -from datafusion import col, literal -from typing_extensions import TYPE_CHECKING, Union - -from polars_bio.polars_bio import ReadOptions - -from .constants import DEFAULT_INTERVAL_COLUMNS -from .context import ctx -from .interval_op_helpers import convert_result, get_py_ctx, read_df_to_datafusion -from .range_op_helpers import _validate_overlap_input, range_operation - -__all__ = ["overlap", "nearest", "count_overlaps", "merge"] - - -if TYPE_CHECKING: - pass -from polars_bio.polars_bio import FilterOp, RangeOp, RangeOptions - - -def overlap( - df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - how: str = "inner", - overlap_filter: FilterOp = FilterOp.Strict, - suffixes: tuple[str, str] = ("_1", "_2"), - on_cols: Union[list[str], None] = None, - cols1: Union[list[str], None] = ["chrom", "start", "end"], - cols2: Union[list[str], None] = ["chrom", "start", "end"], - algorithm: str = "Coitrees", - output_type: str = "polars.LazyFrame", - streaming: bool = False, - read_options1: Union[ReadOptions, None] = None, - read_options2: Union[ReadOptions, None] = None, -) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: - """ - Find pairs of overlapping genomic intervals. - Bioframe inspired API. - - Parameters: - df1: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table (see [register_vcf](api.md#polars_bio.register_vcf)). CSV with a header, BED and Parquet are supported. - df2: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table. CSV with a header, BED and Parquet are supported. - how: How to handle the overlaps on the two dataframes. inner: use intersection of the set of intervals from df1 and df2, optional. - overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. - cols1: The names of columns containing the chromosome, start and end of the - genomic intervals, provided separately for each set. - cols2: The names of columns containing the chromosome, start and end of the - genomic intervals, provided separately for each set. - suffixes: Suffixes for the columns of the two overlapped sets. - on_cols: List of additional column names to join on. default is None. - algorithm: The algorithm to use for the overlap operation. - output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. - streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. - read_options1: Additional options for reading the input files. - read_options2: Additional options for reading the input files. - - Returns: - **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. - - Note: - 1. The default output format, i.e. [LazyFrame](https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html), is recommended for large datasets as it supports output streaming and lazy evaluation. - This enables efficient processing of large datasets without loading the entire output dataset into memory. - 2. Streaming is only supported for polars.LazyFrame output. - - Example: - ```python - import polars_bio as pb - import pandas as pd - - df1 = pd.DataFrame([ - ['chr1', 1, 5], - ['chr1', 3, 8], - ['chr1', 8, 10], - ['chr1', 12, 14]], - columns=['chrom', 'start', 'end'] - ) - - df2 = pd.DataFrame( - [['chr1', 4, 8], - ['chr1', 10, 11]], - columns=['chrom', 'start', 'end' ] - ) - overlapping_intervals = pb.overlap(df1, df2, output_type="pandas.DataFrame") - - overlapping_intervals - chrom_1 start_1 end_1 chrom_2 start_2 end_2 - 0 chr1 1 5 chr1 4 8 - 1 chr1 3 8 chr1 4 8 - - ``` - - Todo: - Support for on_cols. - """ - - _validate_overlap_input(cols1, cols2, on_cols, suffixes, output_type, how) - - cols1 = DEFAULT_INTERVAL_COLUMNS if cols1 is None else cols1 - cols2 = DEFAULT_INTERVAL_COLUMNS if cols2 is None else cols2 - range_options = RangeOptions( - range_op=RangeOp.Overlap, - filter_op=overlap_filter, - suffixes=suffixes, - columns_1=cols1, - columns_2=cols2, - overlap_alg=algorithm, - streaming=streaming, - ) - return range_operation( - df1, df2, range_options, output_type, ctx, read_options1, read_options2 - ) - - -def nearest( - df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - overlap_filter: FilterOp = FilterOp.Strict, - suffixes: tuple[str, str] = ("_1", "_2"), - on_cols: Union[list[str], None] = None, - cols1: Union[list[str], None] = ["chrom", "start", "end"], - cols2: Union[list[str], None] = ["chrom", "start", "end"], - output_type: str = "polars.LazyFrame", - streaming: bool = False, - read_options: Union[ReadOptions, None] = None, -) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: - """ - Find pairs of closest genomic intervals. - Bioframe inspired API. - - Parameters: - df1: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table (see [register_vcf](api.md#polars_bio.register_vcf)). CSV with a header, BED and Parquet are supported. - df2: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table. CSV with a header, BED and Parquet are supported. - overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. - cols1: The names of columns containing the chromosome, start and end of the - genomic intervals, provided separately for each set. - cols2: The names of columns containing the chromosome, start and end of the - genomic intervals, provided separately for each set. - suffixes: Suffixes for the columns of the two overlapped sets. - on_cols: List of additional column names to join on. default is None. - output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. - streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. - read_options: Additional options for reading the input files. - - - Returns: - **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. - - Note: - The default output format, i.e. [LazyFrame](https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html), is recommended for large datasets as it supports output streaming and lazy evaluation. - This enables efficient processing of large datasets without loading the entire output dataset into memory. - - Example: - - Todo: - Support for on_cols. - """ - - _validate_overlap_input(cols1, cols2, on_cols, suffixes, output_type, how="inner") - - cols1 = DEFAULT_INTERVAL_COLUMNS if cols1 is None else cols1 - cols2 = DEFAULT_INTERVAL_COLUMNS if cols2 is None else cols2 - range_options = RangeOptions( - range_op=RangeOp.Nearest, - filter_op=overlap_filter, - suffixes=suffixes, - columns_1=cols1, - columns_2=cols2, - streaming=streaming, - ) - return range_operation(df1, df2, range_options, output_type, ctx, read_options) - - -def coverage( - df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - overlap_filter: FilterOp = FilterOp.Strict, - suffixes: tuple[str, str] = ("_1", "_2"), - on_cols: Union[list[str], None] = None, - cols1: Union[list[str], None] = ["chrom", "start", "end"], - cols2: Union[list[str], None] = ["chrom", "start", "end"], - output_type: str = "polars.LazyFrame", - streaming: bool = False, - read_options: Union[ReadOptions, None] = None, -) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: - """ - Calculate intervals coverage. - Bioframe inspired API. - - Parameters: - df1: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table (see [register_vcf](api.md#polars_bio.register_vcf)). CSV with a header, BED and Parquet are supported. - df2: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table. CSV with a header, BED and Parquet are supported. - overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. - cols1: The names of columns containing the chromosome, start and end of the - genomic intervals, provided separately for each set. - cols2: The names of columns containing the chromosome, start and end of the - genomic intervals, provided separately for each set. - suffixes: Suffixes for the columns of the two overlapped sets. - on_cols: List of additional column names to join on. default is None. - output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. - streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. - read_options: Additional options for reading the input files. - - - Returns: - **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. - - Note: - The default output format, i.e. [LazyFrame](https://docs.pola.rs/api/python/stable/reference/lazyframe/index.html), is recommended for large datasets as it supports output streaming and lazy evaluation. - This enables efficient processing of large datasets without loading the entire output dataset into memory. - - Example: - - Todo: - Support for on_cols. - """ - - _validate_overlap_input(cols1, cols2, on_cols, suffixes, output_type, how="inner") - - cols1 = DEFAULT_INTERVAL_COLUMNS if cols1 is None else cols1 - cols2 = DEFAULT_INTERVAL_COLUMNS if cols2 is None else cols2 - range_options = RangeOptions( - range_op=RangeOp.Coverage, - filter_op=overlap_filter, - suffixes=suffixes, - columns_1=cols1, - columns_2=cols2, - streaming=streaming, - ) - return range_operation(df2, df1, range_options, output_type, ctx, read_options) - - -def count_overlaps( - df1: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - df2: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - overlap_filter: FilterOp = FilterOp.Strict, - suffixes: tuple[str, str] = ("", "_"), - cols1: Union[list[str], None] = ["chrom", "start", "end"], - cols2: Union[list[str], None] = ["chrom", "start", "end"], - on_cols: Union[list[str], None] = None, - output_type: str = "polars.LazyFrame", - streaming: bool = False, - naive_query: bool = True, -) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: - """ - Count pairs of overlapping genomic intervals. - Bioframe inspired API. - - Parameters: - df1: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table (see [register_vcf](api.md#polars_bio.register_vcf)). CSV with a header, BED and Parquet are supported. - df2: Can be a path to a file, a polars DataFrame, or a pandas DataFrame or a registered table. CSV with a header, BED and Parquet are supported. - overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. - suffixes: Suffixes for the columns of the two overlapped sets. - cols1: The names of columns containing the chromosome, start and end of the - genomic intervals, provided separately for each set. - cols2: The names of columns containing the chromosome, start and end of the - genomic intervals, provided separately for each set. - on_cols: List of additional column names to join on. default is None. - output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. - naive_query: If True, use naive query for counting overlaps based on overlaps. - streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. - Returns: - **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. - - Example: - ```python - import polars_bio as pb - import pandas as pd - - df1 = pd.DataFrame([ - ['chr1', 1, 5], - ['chr1', 3, 8], - ['chr1', 8, 10], - ['chr1', 12, 14]], - columns=['chrom', 'start', 'end'] - ) - - df2 = pd.DataFrame( - [['chr1', 4, 8], - ['chr1', 10, 11]], - columns=['chrom', 'start', 'end' ] - ) - counts = pb.count_overlaps(df1, df2, output_type="pandas.DataFrame") - - counts - - chrom start end count - 0 chr1 1 5 1 - 1 chr1 3 8 1 - 2 chr1 8 10 0 - 3 chr1 12 14 0 - ``` - - Todo: - Support return_input. - """ - _validate_overlap_input(cols1, cols2, on_cols, suffixes, output_type, how="inner") - my_ctx = get_py_ctx() - on_cols = [] if on_cols is None else on_cols - cols1 = DEFAULT_INTERVAL_COLUMNS if cols1 is None else cols1 - cols2 = DEFAULT_INTERVAL_COLUMNS if cols2 is None else cols2 - if naive_query: - range_options = RangeOptions( - range_op=RangeOp.CountOverlapsNaive, - filter_op=overlap_filter, - suffixes=suffixes, - columns_1=cols1, - columns_2=cols2, - streaming=streaming, - ) - return range_operation(df2, df1, range_options, output_type, ctx) - df1 = read_df_to_datafusion(my_ctx, df1) - df2 = read_df_to_datafusion(my_ctx, df2) - - # TODO: guarantee no collisions - s1start_s2end = "s1starts2end" - s1end_s2start = "s1ends2start" - contig = "contig" - count = "count" - starts = "starts" - ends = "ends" - is_s1 = "is_s1" - suff, _ = suffixes - df1, df2 = df2, df1 - df1 = df1.select( - *( - [ - literal(1).alias(is_s1), - col(cols1[1]).alias(s1start_s2end), - col(cols1[2]).alias(s1end_s2start), - col(cols1[0]).alias(contig), - ] - + on_cols - ) - ) - df2 = df2.select( - *( - [ - literal(0).alias(is_s1), - col(cols2[2]).alias(s1end_s2start), - col(cols2[1]).alias(s1start_s2end), - col(cols2[0]).alias(contig), - ] - + on_cols - ) - ) - - df = df1.union(df2) - - partitioning = [col(contig)] + [col(c) for c in on_cols] - df = df.select( - *( - [ - s1start_s2end, - s1end_s2start, - contig, - is_s1, - datafusion.functions.sum(col(is_s1)) - .over( - datafusion.expr.Window( - partition_by=partitioning, - order_by=[ - col(s1start_s2end).sort(), - col(is_s1).sort( - ascending=(overlap_filter == FilterOp.Strict) - ), - ], - ) - ) - .alias(starts), - datafusion.functions.sum(col(is_s1)) - .over( - datafusion.expr.Window( - partition_by=partitioning, - order_by=[ - col(s1end_s2start).sort(), - col(is_s1).sort( - ascending=(overlap_filter == FilterOp.Weak) - ), - ], - ) - ) - .alias(ends), - ] - + on_cols - ) - ) - df = df.filter(col(is_s1) == 0) - df = df.select( - *( - [ - col(contig).alias(cols1[0] + suff), - col(s1end_s2start).alias(cols1[1] + suff), - col(s1start_s2end).alias(cols1[2] + suff), - ] - + on_cols - + [(col(starts) - col(ends)).alias(count)] - ) - ) - - return convert_result(df, output_type, streaming) - - -def merge( - df: Union[str, pl.DataFrame, pl.LazyFrame, pd.DataFrame], - overlap_filter: FilterOp = FilterOp.Strict, - min_dist: float = 0, - cols: Union[list[str], None] = ["chrom", "start", "end"], - on_cols: Union[list[str], None] = None, - output_type: str = "polars.LazyFrame", - streaming: bool = False, -) -> Union[pl.LazyFrame, pl.DataFrame, pd.DataFrame, datafusion.DataFrame]: - """ - Merge overlapping intervals. It is assumed that start < end. - - - Parameters: - df: Can be a path to a file, a polars DataFrame, or a pandas DataFrame. CSV with a header, BED and Parquet are supported. - overlap_filter: FilterOp, optional. The type of overlap to consider(Weak or Strict). Strict for **0-based**, Weak for **1-based** coordinate systems. - cols: The names of columns containing the chromosome, start and end of the - genomic intervals, provided separately for each set. - on_cols: List of additional column names for clustering. default is None. - output_type: Type of the output. default is "polars.LazyFrame", "polars.DataFrame", or "pandas.DataFrame" or "datafusion.DataFrame" are also supported. - streaming: **EXPERIMENTAL** If True, use Polars [streaming](features.md#streaming) engine. - - Returns: - **polars.LazyFrame** or polars.DataFrame or pandas.DataFrame of the overlapping intervals. - - Example: - - Todo: - Support for on_cols. - """ - suffixes = ("_1", "_2") - _validate_overlap_input(cols, cols, on_cols, suffixes, output_type, how="inner") - - my_ctx = get_py_ctx() - cols = DEFAULT_INTERVAL_COLUMNS if cols is None else cols - contig = cols[0] - start = cols[1] - end = cols[2] - - on_cols = [] if on_cols is None else on_cols - on_cols = [contig] + on_cols - - df = read_df_to_datafusion(my_ctx, df) - df_schema = df.schema() - start_type = df_schema.field(start).type - end_type = df_schema.field(end).type - # TODO: make sure to avoid conflicting column names - start_end = "start_end" - is_start_end = "is_start_or_end" - current_intervals = "current_intervals" - n_intervals = "n_intervals" - - end_positions = df.select( - *( - [(col(end) + min_dist).alias(start_end), literal(-1).alias(is_start_end)] - + on_cols - ) - ) - start_positions = df.select( - *([col(start).alias(start_end), literal(1).alias(is_start_end)] + on_cols) - ) - all_positions = start_positions.union(end_positions) - start_end_type = all_positions.schema().field(start_end).type - all_positions = all_positions.select( - *([col(start_end).cast(start_end_type), col(is_start_end)] + on_cols) - ) - - sorting = [ - col(start_end).sort(), - col(is_start_end).sort(ascending=(overlap_filter == FilterOp.Strict)), - ] - all_positions = all_positions.sort(*sorting) - - on_cols_expr = [col(c) for c in on_cols] - - win = datafusion.expr.Window( - partition_by=on_cols_expr, - order_by=sorting, - ) - all_positions = all_positions.select( - *( - [ - start_end, - is_start_end, - datafusion.functions.sum(col(is_start_end)) - .over(win) - .alias(current_intervals), - ] - + on_cols - + [ - datafusion.functions.row_number( - partition_by=on_cols_expr, order_by=sorting - ).alias(n_intervals) - ] - ) - ) - all_positions = all_positions.filter( - ((col(current_intervals) == 0) & (col(is_start_end) == -1)) - | ((col(current_intervals) == 1) & (col(is_start_end) == 1)) - ) - all_positions = all_positions.select( - *( - [start_end, is_start_end] - + on_cols - + [ - ( - ( - col(n_intervals) - - datafusion.functions.lag( - col(n_intervals), partition_by=on_cols_expr - ) - + 1 - ) - / 2 - ).alias(n_intervals) - ] - ) - ) - result = all_positions.select( - *( - [ - (col(start_end) - min_dist).alias(end), - is_start_end, - datafusion.functions.lag( - col(start_end), partition_by=on_cols_expr - ).alias(start), - ] - + on_cols - + [n_intervals] - ) - ) - result = result.filter(col(is_start_end) == -1) - result = result.select( - *( - [contig, col(start).cast(start_type), col(end).cast(end_type)] - + on_cols[1:] - + [n_intervals] - ) - ) - - return convert_result(result, output_type, streaming) From 838efe39973c35e2e5faf6acdfa06b166ad9a688 Mon Sep 17 00:00:00 2001 From: zkeram Date: Fri, 11 Apr 2025 10:31:32 +0200 Subject: [PATCH 2/8] Make nearest in Rust have a seperate file --- src/nearest.rs | 109 +++++++++++++++++++++++++++++++++++++++ src/operation.rs | 38 +++++++++----- src/operation_helpers.rs | 42 +++++++++++++++ 3 files changed, 175 insertions(+), 14 deletions(-) create mode 100644 src/nearest.rs create mode 100644 src/operation_helpers.rs diff --git a/src/nearest.rs b/src/nearest.rs new file mode 100644 index 00000000..c3e43fa5 --- /dev/null +++ b/src/nearest.rs @@ -0,0 +1,109 @@ +use exon::ExonSession; +use log::info; +use crate::option::FilterOp; + + + +pub(crate) async fn do_nearest( + ctx: &ExonSession, + left_table: String, + right_table: String, + overlap_filter: FilterOp, + suffixes: (String, String), + columns_1: Vec, + columns_2: Vec, +) -> datafusion::dataframe::DataFrame { + let sign = match range_opts.filter_op.unwrap() { + FilterOp::Weak => "=".to_string(), + _ => "".to_string(), + }; + let left_table_columns = + get_non_join_columns(left_table.to_string(), columns_1.clone(), ctx).await; + let right_table_columns = + get_non_join_columns(right_table.to_string(), columns_2.clone(), ctx).await; + let query = format!( + r#" + SELECT + a.{} AS {}{}, -- contig + a.{} AS {}{}, -- pos_start + a.{} AS {}{}, -- pos_end + b.{} AS {}{}, -- contig + b.{} AS {}{}, -- pos_start + b.{} AS {}{} -- pos_end + {} + {}, + CAST( + CASE WHEN b.{} >= a.{} + THEN + abs(b.{}-a.{}) + WHEN b.{} <= a.{} + THEN + abs(b.{}-a.{}) + ELSE 0 + END AS BIGINT) AS distance + + FROM {} AS b, {} AS a + WHERE b.{} = a.{} + AND cast(b.{} AS INT) >{} cast(a.{} AS INT ) + AND cast(b.{} AS INT) <{} cast(a.{} AS INT) + "#, + columns_1[0], + columns_1[0], + suffixes.0, // contig + columns_1[1], + columns_1[1], + suffixes.0, // pos_start + columns_1[2], + columns_1[2], + suffixes.0, // pos_end + columns_2[0], + columns_2[0], + suffixes.1, // contig + columns_2[1], + columns_2[1], + suffixes.1, // pos_start + columns_2[2], + columns_2[2], + suffixes.1, // pos_end + if !other_columns_1.is_empty() { + ",".to_string() + + &format_non_join_tables( + other_columns_1.clone(), + "a".to_string(), + suffixes.0.clone(), + ) + } else { + "".to_string() + }, + if !other_columns_2.is_empty() { + ",".to_string() + + &format_non_join_tables( + other_columns_2.clone(), + "b".to_string(), + suffixes.1.clone(), + ) + } else { + "".to_string() + }, + columns_2[1], + columns_1[2], // b.pos_start >= a.pos_end + columns_2[1], + columns_1[2], // b.pos_start-a.pos_end + columns_2[2], + columns_1[1], // b.pos_end <= a.pos_start + columns_2[2], + columns_1[1], // a.pos_start-b.pos_end + right_table, + left_table, + columns_1[0], + columns_2[0], // contig + columns_1[2], + sign, + columns_2[1], // pos_start + columns_1[1], + sign, + columns_2[2], // pos_end + ); + debug!("Query: {}", query); + ctx.sql(&query).await.unwrap() +} diff --git a/src/operation.rs b/src/operation.rs index 13ff545f..d2978cf9 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -12,6 +12,7 @@ use crate::query::{count_overlaps_query, nearest_query, overlap_query}; use crate::udtf::CountOverlapsProvider; use crate::utils::default_cols_to_string; use crate::DEFAULT_COLUMN_NAMES; +use crate::nearest::do_nearest; pub(crate) struct QueryParams { pub sign: String, @@ -72,7 +73,29 @@ pub(crate) fn do_range_operation( RangeOp::Overlap => rt.block_on(do_overlap(ctx, range_options, left_table, right_table)), RangeOp::Nearest => { set_option_internal(ctx, "sequila.interval_join_algorithm", "coitreesnearest"); - rt.block_on(do_nearest(ctx, range_options, left_table, right_table)) + + // TODO: require all of these fields to be set (not optional) + let overlap_filter = range_options.filter_op.unwrap(); + let suffixes = match range_opts.suffixes { + Some((s1, s2)) => (s1, s2), + _ => ("_1".to_string(), "_2".to_string()), + }; + let columns_1 = match range_opts.columns_1 { + Some(cols) => cols, + _ => default_cols_to_string(&DEFAULT_COLUMN_NAMES), + }; + let columns_2 = match range_opts.columns_2 { + Some(cols) => cols, + _ => default_cols_to_string(&DEFAULT_COLUMN_NAMES), + }; + rt.block_on(do_nearest(ctx, + left_table, + right_table, + overlap_filter, + suffixes, + columns_1, + columns_2, + )) }, RangeOp::CountOverlaps => rt.block_on(do_count_overlaps( ctx, @@ -99,19 +122,6 @@ pub(crate) fn do_range_operation( } } -async fn do_nearest( - ctx: &ExonSession, - range_opts: RangeOptions, - left_table: String, - right_table: String, -) -> datafusion::dataframe::DataFrame { - let query = prepare_query(nearest_query, range_opts, ctx, left_table, right_table) - .await - .to_string(); - debug!("Query: {}", query); - ctx.sql(&query).await.unwrap() -} - async fn do_overlap( ctx: &ExonSession, range_opts: RangeOptions, diff --git a/src/operation_helpers.rs b/src/operation_helpers.rs new file mode 100644 index 00000000..c3d9892d --- /dev/null +++ b/src/operation_helpers.rs @@ -0,0 +1,42 @@ +pub(crate) fn operation_preconfig( + ctx: &ExonSession, + overlap_alg: Option, + streaming: bool, +) { + match &overlap_alg { + Some(alg) if alg == "coitreesnearest" => { + panic!("CoitreesNearest is an internal algorithm for nearest operation. Can't be set explicitly."); + }, + Some(alg) => { + set_option_internal(ctx, "sequila.interval_join_algorithm", alg); + }, + _ => { + set_option_internal( + ctx, + "sequila.interval_join_algorithm", + &Algorithm::Coitrees.to_string(), + ); + }, + } + if streaming { + info!("Running in streaming mode..."); + } + info!( + "Running with algorithm {} and {} thread(s)...", + range_options.range_op, + ctx.session + .state() + .config() + .options() + .extensions + .get::() + .unwrap() + .interval_join_algorithm, + ctx.session + .state() + .config() + .options() + .execution + .target_partitions + ); +} From b7fd3bfce7d542ae38ce3cce80c5fa857ef7d819 Mon Sep 17 00:00:00 2001 From: zkeram Date: Fri, 11 Apr 2025 10:51:52 +0200 Subject: [PATCH 3/8] remove unnecessary None potential values --- src/lib.rs | 1 + src/nearest.rs | 2 +- src/operation.rs | 27 ++++----------- src/option.rs | 22 ++++++------ src/query.rs | 87 ------------------------------------------------ 5 files changed, 19 insertions(+), 120 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c8890a8d..6c90a1bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ mod scan; mod streaming; mod udtf; mod utils; +mod nearest; use std::string::ToString; use std::sync::{Arc, Mutex}; diff --git a/src/nearest.rs b/src/nearest.rs index c3e43fa5..3e008d55 100644 --- a/src/nearest.rs +++ b/src/nearest.rs @@ -1,7 +1,7 @@ use exon::ExonSession; use log::info; use crate::option::FilterOp; - +use crate::operation::format_non_join_tables; pub(crate) async fn do_nearest( diff --git a/src/operation.rs b/src/operation.rs index d2978cf9..21563dcd 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -8,7 +8,7 @@ use tokio::runtime::Runtime; use crate::context::set_option_internal; use crate::option::{FilterOp, RangeOp, RangeOptions}; -use crate::query::{count_overlaps_query, nearest_query, overlap_query}; +use crate::query::{count_overlaps_query, overlap_query}; use crate::udtf::CountOverlapsProvider; use crate::utils::default_cols_to_string; use crate::DEFAULT_COLUMN_NAMES; @@ -47,7 +47,7 @@ pub(crate) fn do_range_operation( ); }, } - let streaming = range_options.streaming.unwrap_or(false); + let streaming = range_options.streaming; if streaming { info!("Running in streaming mode..."); } @@ -73,28 +73,13 @@ pub(crate) fn do_range_operation( RangeOp::Overlap => rt.block_on(do_overlap(ctx, range_options, left_table, right_table)), RangeOp::Nearest => { set_option_internal(ctx, "sequila.interval_join_algorithm", "coitreesnearest"); - - // TODO: require all of these fields to be set (not optional) - let overlap_filter = range_options.filter_op.unwrap(); - let suffixes = match range_opts.suffixes { - Some((s1, s2)) => (s1, s2), - _ => ("_1".to_string(), "_2".to_string()), - }; - let columns_1 = match range_opts.columns_1 { - Some(cols) => cols, - _ => default_cols_to_string(&DEFAULT_COLUMN_NAMES), - }; - let columns_2 = match range_opts.columns_2 { - Some(cols) => cols, - _ => default_cols_to_string(&DEFAULT_COLUMN_NAMES), - }; rt.block_on(do_nearest(ctx, left_table, right_table, - overlap_filter, - suffixes, - columns_1, - columns_2, + range_options.filter_op, + range_options.suffixes, + range_options.columns_1, + range_options.columns_2, )) }, RangeOp::CountOverlaps => rt.block_on(do_count_overlaps( diff --git a/src/option.rs b/src/option.rs index 5c014e56..bbdf11a0 100644 --- a/src/option.rs +++ b/src/option.rs @@ -8,35 +8,35 @@ pub struct RangeOptions { #[pyo3(get, set)] pub range_op: RangeOp, #[pyo3(get, set)] - pub filter_op: Option, + pub filter_op: FilterOp, #[pyo3(get, set)] - pub suffixes: Option<(String, String)>, + pub suffixes: (String, String), #[pyo3(get, set)] - pub columns_1: Option>, + pub columns_1: Vec, #[pyo3(get, set)] - pub columns_2: Option>, + pub columns_2: Vec, #[pyo3(get, set)] on_cols: Option>, #[pyo3(get, set)] pub overlap_alg: Option, #[pyo3(get, set)] - pub streaming: Option, + pub streaming: bool, } #[pymethods] impl RangeOptions { #[allow(clippy::too_many_arguments)] #[new] - #[pyo3(signature = (range_op, filter_op=None, suffixes=None, columns_1=None, columns_2=None, on_cols=None, overlap_alg=None, streaming=None))] + #[pyo3(signature = (range_op, filter_op, suffixes, columns_1, columns_2, streaming, on_cols=None, overlap_alg=None))] pub fn new( range_op: RangeOp, - filter_op: Option, - suffixes: Option<(String, String)>, - columns_1: Option>, - columns_2: Option>, + filter_op: FilterOp, + suffixes: (String, String), + columns_1: Vec, + columns_2: Vec, + streaming: bool, on_cols: Option>, overlap_alg: Option, - streaming: Option, ) -> Self { RangeOptions { range_op, diff --git a/src/query.rs b/src/query.rs index bd0da050..7ccff3a0 100644 --- a/src/query.rs +++ b/src/query.rs @@ -1,92 +1,5 @@ use crate::operation::{format_non_join_tables, QueryParams}; -pub(crate) fn nearest_query(query_params: QueryParams) -> String { - let query = format!( - r#" - SELECT - a.{} AS {}{}, -- contig - a.{} AS {}{}, -- pos_start - a.{} AS {}{}, -- pos_end - b.{} AS {}{}, -- contig - b.{} AS {}{}, -- pos_start - b.{} AS {}{} -- pos_end - {} - {}, - CAST( - CASE WHEN b.{} >= a.{} - THEN - abs(b.{}-a.{}) - WHEN b.{} <= a.{} - THEN - abs(b.{}-a.{}) - ELSE 0 - END AS BIGINT) AS distance - - FROM {} AS b, {} AS a - WHERE b.{} = a.{} - AND cast(b.{} AS INT) >{} cast(a.{} AS INT ) - AND cast(b.{} AS INT) <{} cast(a.{} AS INT) - "#, - query_params.columns_1[0], - query_params.columns_1[0], - query_params.suffixes.0, // contig - query_params.columns_1[1], - query_params.columns_1[1], - query_params.suffixes.0, // pos_start - query_params.columns_1[2], - query_params.columns_1[2], - query_params.suffixes.0, // pos_end - query_params.columns_2[0], - query_params.columns_2[0], - query_params.suffixes.1, // contig - query_params.columns_2[1], - query_params.columns_2[1], - query_params.suffixes.1, // pos_start - query_params.columns_2[2], - query_params.columns_2[2], - query_params.suffixes.1, // pos_end - if !query_params.other_columns_1.is_empty() { - ",".to_string() - + &format_non_join_tables( - query_params.other_columns_1.clone(), - "a".to_string(), - query_params.suffixes.0.clone(), - ) - } else { - "".to_string() - }, - if !query_params.other_columns_2.is_empty() { - ",".to_string() - + &format_non_join_tables( - query_params.other_columns_2.clone(), - "b".to_string(), - query_params.suffixes.1.clone(), - ) - } else { - "".to_string() - }, - query_params.columns_2[1], - query_params.columns_1[2], // b.pos_start >= a.pos_end - query_params.columns_2[1], - query_params.columns_1[2], // b.pos_start-a.pos_end - query_params.columns_2[2], - query_params.columns_1[1], // b.pos_end <= a.pos_start - query_params.columns_2[2], - query_params.columns_1[1], // a.pos_start-b.pos_end - query_params.right_table, - query_params.left_table, - query_params.columns_1[0], - query_params.columns_2[0], // contig - query_params.columns_1[2], - query_params.sign, - query_params.columns_2[1], // pos_start - query_params.columns_1[1], - query_params.sign, - query_params.columns_2[2], // pos_end - ); - query -} - pub(crate) fn overlap_query(query_params: QueryParams) -> String { let query = format!( r#" From a080f7677ae99191c2fc2c52315bad1fe1056632 Mon Sep 17 00:00:00 2001 From: zkeram Date: Fri, 11 Apr 2025 11:14:07 +0200 Subject: [PATCH 4/8] fix imports and unnecessary unwraps --- src/nearest.rs | 10 +++++----- src/operation.rs | 25 ++++++++----------------- src/operation_helpers.rs | 1 - 3 files changed, 13 insertions(+), 23 deletions(-) diff --git a/src/nearest.rs b/src/nearest.rs index 3e008d55..50221543 100644 --- a/src/nearest.rs +++ b/src/nearest.rs @@ -1,7 +1,7 @@ use exon::ExonSession; -use log::info; +use log::debug; use crate::option::FilterOp; -use crate::operation::format_non_join_tables; +use crate::operation::{format_non_join_tables, get_non_join_columns}; pub(crate) async fn do_nearest( @@ -13,13 +13,13 @@ pub(crate) async fn do_nearest( columns_1: Vec, columns_2: Vec, ) -> datafusion::dataframe::DataFrame { - let sign = match range_opts.filter_op.unwrap() { + let sign = match overlap_filter { FilterOp::Weak => "=".to_string(), _ => "".to_string(), }; - let left_table_columns = + let other_columns_1 = get_non_join_columns(left_table.to_string(), columns_1.clone(), ctx).await; - let right_table_columns = + let other_columns_2 = get_non_join_columns(right_table.to_string(), columns_2.clone(), ctx).await; let query = format!( r#" diff --git a/src/operation.rs b/src/operation.rs index 21563dcd..3727728b 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -155,8 +155,8 @@ async fn do_count_overlaps_coverage_naive( right_table: String, coverage: bool, ) -> datafusion::dataframe::DataFrame { - let columns_1 = range_opts.columns_1.unwrap(); - let columns_2 = range_opts.columns_2.unwrap(); + let columns_1 = range_opts.columns_1; + let columns_2 = range_opts.columns_2; let session = &ctx.session; let right_table_ref = TableReference::from(right_table.clone()); let right_schema = session @@ -173,7 +173,7 @@ async fn do_count_overlaps_coverage_naive( right_schema, columns_1, columns_2, - range_opts.filter_op.unwrap(), + range_opts.filter_op, coverage, ); let table_name = "count_overlaps_coverage".to_string(); @@ -186,7 +186,7 @@ async fn do_count_overlaps_coverage_naive( ctx.sql(&query).await.unwrap() } -async fn get_non_join_columns( +pub(crate) async fn get_non_join_columns( table_name: String, join_columns: Vec, ctx: &ExonSession, @@ -224,22 +224,13 @@ pub(crate) async fn prepare_query( left_table: String, right_table: String, ) -> String { - let sign = match range_opts.filter_op.unwrap() { + let sign = match range_opts.filter_op { FilterOp::Weak => "=".to_string(), _ => "".to_string(), }; - let suffixes = match range_opts.suffixes { - Some((s1, s2)) => (s1, s2), - _ => ("_1".to_string(), "_2".to_string()), - }; - let columns_1 = match range_opts.columns_1 { - Some(cols) => cols, - _ => default_cols_to_string(&DEFAULT_COLUMN_NAMES), - }; - let columns_2 = match range_opts.columns_2 { - Some(cols) => cols, - _ => default_cols_to_string(&DEFAULT_COLUMN_NAMES), - }; + let suffixes = range_opts.suffixes; + let columns_1 = range_opts.columns_1; + let columns_2 = range_opts.columns_2; let left_table_columns = get_non_join_columns(left_table.to_string(), columns_1.clone(), ctx).await; diff --git a/src/operation_helpers.rs b/src/operation_helpers.rs index c3d9892d..e54b3ebe 100644 --- a/src/operation_helpers.rs +++ b/src/operation_helpers.rs @@ -23,7 +23,6 @@ pub(crate) fn operation_preconfig( } info!( "Running with algorithm {} and {} thread(s)...", - range_options.range_op, ctx.session .state() .config() From 928aca24bbdafb2a69bbf301b19413c29b737ab5 Mon Sep 17 00:00:00 2001 From: zkeram Date: Fri, 11 Apr 2025 11:25:44 +0200 Subject: [PATCH 5/8] make Rust overlap operation into separate file --- src/lib.rs | 1 + src/operation.rs | 52 ++++++++++---------------- src/option.rs | 2 +- src/overlap.rs | 97 ++++++++++++++++++++++++++++++++++++++++++++++++ src/query.rs | 72 ----------------------------------- 5 files changed, 118 insertions(+), 106 deletions(-) create mode 100644 src/overlap.rs diff --git a/src/lib.rs b/src/lib.rs index 6c90a1bb..27b6c5eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ mod streaming; mod udtf; mod utils; mod nearest; +mod overlap; use std::string::ToString; use std::sync::{Arc, Mutex}; diff --git a/src/operation.rs b/src/operation.rs index 3727728b..f54e0276 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -8,11 +8,10 @@ use tokio::runtime::Runtime; use crate::context::set_option_internal; use crate::option::{FilterOp, RangeOp, RangeOptions}; -use crate::query::{count_overlaps_query, overlap_query}; +use crate::query::count_overlaps_query; use crate::udtf::CountOverlapsProvider; -use crate::utils::default_cols_to_string; -use crate::DEFAULT_COLUMN_NAMES; use crate::nearest::do_nearest; +use crate::overlap::do_overlap; pub(crate) struct QueryParams { pub sign: String, @@ -70,16 +69,25 @@ pub(crate) fn do_range_operation( .target_partitions ); match range_options.range_op { - RangeOp::Overlap => rt.block_on(do_overlap(ctx, range_options, left_table, right_table)), + RangeOp::Overlap => rt.block_on(do_overlap( + ctx, + left_table, + right_table, + range_options.filter_op, + range_options.suffixes, + range_options.columns_1, + range_options.columns_2, + )), RangeOp::Nearest => { set_option_internal(ctx, "sequila.interval_join_algorithm", "coitreesnearest"); - rt.block_on(do_nearest(ctx, - left_table, - right_table, - range_options.filter_op, - range_options.suffixes, - range_options.columns_1, - range_options.columns_2, + rt.block_on(do_nearest( + ctx, + left_table, + right_table, + range_options.filter_op, + range_options.suffixes, + range_options.columns_1, + range_options.columns_2, )) }, RangeOp::CountOverlaps => rt.block_on(do_count_overlaps( @@ -107,28 +115,6 @@ pub(crate) fn do_range_operation( } } -async fn do_overlap( - ctx: &ExonSession, - range_opts: RangeOptions, - left_table: String, - right_table: String, -) -> datafusion::dataframe::DataFrame { - let query = prepare_query(overlap_query, range_opts, ctx, left_table, right_table) - .await - .to_string(); - debug!("Query: {}", query); - debug!( - "{}", - ctx.session - .state() - .config() - .options() - .execution - .target_partitions - ); - ctx.sql(&query).await.unwrap() -} - async fn do_count_overlaps( ctx: &ExonSession, range_opts: RangeOptions, diff --git a/src/option.rs b/src/option.rs index bbdf11a0..2a2bfb01 100644 --- a/src/option.rs +++ b/src/option.rs @@ -52,7 +52,7 @@ impl RangeOptions { } impl std::fmt::Display for RangeOptions { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "streaming {}", self.streaming.unwrap_or(false)) + write!(f, "streaming {}", self.streaming) } } diff --git a/src/overlap.rs b/src/overlap.rs new file mode 100644 index 00000000..56f3d71a --- /dev/null +++ b/src/overlap.rs @@ -0,0 +1,97 @@ +use exon::ExonSession; +use log::debug; +use crate::option::FilterOp; +use crate::operation::{format_non_join_tables, get_non_join_columns}; + + +pub(crate) async fn do_overlap( + ctx: &ExonSession, + left_table: String, + right_table: String, + overlap_filter: FilterOp, + suffixes: (String, String), + columns_1: Vec, + columns_2: Vec, +) -> datafusion::dataframe::DataFrame { + let sign = match overlap_filter { + FilterOp::Weak => "=".to_string(), + _ => "".to_string(), + }; + let other_columns_1 = + get_non_join_columns(left_table.to_string(), columns_1.clone(), ctx).await; + let other_columns_2 = + get_non_join_columns(right_table.to_string(), columns_2.clone(), ctx).await; + + let query = format!( + r#" + SELECT + b.{} as {}{}, -- contig + b.{} as {}{}, -- pos_start + b.{} as {}{}, -- pos_end + a.{} as {}{}, -- contig + a.{} as {}{}, -- pos_start + a.{} as {}{} -- pos_end + {} + {} + FROM + {} AS a, {} AS b + WHERE + a.{}=b.{} + AND + cast(a.{} AS INT) >{} cast(b.{} AS INT) + AND + cast(a.{} AS INT) <{} cast(b.{} AS INT) + "#, + columns_2[0], + columns_2[0], + suffixes.0, // contig + columns_2[1], + columns_2[1], + suffixes.0, // pos_start + columns_2[2], + columns_2[2], + suffixes.0, // pos_end + columns_1[0], + columns_1[0], + suffixes.1, // contig + columns_1[1], + columns_1[1], + suffixes.1, // pos_start + columns_1[2], + columns_1[2], + suffixes.1, // pos_end + if !other_columns_2.is_empty() { + ",".to_string() + + &format_non_join_tables( + other_columns_2.clone(), + "a".to_string(), + suffixes.0.clone(), + ) + } else { + "".to_string() + }, + if !other_columns_1.is_empty() { + ",".to_string() + + &format_non_join_tables( + other_columns_1.clone(), + "b".to_string(), + suffixes.1.clone(), + ) + } else { + "".to_string() + }, + right_table, + left_table, + columns_1[0], + columns_2[0], // contig + columns_1[2], + sign, + columns_2[1], // pos_start + columns_1[1], + sign, + columns_2[2], // pos_end + ); + + debug!("Query: {}", query); + ctx.sql(&query).await.unwrap() +} diff --git a/src/query.rs b/src/query.rs index 7ccff3a0..282dbf78 100644 --- a/src/query.rs +++ b/src/query.rs @@ -1,77 +1,5 @@ use crate::operation::{format_non_join_tables, QueryParams}; -pub(crate) fn overlap_query(query_params: QueryParams) -> String { - let query = format!( - r#" - SELECT - b.{} as {}{}, -- contig - b.{} as {}{}, -- pos_start - b.{} as {}{}, -- pos_end - a.{} as {}{}, -- contig - a.{} as {}{}, -- pos_start - a.{} as {}{} -- pos_end - {} - {} - FROM - {} AS a, {} AS b - WHERE - a.{}=b.{} - AND - cast(a.{} AS INT) >{} cast(b.{} AS INT) - AND - cast(a.{} AS INT) <{} cast(b.{} AS INT) - "#, - query_params.columns_2[0], - query_params.columns_2[0], - query_params.suffixes.0, // contig - query_params.columns_2[1], - query_params.columns_2[1], - query_params.suffixes.0, // pos_start - query_params.columns_2[2], - query_params.columns_2[2], - query_params.suffixes.0, // pos_end - query_params.columns_1[0], - query_params.columns_1[0], - query_params.suffixes.1, // contig - query_params.columns_1[1], - query_params.columns_1[1], - query_params.suffixes.1, // pos_start - query_params.columns_1[2], - query_params.columns_1[2], - query_params.suffixes.1, // pos_end - if !query_params.other_columns_2.is_empty() { - ",".to_string() - + &format_non_join_tables( - query_params.other_columns_2.clone(), - "a".to_string(), - query_params.suffixes.0.clone(), - ) - } else { - "".to_string() - }, - if !query_params.other_columns_1.is_empty() { - ",".to_string() - + &format_non_join_tables( - query_params.other_columns_1.clone(), - "b".to_string(), - query_params.suffixes.1.clone(), - ) - } else { - "".to_string() - }, - query_params.right_table, - query_params.left_table, - query_params.columns_1[0], - query_params.columns_2[0], // contig - query_params.columns_1[2], - query_params.sign, - query_params.columns_2[1], // pos_start - query_params.columns_1[1], - query_params.sign, - query_params.columns_2[2], // pos_end - ); - query -} pub(crate) fn count_overlaps_query(query_params: QueryParams) -> String { let query = format!( From eeee3377047e0fe4933186c9c4ae81d4d337b227 Mon Sep 17 00:00:00 2001 From: zkeram Date: Fri, 11 Apr 2025 15:19:59 +0200 Subject: [PATCH 6/8] remove unused functions and data --- src/lib.rs | 1 - src/operation.rs | 2 -- src/query.rs | 2 +- src/utils.rs | 4 ---- 4 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 27b6c5eb..801d0110 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,6 @@ use crate::utils::convert_arrow_rb_schema_to_polars_df_schema; const LEFT_TABLE: &str = "s1"; const RIGHT_TABLE: &str = "s2"; -const DEFAULT_COLUMN_NAMES: [&str; 3] = ["contig", "start", "end"]; #[pyfunction] #[pyo3(signature = (py_ctx, df1, df2, range_options, limit=None))] diff --git a/src/operation.rs b/src/operation.rs index f54e0276..0cca44f7 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -18,8 +18,6 @@ pub(crate) struct QueryParams { pub suffixes: (String, String), pub columns_1: Vec, pub columns_2: Vec, - pub other_columns_1: Vec, - pub other_columns_2: Vec, pub left_table: String, pub right_table: String, } diff --git a/src/query.rs b/src/query.rs index 282dbf78..25483ba6 100644 --- a/src/query.rs +++ b/src/query.rs @@ -1,4 +1,4 @@ -use crate::operation::{format_non_join_tables, QueryParams}; +use crate::operation::QueryParams; pub(crate) fn count_overlaps_query(query_params: QueryParams) -> String { diff --git a/src/utils.rs b/src/utils.rs index 7ec38ad8..b8a32021 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -5,10 +5,6 @@ use datafusion::arrow::array::RecordBatch; use polars::prelude::{PlSmallStr, PolarsError}; use polars_core::prelude::{CompatLevel, DataFrame, Series}; -pub(crate) fn default_cols_to_string(s: &[&str; 3]) -> Vec { - s.iter().map(|x| x.to_string()).collect() -} - fn convert_arrow_rs_field_to_polars_arrow_field( arrow_rs_field: &arrow_schema::Field, ) -> Result { From f8e48b4f57a3cd157132ee4e15331f6e5b292092 Mon Sep 17 00:00:00 2001 From: zkeram Date: Fri, 9 May 2025 12:33:15 +0200 Subject: [PATCH 7/8] queries moved --- src/count_overlaps.rs | 41 ++++++++++++++++++ src/operation.rs | 96 ------------------------------------------- src/query.rs | 69 ------------------------------- 3 files changed, 41 insertions(+), 165 deletions(-) create mode 100644 src/count_overlaps.rs delete mode 100644 src/query.rs diff --git a/src/count_overlaps.rs b/src/count_overlaps.rs new file mode 100644 index 00000000..af9b0c32 --- /dev/null +++ b/src/count_overlaps.rs @@ -0,0 +1,41 @@ +use exon::ExonSession; +use crate::option::FilterOp; + + +async fn do_count_overlaps_coverage_naive( + ctx: &ExonSession, + left_table: String, + right_table: String, + overlap_filter: FilterOp, + columns_1: Vec, + columns_2: Vec, + coverage: bool, +) -> datafusion::dataframe::DataFrame { + let session = &ctx.session; + let right_table_ref = TableReference::from(right_table.clone()); + let right_schema = session + .table(right_table_ref.clone()) + .await + .unwrap() + .schema() + .as_arrow() + .clone(); + let count_overlaps_provider = CountOverlapsProvider::new( + Arc::new(session.clone()), + left_table, + right_table, + right_schema, + columns_1, + columns_2, + overlap_filter, + coverage, + ); + let table_name = "count_overlaps_coverage".to_string(); + session.deregister_table(table_name.clone()).unwrap(); + session + .register_table(table_name.clone(), Arc::new(count_overlaps_provider)) + .unwrap(); + let query = format!("SELECT * FROM {}", table_name); + debug!("Query: {}", query); + ctx.sql(&query).await.unwrap() +} diff --git a/src/operation.rs b/src/operation.rs index 0cca44f7..36af16e0 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -88,12 +88,6 @@ pub(crate) fn do_range_operation( range_options.columns_2, )) }, - RangeOp::CountOverlaps => rt.block_on(do_count_overlaps( - ctx, - range_options, - left_table, - right_table, - )), RangeOp::CountOverlapsNaive => rt.block_on(do_count_overlaps_coverage_naive( ctx, range_options, @@ -113,62 +107,6 @@ pub(crate) fn do_range_operation( } } -async fn do_count_overlaps( - ctx: &ExonSession, - range_opts: RangeOptions, - left_table: String, - right_table: String, -) -> datafusion::dataframe::DataFrame { - let query = prepare_query( - count_overlaps_query, - range_opts, - ctx, - left_table, - right_table, - ) - .await - .to_string(); - debug!("Query: {}", query); - ctx.sql(&query).await.unwrap() -} - -async fn do_count_overlaps_coverage_naive( - ctx: &ExonSession, - range_opts: RangeOptions, - left_table: String, - right_table: String, - coverage: bool, -) -> datafusion::dataframe::DataFrame { - let columns_1 = range_opts.columns_1; - let columns_2 = range_opts.columns_2; - let session = &ctx.session; - let right_table_ref = TableReference::from(right_table.clone()); - let right_schema = session - .table(right_table_ref.clone()) - .await - .unwrap() - .schema() - .as_arrow() - .clone(); - let count_overlaps_provider = CountOverlapsProvider::new( - Arc::new(session.clone()), - left_table, - right_table, - right_schema, - columns_1, - columns_2, - range_opts.filter_op, - coverage, - ); - let table_name = "count_overlaps_coverage".to_string(); - session.deregister_table(table_name.clone()).unwrap(); - session - .register_table(table_name.clone(), Arc::new(count_overlaps_provider)) - .unwrap(); - let query = format!("SELECT * FROM {}", table_name); - debug!("Query: {}", query); - ctx.sql(&query).await.unwrap() -} pub(crate) async fn get_non_join_columns( table_name: String, @@ -200,37 +138,3 @@ pub(crate) fn format_non_join_tables( .collect::>() .join(", ") } - -pub(crate) async fn prepare_query( - query: fn(QueryParams) -> String, - range_opts: RangeOptions, - ctx: &ExonSession, - left_table: String, - right_table: String, -) -> String { - let sign = match range_opts.filter_op { - FilterOp::Weak => "=".to_string(), - _ => "".to_string(), - }; - let suffixes = range_opts.suffixes; - let columns_1 = range_opts.columns_1; - let columns_2 = range_opts.columns_2; - - let left_table_columns = - get_non_join_columns(left_table.to_string(), columns_1.clone(), ctx).await; - let right_table_columns = - get_non_join_columns(right_table.to_string(), columns_2.clone(), ctx).await; - - let query_params = QueryParams { - sign, - suffixes, - columns_1, - columns_2, - other_columns_1: left_table_columns, - other_columns_2: right_table_columns, - left_table, - right_table, - }; - - query(query_params) -} diff --git a/src/query.rs b/src/query.rs deleted file mode 100644 index 25483ba6..00000000 --- a/src/query.rs +++ /dev/null @@ -1,69 +0,0 @@ -use crate::operation::QueryParams; - - -pub(crate) fn count_overlaps_query(query_params: QueryParams) -> String { - let query = format!( - r#" - SELECT - chr AS {}{}, -- contig - s1ends2start AS {}{}, -- pos_start - s1starts2end AS {}{}, -- pos_end - st - ed AS count - FROM ( - SELECT - chr, - SUM(iss1) OVER ( - PARTITION BY chr ORDER BY s1starts2end ASC, iss1 {} - ) st, - SUM(iss1) OVER ( - PARTITION BY chr ORDER BY s1ends2start ASC, iss1 {} - ) ed, - iss1, - s1starts2end, - s1ends2start - FROM ( - (SELECT - a.{} AS chr, -- contig - a.{} AS s1starts2end, -- pos_start - a.{} AS s1ends2start, -- pos_end - 1 AS iss1 - FROM {} AS a) - UNION ALL - (SELECT - b.{} AS chr, -- contig - b.{} AS s1starts2end, -- pos_end - b.{} AS s1ends2start, -- pos_start - 0 AS iss1 - FROM {} AS b) - ) - ) - WHERE - iss1 = 0 - "#, - query_params.columns_1[0], - query_params.suffixes.0, // contig - query_params.columns_1[1], - query_params.suffixes.0, // pos_start - query_params.columns_1[2], - query_params.suffixes.0, // pos_end - if query_params.sign == "=" { - "DESC" - } else { - "ASC" - }, - if query_params.sign == "=" { - "ASC" - } else { - "DESC" - }, - query_params.columns_2[0], - query_params.columns_2[1], - query_params.columns_2[2], - query_params.right_table, - query_params.columns_1[0], - query_params.columns_1[2], - query_params.columns_1[1], - query_params.left_table, - ); - query -} From dbca3e274118631a6384c25a825ed8adb9de4650 Mon Sep 17 00:00:00 2001 From: zkeram Date: Fri, 9 May 2025 13:07:59 +0200 Subject: [PATCH 8/8] fix count_overlaps crate imports --- src/count_overlaps.rs | 2 +- src/lib.rs | 1 + src/operation.rs | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/count_overlaps.rs b/src/count_overlaps.rs index af9b0c32..2d993486 100644 --- a/src/count_overlaps.rs +++ b/src/count_overlaps.rs @@ -2,7 +2,7 @@ use exon::ExonSession; use crate::option::FilterOp; -async fn do_count_overlaps_coverage_naive( +pub(crate) async fn do_count_overlaps_coverage_naive( ctx: &ExonSession, left_table: String, right_table: String, diff --git a/src/lib.rs b/src/lib.rs index 801d0110..67ce6651 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ mod streaming; mod udtf; mod utils; mod nearest; +mod count_overlaps; mod overlap; use std::string::ToString; diff --git a/src/operation.rs b/src/operation.rs index 36af16e0..50417e90 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -12,6 +12,7 @@ use crate::query::count_overlaps_query; use crate::udtf::CountOverlapsProvider; use crate::nearest::do_nearest; use crate::overlap::do_overlap; +use crate::count_overlaps::do_count_overlaps_coverage_naive; pub(crate) struct QueryParams { pub sign: String,