|
6 | 6 |
|
7 | 7 | import pandas as pd
|
8 | 8 | import pyspark
|
9 |
| -from obs import ObsClient |
10 |
| -from pyspark.ml import Pipeline, PipelineModel |
11 |
| -from pyspark.ml.evaluation import RegressionEvaluator |
12 |
| -from pyspark.ml.classification import * |
13 |
| -from pyspark.ml.feature import * |
| 9 | +from pyspark.ml import Pipeline |
| 10 | +from pyspark.ml.evaluation import MulticlassClassificationEvaluator |
| 11 | +from pyspark.ml.classification import LogisticRegression |
| 12 | +from pyspark.ml.feature import VectorAssembler, StringIndexer, IndexToString |
14 | 13 |
|
15 |
| -import moxing.framework.cloud_utils as cloud_utils |
| 14 | +import moxing as mox |
16 | 15 |
|
17 |
| -# define local temporary file names and paths |
18 |
| -TRAIN_DATASET = 'iris.csv' |
19 |
| -MODEL_NAME = 'spark_model' |
20 |
| -CONFIG_NAME = 'config.json' |
21 |
| -METRIC_NAME = 'metric.json' |
22 |
| -LOCAL_MODEL_PATH = '/tmp/' |
23 |
| -LOCAL_CONFIG_PATH = '/tmp/config.json' |
24 |
| -LOCAL_METRIC_PATH = '/tmp/metric.json' |
25 |
| -LOCAL_DATA = '/tmp/iris.csv' |
26 |
| - |
27 |
| -# get the OBS configuration from system environment variables |
28 |
| -sec_info = cloud_utils.get_auth() |
29 |
| -AK = os.getenv('MINER_USER_ACCESS_KEY') |
30 |
| -if AK is None: |
31 |
| - AK = sec_info.AK |
32 |
| - |
33 |
| -SK = os.getenv('MINER_USER_SECRET_ACCESS_KEY') |
34 |
| -if SK is None: |
35 |
| - SK = sec_info.SK |
36 |
| - |
37 |
| -obs_endpoint = os.getenv('MINER_OBS_URL') |
38 |
| -if obs_endpoint is None: |
39 |
| - obs_endpoint = os.getenv('S3_ENDPOINT') |
40 |
| -print("obs_endpoint: " + str(obs_endpoint)) |
41 | 16 |
|
42 | 17 | obs_path = os.getenv('TRAIN_URL')
|
43 | 18 | if obs_path is None:
|
|
49 | 24 | data_path = ''
|
50 | 25 | print("data_path: " + str(data_path))
|
51 | 26 |
|
| 27 | +# define local temporary file names and paths |
| 28 | +TRAIN_DATASET = 'iris.csv' |
| 29 | +MODEL_NAME = 'spark_model' |
| 30 | +CONFIG_NAME = 'config.json' |
| 31 | +METRIC_NAME = 'metric.json' |
| 32 | +LOCAL_MODEL_PATH = '/tmp/spark_model' |
| 33 | +LOCAL_CONFIG_PATH = '/tmp/config.json' |
| 34 | +LOCAL_METRIC_PATH = '/tmp/metric.json' |
| 35 | +LOCAL_DATA_PATH = '/tmp/iris.csv' |
| 36 | + |
52 | 37 | # start local Spark
|
53 | 38 | spark = pyspark.sql.SparkSession.builder.config("spark.driver.host", "localhost").master("local[*]").appName(
|
54 | 39 | "flower_classification").getOrCreate()
|
55 | 40 | metric_dict = {}
|
56 | 41 |
|
57 | 42 |
|
| 43 | +def print_title(title=""): |
| 44 | + print("=" * 15 + " %s " % title + "=" * 15) |
| 45 | + |
| 46 | + |
58 | 47 | # download file from OBS
|
59 | 48 | def download_dataset():
|
60 | 49 | print("Start to download dataset from OBS")
|
61 | 50 |
|
62 |
| - obs_client = ObsClient(AK, SK, is_secure=True, server=obs_endpoint) |
63 |
| - |
64 | 51 | try:
|
65 |
| - bucket_name = data_path.split("/", 1)[0] |
66 |
| - train_file = data_path.split("/", 1)[1] + "/iris.csv" |
67 |
| - resp = obs_client.getObject(bucket_name, train_file, downloadPath=LOCAL_DATA) |
68 |
| - if resp.status < 300: |
69 |
| - print('Succeeded to download training dataset') |
70 |
| - else: |
71 |
| - print('Failed to download ') |
72 |
| - raise Exception('Failed to download training dataset from OBS !') |
73 |
| - |
74 |
| - finally: |
75 |
| - obs_client.close() |
| 52 | + train_file = os.path.join("obs://", data_path, "iris.csv") |
| 53 | + mox.file.copy(train_file, LOCAL_DATA_PATH) |
| 54 | + print('Succeeded to download training dataset') |
| 55 | + except Exception: |
| 56 | + print('Failed to download training dataset from OBS !') |
| 57 | + raise Exception('Failed to download training dataset from OBS !') |
76 | 58 |
|
77 | 59 |
|
78 | 60 | # upload file to OBS
|
79 | 61 | def upload_to_obs():
|
80 |
| - obs_client = ObsClient(AK, SK, is_secure=True, server=obs_endpoint) |
81 |
| - |
82 |
| - bucket_name = obs_path.split("/", 1)[0] |
83 |
| - work_metric = obs_path.split("/", 1)[1] + '/' |
84 |
| - model_dir = obs_path.split("/", 1)[1] + '/model/' |
85 |
| - model_file = model_dir + MODEL_NAME |
86 |
| - config_file = model_dir + CONFIG_NAME |
87 |
| - metric_file = work_metric + METRIC_NAME |
88 |
| - |
89 |
| - # upload model to OBS |
90 |
| - print_title("upload model to obs !") |
91 |
| - obs_client.putFile(bucket_name, model_file, file_path=LOCAL_MODEL_PATH + MODEL_NAME) |
| 62 | + try: |
| 63 | + # upload model to OBS |
| 64 | + print_title("upload model to obs !") |
| 65 | + mox.file.copy_parallel(LOCAL_MODEL_PATH, os.path.join("obs://", obs_path, "model", MODEL_NAME)) |
92 | 66 |
|
93 |
| - # upload config file to OBS |
94 |
| - print_title("upload config to obs !") |
95 |
| - obs_client.putFile(bucket_name, config_file, file_path=LOCAL_CONFIG_PATH) |
| 67 | + # upload config file to OBS |
| 68 | + print_title("upload config to obs !") |
| 69 | + mox.file.copy(LOCAL_CONFIG_PATH, os.path.join("obs://", obs_path, "model", CONFIG_NAME)) |
96 | 70 |
|
97 |
| - # upload metric file to OBS |
98 |
| - print_title("upload metric to obs !") |
99 |
| - obs_client.putFile(bucket_name, metric_file, file_path=LOCAL_METRIC_PATH) |
| 71 | + # upload metric file to OBS |
| 72 | + print_title("upload metric to obs !") |
| 73 | + mox.file.copy(LOCAL_METRIC_PATH, os.path.join("obs://", obs_path, "model", METRIC_NAME)) |
| 74 | + except Exception: |
| 75 | + print('Failed to upload training output to OBS !') |
| 76 | + raise Exception('Failed to upload training output to OBS !') |
100 | 77 |
|
101 | 78 | return 0
|
102 | 79 |
|
103 | 80 |
|
104 |
| -def print_title(title=""): |
105 |
| - print("=" * 15 + " %s " % title + "=" * 15) |
106 |
| - |
107 |
| - |
108 | 81 | # calculate the metric value
|
109 |
| -def calculate_metric_value(regression_model, regression_df): |
110 |
| - """ |
111 |
| - because modelarts console UI only support displaying value: f1, recall, precision, and accuracy, |
112 |
| - this step maps `mae` to `recall`, `mse` maps to `precision`, and `rmse` maps to `accuracy`. |
113 |
| - :return: |
114 |
| - """ |
115 |
| - # split a portion of the data for testing |
116 |
| - dataset = regression_model.transform(regression_df) |
| 82 | +def calculate_metric_value(multiclass_classification_model, df): |
| 83 | + dataset = multiclass_classification_model.transform(df) |
117 | 84 |
|
118 | 85 | # calculate metric
|
119 |
| - evaluator = RegressionEvaluator(predictionCol="prediction") |
120 |
| - # modelarts only supports multi-classification metric currently, borrow recall/precision/accuracy to show regression metric |
121 |
| - metric_dict["f1"] = 0 |
122 |
| - metric_dict["recall"] = (evaluator.evaluate(dataset, {evaluator.metricName: "mae"})) |
123 |
| - metric_dict["precision"] = (evaluator.evaluate(dataset, {evaluator.metricName: "mse"})) |
124 |
| - metric_dict["accuracy"] = (evaluator.evaluate(dataset, {evaluator.metricName: "rmse"})) |
| 86 | + # evaluator = RegressionEvaluator(predictionCol="prediction") |
| 87 | + evaluator = MulticlassClassificationEvaluator(predictionCol="prediction") |
| 88 | + metric_dict["f1"] = (evaluator.evaluate(dataset, {evaluator.metricName: "f1"})) |
| 89 | + metric_dict["recall"] = (evaluator.evaluate(dataset, {evaluator.metricName: "weightedPrecision"})) |
| 90 | + metric_dict["precision"] = (evaluator.evaluate(dataset, {evaluator.metricName: "weightedRecall"})) |
| 91 | + metric_dict["accuracy"] = (evaluator.evaluate(dataset, {evaluator.metricName: "accuracy"})) |
125 | 92 |
|
126 | 93 |
|
127 | 94 | def create_config():
|
@@ -193,22 +160,21 @@ def train_model():
|
193 | 160 | df.printSchema()
|
194 | 161 |
|
195 | 162 | # convert features
|
196 |
| - assembler = pyspark.ml.feature.VectorAssembler(inputCols=['sepal-length', 'petal-width', 'petal-length', 'sepal-width'], outputCol='features') |
| 163 | + assembler = VectorAssembler(inputCols=['sepal-length', 'petal-width', 'petal-length', 'sepal-width'], outputCol='features') |
197 | 164 |
|
198 | 165 | # convert text labels into indices
|
199 |
| - label_indexer = pyspark.ml.feature.StringIndexer(inputCol='class', outputCol='label').fit(df) |
| 166 | + label_indexer = StringIndexer(inputCol='class', outputCol='label').fit(df) |
200 | 167 |
|
201 | 168 | # train
|
202 |
| - lr = pyspark.ml.classification.LogisticRegression(regParam=0.01) |
203 |
| - label_converter = pyspark.ml.feature.IndexToString(inputCol='prediction', outputCol='predictionClass', labels=label_indexer.labels) |
| 169 | + lr = LogisticRegression(regParam=0.01) |
| 170 | + label_converter = IndexToString(inputCol='prediction', outputCol='predictionClass', labels=label_indexer.labels) |
204 | 171 | pipeline = Pipeline(stages=[assembler, label_indexer, lr, label_converter])
|
205 | 172 |
|
206 | 173 | # fit the pipeline to training documents.
|
207 | 174 | model = pipeline.fit(df)
|
208 |
| - model_local_path = os.path.join(LOCAL_MODEL_PATH, MODEL_NAME) |
209 | 175 |
|
210 | 176 | # save model
|
211 |
| - model.save(model_local_path) |
| 177 | + model.save(LOCAL_MODEL_PATH) |
212 | 178 | calculate_metric_value(model, df)
|
213 | 179 |
|
214 | 180 |
|
|
0 commit comments