diff --git a/README.md b/README.md index c9cf4b472..228a8f9dd 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,20 @@ Apache Wayang provides a flexible architecture which enables easy addition of ne For a quick guide on how to run WordCount see [here](guides/tutorial.md). +### Spark Dataset / DataFrame pipelines + +Wayang’s Spark platform can now execute end-to-end pipelines on Spark `Dataset[Row]` (aka DataFrames). This is particularly useful when working with lakehouse-style storage (Parquet/Delta) or when you want to plug Spark ML stages into a Wayang plan without repeatedly falling back to RDDs. + +To build a Dataset-backed pipeline: + +1. **Use the Dataset-aware plan builder APIs.** + - `PlanBuilder.readParquet(..., preferDataset = true)` (or `JavaPlanBuilder.readParquet(..., ..., true)`) reads Parquet files directly into a Dataset channel. + - `DataQuanta.writeParquet(..., preferDataset = true)` writes a Dataset channel without converting it back to an RDD. +2. **Keep operators dataset-compatible.** Most operators continue to work unchanged; if an operator explicitly prefers RDDs, Wayang will insert the necessary conversions automatically (at an additional cost). Custom operators can expose `DatasetChannel` descriptors to stay in the dataframe world. +3. **Let the optimizer do the rest.** The optimizer now assigns a higher cost to Dataset↔RDD conversions, so once you opt into Dataset sources/sinks the plan will stay in Dataset form by default. + +No extra flags are required—just opt into the Dataset-based APIs where you want dataframe semantics. If you see unexpected conversions in your execution plan, check that the upstream/downstream operators you use can consume `DatasetChannel`s; otherwise Wayang will insert a conversion operator for you. + ## Quick Guide for Developing with Wayang For a quick guide on how to use Wayang in your Java/Scala project see [here](guides/develop-with-Wayang.md). diff --git a/guides/spark-datasets.md b/guides/spark-datasets.md new file mode 100644 index 000000000..5efcaa6ce --- /dev/null +++ b/guides/spark-datasets.md @@ -0,0 +1,61 @@ + + +--- +title: Spark Dataset pipelines +description: How to build Wayang jobs that stay on Spark Datasets/DataFrames from source to sink. +--- + +Wayang’s Spark backend can now run entire pipelines on Spark `Dataset[Row]` (a.k.a. DataFrames). Use this mode when you ingest from lakehouse formats (Parquet/Delta), interoperate with Spark ML stages, or simply prefer schema-aware processing. This guide explains how to opt in. + +## When to use Dataset channels + +- **Lakehouse storage:** Reading Parquet/Delta directly into datasets avoids repeated schema inference and keeps Spark’s optimized Parquet reader in play. +- **Spark ML:** Our ML operators already convert RDDs into DataFrames internally. Feeding them a dataset channel skips that conversion and preserves column names. +- **Federated pipelines:** You can mix dataset-backed stages on Spark with other platforms; Wayang will insert conversions only when strictly necessary. + +## Enable Dataset sources and sinks + +1. **Plan builder APIs:** + - `PlanBuilder.readParquet(..., preferDataset = true)` (Scala) or `JavaPlanBuilder.readParquet(..., ..., true)` loads Parquet files into a `DatasetChannel` instead of an `RddChannel`. + - `DataQuanta.writeParquet(..., preferDataset = true)` writes a dataset back to Parquet without converting to RDD first. +2. **Prefer dataset-friendly operators:** Most unary/binary operators accept either channel type, but custom operators can advertise dataset descriptors explicitly. See `DatasetChannel` in `wayang-platforms/wayang-spark` for details. +3. **Let the optimizer keep it:** The optimizer now assigns costs to Dataset↔RDD conversions, so once your plan starts with a dataset channel it will stay in dataset form unless an operator demands an RDD. + +## Mixing with RDD operators + +If a stage only supports RDDs, Wayang inserts conversion operators automatically: + +- `SparkRddToDatasetOperator` converts an RDD of `org.apache.wayang.basic.data.Record` into a Spark `Dataset[Row]` (using sampled schema inference or `RecordType`). +- `SparkDatasetToRddOperator` turns a `Dataset[Row]` back into a JavaRDD.`Record`. + +Both conversions carry non-trivial load profiles. You’ll see them in plan explanations if you mix dataset- and RDD-only operators. + +## Developer checklist + +- **Use `RecordType` when possible.** Providing field names in your logical operators helps the converter derive a precise schema. +- **Re-use `sparkExecutor.ss`.** When writing custom Spark operators that build DataFrames, use the provided `SparkExecutor` instead of `SparkSession.builder()` to avoid extra contexts. +- **Watch plan explanations.** Run `PlanBuilder.buildAndExplain(true)` to verify whether conversions are inserted. If they are, consider adding dataset descriptors to your operators. + +## Current limitations + +- Only Parquet sources/sinks expose dataset-specific APIs today. Text/Object sources still produce RDD channels. +- ML4All pipelines currently emit plain `double[]`/`Double` RDDs. They still benefit from the internal DataFrame conversions but do not expose dataset channels yet. + +Contributions to widen dataset support (e.g., dataset-aware `map`/`filter` or ML4All stages) are welcome. diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala index 99f6f9cc7..a5ef8bc42 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala @@ -36,7 +36,7 @@ import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator import org.apache.wayang.core.plan.wayangplan._ import org.apache.wayang.core.platform.Platform import org.apache.wayang.core.util.{Tuple => WayangTuple} -import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2} +import org.apache.wayang.basic.data.{Record, Tuple2 => WayangTuple2} import org.apache.wayang.basic.model.{DLModel, LogisticRegressionModel,DecisionTreeRegressionModel}; import org.apache.wayang.commons.util.profiledb.model.Experiment import com.google.protobuf.ByteString; @@ -1027,6 +1027,11 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I writeTextFileJava(url, toSerializableFunction(formatterUdf), udfLoad) } + def writeParquet(url: String, + overwrite: Boolean = false, + preferDataset: Boolean = false)(implicit ev: Out =:= Record): Unit = + writeParquetJava(url, overwrite, preferDataset) + /** * Write the data quanta in this instance to a text file. Triggers execution. * @@ -1090,6 +1095,16 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I this.planBuilder.sinks.clear() } + private def writeParquetJava(url: String, overwrite: Boolean, preferDataset: Boolean)(implicit ev: Out =:= Record): Unit = { + val _ = ev + val sink = new ParquetSink(url, overwrite, preferDataset) + sink.setName(s"Write parquet $url") + this.connectTo(sink, 0) + this.planBuilder.sinks += sink + this.planBuilder.buildAndExecute() + this.planBuilder.sinks.clear() + } + /** * Write the data quanta in this instance to a Object file. Triggers execution. * diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala index d1f9a118a..e1508c2aa 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala @@ -69,8 +69,22 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) { * @param projection the projection, if any * @return [[DataQuantaBuilder]] for the file */ - def readParquet(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] = - createSourceBuilder(ParquetSource.create(url, projection))(ClassTag(classOf[Record])) + def readParquet(url: String, + projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] = + readParquet(url, projection, preferDataset = false) + + /** + * Read a parquet file and optionally keep it backed by Spark Datasets. + * + * @param url the URL of the Parquet file + * @param projection the projection, if any + * @param preferDataset when {@code true}, emit a Dataset-backed channel + * @return [[DataQuantaBuilder]] for the file + */ + def readParquet(url: String, + projection: Array[String], + preferDataset: Boolean): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] = + createSourceBuilder(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))(ClassTag(classOf[Record])) /** * Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line. diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala index 648755492..5e3635e68 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala @@ -136,9 +136,13 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job * * @param url the URL of the Parquet file * @param projection the projection, if any + * @param preferDataset when {@code true}, keep the resulting channel backed by Spark Datasets * @return [[DataQuanta]] of [[Record]]s representing the file */ - def readParquet(url: String, projection: Array[String] = null): DataQuanta[Record] = load(ParquetSource.create(url, projection)) + def readParquet(url: String, + projection: Array[String] = null, + preferDataset: Boolean = false): DataQuanta[Record] = + load(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset)) /** * Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line. diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ParquetSink.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ParquetSink.java new file mode 100644 index 000000000..ae283991f --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ParquetSink.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.plan.wayangplan.UnarySink; +import org.apache.wayang.core.types.DataSetType; + +/** + * Logical operator that writes {@link Record}s into a Parquet file. + */ +public class ParquetSink extends UnarySink { + + private final String outputUrl; + + private final boolean isOverwrite; + + private final boolean preferDataset; + + public ParquetSink(String outputUrl, boolean isOverwrite, boolean preferDataset, DataSetType type) { + super(type); + this.outputUrl = outputUrl; + this.isOverwrite = isOverwrite; + this.preferDataset = preferDataset; + } + + public ParquetSink(String outputUrl, boolean isOverwrite, boolean preferDataset) { + this(outputUrl, isOverwrite, preferDataset, DataSetType.createDefault(Record.class)); + } + + public String getOutputUrl() { + return this.outputUrl; + } + + public boolean isOverwrite() { + return this.isOverwrite; + } + + public boolean prefersDataset() { + return this.preferDataset; + } +} diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ParquetSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ParquetSource.java index 943bdbc5f..c5df31937 100644 --- a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ParquetSource.java +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ParquetSource.java @@ -57,6 +57,8 @@ public class ParquetSource extends UnarySource { private MessageType schema; + private boolean preferDatasetOutput = false; + /** * Creates a new instance. * @@ -124,6 +126,16 @@ public ParquetSource(ParquetSource that) { this.projection = that.getProjection(); this.metadata = that.getMetadata(); this.schema = that.getSchema(); + this.preferDatasetOutput = that.preferDatasetOutput; + } + + public ParquetSource preferDatasetOutput(boolean preferDataset) { + this.preferDatasetOutput = preferDataset; + return this; + } + + public boolean isDatasetOutputPreferred() { + return this.preferDatasetOutput; } @Override diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/channels/ChannelConversions.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/channels/ChannelConversions.java index 9f8fca121..87ece4576 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/channels/ChannelConversions.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/channels/ChannelConversions.java @@ -19,9 +19,12 @@ package org.apache.wayang.spark.channels; import org.apache.wayang.basic.channels.FileChannel; +import org.apache.wayang.basic.data.Record; import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.optimizer.channels.ChannelConversion; import org.apache.wayang.core.optimizer.channels.DefaultChannelConversion; +import org.apache.wayang.core.plan.executionplan.Channel; import org.apache.wayang.core.types.DataSetType; import org.apache.wayang.java.channels.CollectionChannel; import org.apache.wayang.java.platform.JavaPlatform; @@ -29,8 +32,10 @@ import org.apache.wayang.spark.operators.SparkCacheOperator; import org.apache.wayang.spark.operators.SparkCollectOperator; import org.apache.wayang.spark.operators.SparkCollectionSource; +import org.apache.wayang.spark.operators.SparkDatasetToRddOperator; import org.apache.wayang.spark.operators.SparkObjectFileSink; import org.apache.wayang.spark.operators.SparkObjectFileSource; +import org.apache.wayang.spark.operators.SparkRddToDatasetOperator; import org.apache.wayang.spark.operators.SparkTsvFileSink; import org.apache.wayang.spark.operators.SparkTsvFileSource; @@ -108,6 +113,32 @@ public class ChannelConversions { () -> new SparkObjectFileSource<>(DataSetType.createDefault(Void.class)) ); + public static final ChannelConversion DATASET_TO_UNCACHED_RDD = new DefaultChannelConversion( + DatasetChannel.UNCACHED_DESCRIPTOR, + RddChannel.UNCACHED_DESCRIPTOR, + () -> new SparkDatasetToRddOperator() + ); + + public static final ChannelConversion CACHED_DATASET_TO_UNCACHED_RDD = new DefaultChannelConversion( + DatasetChannel.CACHED_DESCRIPTOR, + RddChannel.UNCACHED_DESCRIPTOR, + () -> new SparkDatasetToRddOperator() + ); + + public static final ChannelConversion UNCACHED_RDD_TO_UNCACHED_DATASET = new DefaultChannelConversion( + RddChannel.UNCACHED_DESCRIPTOR, + DatasetChannel.UNCACHED_DESCRIPTOR, + ChannelConversions::createRddToDatasetOperator, + "via SparkRddToDatasetOperator" + ); + + public static final ChannelConversion CACHED_RDD_TO_UNCACHED_DATASET = new DefaultChannelConversion( + RddChannel.CACHED_DESCRIPTOR, + DatasetChannel.UNCACHED_DESCRIPTOR, + ChannelConversions::createRddToDatasetOperator, + "via SparkRddToDatasetOperator" + ); + public static Collection ALL = Arrays.asList( UNCACHED_RDD_TO_CACHED_RDD, COLLECTION_TO_BROADCAST, @@ -119,6 +150,24 @@ public class ChannelConversions { HDFS_OBJECT_FILE_TO_UNCACHED_RDD, // HDFS_TSV_TO_UNCACHED_RDD, CACHED_RDD_TO_HDFS_TSV, - UNCACHED_RDD_TO_HDFS_TSV + UNCACHED_RDD_TO_HDFS_TSV, + DATASET_TO_UNCACHED_RDD, + CACHED_DATASET_TO_UNCACHED_RDD, + UNCACHED_RDD_TO_UNCACHED_DATASET, + CACHED_RDD_TO_UNCACHED_DATASET ); + + private static SparkRddToDatasetOperator createRddToDatasetOperator(Channel sourceChannel, + Configuration configuration) { + DataSetType type = DataSetType.createDefault(Record.class); + if (sourceChannel != null) { + DataSetType sourceType = sourceChannel.getDataSetType(); + if (Record.class.isAssignableFrom(sourceType.getDataUnitType().getTypeClass())) { + @SuppressWarnings("unchecked") + DataSetType casted = (DataSetType) sourceType; + type = casted; + } + } + return new SparkRddToDatasetOperator(type); + } } diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/channels/DatasetChannel.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/channels/DatasetChannel.java new file mode 100644 index 000000000..9d3057fae --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/channels/DatasetChannel.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.channels; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.executionplan.Channel; +import org.apache.wayang.core.plan.wayangplan.OutputSlot; +import org.apache.wayang.core.platform.AbstractChannelInstance; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.Executor; +import org.apache.wayang.core.util.Actions; +import org.apache.wayang.spark.execution.SparkExecutor; + +/** + * {@link Channel} implementation that transports Spark {@link Dataset}s (i.e., DataFrames). + */ +public class DatasetChannel extends Channel { + + public static final ChannelDescriptor UNCACHED_DESCRIPTOR = new ChannelDescriptor( + DatasetChannel.class, false, false + ); + + public static final ChannelDescriptor CACHED_DESCRIPTOR = new ChannelDescriptor( + DatasetChannel.class, true, true + ); + + public DatasetChannel(ChannelDescriptor descriptor, OutputSlot outputSlot) { + super(descriptor, outputSlot); + assert descriptor == UNCACHED_DESCRIPTOR || descriptor == CACHED_DESCRIPTOR; + } + + private DatasetChannel(DatasetChannel parent) { + super(parent); + } + + @Override + public DatasetChannel copy() { + return new DatasetChannel(this); + } + + @Override + public Instance createInstance(Executor executor, + OptimizationContext.OperatorContext producerOperatorContext, + int producerOutputIndex) { + return new Instance((SparkExecutor) executor, producerOperatorContext, producerOutputIndex); + } + + /** + * {@link ChannelInstance} for {@link DatasetChannel}s. + */ + public class Instance extends AbstractChannelInstance { + + private Dataset dataset; + + public Instance(SparkExecutor executor, + OptimizationContext.OperatorContext producerOperatorContext, + int producerOutputIndex) { + super(executor, producerOperatorContext, producerOutputIndex); + } + + /** + * Store a {@link Dataset} in this channel and optionally measure its cardinality. + * + * @param dataset the {@link Dataset} to store + * @param sparkExecutor the {@link SparkExecutor} handling this channel + */ + public void accept(Dataset dataset, SparkExecutor sparkExecutor) { + this.dataset = dataset; + if (this.isMarkedForInstrumentation()) { + this.measureCardinality(dataset); + } + } + + /** + * Provide the stored {@link Dataset}. + * + * @return the stored {@link Dataset} + */ + public Dataset provideDataset() { + return this.dataset; + } + + @Override + protected void doDispose() { + if (this.isDatasetCached() && this.dataset != null) { + Actions.doSafe(() -> this.dataset.unpersist()); + this.dataset = null; + } + } + + private void measureCardinality(Dataset dataset) { + this.setMeasuredCardinality(dataset.count()); + } + + private boolean isDatasetCached() { + return this.getChannel().isReusable(); + } + + @Override + public DatasetChannel getChannel() { + return DatasetChannel.this; + } + } +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java index bc42956f9..1e3642a9a 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/Mappings.java @@ -60,7 +60,8 @@ public class Mappings { new SampleMapping(), new ZipWithIdMapping(), new KafkaTopicSinkMapping(), - new KafkaTopicSourceMapping() + new KafkaTopicSourceMapping(), + new ParquetSinkMapping() ); diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ParquetSinkMapping.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ParquetSinkMapping.java new file mode 100644 index 000000000..b5b9b636f --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/mapping/ParquetSinkMapping.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.mapping; + +import org.apache.wayang.basic.operators.ParquetSink; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.spark.operators.SparkParquetSink; +import org.apache.wayang.spark.platform.SparkPlatform; + +import java.util.Collection; +import java.util.Collections; + +public class ParquetSinkMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + SparkPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern operatorPattern = new OperatorPattern<>( + "sink", new ParquetSink("", true, true), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new SparkParquetSink(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkDatasetToRddOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkDatasetToRddOperator.java new file mode 100644 index 000000000..16222cabf --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkDatasetToRddOperator.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.DatasetChannel; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Optional; + +/** + * Conversion operator from {@link DatasetChannel} to {@link RddChannel}. + */ +public class SparkDatasetToRddOperator extends UnaryToUnaryOperator implements SparkExecutionOperator { + + public SparkDatasetToRddOperator() { + super(DataSetType.createDefault(Row.class), DataSetType.createDefault(Row.class), false); + } + + @Override + public Tuple, Collection> evaluate(ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + DatasetChannel.Instance input = (DatasetChannel.Instance) inputs[0]; + RddChannel.Instance output = (RddChannel.Instance) outputs[0]; + + Dataset dataset = input.provideDataset(); + output.accept(dataset.toJavaRDD(), sparkExecutor); + + return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext); + } + + @Override + public List getSupportedInputChannels(int index) { + return Arrays.asList(DatasetChannel.UNCACHED_DESCRIPTOR, DatasetChannel.CACHED_DESCRIPTOR); + } + + @Override + public List getSupportedOutputChannels(int index) { + return Collections.singletonList(RddChannel.UNCACHED_DESCRIPTOR); + } + + @Override + public boolean containsAction() { + return false; + } + + @Override + public String getLoadProfileEstimatorConfigurationKey() { + return "wayang.spark.dataset-to-rdd.load"; + } + + @Override + public Optional createLoadProfileEstimator(Configuration configuration) { + return Optional.ofNullable( + LoadProfileEstimators.createFromSpecification( + this.getLoadProfileEstimatorConfigurationKey(), configuration + ) + ); + } +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkParquetSink.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkParquetSink.java new file mode 100644 index 000000000..587033156 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkParquetSink.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.ParquetSink; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.DatasetChannel; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; +import org.apache.wayang.spark.util.DatasetConverters; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +/** + * Writes records to Parquet using Spark. + */ +public class SparkParquetSink extends ParquetSink implements SparkExecutionOperator { + + private final SaveMode saveMode; + + public SparkParquetSink(ParquetSink that) { + super(that.getOutputUrl(), that.isOverwrite(), that.prefersDataset(), that.getType()); + this.saveMode = that.isOverwrite() ? SaveMode.Overwrite : SaveMode.ErrorIfExists; + } + + @Override + public Tuple, Collection> evaluate(ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + final Dataset dataset = this.obtainDataset(inputs[0], sparkExecutor); + dataset.write().mode(this.saveMode).parquet(this.getOutputUrl()); + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + + private Dataset obtainDataset(ChannelInstance input, SparkExecutor sparkExecutor) { + if (input instanceof DatasetChannel.Instance) { + return ((DatasetChannel.Instance) input).provideDataset(); + } + JavaRDD rdd = ((RddChannel.Instance) input).provideRdd(); + return DatasetConverters.recordsToDataset(rdd, this.getType(), sparkExecutor.ss); + } + + @Override + public List getSupportedInputChannels(int index) { + if (this.prefersDataset()) { + return Arrays.asList(DatasetChannel.UNCACHED_DESCRIPTOR, DatasetChannel.CACHED_DESCRIPTOR); + } + return Arrays.asList(DatasetChannel.UNCACHED_DESCRIPTOR, DatasetChannel.CACHED_DESCRIPTOR, + RddChannel.UNCACHED_DESCRIPTOR, RddChannel.CACHED_DESCRIPTOR); + } + + @Override + public List getSupportedOutputChannels(int index) { + throw new UnsupportedOperationException("This operator has no outputs."); + } + + @Override + public boolean containsAction() { + return true; + } +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkParquetSource.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkParquetSource.java index adae5a0d7..315acbb2c 100644 --- a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkParquetSource.java +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkParquetSource.java @@ -29,6 +29,7 @@ import org.apache.wayang.core.platform.ChannelInstance; import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.DatasetChannel; import org.apache.wayang.spark.channels.RddChannel; import org.apache.wayang.spark.execution.SparkExecutor; @@ -64,8 +65,6 @@ public Tuple, Collection> eval assert inputs.length == this.getNumInputs(); assert outputs.length == this.getNumOutputs(); - RddChannel.Instance output = (RddChannel.Instance) outputs[0]; - Dataset table = sparkExecutor.ss.read().parquet(this.getInputUrl().trim()); // Reads a projection, if any (loads the complete file if no projection defined) @@ -74,16 +73,6 @@ public Tuple, Collection> eval table = table.selectExpr(projection); } - // Wrap dataset into a JavaRDD and convert Row's to Record's - JavaRDD rdd = table.toJavaRDD().map(row -> { - List values = IntStream.range(0, row.size()) - .mapToObj(row::get) - .collect(Collectors.toList()); - return new Record(values); - }); - this.name(rdd); - output.accept(rdd, sparkExecutor); - ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext); prepareLineageNode.add(LoadProfileEstimators.createFromSpecification( "wayang.spark.parquetsource.load.prepare", sparkExecutor.getConfiguration() @@ -92,7 +81,25 @@ public Tuple, Collection> eval mainLineageNode.add(LoadProfileEstimators.createFromSpecification( "wayang.spark.parquetsource.load.main", sparkExecutor.getConfiguration() )); - output.getLineage().addPredecessor(mainLineageNode); + + if (this.isDatasetOutputPreferred() && outputs[0] instanceof DatasetChannel.Instance) { + DatasetChannel.Instance datasetOutput = + (DatasetChannel.Instance) outputs[0]; + datasetOutput.accept(table, sparkExecutor); + datasetOutput.getLineage().addPredecessor(mainLineageNode); + } else { + RddChannel.Instance output = (RddChannel.Instance) outputs[0]; + // Wrap dataset into a JavaRDD and convert Row's to Record's + JavaRDD rdd = table.toJavaRDD().map(row -> { + List values = IntStream.range(0, row.size()) + .mapToObj(row::get) + .collect(Collectors.toList()); + return new Record(values); + }); + this.name(rdd); + output.accept(rdd, sparkExecutor); + output.getLineage().addPredecessor(mainLineageNode); + } return prepareLineageNode.collectAndMark(); } @@ -109,6 +116,9 @@ public List getSupportedInputChannels(int index) { @Override public List getSupportedOutputChannels(int index) { + if (this.isDatasetOutputPreferred()) { + return Arrays.asList(DatasetChannel.UNCACHED_DESCRIPTOR, RddChannel.UNCACHED_DESCRIPTOR); + } return Collections.singletonList(RddChannel.UNCACHED_DESCRIPTOR); } diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRddToDatasetOperator.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRddToDatasetOperator.java new file mode 100644 index 000000000..ecde6033a --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkRddToDatasetOperator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.DatasetChannel; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; +import org.apache.wayang.spark.util.DatasetConverters; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Optional; + +/** + * Conversion operator from {@link RddChannel} to {@link DatasetChannel}. + */ +public class SparkRddToDatasetOperator extends UnaryToUnaryOperator implements SparkExecutionOperator { + + public SparkRddToDatasetOperator() { + this(DataSetType.createDefault(Record.class)); + } + + public SparkRddToDatasetOperator(DataSetType type) { + super(type, type, false); + } + + @Override + public Tuple, Collection> evaluate(ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + RddChannel.Instance input = (RddChannel.Instance) inputs[0]; + DatasetChannel.Instance output = (DatasetChannel.Instance) outputs[0]; + + JavaRDD records = input.provideRdd(); + Dataset dataset = DatasetConverters.recordsToDataset(records, this.getInputType(), sparkExecutor.ss); + output.accept(dataset, sparkExecutor); + + return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext); + } + + @Override + public List getSupportedInputChannels(int index) { + return Arrays.asList(RddChannel.UNCACHED_DESCRIPTOR, RddChannel.CACHED_DESCRIPTOR); + } + + @Override + public List getSupportedOutputChannels(int index) { + return Arrays.asList(DatasetChannel.UNCACHED_DESCRIPTOR, DatasetChannel.CACHED_DESCRIPTOR); + } + + @Override + public boolean containsAction() { + return false; + } + + @Override + public String getLoadProfileEstimatorConfigurationKey() { + return "wayang.spark.rdd-to-dataset.load"; + } + + @Override + public Optional createLoadProfileEstimator(Configuration configuration) { + return Optional.ofNullable( + LoadProfileEstimators.createFromSpecification( + this.getLoadProfileEstimatorConfigurationKey(), configuration + ) + ); + } +} diff --git a/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/util/DatasetConverters.java b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/util/DatasetConverters.java new file mode 100644 index 000000000..bf7bd7ff7 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/util/DatasetConverters.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.util; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.types.RecordType; +import org.apache.wayang.core.types.DataSetType; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility methods to convert {@link Record}-backed RDDs into Spark {@link Dataset}s. + */ +public final class DatasetConverters { + + private static final int SCHEMA_SAMPLE_SIZE = 50; + + private DatasetConverters() { + } + + /** + * Convert an RDD of {@link Record}s into a Spark {@link Dataset}. + * + * @param records the records to convert + * @param dataSetType type information about the records (field names etc.) + * @param sparkSession the {@link SparkSession} used to create the {@link Dataset} + * @return a {@link Dataset} view over {@code records} + */ + public static Dataset recordsToDataset(JavaRDD records, + DataSetType dataSetType, + SparkSession sparkSession) { + StructType schema = deriveSchema(records, dataSetType); + JavaRDD rows = records.map(record -> RowFactory.create(record.getValues())); + return sparkSession.createDataFrame(rows, schema); + } + + private static StructType deriveSchema(JavaRDD rdd, DataSetType dataSetType) { + List samples = rdd.take(SCHEMA_SAMPLE_SIZE); + RecordType recordType = extractRecordType(dataSetType); + String[] fieldNames = resolveFieldNames(samples, recordType); + + List fields = new ArrayList<>(fieldNames.length); + for (int column = 0; column < fieldNames.length; column++) { + DataType dataType = inferColumnType(samples, column); + fields.add(DataTypes.createStructField(fieldNames[column], dataType, true)); + } + return new StructType(fields.toArray(new StructField[0])); + } + + private static RecordType extractRecordType(DataSetType dataSetType) { + if (dataSetType == null || dataSetType.getDataUnitType() == null) { + return null; + } + if (dataSetType.getDataUnitType() instanceof RecordType) { + return (RecordType) dataSetType.getDataUnitType(); + } + if (dataSetType.getDataUnitType().toBasicDataUnitType() instanceof RecordType) { + return (RecordType) dataSetType.getDataUnitType().toBasicDataUnitType(); + } + return null; + } + + private static String[] resolveFieldNames(List samples, RecordType recordType) { + if (recordType != null && recordType.getFieldNames() != null && recordType.getFieldNames().length > 0) { + return recordType.getFieldNames(); + } + Record sample = samples.isEmpty() ? null : samples.get(0); + int numFields = sample == null ? 0 : sample.size(); + String[] names = new String[numFields]; + for (int index = 0; index < numFields; index++) { + names[index] = "field" + index; + } + return names; + } + + private static DataType inferColumnType(List samples, int columnIndex) { + for (Record sample : samples) { + if (sample == null || columnIndex >= sample.size()) { + continue; + } + Object value = sample.getField(columnIndex); + if (value == null) { + continue; + } + DataType dataType = toSparkType(value); + if (dataType != null) { + return dataType; + } + } + return DataTypes.StringType; + } + + private static DataType toSparkType(Object value) { + if (value instanceof String || value instanceof Character) { + return DataTypes.StringType; + } else if (value instanceof Integer) { + return DataTypes.IntegerType; + } else if (value instanceof Long) { + return DataTypes.LongType; + } else if (value instanceof Short) { + return DataTypes.ShortType; + } else if (value instanceof Byte) { + return DataTypes.ByteType; + } else if (value instanceof Double) { + return DataTypes.DoubleType; + } else if (value instanceof Float) { + return DataTypes.FloatType; + } else if (value instanceof Boolean) { + return DataTypes.BooleanType; + } else if (value instanceof Timestamp) { + return DataTypes.TimestampType; + } else if (value instanceof java.sql.Date) { + return DataTypes.DateType; + } else if (value instanceof byte[]) { + return DataTypes.BinaryType; + } else if (value instanceof BigDecimal) { + BigDecimal decimal = (BigDecimal) value; + int precision = Math.min(38, Math.max(decimal.precision(), decimal.scale())); + int scale = Math.max(0, decimal.scale()); + return DataTypes.createDecimalType(precision, scale); + } else if (value instanceof BigInteger) { + BigInteger bigInteger = (BigInteger) value; + int precision = Math.min(38, bigInteger.toString().length()); + return DataTypes.createDecimalType(precision, 0); + } + return null; + } +} diff --git a/wayang-platforms/wayang-spark/src/main/resources/wayang-spark-defaults.properties b/wayang-platforms/wayang-spark/src/main/resources/wayang-spark-defaults.properties index e96446f91..0ffe1a449 100644 --- a/wayang-platforms/wayang-spark/src/main/resources/wayang-spark-defaults.properties +++ b/wayang-platforms/wayang-spark/src/main/resources/wayang-spark-defaults.properties @@ -198,6 +198,28 @@ wayang.spark.sort.load = {\ "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\ } +wayang.spark.dataset-to-rdd.load = {\ + "in":1, "out":1,\ + "cpu":"${900*in0 + 250000}",\ + "ram":"10000",\ + "disk":"0",\ + "net":"0",\ + "p":0.9,\ + "overhead":0,\ + "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\ +} + +wayang.spark.rdd-to-dataset.load = {\ + "in":1, "out":1,\ + "cpu":"${1200*in0 + 750000}",\ + "ram":"10000",\ + "disk":"0",\ + "net":"0",\ + "p":0.9,\ + "overhead":0,\ + "ru":"${wayang:logGrowth(0.1, 0.1, 1000000, in0)}"\ +} + wayang.spark.globalreduce.load.template = {\ "type":"mathex", "in":1, "out":1,\ "cpu":"?*in0 + ?"\ diff --git a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/DatasetChannelTest.java b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/DatasetChannelTest.java new file mode 100644 index 000000000..38450e5f3 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/DatasetChannelTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.wayang.spark.channels.DatasetChannel; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class DatasetChannelTest extends SparkOperatorTestBase { + + @Test + void acceptAndProvideDataset() { + List rows = sampleRows(); + Dataset dataset = createDataset(rows); + DatasetChannel.Instance instance = createDatasetChannelInstance(false); + + instance.accept(dataset, this.sparkExecutor); + + assertEquals(rows, instance.provideDataset().collectAsList()); + } + + @Test + void instrumentationCountsRows() { + List rows = sampleRows(); + Dataset dataset = createDataset(rows); + DatasetChannel.Instance instance = createDatasetChannelInstance(true); + + instance.accept(dataset, this.sparkExecutor); + + assertTrue(instance.getMeasuredCardinality().isPresent()); + assertEquals(rows.size(), instance.getMeasuredCardinality().getAsLong()); + } + + @Test + void noInstrumentationLeavesCardinalityEmpty() { + DatasetChannel.Instance instance = createDatasetChannelInstance(false); + + instance.accept(createDataset(sampleRows()), this.sparkExecutor); + + assertFalse(instance.getMeasuredCardinality().isPresent()); + } + + private DatasetChannel.Instance createDatasetChannelInstance(boolean instrumented) { + DatasetChannel channel = (DatasetChannel) DatasetChannel.UNCACHED_DESCRIPTOR + .createChannel(null, this.configuration); + if (instrumented) { + channel.markForInstrumentation(); + } + return (DatasetChannel.Instance) channel.createInstance(this.sparkExecutor, null, -1); + } + + private Dataset createDataset(List rows) { + return this.sparkExecutor.ss.createDataFrame(rows, sampleSchema()); + } + + private StructType sampleSchema() { + StructField[] fields = new StructField[]{ + DataTypes.createStructField("name", DataTypes.StringType, false), + DataTypes.createStructField("age", DataTypes.IntegerType, false) + }; + return new StructType(fields); + } + + private List sampleRows() { + return Arrays.asList( + RowFactory.create("alice", 30), + RowFactory.create("bob", 25) + ); + } +} diff --git a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/DatasetTestUtils.java b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/DatasetTestUtils.java new file mode 100644 index 000000000..e6736464e --- /dev/null +++ b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/DatasetTestUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.spark.execution.SparkExecutor; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Comparator; + +final class DatasetTestUtils { + + private DatasetTestUtils() { + } + + static Dataset createSampleDataset(SparkExecutor sparkExecutor) { + return sparkExecutor.ss.createDataFrame(sampleRows(), sampleSchema()); + } + + static List sampleRows() { + return Arrays.asList( + RowFactory.create("alice", 30), + RowFactory.create("bob", 25), + RowFactory.create("carol", 41) + ); + } + + static List sampleRecords() { + return Arrays.asList( + new Record("alice", 30), + new Record("bob", 25), + new Record("carol", 41) + ); + } + + static StructType sampleSchema() { + StructField[] fields = new StructField[]{ + DataTypes.createStructField("name", DataTypes.StringType, false), + DataTypes.createStructField("age", DataTypes.IntegerType, false) + }; + return new StructType(fields); + } + + static void deleteRecursively(Path directory) throws IOException { + if (Files.notExists(directory)) { + return; + } + Files.walk(directory) + .sorted(Comparator.reverseOrder()) + .forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkDatasetToRddOperatorTest.java b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkDatasetToRddOperatorTest.java new file mode 100644 index 000000000..29ec75eec --- /dev/null +++ b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkDatasetToRddOperatorTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.spark.channels.RddChannel; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class SparkDatasetToRddOperatorTest extends SparkOperatorTestBase { + + @Test + void testConversionPreservesRows() { + Dataset dataset = DatasetTestUtils.createSampleDataset(this.sparkExecutor); + SparkDatasetToRddOperator operator = new SparkDatasetToRddOperator(); + + ChannelInstance[] inputs = new ChannelInstance[]{this.createDatasetChannelInstance(dataset)}; + ChannelInstance[] outputs = new ChannelInstance[]{this.createRddChannelInstance()}; + + this.evaluate(operator, inputs, outputs); + + List rows = ((RddChannel.Instance) outputs[0]).provideRdd().collect(); + assertEquals(dataset.collectAsList(), rows); + } +} diff --git a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkOperatorTestBase.java b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkOperatorTestBase.java index 222954da9..f2d8fb68f 100644 --- a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkOperatorTestBase.java +++ b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkOperatorTestBase.java @@ -19,26 +19,29 @@ package org.apache.wayang.spark.operators; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.Job; import org.apache.wayang.core.optimizer.DefaultOptimizationContext; import org.apache.wayang.core.optimizer.OptimizationContext; import org.apache.wayang.core.plan.wayangplan.Operator; +import org.apache.wayang.core.plan.wayangplan.WayangPlan; import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.java.channels.CollectionChannel; import org.apache.wayang.core.platform.CrossPlatformExecutor; import org.apache.wayang.core.profiling.FullInstrumentationStrategy; -import org.apache.wayang.java.channels.CollectionChannel; import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.channels.DatasetChannel; import org.apache.wayang.spark.execution.SparkExecutor; import org.apache.wayang.spark.platform.SparkPlatform; import org.apache.wayang.spark.test.ChannelFactory; +import org.apache.wayang.core.api.WayangContext; import org.junit.jupiter.api.BeforeEach; +import java.lang.reflect.Field; import java.util.Collection; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Test base for {@link SparkExecutionOperator} tests. */ @@ -48,22 +51,32 @@ class SparkOperatorTestBase { protected SparkExecutor sparkExecutor; + private Job job; + @BeforeEach void setUp() { - this.configuration = new Configuration(); - if(sparkExecutor == null) - this.sparkExecutor = (SparkExecutor) SparkPlatform.getInstance().getExecutorFactory().create(this.mockJob()); + WayangContext context = new WayangContext(new Configuration()); + this.job = context.createJob("spark-operator-test", new WayangPlan()); + this.configuration = this.job.getConfiguration(); + this.ensureCrossPlatformExecutor(); + this.sparkExecutor = (SparkExecutor) SparkPlatform.getInstance().getExecutorFactory().create(this.job); } - Job mockJob() { - final Job job = mock(Job.class); - when(job.getConfiguration()).thenReturn(this.configuration); - when(job.getCrossPlatformExecutor()).thenReturn(new CrossPlatformExecutor(job, new FullInstrumentationStrategy())); - return job; + private void ensureCrossPlatformExecutor() { + try { + Field field = Job.class.getDeclaredField("crossPlatformExecutor"); + field.setAccessible(true); + if (field.get(this.job) == null) { + CrossPlatformExecutor executor = new CrossPlatformExecutor(this.job, new FullInstrumentationStrategy()); + field.set(this.job, executor); + } + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to initialize CrossPlatformExecutor for tests.", e); + } } protected OptimizationContext.OperatorContext createOperatorContext(Operator operator) { - OptimizationContext optimizationContext = new DefaultOptimizationContext(mockJob()); + OptimizationContext optimizationContext = new DefaultOptimizationContext(this.job); return optimizationContext.addOneTimeOperator(operator); } @@ -81,6 +94,14 @@ RddChannel.Instance createRddChannelInstance(Collection collection) { return ChannelFactory.createRddChannelInstance(collection, this.sparkExecutor, this.configuration); } + DatasetChannel.Instance createDatasetChannelInstance() { + return ChannelFactory.createDatasetChannelInstance(this.configuration); + } + + DatasetChannel.Instance createDatasetChannelInstance(Dataset dataset) { + return ChannelFactory.createDatasetChannelInstance(dataset, this.sparkExecutor, this.configuration); + } + protected CollectionChannel.Instance createCollectionChannelInstance() { return ChannelFactory.createCollectionChannelInstance(this.configuration); } diff --git a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkParquetSinkTest.java b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkParquetSinkTest.java new file mode 100644 index 000000000..eb7db83b1 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkParquetSinkTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.wayang.basic.operators.ParquetSink; +import org.apache.wayang.core.platform.ChannelInstance; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class SparkParquetSinkTest extends SparkOperatorTestBase { + + @Test + void writesDatasetToParquet() throws IOException { + Dataset dataset = DatasetTestUtils.createSampleDataset(this.sparkExecutor); + Path outputDir = Files.createTempDirectory("wayang-dataset-parquet-sink"); + try { + SparkParquetSink sink = new SparkParquetSink(new ParquetSink(outputDir.toString(), true, true)); + + ChannelInstance[] inputs = new ChannelInstance[]{this.createDatasetChannelInstance(dataset)}; + ChannelInstance[] outputs = new ChannelInstance[0]; + + this.evaluate(sink, inputs, outputs); + + Dataset stored = this.sparkExecutor.ss.read().parquet(outputDir.toString()); + assertEquals(dataset.collectAsList(), stored.collectAsList()); + } finally { + DatasetTestUtils.deleteRecursively(outputDir); + } + } + + @Test + void writesRddToParquet() throws IOException { + Path outputDir = Files.createTempDirectory("wayang-rdd-parquet-sink"); + try { + SparkParquetSink sink = new SparkParquetSink(new ParquetSink(outputDir.toString(), true, false)); + ChannelInstance[] inputs = new ChannelInstance[]{this.createRddChannelInstance(DatasetTestUtils.sampleRecords())}; + ChannelInstance[] outputs = new ChannelInstance[0]; + + this.evaluate(sink, inputs, outputs); + + Dataset stored = this.sparkExecutor.ss.read().parquet(outputDir.toString()); + assertEquals(DatasetTestUtils.sampleRows(), stored.collectAsList()); + } finally { + DatasetTestUtils.deleteRecursively(outputDir); + } + } +} diff --git a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkParquetSourceDatasetOutputTest.java b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkParquetSourceDatasetOutputTest.java new file mode 100644 index 000000000..d4249aed0 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkParquetSourceDatasetOutputTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.spark.channels.DatasetChannel; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class SparkParquetSourceDatasetOutputTest extends SparkOperatorTestBase { + + @Test + void producesDatasetChannel() throws IOException { + Dataset dataset = DatasetTestUtils.createSampleDataset(this.sparkExecutor); + Path inputPath = Files.createTempDirectory("wayang-parquet-source"); + try { + dataset.write().mode(SaveMode.Overwrite).parquet(inputPath.toString()); + + SparkParquetSource source = new SparkParquetSource(inputPath.toString(), null); + source.preferDatasetOutput(true); + + ChannelInstance[] inputs = new ChannelInstance[0]; + ChannelInstance[] outputs = new ChannelInstance[]{this.createDatasetChannelInstance()}; + + this.evaluate(source, inputs, outputs); + + Dataset result = ((DatasetChannel.Instance) outputs[0]).provideDataset(); + assertEquals(dataset.collectAsList(), result.collectAsList()); + } finally { + DatasetTestUtils.deleteRecursively(inputPath); + } + } +} diff --git a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkRddToDatasetOperatorTest.java b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkRddToDatasetOperatorTest.java new file mode 100644 index 000000000..0762e55e0 --- /dev/null +++ b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkRddToDatasetOperatorTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.operators; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.spark.channels.DatasetChannel; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class SparkRddToDatasetOperatorTest extends SparkOperatorTestBase { + + @Test + void convertsRecordsToDataset() { + SparkRddToDatasetOperator operator = new SparkRddToDatasetOperator(); + + ChannelInstance[] inputs = new ChannelInstance[]{this.createRddChannelInstance(DatasetTestUtils.sampleRecords())}; + ChannelInstance[] outputs = new ChannelInstance[]{this.createDatasetChannelInstance()}; + + this.evaluate(operator, inputs, outputs); + + Dataset dataset = ((DatasetChannel.Instance) outputs[0]).provideDataset(); + assertEquals(DatasetTestUtils.sampleRows(), dataset.collectAsList()); + } +} diff --git a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/test/ChannelFactory.java b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/test/ChannelFactory.java index 3c430d116..565cb0619 100644 --- a/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/test/ChannelFactory.java +++ b/wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/test/ChannelFactory.java @@ -23,6 +23,9 @@ import org.apache.wayang.core.platform.ChannelDescriptor; import org.apache.wayang.core.util.WayangCollections; import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.wayang.spark.channels.DatasetChannel; import org.apache.wayang.spark.channels.RddChannel; import org.apache.wayang.spark.execution.SparkExecutor; import org.junit.jupiter.api.BeforeEach; @@ -73,4 +76,18 @@ public static CollectionChannel.Instance createCollectionChannelInstance(Collect return instance; } + public static DatasetChannel.Instance createDatasetChannelInstance(Configuration configuration) { + return (DatasetChannel.Instance) DatasetChannel.UNCACHED_DESCRIPTOR + .createChannel(null, configuration) + .createInstance(sparkExecutor, null, -1); + } + + public static DatasetChannel.Instance createDatasetChannelInstance(Dataset dataset, + SparkExecutor sparkExecutor, + Configuration configuration) { + DatasetChannel.Instance instance = createDatasetChannelInstance(configuration); + instance.accept(dataset, sparkExecutor); + return instance; + } + }