Skip to content

ENH: Parallelization support for pairwise correlation #61737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Other enhancements
- :func:`DataFrame.to_excel` argument ``merge_cells`` now accepts a value of ``"columns"`` to only merge :class:`MultiIndex` column header header cells (:issue:`35384`)
- :func:`set_option` now accepts a dictionary of options, simplifying configuration of multiple settings at once (:issue:`61093`)
- :meth:`DataFrame.corrwith` now accepts ``min_periods`` as optional arguments, as in :meth:`DataFrame.corr` and :meth:`Series.corr` (:issue:`9490`)
- :meth:`DataFrame.corr` now accepts ``use_parallel`` parameter for parallel computation of Pearson correlations, potentially improving performance on large datasets (:issue:`TBD`)
- :meth:`DataFrame.cummin`, :meth:`DataFrame.cummax`, :meth:`DataFrame.cumprod` and :meth:`DataFrame.cumsum` methods now have a ``numeric_only`` parameter (:issue:`53072`)
- :meth:`DataFrame.ewm` now allows ``adjust=False`` when ``times`` is provided (:issue:`54328`)
- :meth:`DataFrame.fillna` and :meth:`Series.fillna` can now accept ``value=None``; for non-object dtype the corresponding NA value will be used (:issue:`57723`)
Expand Down Expand Up @@ -641,6 +642,7 @@ Performance improvements
- Performance improvement in :meth:`DataFrame.join` for sorted but non-unique indexes (:issue:`56941`)
- Performance improvement in :meth:`DataFrame.join` when left and/or right are non-unique and ``how`` is ``"left"``, ``"right"``, or ``"inner"`` (:issue:`56817`)
- Performance improvement in :meth:`DataFrame.join` with ``how="left"`` or ``how="right"`` and ``sort=True`` (:issue:`56919`)
- Performance improvement in :meth:`DataFrame.corr` when ``use_parallel=True`` is used for computing Pearson correlations on large datasets (:issue:`TBD`)
- Performance improvement in :meth:`DataFrame.to_csv` when ``index=False`` (:issue:`59312`)
- Performance improvement in :meth:`DataFrameGroupBy.ffill`, :meth:`DataFrameGroupBy.bfill`, :meth:`SeriesGroupBy.ffill`, and :meth:`SeriesGroupBy.bfill` (:issue:`56902`)
- Performance improvement in :meth:`Index.join` by propagating cached attributes in cases where the result matches one of the inputs (:issue:`57023`)
Expand Down
33 changes: 33 additions & 0 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ py = import('python').find_installation(pure: false)
tempita = files('generate_pxi.py')
versioneer = files('generate_version.py')

# Find OpenMP dependency - skip for Pyodide/WebAssembly builds
is_pyodide_build = false
cc = meson.get_compiler('c')

# Check if we're building for WebAssembly/Pyodide
if cc.get_id() == 'clang' and host_machine.cpu_family() == 'wasm32'
is_pyodide_build = true
message('Detected Pyodide/WebAssembly build - skipping OpenMP')
endif

openmp_dep = dependency('openmp', required: false)
if not openmp_dep.found() and not is_pyodide_build
# Try to find OpenMP using compiler flags
if cc.has_argument('-fopenmp')
openmp_dep = declare_dependency(
compile_args: ['-fopenmp'],
link_args: ['-fopenmp'],
)
endif
endif

add_project_arguments('-DNPY_NO_DEPRECATED_API=0', language: 'c')
add_project_arguments('-DNPY_NO_DEPRECATED_API=0', language: 'cpp')
Expand All @@ -28,6 +48,18 @@ add_project_arguments(
language: 'cpp',
)

# Add OpenMP compile args if available and not building for Pyodide
if openmp_dep.found() and not is_pyodide_build
add_project_arguments('-DHAVE_OPENMP', language: 'c')
add_project_arguments('-DHAVE_OPENMP', language: 'cpp')
message('OpenMP support enabled')
else
if is_pyodide_build
message('OpenMP disabled for Pyodide/WebAssembly build')
else
message('OpenMP not found - parallel code will run sequentially')
endif
endif

if fs.exists('_version_meson.py')
py.install_sources('_version_meson.py', subdir: 'pandas')
Expand Down Expand Up @@ -74,4 +106,5 @@ endif
# Needed by pandas.test() when it looks for the pytest ini options
py.install_sources('pyproject.toml', subdir: 'pandas')

