Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,13 @@ async def _fit(
y_train = sorted(futures_of(y_train), key=lambda f: f.key)
assert len(X_train) == len(y_train)

train_eg = await client.gather(client.map(len, y_train))
msg = "[CV%s] For training there are between %d and %d examples in each chunk"
logger.info(msg, prefix, min(train_eg), max(train_eg))
train_eg = await client.gather(client.map(len, X_train))

# Order by which we process training data futures
order = []
min_samples = min(train_eg) if len(train_eg) else len(X_train)
max_samples = max(train_eg) if len(train_eg) else len(X_train)

msg = "[CV%s] For training there are between %d and %d examples in each chunk"
logger.info(msg, prefix, min_samples, max_samples)

def get_futures(partial_fit_calls):
"""Policy to get training data futures
Expand All @@ -218,6 +219,9 @@ def get_futures(partial_fit_calls):
This function handles that policy internally, and also controls random
access to training data.
"""
if dask.is_dask_collection(X_train):
return X_train, y_train

# Shuffle blocks going forward to get uniform-but-random access
while partial_fit_calls >= len(order):
L = list(range(len(X_train)))
Expand All @@ -226,6 +230,9 @@ def get_futures(partial_fit_calls):
j = order[partial_fit_calls]
return X_train[j], y_train[j]

# Order by which we process training data futures
order = []

# Submit initial partial_fit and score computations on first batch of data
X_future, y_future = get_futures(0)
X_future_2, y_future_2 = get_futures(1)
Expand Down Expand Up @@ -566,7 +573,8 @@ def _get_train_test_split(self, X, y, **kwargs):
X, y : dask.array.Array
"""
if self.test_size is None:
test_size = min(0.2, 1 / X.npartitions)
npartitions = getattr(X, 'npartitions', 1)
test_size = min(0.2, 1 / npartitions)
else:
test_size = self.test_size
X_train, X_test, y_train, y_test = train_test_split(
Expand Down
22 changes: 22 additions & 0 deletions tests/model_selection/test_hyperband_non_daskarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import numpy as np
import pandas as pd

from dask_ml.model_selection import HyperbandSearchCV
from dask_ml.datasets import make_classification
from distributed.utils_test import gen_cluster
from sklearn.linear_model import SGDClassifier


@gen_cluster(client=True)
def test_pandas(c, s, a, b):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this test go in test_hyperband.py?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes let me move it there

X, y = make_classification(chunks=100)
X, y = pd.DataFrame(X.compute()), pd.Series(y.compute())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the test is failing in the .compute function. Why not use from sklearn.datasets import make_classification?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange, it was working on my local machine.
Let me try to change it. Maybe it's something to do with how I wrote the test function. I just tried to emulate the structure I found in test_hyperband.py


est = SGDClassifier(tol=1e-3)
param_dist = {'alpha': np.logspace(-4, 0, num=1000),
'loss': ['hinge', 'log', 'modified_huber', 'squared_hinge'],
'average': [True, False]}

search = HyperbandSearchCV(est, param_dist)
search.fit(X, y, classes=y.unique())
assert search.best_params_