Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: initial refactoring of incremental spmd algos #2248

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
50 changes: 3 additions & 47 deletions onedal/spmd/basic_statistics/incremental_basic_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,14 @@
# limitations under the License.
# ==============================================================================

from daal4py.sklearn._utils import get_dtype

from ...basic_statistics import (
IncrementalBasicStatistics as base_IncrementalBasicStatistics,
)
from ...datatypes import to_table
from ..._device_offload import support_input_format
from .._base import BaseEstimatorSPMD


class IncrementalBasicStatistics(BaseEstimatorSPMD, base_IncrementalBasicStatistics):
def _reset(self):
self._need_to_finalize = False
self._partial_result = super(base_IncrementalBasicStatistics, self)._get_backend(
"basic_statistics", None, "partial_compute_result"
)

@support_input_format()
def partial_fit(self, X, weights=None, queue=None):
"""
Computes partial data for basic statistics
from data batch X and saves it to `_partial_result`.

Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data batch, where `n_samples` is the number of samples
in the batch, and `n_features` is the number of features.

queue : dpctl.SyclQueue
If not None, use this queue for computations.

Returns
-------
self : object
Returns the instance itself.
"""
self._queue = queue
policy = super(base_IncrementalBasicStatistics, self)._get_policy(queue, X)
X_table, weights_table = to_table(X, weights, queue=queue)

if not hasattr(self, "_onedal_params"):
self._onedal_params = self._get_onedal_params(False, dtype=X_table.dtype)

self._partial_result = super(base_IncrementalBasicStatistics, self)._get_backend(
"basic_statistics",
None,
"partial_compute",
policy,
self._onedal_params,
self._partial_result,
X_table,
weights_table,
)

self._need_to_finalize = True
return self
return super().partial_fit(X, weights=weights, queue=queue)
60 changes: 3 additions & 57 deletions onedal/spmd/covariance/incremental_covariance.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,70 +14,16 @@
# limitations under the License.
# ==============================================================================

import numpy as np

from daal4py.sklearn._utils import get_dtype

from ...covariance import (
IncrementalEmpiricalCovariance as base_IncrementalEmpiricalCovariance,
)
from ...datatypes import to_table
from ...utils import _check_array
from ..._device_offload import support_input_format
from .._base import BaseEstimatorSPMD


class IncrementalEmpiricalCovariance(
BaseEstimatorSPMD, base_IncrementalEmpiricalCovariance
):
def _reset(self):
self._need_to_finalize = False
self._partial_result = super(
base_IncrementalEmpiricalCovariance, self
)._get_backend("covariance", None, "partial_compute_result")

@support_input_format()
def partial_fit(self, X, y=None, queue=None):
"""
Computes partial data for the covariance matrix
from data batch X and saves it to `_partial_result`.

Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data batch, where `n_samples` is the number of samples
in the batch, and `n_features` is the number of features.

y : Ignored
Not used, present for API consistency by convention.

queue : dpctl.SyclQueue
If not None, use this queue for computations.

Returns
-------
self : object
Returns the instance itself.
"""
X = _check_array(X, dtype=[np.float64, np.float32], ensure_2d=True)

self._queue = queue

policy = super(base_IncrementalEmpiricalCovariance, self)._get_policy(queue, X)

X_table = to_table(X, queue=queue)

if not hasattr(self, "_dtype"):
self._dtype = X_table.dtype

params = self._get_onedal_params(self._dtype)
self._partial_result = super(
base_IncrementalEmpiricalCovariance, self
)._get_backend(
"covariance",
None,
"partial_compute",
policy,
params,
self._partial_result,
X_table,
)
self._need_to_finalize = True
return super().partial_fit(X, queue=queue)
105 changes: 3 additions & 102 deletions onedal/spmd/decomposition/incremental_pca.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,111 +14,12 @@
# limitations under the License.
# ==============================================================================

from daal4py.sklearn._utils import get_dtype

from ...datatypes import from_table, to_table
from ...decomposition import IncrementalPCA as base_IncrementalPCA
from ...utils import _check_array
from ..._device_offload import support_input_format
from .._base import BaseEstimatorSPMD


class IncrementalPCA(BaseEstimatorSPMD, base_IncrementalPCA):
"""
Distributed incremental estimator for PCA based on oneDAL implementation.
Allows for distributed PCA computation if data is split into batches.

API is the same as for `onedal.decomposition.IncrementalPCA`
"""

def _reset(self):
self._need_to_finalize = False
self._partial_result = super(base_IncrementalPCA, self)._get_backend(
"decomposition", "dim_reduction", "partial_train_result"
)
if hasattr(self, "components_"):
del self.components_