# Pass build flags to subdirectories
subdir('pandas')
1 change: 1 addition & 0 deletions pandas/_libs/algos.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def nancorr(
mat: npt.NDArray[np.float64], # const float64_t[:, :]
cov: bool = ...,
minp: int | None = ...,
use_parallel: bool = ...,
) -> npt.NDArray[np.float64]: ... # ndarray[float64_t, ndim=2]
def nancorr_spearman(
mat: npt.NDArray[np.float64], # ndarray[float64_t, ndim=2]
Expand Down
163 changes: 130 additions & 33 deletions pandas/_libs/algos.pyx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cimport cython
from cython cimport Py_ssize_t
from cython.parallel cimport prange
from libc.math cimport (
fabs,
sqrt,
Expand All @@ -10,6 +11,8 @@ from libc.stdlib cimport (
)
from libc.string cimport memmove

import sys

import numpy as np

cimport numpy as cnp
Expand Down Expand Up @@ -345,60 +348,154 @@ def kth_smallest(numeric_t[::1] arr, Py_ssize_t k) -> numeric_t:

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
def nancorr(const float64_t[:, :] mat, bint cov=False, minp=None):
def nancorr(
const float64_t[:, :] mat,
bint cov=False,
minp=None,
bint use_parallel=False
):
cdef:
Py_ssize_t i, xi, yi, N, K
int64_t minpv
Py_ssize_t i, j, xi, yi, N, K
bint minpv
float64_t[:, ::1] result
uint8_t[:, :] mask
# Initialize to None since we only use in the no missing value case
float64_t[::1] means = None
float64_t[::1] ssqds = None
ndarray[uint8_t, ndim=2] mask
bint no_nans
int64_t nobs = 0
float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy, val
float64_t mean, ssqd, val
float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy, corr_val

# Disable parallel execution in Pyodide/WebAssembly environment
if use_parallel and hasattr(sys, "platform") and sys.platform == "emscripten":
use_parallel = False

N, K = (<object>mat).shape
if minp is None:
minpv = 1
else:
minpv = <int64_t>minp

minpv = <int>minp
result = np.empty((K, K), dtype=np.float64)
mask = np.isfinite(mat).view(np.uint8)
no_nans = mask.all()

with nogil:
for xi in range(K):
for yi in range(xi + 1):
# Welford's method for the variance-calculation
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0
# Computing the online means and variances is expensive - so if possible we can
# precompute these and avoid repeating the computations each time we handle
# an (xi, yi) pair
if no_nans:
means = np.empty(K, dtype=np.float64)
ssqds = np.empty(K, dtype=np.float64)
with nogil:
for j in range(K):
ssqd = mean = 0
for i in range(N):
if mask[i, xi] and mask[i, yi]:
val = mat[i, j]
dx = val - mean
mean += 1 / (i + 1) * dx
ssqd += (val - mean) * dx
means[j] = mean
ssqds[j] = ssqd

if use_parallel:
# Use parallel execution with prange (only works if OpenMP is available)
for xi in prange(K, schedule="dynamic", nogil=True):
for yi in range(xi + 1):
covxy = 0
if no_nans:
for i in range(N):
vx = mat[i, xi]
vy = mat[i, yi]
nobs += 1
dx = vx - meanx
dy = vy - meany
meanx += 1. / nobs * dx
meany += 1. / nobs * dy
ssqdmx += (vx - meanx) * dx
ssqdmy += (vy - meany) * dy
covxy += (vx - meanx) * dy

covxy += (vx - means[xi]) * (vy - means[yi])
ssqdmx = ssqds[xi]
ssqdmy = ssqds[yi]
nobs = N
else:
nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0
for i in range(N):
# Welford's method for the variance-calculation
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
if mask[i, xi] and mask[i, yi]:
vx = mat[i, xi]
vy = mat[i, yi]
nobs = nobs + 1
dx = vx - meanx
dy = vy - meany
meanx = meanx + (1 / nobs * dx)
meany = meany + (1 / nobs * dy)
ssqdmx = ssqdmx + ((vx - meanx) * dx)
ssqdmy = ssqdmy + ((vy - meany) * dy)
covxy = covxy + ((vx - meanx) * dy)
if nobs < minpv:
result[xi, yi] = result[yi, xi] = NaN
else:
divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy)

# clip `covxy / divisor` to ensure coeff is within bounds
if divisor != 0:
val = covxy / divisor
if not cov:
if val > 1.0:
val = 1.0
elif val < -1.0:
val = -1.0
result[xi, yi] = result[yi, xi] = val
if cov:
result[xi, yi] = result[yi, xi] = covxy / divisor
else:
# ensure that diagonal is exactly 1.0
if xi == yi:
result[xi, yi] = 1.0
else:
corr_val = covxy / divisor
if corr_val > 1.0:
corr_val = 1.0
elif corr_val < -1.0:
corr_val = -1.0
result[xi, yi] = result[yi, xi] = corr_val
else:
result[xi, yi] = result[yi, xi] = NaN
else:
# Use sequential execution with regular range
with nogil:
for xi in range(K):
for yi in range(xi + 1):
covxy = 0
if no_nans:
for i in range(N):
vx = mat[i, xi]
vy = mat[i, yi]
covxy += (vx - means[xi]) * (vy - means[yi])
ssqdmx = ssqds[xi]
ssqdmy = ssqds[yi]
nobs = N
else:
nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0
for i in range(N):
# Welford's method for the variance-calculation
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
if mask[i, xi] and mask[i, yi]:
vx = mat[i, xi]
vy = mat[i, yi]
nobs += 1
dx = vx - meanx
dy = vy - meany
meanx += 1 / nobs * dx
meany += 1 / nobs * dy
ssqdmx += (vx - meanx) * dx
ssqdmy += (vy - meany) * dy
covxy += (vx - meanx) * dy
if nobs < minpv:
result[xi, yi] = result[yi, xi] = NaN
else:
divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy)
if divisor != 0:
if cov:
result[xi, yi] = result[yi, xi] = covxy / divisor
else:
# For correlation, ensure diagonal is exactly 1.0
if xi == yi:
result[xi, yi] = 1.0
else:
corr_val = covxy / divisor
if corr_val > 1.0:
corr_val = 1.0
elif corr_val < -1.0:
corr_val = -1.0
result[xi, yi] = result[yi, xi] = corr_val
else:
result[xi, yi] = result[yi, xi] = NaN

