diff --git a/doc/source/whatsnew/v3.0.0.rst b/doc/source/whatsnew/v3.0.0.rst index 4154942f92907..6b67af7267e63 100644 --- a/doc/source/whatsnew/v3.0.0.rst +++ b/doc/source/whatsnew/v3.0.0.rst @@ -56,6 +56,7 @@ Other enhancements - :func:`DataFrame.to_excel` argument ``merge_cells`` now accepts a value of ``"columns"`` to only merge :class:`MultiIndex` column header header cells (:issue:`35384`) - :func:`set_option` now accepts a dictionary of options, simplifying configuration of multiple settings at once (:issue:`61093`) - :meth:`DataFrame.corrwith` now accepts ``min_periods`` as optional arguments, as in :meth:`DataFrame.corr` and :meth:`Series.corr` (:issue:`9490`) +- :meth:`DataFrame.corr` now accepts ``use_parallel`` parameter for parallel computation of Pearson correlations, potentially improving performance on large datasets (:issue:`TBD`) - :meth:`DataFrame.cummin`, :meth:`DataFrame.cummax`, :meth:`DataFrame.cumprod` and :meth:`DataFrame.cumsum` methods now have a ``numeric_only`` parameter (:issue:`53072`) - :meth:`DataFrame.ewm` now allows ``adjust=False`` when ``times`` is provided (:issue:`54328`) - :meth:`DataFrame.fillna` and :meth:`Series.fillna` can now accept ``value=None``; for non-object dtype the corresponding NA value will be used (:issue:`57723`) @@ -641,6 +642,7 @@ Performance improvements - Performance improvement in :meth:`DataFrame.join` for sorted but non-unique indexes (:issue:`56941`) - Performance improvement in :meth:`DataFrame.join` when left and/or right are non-unique and ``how`` is ``"left"``, ``"right"``, or ``"inner"`` (:issue:`56817`) - Performance improvement in :meth:`DataFrame.join` with ``how="left"`` or ``how="right"`` and ``sort=True`` (:issue:`56919`) +- Performance improvement in :meth:`DataFrame.corr` when ``use_parallel=True`` is used for computing Pearson correlations on large datasets (:issue:`TBD`) - Performance improvement in :meth:`DataFrame.to_csv` when ``index=False`` (:issue:`59312`) - Performance improvement in :meth:`DataFrameGroupBy.ffill`, :meth:`DataFrameGroupBy.bfill`, :meth:`SeriesGroupBy.ffill`, and :meth:`SeriesGroupBy.bfill` (:issue:`56902`) - Performance improvement in :meth:`Index.join` by propagating cached attributes in cases where the result matches one of the inputs (:issue:`57023`) diff --git a/meson.build b/meson.build index 6a00e52481108..7b94915394223 100644 --- a/meson.build +++ b/meson.build @@ -15,6 +15,26 @@ py = import('python').find_installation(pure: false) tempita = files('generate_pxi.py') versioneer = files('generate_version.py') +# Find OpenMP dependency - skip for Pyodide/WebAssembly builds +is_pyodide_build = false +cc = meson.get_compiler('c') + +# Check if we're building for WebAssembly/Pyodide +if cc.get_id() == 'clang' and host_machine.cpu_family() == 'wasm32' + is_pyodide_build = true + message('Detected Pyodide/WebAssembly build - skipping OpenMP') +endif + +openmp_dep = dependency('openmp', required: false) +if not openmp_dep.found() and not is_pyodide_build + # Try to find OpenMP using compiler flags + if cc.has_argument('-fopenmp') + openmp_dep = declare_dependency( + compile_args: ['-fopenmp'], + link_args: ['-fopenmp'], + ) + endif +endif add_project_arguments('-DNPY_NO_DEPRECATED_API=0', language: 'c') add_project_arguments('-DNPY_NO_DEPRECATED_API=0', language: 'cpp') @@ -28,6 +48,18 @@ add_project_arguments( language: 'cpp', ) +# Add OpenMP compile args if available and not building for Pyodide +if openmp_dep.found() and not is_pyodide_build + add_project_arguments('-DHAVE_OPENMP', language: 'c') + add_project_arguments('-DHAVE_OPENMP', language: 'cpp') + message('OpenMP support enabled') +else + if is_pyodide_build + message('OpenMP disabled for Pyodide/WebAssembly build') + else + message('OpenMP not found - parallel code will run sequentially') + endif +endif if fs.exists('_version_meson.py') py.install_sources('_version_meson.py', subdir: 'pandas') @@ -74,4 +106,5 @@ endif # Needed by pandas.test() when it looks for the pytest ini options py.install_sources('pyproject.toml', subdir: 'pandas') +# Pass build flags to subdirectories subdir('pandas') diff --git a/pandas/_libs/algos.pyi b/pandas/_libs/algos.pyi index 0a6be851e1efd..4cf93d8fdc57e 100644 --- a/pandas/_libs/algos.pyi +++ b/pandas/_libs/algos.pyi @@ -43,6 +43,7 @@ def nancorr( mat: npt.NDArray[np.float64], # const float64_t[:, :] cov: bool = ..., minp: int | None = ..., + use_parallel: bool = ..., ) -> npt.NDArray[np.float64]: ... # ndarray[float64_t, ndim=2] def nancorr_spearman( mat: npt.NDArray[np.float64], # ndarray[float64_t, ndim=2] diff --git a/pandas/_libs/algos.pyx b/pandas/_libs/algos.pyx index f584c0ff9f614..a3d0e9237ab32 100644 --- a/pandas/_libs/algos.pyx +++ b/pandas/_libs/algos.pyx @@ -1,5 +1,6 @@ cimport cython from cython cimport Py_ssize_t +from cython.parallel cimport prange from libc.math cimport ( fabs, sqrt, @@ -10,6 +11,8 @@ from libc.stdlib cimport ( ) from libc.string cimport memmove +import sys + import numpy as np cimport numpy as cnp @@ -345,60 +348,154 @@ def kth_smallest(numeric_t[::1] arr, Py_ssize_t k) -> numeric_t: @cython.boundscheck(False) @cython.wraparound(False) -@cython.cdivision(True) -def nancorr(const float64_t[:, :] mat, bint cov=False, minp=None): +def nancorr( + const float64_t[:, :] mat, + bint cov=False, + minp=None, + bint use_parallel=False +): cdef: - Py_ssize_t i, xi, yi, N, K - int64_t minpv + Py_ssize_t i, j, xi, yi, N, K + bint minpv float64_t[:, ::1] result - uint8_t[:, :] mask + # Initialize to None since we only use in the no missing value case + float64_t[::1] means = None + float64_t[::1] ssqds = None + ndarray[uint8_t, ndim=2] mask + bint no_nans int64_t nobs = 0 - float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy, val + float64_t mean, ssqd, val + float64_t vx, vy, dx, dy, meanx, meany, divisor, ssqdmx, ssqdmy, covxy, corr_val + + # Disable parallel execution in Pyodide/WebAssembly environment + if use_parallel and hasattr(sys, "platform") and sys.platform == "emscripten": + use_parallel = False N, K = (mat).shape if minp is None: minpv = 1 else: - minpv = minp - + minpv = minp result = np.empty((K, K), dtype=np.float64) mask = np.isfinite(mat).view(np.uint8) + no_nans = mask.all() - with nogil: - for xi in range(K): - for yi in range(xi + 1): - # Welford's method for the variance-calculation - # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance - nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0 + # Computing the online means and variances is expensive - so if possible we can + # precompute these and avoid repeating the computations each time we handle + # an (xi, yi) pair + if no_nans: + means = np.empty(K, dtype=np.float64) + ssqds = np.empty(K, dtype=np.float64) + with nogil: + for j in range(K): + ssqd = mean = 0 for i in range(N): - if mask[i, xi] and mask[i, yi]: + val = mat[i, j] + dx = val - mean + mean += 1 / (i + 1) * dx + ssqd += (val - mean) * dx + means[j] = mean + ssqds[j] = ssqd + + if use_parallel: + # Use parallel execution with prange (only works if OpenMP is available) + for xi in prange(K, schedule="dynamic", nogil=True): + for yi in range(xi + 1): + covxy = 0 + if no_nans: + for i in range(N): vx = mat[i, xi] vy = mat[i, yi] - nobs += 1 - dx = vx - meanx - dy = vy - meany - meanx += 1. / nobs * dx - meany += 1. / nobs * dy - ssqdmx += (vx - meanx) * dx - ssqdmy += (vy - meany) * dy - covxy += (vx - meanx) * dy - + covxy += (vx - means[xi]) * (vy - means[yi]) + ssqdmx = ssqds[xi] + ssqdmy = ssqds[yi] + nobs = N + else: + nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0 + for i in range(N): + # Welford's method for the variance-calculation + # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + if mask[i, xi] and mask[i, yi]: + vx = mat[i, xi] + vy = mat[i, yi] + nobs = nobs + 1 + dx = vx - meanx + dy = vy - meany + meanx = meanx + (1 / nobs * dx) + meany = meany + (1 / nobs * dy) + ssqdmx = ssqdmx + ((vx - meanx) * dx) + ssqdmy = ssqdmy + ((vy - meany) * dy) + covxy = covxy + ((vx - meanx) * dy) if nobs < minpv: result[xi, yi] = result[yi, xi] = NaN else: divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy) - - # clip `covxy / divisor` to ensure coeff is within bounds if divisor != 0: - val = covxy / divisor - if not cov: - if val > 1.0: - val = 1.0 - elif val < -1.0: - val = -1.0 - result[xi, yi] = result[yi, xi] = val + if cov: + result[xi, yi] = result[yi, xi] = covxy / divisor + else: + # ensure that diagonal is exactly 1.0 + if xi == yi: + result[xi, yi] = 1.0 + else: + corr_val = covxy / divisor + if corr_val > 1.0: + corr_val = 1.0 + elif corr_val < -1.0: + corr_val = -1.0 + result[xi, yi] = result[yi, xi] = corr_val else: result[xi, yi] = result[yi, xi] = NaN + else: + # Use sequential execution with regular range + with nogil: + for xi in range(K): + for yi in range(xi + 1): + covxy = 0 + if no_nans: + for i in range(N): + vx = mat[i, xi] + vy = mat[i, yi] + covxy += (vx - means[xi]) * (vy - means[yi]) + ssqdmx = ssqds[xi] + ssqdmy = ssqds[yi] + nobs = N + else: + nobs = ssqdmx = ssqdmy = covxy = meanx = meany = 0 + for i in range(N): + # Welford's method for the variance-calculation + # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + if mask[i, xi] and mask[i, yi]: + vx = mat[i, xi] + vy = mat[i, yi] + nobs += 1 + dx = vx - meanx + dy = vy - meany + meanx += 1 / nobs * dx + meany += 1 / nobs * dy + ssqdmx += (vx - meanx) * dx + ssqdmy += (vy - meany) * dy + covxy += (vx - meanx) * dy + if nobs < minpv: + result[xi, yi] = result[yi, xi] = NaN + else: + divisor = (nobs - 1.0) if cov else sqrt(ssqdmx * ssqdmy) + if divisor != 0: + if cov: + result[xi, yi] = result[yi, xi] = covxy / divisor + else: + # For correlation, ensure diagonal is exactly 1.0 + if xi == yi: + result[xi, yi] = 1.0 + else: + corr_val = covxy / divisor + if corr_val > 1.0: + corr_val = 1.0 + elif corr_val < -1.0: + corr_val = -1.0 + result[xi, yi] = result[yi, xi] = corr_val + else: + result[xi, yi] = result[yi, xi] = NaN return result.base diff --git a/pandas/_libs/meson.build b/pandas/_libs/meson.build index 33fc65e5034d0..fe72c579ebebc 100644 --- a/pandas/_libs/meson.build +++ b/pandas/_libs/meson.build @@ -50,6 +50,15 @@ _khash_primitive_helper_dep = declare_dependency( sources: _khash_primitive_helper, ) +# Use the parent's WebAssembly detection and OpenMP configuration +# (is_pyodide_build and openmp_dep are inherited from parent meson.build) + +# Conditional dependencies for algos module +algos_deps = [_khash_primitive_helper_dep] +if openmp_dep.found() and not is_pyodide_build + algos_deps += [openmp_dep] +endif + cdata = configuration_data() if cy.version().version_compare('>=3.1.0') cdata.set('freethreading_compatible', '1') @@ -70,7 +79,7 @@ libs_sources = { # numpy include dir is implicitly included 'algos': { 'sources': ['algos.pyx', _algos_common_helper, _algos_take_helper], - 'deps': _khash_primitive_helper_dep, + 'deps': algos_deps, }, 'arrays': {'sources': ['arrays.pyx']}, 'groupby': {'sources': ['groupby.pyx']}, diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 8053c17437c5e..26ff39852f3ee 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -11264,10 +11264,15 @@ def corr( method: CorrelationMethod = "pearson", min_periods: int = 1, numeric_only: bool = False, + use_parallel: bool = False, ) -> DataFrame: """ Compute pairwise correlation of columns, excluding NA/null values. + This function computes the correlation matrix between all pairs of columns + in the DataFrame, handling missing values by excluding them from the + calculation on a pairwise basis. + Parameters ---------- method : {'pearson', 'kendall', 'spearman'} or callable @@ -11292,6 +11297,21 @@ def corr( .. versionchanged:: 2.0.0 The default value of ``numeric_only`` is now ``False``. + use_parallel : bool, default False + Use parallel computation for Pearson correlation to potentially + improve performance on large datasets. This parameter is only + effective when ``method='pearson'`` and is ignored for other + correlation methods. + + When ``True``, the computation will utilize multiple CPU cores + for calculating pairwise correlations. This can provide significant + performance improvements for large DataFrames (typically with + hundreds of columns or more) but may introduce overhead for + smaller datasets. The optimal threshold depends on system + specifications and data characteristics. + + .. versionadded:: 3.0.0 + Returns ------- DataFrame @@ -11311,6 +11331,17 @@ def corr( * `Kendall rank correlation coefficient `_ * `Spearman's rank correlation coefficient `_ + **Parallel Computation:** + + The ``use_parallel`` parameter can significantly improve performance for large + DataFrames by distributing the correlation computation across multiple CPU cores. + However, it's important to note: + + - Only affects Pearson correlation (``method='pearson'``) + - Performance gains are most noticeable for DataFrames with many columns + - Small datasets may see negligible improvement or even slight overhead + - The optimal threshold depends on system specifications and data characteristics + Examples -------- >>> def histogram_intersection(a, b): @@ -11332,6 +11363,10 @@ def corr( dogs cats dogs 1.0 NaN cats NaN 1.0 + + >>> # Use parallel computation for large DataFrames + >>> large_df = pd.DataFrame(np.random.randn(1000, 50)) + >>> corr_matrix = large_df.corr(use_parallel=True) # doctest: +SKIP """ # noqa: E501 data = self._get_numeric_data() if numeric_only else self cols = data.columns @@ -11339,7 +11374,19 @@ def corr( mat = data.to_numpy(dtype=float, na_value=np.nan, copy=False) if method == "pearson": - correl = libalgos.nancorr(mat, minp=min_periods) + if use_parallel: + try: + correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=True) + except (ImportError, AttributeError, RuntimeError): + # OpenMP not available or prange failed, fall back to sequential + warnings.warn( + "No parallelism; using sequential (e.g. Pyodide or no OpenMP).", + UserWarning, + stacklevel=find_stack_level(), + ) + correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=False) + else: + correl = libalgos.nancorr(mat, minp=min_periods, use_parallel=False) elif method == "spearman": correl = libalgos.nancorr_spearman(mat, minp=min_periods) elif method == "kendall" or callable(method): diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index b2531e2abf7f1..70ee575fb5e3a 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -3113,6 +3113,7 @@ def corr( method: str | Callable[[np.ndarray, np.ndarray], float] = "pearson", min_periods: int = 1, numeric_only: bool = False, + use_parallel: bool = False, ) -> DataFrame: result = self._op_via_apply( "corr", method=method, min_periods=min_periods, numeric_only=numeric_only diff --git a/pandas/tests/frame/methods/test_cov_corr.py b/pandas/tests/frame/methods/test_cov_corr.py index 304638a3a7dcf..cfc350cab38e1 100644 --- a/pandas/tests/frame/methods/test_cov_corr.py +++ b/pandas/tests/frame/methods/test_cov_corr.py @@ -507,3 +507,127 @@ def test_cov_with_missing_values(self): result2 = df.dropna().cov() tm.assert_frame_equal(result1, expected) tm.assert_frame_equal(result2, expected) + + def test_corr_parallel_functionality(self): + """Test that parallel correlation gives same results as sequential.""" + rng = np.random.default_rng(seed=42) + n_samples = 100 + + x = rng.normal(0, 1, n_samples) + y = 0.8 * x + 0.6 * rng.normal(0, 1, n_samples) + z = -0.5 * x + 0.866 * rng.normal(0, 1, n_samples) + + df = DataFrame({"X": x, "Y": y, "Z": z}) + + result_sequential = df.corr(use_parallel=False) + result_parallel = df.corr(use_parallel=True) + + tm.assert_frame_equal(result_sequential, result_parallel) + + for result in [result_sequential, result_parallel]: + assert result.shape == (3, 3) + assert (np.diag(result) == 1.0).all() + + tm.assert_frame_equal(result, result.T) + + assert (result >= -1.0).all().all() + assert (result <= 1.0).all().all() + + assert abs(result.loc["X", "Y"] - 0.8) < 0.2 + assert abs(result.loc["X", "Z"] + 0.5) < 0.2 + + def test_corr_with_missing_data_parallel(self): + """ + Test correlation with missing data works correctly with parallel processing. + """ + df = DataFrame( + { + "A": [1.0, 2.0, np.nan, 4.0, 5.0], + "B": [2.0, np.nan, 3.0, 4.0, 5.0], + "C": [1.0, 2.0, 3.0, np.nan, 5.0], + } + ) + + result_sequential = df.corr(use_parallel=False) + result_parallel = df.corr(use_parallel=True) + + tm.assert_frame_equal(result_sequential, result_parallel) + + for result in [result_sequential, result_parallel]: + assert result.shape == (3, 3) + assert (np.diag(result) == 1.0).all() + tm.assert_frame_equal(result, result.T) + + assert (result >= -1.0).all().all() + assert (result <= 1.0).all().all() + + assert np.isfinite(result.loc["A", "B"]) + assert np.isfinite(result.loc["A", "C"]) + assert np.isfinite(result.loc["B", "C"]) + + def test_corr_parallel_vs_sequential_large_data(self): + """ + Test parallel and sequential correlation on large dataset. + """ + rng = np.random.default_rng(seed=42) + n_samples = 1000 + n_cols = 100 + + data = rng.normal(0, 1, (n_samples, n_cols)) + df = DataFrame(data, columns=[f"col_{i}" for i in range(n_cols)]) + + result_sequential = df.corr(use_parallel=False) + result_parallel = df.corr(use_parallel=True) + + tm.assert_frame_equal( + result_sequential, result_parallel, rtol=1e-14, atol=1e-14 + ) + + for result in [result_sequential, result_parallel]: + assert (np.diag(result) == 1.0).all() + + tm.assert_frame_equal(result, result.T) + + assert (result >= -1.0).all().all() + assert (result <= 1.0).all().all() + + def test_corr_numerical_stability_edge_cases(self): + """Test correlation numerical stability with edge cases.""" + df_small = DataFrame({"A": [1e-15, 2e-15, 3e-15], "B": [2e-15, 4e-15, 6e-15]}) + + result_small_seq = df_small.corr(use_parallel=False) + result_small_par = df_small.corr(use_parallel=True) + + tm.assert_frame_equal(result_small_seq, result_small_par) + + for result in [result_small_seq, result_small_par]: + assert (np.diag(result) == 1.0).all() + assert abs(result.loc["A", "B"] - 1.0) < 1e-10 + + df_large = DataFrame({"A": [1e15, 2e15, 3e15], "B": [2e15, 4e15, 6e15]}) + + result_large_seq = df_large.corr(use_parallel=False) + result_large_par = df_large.corr(use_parallel=True) + + tm.assert_frame_equal(result_large_seq, result_large_par) + + for result in [result_large_seq, result_large_par]: + assert (np.diag(result) == 1.0).all() + assert abs(result.loc["A", "B"] - 1.0) < 1e-10 + + df_precision = DataFrame( + { + "A": [0.1, 0.2, 0.3], + "B": [0.1000000000000001, 0.2000000000000001, 0.3000000000000001], + } + ) + + result_precision_seq = df_precision.corr(use_parallel=False) + result_precision_par = df_precision.corr(use_parallel=True) + + tm.assert_frame_equal(result_precision_seq, result_precision_par) + + for result in [result_precision_seq, result_precision_par]: + assert (np.diag(result) == 1.0).all() + assert result.loc["A", "B"] >= 0.999 + assert result.loc["A", "B"] <= 1.0 diff --git a/setup.py b/setup.py index db1852b43cfa9..daff329b5bba3 100755 --- a/setup.py +++ b/setup.py @@ -28,6 +28,16 @@ cmdclass = versioneer.get_cmdclass() +# Check for WebAssembly/Emscripten build environment +def is_platform_emscripten(): + """Check if we're building for WebAssembly/Emscripten (Pyodide).""" + return ( + sys.platform == "emscripten" + or os.environ.get("EMSCRIPTEN", "").lower() in ("1", "true", "yes") + or os.environ.get("PYODIDE", "").lower() in ("1", "true", "yes") + ) + + def is_platform_windows(): return sys.platform in ("win32", "cygwin") @@ -438,6 +448,18 @@ def srcpath(name=None, suffix=".pyx", subdir="src"): "_libs.algos": { "pyxfile": "_libs/algos", "depends": _pxi_dep["algos"], + "extra_compile_args": extra_compile_args + + ( + (["-fopenmp"] if not is_platform_windows() else ["/openmp"]) + if not is_platform_emscripten() + else [] + ), + "extra_link_args": extra_link_args + + ( + (["-fopenmp"] if not is_platform_windows() else []) + if not is_platform_emscripten() + else [] + ), }, "_libs.arrays": {"pyxfile": "_libs/arrays"}, "_libs.groupby": {"pyxfile": "_libs/groupby"},