From 3dc9eed416795d5c244427911bfee39ef38afcea Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 22 Jun 2025 14:50:07 +0000 Subject: [PATCH 01/13] add: parallelization support for pearson correlation --- meson.build | 20 +++++++++ pandas/_libs/algos.pyx | 95 +++++++++++++++++++++++++++++----------- pandas/_libs/meson.build | 14 +++++- pandas/core/frame.py | 12 ++++- 4 files changed, 113 insertions(+), 28 deletions(-) diff --git a/meson.build b/meson.build index 6a00e52481108..1bb404b3bb6b1 100644 --- a/meson.build +++ b/meson.build @@ -15,6 +15,18 @@ py = import('python').find_installation(pure: false) tempita = files('generate_pxi.py') versioneer = files('generate_version.py') +# Find OpenMP dependency +openmp_dep = dependency('openmp', required: false) +if not openmp_dep.found() + # Try to find OpenMP using compiler flags + cc = meson.get_compiler('c') + if cc.has_argument('-fopenmp') + openmp_dep = declare_dependency( + compile_args: ['-fopenmp'], + link_args: ['-fopenmp'], + ) + endif +endif add_project_arguments('-DNPY_NO_DEPRECATED_API=0', language: 'c') add_project_arguments('-DNPY_NO_DEPRECATED_API=0', language: 'cpp') @@ -28,6 +40,14 @@ add_project_arguments( language: 'cpp', ) +# Add OpenMP compile args if available +if openmp_dep.found() + add_project_arguments('-DHAVE_OPENMP', language: 'c') + add_project_arguments('-DHAVE_OPENMP', language: 'cpp') + message('OpenMP support enabled') +else + message('OpenMP not found - parallel code will run sequentially') +endif if fs.exists('_version_meson.py') py.install_sources('_version_meson.py', subdir: 'pandas') diff --git a/pandas/_libs/algos.pyx b/pandas/_libs/algos.pyx index f584c0ff9f614..6ab62db807dd8 100644 --- a/pandas/_libs/algos.pyx +++ b/pandas/_libs/algos.pyx @@ -1,5 +1,8 @@ cimport cython from cython cimport Py_ssize_t +from cython.parallel cimport ( + prange, +) from libc.math cimport ( fabs, sqrt, @@ -346,61 +349,101 @@ def kth_smallest(numeric_t[::1] arr, Py_ssize_t k) -> numeric_t: @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) -def nancorr(const float64_t[:, :] mat, bint cov=False, minp=None): +def nancorr( + const float64_t[:, :] mat, + bint cov=False, + minp=None, + bint use_parallel=False, +): cdef: Py_ssize_t i, xi, yi, N, K int64_t minpv float64_t[:, ::1] result uint8_t[:, :] mask - int64_t nobs = 0 - float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy, val + int64_t nobs + float64_t vx, vy, dx, dy, meanx, meany + float64_t prev_meanx, prev_meany + float64_t ssqdmx, ssqdmy, covxy, divisor, val N, K = (mat).shape - if minp is None: - minpv = 1 - else: - minpv = minp + minpv = 1 if minp is None else minp result = np.empty((K, K), dtype=np.float64) mask = np.isfinite(mat).view(np.uint8) - with nogil: - for xi in range(K): + # Welford's method for the variance-calculation + # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + + if use_parallel: + for xi in prange(K, schedule="dynamic", nogil=True): for yi in range(xi + 1): - # Welford's method for the variance-calculation - # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0 for i in range(N): if mask[i, xi] and mask[i, yi]: vx = mat[i, xi] vy = mat[i, yi] - nobs += 1 + nobs = nobs + 1 dx = vx - meanx dy = vy - meany - meanx += 1. / nobs * dx - meany += 1. / nobs * dy - ssqdmx += (vx - meanx) * dx - ssqdmy += (vy - meany) * dy - covxy += (vx - meanx) * dy + meanx = meanx + dx / nobs + meany = meany + dy / nobs + ssqdmx = ssqdmx + (vx - meanx) * dx + ssqdmy = ssqdmy + (vy - meany) * dy + covxy = covxy + (vx - meanx) * dy if nobs < minpv: - result[xi, yi] = result[yi, xi] = NaN + val = NaN else: - divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy) - - # clip `covxy / divisor` to ensure coeff is within bounds - if divisor != 0: + if cov: + divisor = nobs - 1.0 + else: + divisor = sqrt(ssqdmx * ssqdmy) + if divisor == 0.0: + val = NaN + else: val = covxy / divisor if not cov: if val > 1.0: val = 1.0 - elif val < -1.0: + if val < -1.0: val = -1.0 - result[xi, yi] = result[yi, xi] = val - else: + + result[xi, yi] = result[yi, xi] = val + else: + # Sequential version + with nogil: + for xi in range(K): + for yi in range(xi + 1): + nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0 + for i in range(N): + if mask[i, xi] and mask[i, yi]: + vx = mat[i, xi] + vy = mat[i, yi] + nobs += 1 + prev_meanx = meanx + prev_meany = meany + meanx = meanx + 1 / nobs * (vx - meanx) + meany = meany + 1 / nobs * (vy - meany) + ssqdmx = ssqdmx + (vx - meanx) * (vx - prev_meanx) + ssqdmy = ssqdmy + (vy - meany) * (vy - prev_meany) + covxy = covxy + (vx - meanx) * (vy - prev_meany) + + if nobs < minpv: result[xi, yi] = result[yi, xi] = NaN + else: + divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy) + if divisor != 0: + val = covxy / divisor + if not cov: + if val > 1.0: + val = 1.0 + if val < -1.0: + val = -1.0 + result[xi, yi] = result[yi, xi] = val + else: + result[xi, yi] = result[yi, xi] = NaN - return result.base + return result # ---------------------------------------------------------------------- # Pairwise Spearman correlation diff --git a/pandas/_libs/meson.build b/pandas/_libs/meson.build index 33fc65e5034d0..e1f2e8b219902 100644 --- a/pandas/_libs/meson.build +++ b/pandas/_libs/meson.build @@ -1,3 +1,15 @@ +# Add OpenMP dependency detection at the top of the file +openmp_dep = dependency('openmp', required: false) +if not openmp_dep.found() + cc = meson.get_compiler('c') + if cc.has_argument('-fopenmp') + openmp_dep = declare_dependency( + compile_args: ['-fopenmp'], + link_args: ['-fopenmp'], + ) + endif +endif + _algos_take_helper = custom_target( 'algos_take_helper_pxi', output: 'algos_take_helper.pxi', @@ -70,7 +82,7 @@ libs_sources = { # numpy include dir is implicitly included 'algos': { 'sources': ['algos.pyx', _algos_common_helper, _algos_take_helper], - 'deps': _khash_primitive_helper_dep, + 'deps': [_khash_primitive_helper_dep, openmp_dep], }, 'arrays': {'sources': ['arrays.pyx']}, 'groupby': {'sources': ['groupby.pyx']}, diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 8053c17437c5e..54e9918202323 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -11264,6 +11264,7 @@ def corr( method: CorrelationMethod = "pearson", min_periods: int = 1, numeric_only: bool = False, + parallel: bool = False, ) -> DataFrame: """ Compute pairwise correlation of columns, excluding NA/null values. @@ -11292,6 +11293,11 @@ def corr( .. versionchanged:: 2.0.0 The default value of ``numeric_only`` is now ``False``. + parallel : bool, default False + Use parallel computation for Pearson correlation. + Only effective for large matrices where parallelization overhead + is justified by compute time savings. + Returns ------- DataFrame @@ -11332,6 +11338,10 @@ def corr( dogs cats dogs 1.0 NaN cats NaN 1.0 + + >>> # Use parallel computation for large DataFrames + >>> large_df = pd.DataFrame(np.random.randn(10000, 100)) + >>> corr_matrix = large_df.corr(parallel=True) """ # noqa: E501 data = self._get_numeric_data() if numeric_only else self cols = data.columns @@ -11339,7 +11349,7 @@ def corr( mat = data.to_numpy(dtype=float, na_value=np.nan, copy=False) if method == "pearson": - correl = libalgos.nancorr(mat, minp=min_periods) + correl = libalgos.nancorr(mat, minp=min_periods, parallel=parallel) elif method == "spearman": correl = libalgos.nancorr_spearman(mat, minp=min_periods) elif method == "kendall" or callable(method): From 2fc7e1470cc34cac59955fe26b243ea074c6a5ec Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 22 Jun 2025 15:10:29 +0000 Subject: [PATCH 02/13] fix: incorrect argument name on corr() --- pandas/core/frame.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 54e9918202323..4b72b7a152d0c 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -11264,7 +11264,7 @@ def corr( method: CorrelationMethod = "pearson", min_periods: int = 1, numeric_only: bool = False, - parallel: bool = False, + use_parallel: bool = False, ) -> DataFrame: """ Compute pairwise correlation of columns, excluding NA/null values. @@ -11349,7 +11349,7 @@ def corr( mat = data.to_numpy(dtype=float, na_value=np.nan, copy=False) if method == "pearson": - correl = libalgos.nancorr(mat, minp=min_periods, parallel=parallel) + correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=use_parallel) elif method == "spearman": correl = libalgos.nancorr_spearman(mat, minp=min_periods) elif method == "kendall" or callable(method): From 66ec0170d3ea3ad21b15c7c4c96f5759a9eb2ee1 Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 22 Jun 2025 15:17:07 +0000 Subject: [PATCH 03/13] update: docstring to show correct argument --- pandas/core/frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 4b72b7a152d0c..d5ab6e3e0713d 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -11293,7 +11293,7 @@ def corr( .. versionchanged:: 2.0.0 The default value of ``numeric_only`` is now ``False``. - parallel : bool, default False + use_parallel : bool, default False Use parallel computation for Pearson correlation. Only effective for large matrices where parallelization overhead is justified by compute time savings. From 31b530b09436b2d4bc13ecd0217d0ede8dd78886 Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 29 Jun 2025 06:09:12 +0000 Subject: [PATCH 04/13] update: clean up code --- pandas/_libs/algos.pyx | 149 ++++++++++++++++++++++++----------------- 1 file changed, 87 insertions(+), 62 deletions(-) diff --git a/pandas/_libs/algos.pyx b/pandas/_libs/algos.pyx index 6ab62db807dd8..d055c4261601a 100644 --- a/pandas/_libs/algos.pyx +++ b/pandas/_libs/algos.pyx @@ -348,102 +348,127 @@ def kth_smallest(numeric_t[::1] arr, Py_ssize_t k) -> numeric_t: @cython.boundscheck(False) @cython.wraparound(False) -@cython.cdivision(True) def nancorr( const float64_t[:, :] mat, bint cov=False, minp=None, - bint use_parallel=False, + bint use_parallel=False ): cdef: - Py_ssize_t i, xi, yi, N, K - int64_t minpv + Py_ssize_t i, j, xi, yi, N, K + bint minpv float64_t[:, ::1] result - uint8_t[:, :] mask - int64_t nobs - float64_t vx, vy, dx, dy, meanx, meany - float64_t prev_meanx, prev_meany - float64_t ssqdmx, ssqdmy, covxy, divisor, val + # Initialize to None since we only use in the no missing value case + float64_t[::1] means = None + float64_t[::1] ssqds = None + ndarray[uint8_t, ndim=2] mask + bint no_nans + int64_t nobs = 0 + float64_t mean, ssqd, val + float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy N, K = (mat).shape - minpv = 1 if minp is None else minp - + if minp is None: + minpv = 1 + else: + minpv = minp result = np.empty((K, K), dtype=np.float64) mask = np.isfinite(mat).view(np.uint8) + no_nans = mask.all() - # Welford's method for the variance-calculation - # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance - + # Computing the online means and variances is expensive - so if possible we can + # precompute these and avoid repeating the computations each time we handle + # an (xi, yi) pair + if no_nans: + means = np.empty(K, dtype=np.float64) + ssqds = np.empty(K, dtype=np.float64) + with nogil: + for j in range(K): + ssqd = mean = 0 + for i in range(N): + val = mat[i, j] + dx = val - mean + mean += 1 / (i + 1) * dx + ssqd += (val - mean) * dx + means[j] = mean + ssqds[j] = ssqd + + # ONLY CHANGE: Add parallel option to the main correlation loop if use_parallel: for xi in prange(K, schedule="dynamic", nogil=True): for yi in range(xi + 1): - nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0 - for i in range(N): - if mask[i, xi] and mask[i, yi]: + covxy = 0 + if no_nans: + for i in range(N): vx = mat[i, xi] vy = mat[i, yi] - nobs = nobs + 1 - dx = vx - meanx - dy = vy - meany - meanx = meanx + dx / nobs - meany = meany + dy / nobs - ssqdmx = ssqdmx + (vx - meanx) * dx - ssqdmy = ssqdmy + (vy - meany) * dy - covxy = covxy + (vx - meanx) * dy - + covxy += (vx - means[xi]) * (vy - means[yi]) + ssqdmx = ssqds[xi] + ssqdmy = ssqds[yi] + nobs = N + else: + nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0 + for i in range(N): + # Welford's method for the variance-calculation + # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + if mask[i, xi] and mask[i, yi]: + vx = mat[i, xi] + vy = mat[i, yi] + nobs = nobs + 1 + dx = vx - meanx + dy = vy - meany + meanx = meanx + (1 / nobs * dx) + meany = meany + (1 / nobs * dy) + ssqdmx = ssqdmx + ((vx - meanx) * dx) + ssqdmy = ssqdmy + ((vy - meany) * dy) + covxy = covxy + ((vx - meanx) * dy) if nobs < minpv: - val = NaN + result[xi, yi] = result[yi, xi] = NaN else: - if cov: - divisor = nobs - 1.0 + divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy) + if divisor != 0: + result[xi, yi] = result[yi, xi] = covxy / divisor else: - divisor = sqrt(ssqdmx * ssqdmy) - if divisor == 0.0: - val = NaN - else: - val = covxy / divisor - if not cov: - if val > 1.0: - val = 1.0 - if val < -1.0: - val = -1.0 - - result[xi, yi] = result[yi, xi] = val + result[xi, yi] = result[yi, xi] = NaN else: - # Sequential version with nogil: for xi in range(K): for yi in range(xi + 1): - nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0 - for i in range(N): - if mask[i, xi] and mask[i, yi]: + covxy = 0 + if no_nans: + for i in range(N): vx = mat[i, xi] vy = mat[i, yi] - nobs += 1 - prev_meanx = meanx - prev_meany = meany - meanx = meanx + 1 / nobs * (vx - meanx) - meany = meany + 1 / nobs * (vy - meany) - ssqdmx = ssqdmx + (vx - meanx) * (vx - prev_meanx) - ssqdmy = ssqdmy + (vy - meany) * (vy - prev_meany) - covxy = covxy + (vx - meanx) * (vy - prev_meany) - + covxy += (vx - means[xi]) * (vy - means[yi]) + ssqdmx = ssqds[xi] + ssqdmy = ssqds[yi] + nobs = N + else: + nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0 + for i in range(N): + # Welford's method for the variance-calculation + # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + if mask[i, xi] and mask[i, yi]: + vx = mat[i, xi] + vy = mat[i, yi] + nobs += 1 + dx = vx - meanx + dy = vy - meany + meanx += 1 / nobs * dx + meany += 1 / nobs * dy + ssqdmx += (vx - meanx) * dx + ssqdmy += (vy - meany) * dy + covxy += (vx - meanx) * dy if nobs < minpv: result[xi, yi] = result[yi, xi] = NaN else: divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy) if divisor != 0: - val = covxy / divisor - if not cov: - if val > 1.0: - val = 1.0 - if val < -1.0: - val = -1.0 - result[xi, yi] = result[yi, xi] = val + result[xi, yi] = result[yi, xi] = covxy / divisor else: result[xi, yi] = result[yi, xi] = NaN - return result + return result.base # ---------------------------------------------------------------------- # Pairwise Spearman correlation From fc29e7b001a02fe307500c7351b66f34db84f74d Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 29 Jun 2025 09:23:24 +0000 Subject: [PATCH 05/13] update: docstring --- pandas/core/frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index d5ab6e3e0713d..b1518dda97fd7 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -11341,7 +11341,7 @@ def corr( >>> # Use parallel computation for large DataFrames >>> large_df = pd.DataFrame(np.random.randn(10000, 100)) - >>> corr_matrix = large_df.corr(parallel=True) + >>> corr_matrix = large_df.corr(use_parallel=True) """ # noqa: E501 data = self._get_numeric_data() if numeric_only else self cols = data.columns From 643eb44c28c998c695468ecb4c67bb56536fffc3 Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 29 Jun 2025 09:29:27 +0000 Subject: [PATCH 06/13] update: openmp deps in setup.py --- setup.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/setup.py b/setup.py index db1852b43cfa9..c6898f84df581 100755 --- a/setup.py +++ b/setup.py @@ -438,6 +438,10 @@ def srcpath(name=None, suffix=".pyx", subdir="src"): "_libs.algos": { "pyxfile": "_libs/algos", "depends": _pxi_dep["algos"], + "extra_compile_args": extra_compile_args + + (["-fopenmp"] if not is_platform_windows() else ["/openmp"]), + "extra_link_args": extra_link_args + + (["-fopenmp"] if not is_platform_windows() else []), }, "_libs.arrays": {"pyxfile": "_libs/arrays"}, "_libs.groupby": {"pyxfile": "_libs/groupby"}, From 6f8c8595a6b79523b0671b9a8f8c206300a89393 Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sat, 5 Jul 2025 14:42:47 +0000 Subject: [PATCH 07/13] fix: numerical instability issues --- pandas/_libs/algos.pyx | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/pandas/_libs/algos.pyx b/pandas/_libs/algos.pyx index d055c4261601a..8d307ebf70100 100644 --- a/pandas/_libs/algos.pyx +++ b/pandas/_libs/algos.pyx @@ -1,8 +1,6 @@ cimport cython from cython cimport Py_ssize_t -from cython.parallel cimport ( - prange, -) +from cython.parallel cimport prange from libc.math cimport ( fabs, sqrt, @@ -365,7 +363,7 @@ def nancorr( bint no_nans int64_t nobs = 0 float64_t mean, ssqd, val - float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy + float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy, corr_val N, K = (mat).shape if minp is None: @@ -393,7 +391,6 @@ def nancorr( means[j] = mean ssqds[j] = ssqd - # ONLY CHANGE: Add parallel option to the main correlation loop if use_parallel: for xi in prange(K, schedule="dynamic", nogil=True): for yi in range(xi + 1): @@ -427,7 +424,19 @@ def nancorr( else: divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy) if divisor != 0: - result[xi, yi] = result[yi, xi] = covxy / divisor + if cov: + result[xi, yi] = result[yi, xi] = covxy / divisor + else: + # ensure that diagonal is exactly 1.0 + if xi == yi: + result[xi, yi] = 1.0 + else: + corr_val = covxy / divisor + if corr_val > 1.0: + corr_val = 1.0 + elif corr_val < -1.0: + corr_val = -1.0 + result[xi, yi] = result[yi, xi] = corr_val else: result[xi, yi] = result[yi, xi] = NaN else: @@ -464,7 +473,19 @@ def nancorr( else: divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy) if divisor != 0: - result[xi, yi] = result[yi, xi] = covxy / divisor + if cov: + result[xi, yi] = result[yi, xi] = covxy / divisor + else: + # For correlation, ensure diagonal is exactly 1.0 + if xi == yi: + result[xi, yi] = 1.0 + else: + corr_val = covxy / divisor + if corr_val > 1.0: + corr_val = 1.0 + elif corr_val < -1.0: + corr_val = -1.0 + result[xi, yi] = result[yi, xi] = corr_val else: result[xi, yi] = result[yi, xi] = NaN From 928635c515b042cbf728cd8ce363dcfb996ec84f Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sat, 5 Jul 2025 15:27:52 +0000 Subject: [PATCH 08/13] add: test for parallel correlation --- pandas/tests/frame/methods/test_cov_corr.py | 124 ++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/pandas/tests/frame/methods/test_cov_corr.py b/pandas/tests/frame/methods/test_cov_corr.py index 304638a3a7dcf..cfc350cab38e1 100644 --- a/pandas/tests/frame/methods/test_cov_corr.py +++ b/pandas/tests/frame/methods/test_cov_corr.py @@ -507,3 +507,127 @@ def test_cov_with_missing_values(self): result2 = df.dropna().cov() tm.assert_frame_equal(result1, expected) tm.assert_frame_equal(result2, expected) + + def test_corr_parallel_functionality(self): + """Test that parallel correlation gives same results as sequential.""" + rng = np.random.default_rng(seed=42) + n_samples = 100 + + x = rng.normal(0, 1, n_samples) + y = 0.8 * x + 0.6 * rng.normal(0, 1, n_samples) + z = -0.5 * x + 0.866 * rng.normal(0, 1, n_samples) + + df = DataFrame({"X": x, "Y": y, "Z": z}) + + result_sequential = df.corr(use_parallel=False) + result_parallel = df.corr(use_parallel=True) + + tm.assert_frame_equal(result_sequential, result_parallel) + + for result in [result_sequential, result_parallel]: + assert result.shape == (3, 3) + assert (np.diag(result) == 1.0).all() + + tm.assert_frame_equal(result, result.T) + + assert (result >= -1.0).all().all() + assert (result <= 1.0).all().all() + + assert abs(result.loc["X", "Y"] - 0.8) < 0.2 + assert abs(result.loc["X", "Z"] + 0.5) < 0.2 + + def test_corr_with_missing_data_parallel(self): + """ + Test correlation with missing data works correctly with parallel processing. + """ + df = DataFrame( + { + "A": [1.0, 2.0, np.nan, 4.0, 5.0], + "B": [2.0, np.nan, 3.0, 4.0, 5.0], + "C": [1.0, 2.0, 3.0, np.nan, 5.0], + } + ) + + result_sequential = df.corr(use_parallel=False) + result_parallel = df.corr(use_parallel=True) + + tm.assert_frame_equal(result_sequential, result_parallel) + + for result in [result_sequential, result_parallel]: + assert result.shape == (3, 3) + assert (np.diag(result) == 1.0).all() + tm.assert_frame_equal(result, result.T) + + assert (result >= -1.0).all().all() + assert (result <= 1.0).all().all() + + assert np.isfinite(result.loc["A", "B"]) + assert np.isfinite(result.loc["A", "C"]) + assert np.isfinite(result.loc["B", "C"]) + + def test_corr_parallel_vs_sequential_large_data(self): + """ + Test parallel and sequential correlation on large dataset. + """ + rng = np.random.default_rng(seed=42) + n_samples = 1000 + n_cols = 100 + + data = rng.normal(0, 1, (n_samples, n_cols)) + df = DataFrame(data, columns=[f"col_{i}" for i in range(n_cols)]) + + result_sequential = df.corr(use_parallel=False) + result_parallel = df.corr(use_parallel=True) + + tm.assert_frame_equal( + result_sequential, result_parallel, rtol=1e-14, atol=1e-14 + ) + + for result in [result_sequential, result_parallel]: + assert (np.diag(result) == 1.0).all() + + tm.assert_frame_equal(result, result.T) + + assert (result >= -1.0).all().all() + assert (result <= 1.0).all().all() + + def test_corr_numerical_stability_edge_cases(self): + """Test correlation numerical stability with edge cases.""" + df_small = DataFrame({"A": [1e-15, 2e-15, 3e-15], "B": [2e-15, 4e-15, 6e-15]}) + + result_small_seq = df_small.corr(use_parallel=False) + result_small_par = df_small.corr(use_parallel=True) + + tm.assert_frame_equal(result_small_seq, result_small_par) + + for result in [result_small_seq, result_small_par]: + assert (np.diag(result) == 1.0).all() + assert abs(result.loc["A", "B"] - 1.0) < 1e-10 + + df_large = DataFrame({"A": [1e15, 2e15, 3e15], "B": [2e15, 4e15, 6e15]}) + + result_large_seq = df_large.corr(use_parallel=False) + result_large_par = df_large.corr(use_parallel=True) + + tm.assert_frame_equal(result_large_seq, result_large_par) + + for result in [result_large_seq, result_large_par]: + assert (np.diag(result) == 1.0).all() + assert abs(result.loc["A", "B"] - 1.0) < 1e-10 + + df_precision = DataFrame( + { + "A": [0.1, 0.2, 0.3], + "B": [0.1000000000000001, 0.2000000000000001, 0.3000000000000001], + } + ) + + result_precision_seq = df_precision.corr(use_parallel=False) + result_precision_par = df_precision.corr(use_parallel=True) + + tm.assert_frame_equal(result_precision_seq, result_precision_par) + + for result in [result_precision_seq, result_precision_par]: + assert (np.diag(result) == 1.0).all() + assert result.loc["A", "B"] >= 0.999 + assert result.loc["A", "B"] <= 1.0 From 5af65e6d42fb8f925dc8d06c21a9470685b51567 Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 6 Jul 2025 03:53:34 +0000 Subject: [PATCH 09/13] update: docs for parallel correlation --- doc/source/whatsnew/v3.0.0.rst | 2 ++ pandas/_libs/algos.pyi | 1 + pandas/core/frame.py | 35 +++++++++++++++++++++++++++++----- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/doc/source/whatsnew/v3.0.0.rst b/doc/source/whatsnew/v3.0.0.rst index 4154942f92907..6b67af7267e63 100644 --- a/doc/source/whatsnew/v3.0.0.rst +++ b/doc/source/whatsnew/v3.0.0.rst @@ -56,6 +56,7 @@ Other enhancements - :func:`DataFrame.to_excel` argument ``merge_cells`` now accepts a value of ``"columns"`` to only merge :class:`MultiIndex` column header header cells (:issue:`35384`) - :func:`set_option` now accepts a dictionary of options, simplifying configuration of multiple settings at once (:issue:`61093`) - :meth:`DataFrame.corrwith` now accepts ``min_periods`` as optional arguments, as in :meth:`DataFrame.corr` and :meth:`Series.corr` (:issue:`9490`) +- :meth:`DataFrame.corr` now accepts ``use_parallel`` parameter for parallel computation of Pearson correlations, potentially improving performance on large datasets (:issue:`TBD`) - :meth:`DataFrame.cummin`, :meth:`DataFrame.cummax`, :meth:`DataFrame.cumprod` and :meth:`DataFrame.cumsum` methods now have a ``numeric_only`` parameter (:issue:`53072`) - :meth:`DataFrame.ewm` now allows ``adjust=False`` when ``times`` is provided (:issue:`54328`) - :meth:`DataFrame.fillna` and :meth:`Series.fillna` can now accept ``value=None``; for non-object dtype the corresponding NA value will be used (:issue:`57723`) @@ -641,6 +642,7 @@ Performance improvements - Performance improvement in :meth:`DataFrame.join` for sorted but non-unique indexes (:issue:`56941`) - Performance improvement in :meth:`DataFrame.join` when left and/or right are non-unique and ``how`` is ``"left"``, ``"right"``, or ``"inner"`` (:issue:`56817`) - Performance improvement in :meth:`DataFrame.join` with ``how="left"`` or ``how="right"`` and ``sort=True`` (:issue:`56919`) +- Performance improvement in :meth:`DataFrame.corr` when ``use_parallel=True`` is used for computing Pearson correlations on large datasets (:issue:`TBD`) - Performance improvement in :meth:`DataFrame.to_csv` when ``index=False`` (:issue:`59312`) - Performance improvement in :meth:`DataFrameGroupBy.ffill`, :meth:`DataFrameGroupBy.bfill`, :meth:`SeriesGroupBy.ffill`, and :meth:`SeriesGroupBy.bfill` (:issue:`56902`) - Performance improvement in :meth:`Index.join` by propagating cached attributes in cases where the result matches one of the inputs (:issue:`57023`) diff --git a/pandas/_libs/algos.pyi b/pandas/_libs/algos.pyi index 0a6be851e1efd..4cf93d8fdc57e 100644 --- a/pandas/_libs/algos.pyi +++ b/pandas/_libs/algos.pyi @@ -43,6 +43,7 @@ def nancorr( mat: npt.NDArray[np.float64], # const float64_t[:, :] cov: bool = ..., minp: int | None = ..., + use_parallel: bool = ..., ) -> npt.NDArray[np.float64]: ... # ndarray[float64_t, ndim=2] def nancorr_spearman( mat: npt.NDArray[np.float64], # ndarray[float64_t, ndim=2] diff --git a/pandas/core/frame.py b/pandas/core/frame.py index b1518dda97fd7..a15c3f7238d0b 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -11269,6 +11269,10 @@ def corr( """ Compute pairwise correlation of columns, excluding NA/null values. + This function computes the correlation matrix between all pairs of columns + in the DataFrame, handling missing values by excluding them from the + calculation on a pairwise basis. + Parameters ---------- method : {'pearson', 'kendall', 'spearman'} or callable @@ -11294,9 +11298,19 @@ def corr( The default value of ``numeric_only`` is now ``False``. use_parallel : bool, default False - Use parallel computation for Pearson correlation. - Only effective for large matrices where parallelization overhead - is justified by compute time savings. + Use parallel computation for Pearson correlation to potentially + improve performance on large datasets. This parameter is only + effective when ``method='pearson'`` and is ignored for other + correlation methods. + + When ``True``, the computation will utilize multiple CPU cores + for calculating pairwise correlations. This can provide significant + performance improvements for large DataFrames (typically with + hundreds of columns or more) but may introduce overhead for + smaller datasets. The optimal threshold depends on system + specifications and data characteristics. + + .. versionadded:: 3.0.0 Returns ------- @@ -11317,6 +11331,17 @@ def corr( * `Kendall rank correlation coefficient `_ * `Spearman's rank correlation coefficient `_ + **Parallel Computation:** + + The ``use_parallel`` parameter can significantly improve performance for large + DataFrames by distributing the correlation computation across multiple CPU cores. + However, it's important to note: + + - Only affects Pearson correlation (``method='pearson'``) + - Performance gains are most noticeable for DataFrames with many columns + - Small datasets may see negligible improvement or even slight overhead + - The optimal threshold depends on system specifications and data characteristics + Examples -------- >>> def histogram_intersection(a, b): @@ -11340,8 +11365,8 @@ def corr( cats NaN 1.0 >>> # Use parallel computation for large DataFrames - >>> large_df = pd.DataFrame(np.random.randn(10000, 100)) - >>> corr_matrix = large_df.corr(use_parallel=True) + >>> large_df = pd.DataFrame(np.random.randn(1000, 50)) + >>> corr_matrix = large_df.corr(use_parallel=True) # doctest: +SKIP """ # noqa: E501 data = self._get_numeric_data() if numeric_only else self cols = data.columns From 269f7ae1b3543f3277a7bdbc802704857d86c0c9 Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 6 Jul 2025 09:58:34 +0000 Subject: [PATCH 10/13] fix: warnings for pyodide --- meson.build | 24 ++++++++++++++++++------ pandas/_libs/algos.pyx | 6 ++++++ pandas/core/frame.py | 14 +++++++++++++- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/meson.build b/meson.build index 1bb404b3bb6b1..202b907e55e23 100644 --- a/meson.build +++ b/meson.build @@ -15,11 +15,19 @@ py = import('python').find_installation(pure: false) tempita = files('generate_pxi.py') versioneer = files('generate_version.py') -# Find OpenMP dependency +# Find OpenMP dependency - skip for Pyodide/WebAssembly builds +is_pyodide_build = false +cc = meson.get_compiler('c') + +# Check if we're building for WebAssembly/Pyodide +if cc.get_id() == 'clang' and host_machine.cpu_family() == 'wasm32' + is_pyodide_build = true + message('Detected Pyodide/WebAssembly build - skipping OpenMP') +endif + openmp_dep = dependency('openmp', required: false) -if not openmp_dep.found() +if not openmp_dep.found() and not is_pyodide_build # Try to find OpenMP using compiler flags - cc = meson.get_compiler('c') if cc.has_argument('-fopenmp') openmp_dep = declare_dependency( compile_args: ['-fopenmp'], @@ -40,13 +48,17 @@ add_project_arguments( language: 'cpp', ) -# Add OpenMP compile args if available -if openmp_dep.found() +# Add OpenMP compile args if available and not building for Pyodide +if openmp_dep.found() and not is_pyodide_build add_project_arguments('-DHAVE_OPENMP', language: 'c') add_project_arguments('-DHAVE_OPENMP', language: 'cpp') message('OpenMP support enabled') else - message('OpenMP not found - parallel code will run sequentially') + if is_pyodide_build + message('OpenMP disabled for Pyodide/WebAssembly build') + else + message('OpenMP not found - parallel code will run sequentially') + endif endif if fs.exists('_version_meson.py') diff --git a/pandas/_libs/algos.pyx b/pandas/_libs/algos.pyx index 8d307ebf70100..fb845b5388244 100644 --- a/pandas/_libs/algos.pyx +++ b/pandas/_libs/algos.pyx @@ -11,6 +11,8 @@ from libc.stdlib cimport ( ) from libc.string cimport memmove +import sys + import numpy as np cimport numpy as cnp @@ -365,6 +367,10 @@ def nancorr( float64_t mean, ssqd, val float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy, corr_val + # Disable parallel execution in Pyodide/WebAssembly environment + if use_parallel and hasattr(sys, "platform") and sys.platform == "emscripten": + use_parallel = False + N, K = (mat).shape if minp is None: minpv = 1 diff --git a/pandas/core/frame.py b/pandas/core/frame.py index a15c3f7238d0b..26ff39852f3ee 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -11374,7 +11374,19 @@ def corr( mat = data.to_numpy(dtype=float, na_value=np.nan, copy=False) if method == "pearson": - correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=use_parallel) + if use_parallel: + try: + correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=True) + except (ImportError, AttributeError, RuntimeError): + # OpenMP not available or prange failed, fall back to sequential + warnings.warn( + "No parallelism; using sequential (e.g. Pyodide or no OpenMP).", + UserWarning, + stacklevel=find_stack_level(), + ) + correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=False) + else: + correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=False) elif method == "spearman": correl = libalgos.nancorr_spearman(mat, minp=min_periods) elif method == "kendall" or callable(method): From 5a0bb6e2646a30eb1c44b0c08e4b34a50b4b4872 Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 6 Jul 2025 14:01:31 +0000 Subject: [PATCH 11/13] fix: docstring error --- pandas/core/groupby/generic.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index b2531e2abf7f1..70ee575fb5e3a 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -3113,6 +3113,7 @@ def corr( method: str | Callable[[np.ndarray, np.ndarray], float] = "pearson", min_periods: int = 1, numeric_only: bool = False, + use_parallel: bool = False, ) -> DataFrame: result = self._op_via_apply( "corr", method=method, min_periods=min_periods, numeric_only=numeric_only From 3e8c1e4b4a09f5c8383d8b09ca2811b1415826c2 Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sun, 6 Jul 2025 15:43:25 +0000 Subject: [PATCH 12/13] fix: avoid using openmp for pyodide --- meson.build | 1 + pandas/_libs/algos.pyx | 2 ++ pandas/_libs/meson.build | 23 ++++++++++------------- setup.py | 22 ++++++++++++++++++++-- 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/meson.build b/meson.build index 202b907e55e23..7b94915394223 100644 --- a/meson.build +++ b/meson.build @@ -106,4 +106,5 @@ endif # Needed by pandas.test() when it looks for the pytest ini options py.install_sources('pyproject.toml', subdir: 'pandas') +# Pass build flags to subdirectories subdir('pandas') diff --git a/pandas/_libs/algos.pyx b/pandas/_libs/algos.pyx index fb845b5388244..a3d0e9237ab32 100644 --- a/pandas/_libs/algos.pyx +++ b/pandas/_libs/algos.pyx @@ -398,6 +398,7 @@ def nancorr( ssqds[j] = ssqd if use_parallel: + # Use parallel execution with prange (only works if OpenMP is available) for xi in prange(K, schedule="dynamic", nogil=True): for yi in range(xi + 1): covxy = 0 @@ -446,6 +447,7 @@ def nancorr( else: result[xi, yi] = result[yi, xi] = NaN else: + # Use sequential execution with regular range with nogil: for xi in range(K): for yi in range(xi + 1): diff --git a/pandas/_libs/meson.build b/pandas/_libs/meson.build index e1f2e8b219902..fe72c579ebebc 100644 --- a/pandas/_libs/meson.build +++ b/pandas/_libs/meson.build @@ -1,15 +1,3 @@ -# Add OpenMP dependency detection at the top of the file -openmp_dep = dependency('openmp', required: false) -if not openmp_dep.found() - cc = meson.get_compiler('c') - if cc.has_argument('-fopenmp') - openmp_dep = declare_dependency( - compile_args: ['-fopenmp'], - link_args: ['-fopenmp'], - ) - endif -endif - _algos_take_helper = custom_target( 'algos_take_helper_pxi', output: 'algos_take_helper.pxi', @@ -62,6 +50,15 @@ _khash_primitive_helper_dep = declare_dependency( sources: _khash_primitive_helper, ) +# Use the parent's WebAssembly detection and OpenMP configuration +# (is_pyodide_build and openmp_dep are inherited from parent meson.build) + +# Conditional dependencies for algos module +algos_deps = [_khash_primitive_helper_dep] +if openmp_dep.found() and not is_pyodide_build + algos_deps += [openmp_dep] +endif + cdata = configuration_data() if cy.version().version_compare('>=3.1.0') cdata.set('freethreading_compatible', '1') @@ -82,7 +79,7 @@ libs_sources = { # numpy include dir is implicitly included 'algos': { 'sources': ['algos.pyx', _algos_common_helper, _algos_take_helper], - 'deps': [_khash_primitive_helper_dep, openmp_dep], + 'deps': algos_deps, }, 'arrays': {'sources': ['arrays.pyx']}, 'groupby': {'sources': ['groupby.pyx']}, diff --git a/setup.py b/setup.py index c6898f84df581..daff329b5bba3 100755 --- a/setup.py +++ b/setup.py @@ -28,6 +28,16 @@ cmdclass = versioneer.get_cmdclass() +# Check for WebAssembly/Emscripten build environment +def is_platform_emscripten(): + """Check if we're building for WebAssembly/Emscripten (Pyodide).""" + return ( + sys.platform == "emscripten" + or os.environ.get("EMSCRIPTEN", "").lower() in ("1", "true", "yes") + or os.environ.get("PYODIDE", "").lower() in ("1", "true", "yes") + ) + + def is_platform_windows(): return sys.platform in ("win32", "cygwin") @@ -439,9 +449,17 @@ def srcpath(name=None, suffix=".pyx", subdir="src"): "pyxfile": "_libs/algos", "depends": _pxi_dep["algos"], "extra_compile_args": extra_compile_args - + (["-fopenmp"] if not is_platform_windows() else ["/openmp"]), + + ( + (["-fopenmp"] if not is_platform_windows() else ["/openmp"]) + if not is_platform_emscripten() + else [] + ), "extra_link_args": extra_link_args - + (["-fopenmp"] if not is_platform_windows() else []), + + ( + (["-fopenmp"] if not is_platform_windows() else []) + if not is_platform_emscripten() + else [] + ), }, "_libs.arrays": {"pyxfile": "_libs/arrays"}, "_libs.groupby": {"pyxfile": "_libs/groupby"}, From 212804ac1711d654e546e5ba6fae56242e5c3345 Mon Sep 17 00:00:00 2001 From: gangula-karthik Date: Sat, 19 Jul 2025 16:00:26 +0000 Subject: [PATCH 13/13] fix: prevent openmp compilation on pyodide --- pandas/_libs/algos.pyx | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pandas/_libs/algos.pyx b/pandas/_libs/algos.pyx index a3d0e9237ab32..8bd0af0534e08 100644 --- a/pandas/_libs/algos.pyx +++ b/pandas/_libs/algos.pyx @@ -1,6 +1,7 @@ cimport cython from cython cimport Py_ssize_t from cython.parallel cimport prange +import sys from libc.math cimport ( fabs, sqrt, @@ -367,9 +368,10 @@ def nancorr( float64_t mean, ssqd, val float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy, corr_val - # Disable parallel execution in Pyodide/WebAssembly environment - if use_parallel and hasattr(sys, "platform") and sys.platform == "emscripten": - use_parallel = False + # Disable parallel execution in WASM/Emscripten environments + if use_parallel and hasattr(sys, "platform"): + if "emscripten" in sys.platform or "wasm32" in sys.platform: + use_parallel = False N, K = (mat).shape if minp is None: @@ -398,7 +400,6 @@ def nancorr( ssqds[j] = ssqd if use_parallel: - # Use parallel execution with prange (only works if OpenMP is available) for xi in prange(K, schedule="dynamic", nogil=True): for yi in range(xi + 1): covxy = 0