return result.base

Expand Down
11 changes: 10 additions & 1 deletion pandas/_libs/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ _khash_primitive_helper_dep = declare_dependency(
sources: _khash_primitive_helper,
)

# Use the parent's WebAssembly detection and OpenMP configuration
# (is_pyodide_build and openmp_dep are inherited from parent meson.build)

# Conditional dependencies for algos module
algos_deps = [_khash_primitive_helper_dep]
if openmp_dep.found() and not is_pyodide_build
algos_deps += [openmp_dep]
endif

cdata = configuration_data()
if cy.version().version_compare('>=3.1.0')
cdata.set('freethreading_compatible', '1')
Expand All @@ -70,7 +79,7 @@ libs_sources = {
# numpy include dir is implicitly included
'algos': {
'sources': ['algos.pyx', _algos_common_helper, _algos_take_helper],
'deps': _khash_primitive_helper_dep,
'deps': algos_deps,
},
'arrays': {'sources': ['arrays.pyx']},
'groupby': {'sources': ['groupby.pyx']},
Expand Down
49 changes: 48 additions & 1 deletion pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -11264,10 +11264,15 @@ def corr(
method: CorrelationMethod = "pearson",
min_periods: int = 1,
numeric_only: bool = False,
use_parallel: bool = False,
) -> DataFrame:
"""
Compute pairwise correlation of columns, excluding NA/null values.

This function computes the correlation matrix between all pairs of columns
in the DataFrame, handling missing values by excluding them from the
calculation on a pairwise basis.

Parameters
----------
method : {'pearson', 'kendall', 'spearman'} or callable
Expand All @@ -11292,6 +11297,21 @@ def corr(
.. versionchanged:: 2.0.0
The default value of ``numeric_only`` is now ``False``.

use_parallel : bool, default False
Use parallel computation for Pearson correlation to potentially
improve performance on large datasets. This parameter is only
effective when ``method='pearson'`` and is ignored for other
correlation methods.

When ``True``, the computation will utilize multiple CPU cores
for calculating pairwise correlations. This can provide significant
performance improvements for large DataFrames (typically with
hundreds of columns or more) but may introduce overhead for
smaller datasets. The optimal threshold depends on system
specifications and data characteristics.

.. versionadded:: 3.0.0

Returns
-------
DataFrame
Expand All @@ -11311,6 +11331,17 @@ def corr(
* `Kendall rank correlation coefficient <https://en.wikipedia.org/wiki/Kendall_rank_correlation_coefficient>`_
* `Spearman's rank correlation coefficient <https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient>`_

**Parallel Computation:**

The ``use_parallel`` parameter can significantly improve performance for large
DataFrames by distributing the correlation computation across multiple CPU cores.
However, it's important to note:

- Only affects Pearson correlation (``method='pearson'``)
- Performance gains are most noticeable for DataFrames with many columns
- Small datasets may see negligible improvement or even slight overhead
- The optimal threshold depends on system specifications and data characteristics

Examples
--------
>>> def histogram_intersection(a, b):
Expand All @@ -11332,14 +11363,30 @@ def corr(
dogs cats
dogs 1.0 NaN
cats NaN 1.0

>>> # Use parallel computation for large DataFrames
>>> large_df = pd.DataFrame(np.random.randn(1000, 50))
>>> corr_matrix = large_df.corr(use_parallel=True) # doctest: +SKIP
""" # noqa: E501
data = self._get_numeric_data() if numeric_only else self
cols = data.columns
idx = cols.copy()
mat = data.to_numpy(dtype=float, na_value=np.nan, copy=False)

if method == "pearson":
correl = libalgos.nancorr(mat, minp=min_periods)
if use_parallel:
try:
correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=True)
except (ImportError, AttributeError, RuntimeError):
# OpenMP not available or prange failed, fall back to sequential
warnings.warn(
"No parallelism; using sequential (e.g. Pyodide or no OpenMP).",
UserWarning,
stacklevel=find_stack_level(),
)
correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=False)
else:
correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=False)
elif method == "spearman":
correl = libalgos.nancorr_spearman(mat, minp=min_periods)
elif method == "kendall" or callable(method):
Expand Down
Loading
Loading