@support_input_format()
def partial_fit(self, X, y=None, queue=None):
"""Incremental fit with X. All of X is processed as a single batch.

Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data, where `n_samples` is the number of samples and
`n_features` is the number of features.

y : Ignored
Not used, present for API consistency by convention.

Returns
-------
self : object
Returns the instance itself.
"""
X = _check_array(X)
n_samples, n_features = X.shape

first_pass = not hasattr(self, "components_")
if first_pass:
self.components_ = None
self.n_samples_seen_ = n_samples
self.n_features_in_ = n_features
else:
self.n_samples_seen_ += n_samples

if self.n_components is None:
if self.components_ is None:
self.n_components_ = min(n_samples, n_features)
else:
self.n_components_ = self.components_.shape[0]
else:
self.n_components_ = self.n_components

self._queue = queue

policy = super(base_IncrementalPCA, self)._get_policy(queue, X)
X_table = to_table(X, queue=queue)

if not hasattr(self, "_dtype"):
self._dtype = X_table.dtype
self._params = self._get_onedal_params(X_table)

self._partial_result = super(base_IncrementalPCA, self)._get_backend(
"decomposition",
"dim_reduction",
"partial_train",
policy,
self._params,
self._partial_result,
X_table,
)
self._need_to_finalize = True
return self

def _create_model(self):
m = super(base_IncrementalPCA, self)._get_backend(
"decomposition", "dim_reduction", "model"
)
m.eigenvectors = to_table(self.components_)
m.means = to_table(self.mean_)
if self.whiten:
m.eigenvalues = to_table(self.explained_variance_)
self._onedal_model = m
return m

def predict(self, X, queue=None):
policy = super(base_IncrementalPCA, self)._get_policy(queue, X)
model = self._create_model()
X = to_table(X, queue=queue)
params = self._get_onedal_params(X, stage="predict")

result = super(base_IncrementalPCA, self)._get_backend(
"decomposition",
"dim_reduction",
"infer",
policy,
params,
model,
X,
)
return from_table(result.transformed_data)
return super().partial_fit(X, queue=queue)
76 changes: 3 additions & 73 deletions onedal/spmd/linear_model/incremental_linear_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,84 +14,14 @@
# limitations under the License.
# ==============================================================================

import numpy as np

from daal4py.sklearn._utils import get_dtype

from ...common.hyperparameters import get_hyperparameters
from ...datatypes import to_table
from ...linear_model import (
IncrementalLinearRegression as base_IncrementalLinearRegression,
)
from ...utils import _check_X_y, _num_features
from ..._device_offload import support_input_format
from .._base import BaseEstimatorSPMD


class IncrementalLinearRegression(BaseEstimatorSPMD, base_IncrementalLinearRegression):
"""
Distributed incremental Linear Regression oneDAL implementation.

API is the same as for `onedal.linear_model.IncrementalLinearRegression`.
"""

def _reset(self):
self._partial_result = super(base_IncrementalLinearRegression, self)._get_backend(
"linear_model", "regression", "partial_train_result"
)

@support_input_format()
ethanglaser marked this conversation as resolved.
Show resolved Hide resolved
def partial_fit(self, X, y, queue=None):
"""
Computes partial data for linear regression
from data batch X and saves it to `_partial_result`.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data batch, where `n_samples` is the number of samples
in the batch, and `n_features` is the number of features.

y: array-like of shape (n_samples,) or (n_samples, n_targets) in
case of multiple targets
Responses for training data.

queue : dpctl.SyclQueue
If not None, use this queue for computations.
Returns
-------
self : object
Returns the instance itself.
"""
module = super(base_IncrementalLinearRegression, self)._get_backend(
"linear_model", "regression"
)

self._queue = queue
policy = super(base_IncrementalLinearRegression, self)._get_policy(queue, X)
icfaust marked this conversation as resolved.
Show resolved Hide resolved

X, y = _check_X_y(
X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False
)

X_table, y_table = to_table(X, y, queue=queue)

if not hasattr(self, "_dtype"):
self._dtype = X_table.dtype
self._params = self._get_onedal_params(self._dtype)

y = np.asarray(y, dtype=self._dtype)

self.n_features_in_ = _num_features(X, fallback_1d=True)

hparams = get_hyperparameters("linear_regression", "train")
if hparams is not None and not hparams.is_default:
self._partial_result = module.partial_train(
policy,
self._params,
hparams.backend,
self._partial_result,
X_table,
y_table,
)
else:
self._partial_result = module.partial_train(
policy, self._params, self._partial_result, X_table, y_table
)
return super().partial_fit(X, y, queue=queue)
Loading