Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ Apache Wayang provides a flexible architecture which enables easy addition of ne

For a quick guide on how to run WordCount see [here](guides/tutorial.md).

### Spark Dataset / DataFrame pipelines

Wayang’s Spark platform can now execute end-to-end pipelines on Spark `Dataset[Row]` (aka DataFrames). This is particularly useful when working with lakehouse-style storage (Parquet/Delta) or when you want to plug Spark ML stages into a Wayang plan without repeatedly falling back to RDDs.

To build a Dataset-backed pipeline:

1. **Use the Dataset-aware plan builder APIs.**
- `PlanBuilder.readParquet(..., preferDataset = true)` (or `JavaPlanBuilder.readParquet(..., ..., true)`) reads Parquet files directly into a Dataset channel.
- `DataQuanta.writeParquet(..., preferDataset = true)` writes a Dataset channel without converting it back to an RDD.
2. **Keep operators dataset-compatible.** Most operators continue to work unchanged; if an operator explicitly prefers RDDs, Wayang will insert the necessary conversions automatically (at an additional cost). Custom operators can expose `DatasetChannel` descriptors to stay in the dataframe world.
3. **Let the optimizer do the rest.** The optimizer now assigns a higher cost to Dataset↔RDD conversions, so once you opt into Dataset sources/sinks the plan will stay in Dataset form by default.

No extra flags are required—just opt into the Dataset-based APIs where you want dataframe semantics. If you see unexpected conversions in your execution plan, check that the upstream/downstream operators you use can consume `DatasetChannel`s; otherwise Wayang will insert a conversion operator for you.

## Quick Guide for Developing with Wayang

For a quick guide on how to use Wayang in your Java/Scala project see [here](guides/develop-with-Wayang.md).
Expand Down
61 changes: 61 additions & 0 deletions guides/spark-datasets.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<!--

Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->

---
title: Spark Dataset pipelines
description: How to build Wayang jobs that stay on Spark Datasets/DataFrames from source to sink.
---

Wayang’s Spark backend can now run entire pipelines on Spark `Dataset[Row]` (a.k.a. DataFrames). Use this mode when you ingest from lakehouse formats (Parquet/Delta), interoperate with Spark ML stages, or simply prefer schema-aware processing. This guide explains how to opt in.

## When to use Dataset channels

- **Lakehouse storage:** Reading Parquet/Delta directly into datasets avoids repeated schema inference and keeps Spark’s optimized Parquet reader in play.
- **Spark ML:** Our ML operators already convert RDDs into DataFrames internally. Feeding them a dataset channel skips that conversion and preserves column names.
- **Federated pipelines:** You can mix dataset-backed stages on Spark with other platforms; Wayang will insert conversions only when strictly necessary.

## Enable Dataset sources and sinks

1. **Plan builder APIs:**
- `PlanBuilder.readParquet(..., preferDataset = true)` (Scala) or `JavaPlanBuilder.readParquet(..., ..., true)` loads Parquet files into a `DatasetChannel` instead of an `RddChannel`.
- `DataQuanta.writeParquet(..., preferDataset = true)` writes a dataset back to Parquet without converting to RDD first.
2. **Prefer dataset-friendly operators:** Most unary/binary operators accept either channel type, but custom operators can advertise dataset descriptors explicitly. See `DatasetChannel` in `wayang-platforms/wayang-spark` for details.
3. **Let the optimizer keep it:** The optimizer now assigns costs to Dataset↔RDD conversions, so once your plan starts with a dataset channel it will stay in dataset form unless an operator demands an RDD.

## Mixing with RDD operators

If a stage only supports RDDs, Wayang inserts conversion operators automatically:

- `SparkRddToDatasetOperator` converts an RDD of `org.apache.wayang.basic.data.Record` into a Spark `Dataset[Row]` (using sampled schema inference or `RecordType`).
- `SparkDatasetToRddOperator` turns a `Dataset[Row]` back into a JavaRDD.`Record`.

Both conversions carry non-trivial load profiles. You’ll see them in plan explanations if you mix dataset- and RDD-only operators.

## Developer checklist

- **Use `RecordType` when possible.** Providing field names in your logical operators helps the converter derive a precise schema.
- **Re-use `sparkExecutor.ss`.** When writing custom Spark operators that build DataFrames, use the provided `SparkExecutor` instead of `SparkSession.builder()` to avoid extra contexts.
- **Watch plan explanations.** Run `PlanBuilder.buildAndExplain(true)` to verify whether conversions are inserted. If they are, consider adding dataset descriptors to your operators.

## Current limitations

- Only Parquet sources/sinks expose dataset-specific APIs today. Text/Object sources still produce RDD channels.
- ML4All pipelines currently emit plain `double[]`/`Double` RDDs. They still benefit from the internal DataFrame conversions but do not expose dataset channels yet.

Contributions to widen dataset support (e.g., dataset-aware `map`/`filter` or ML4All stages) are welcome.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
import org.apache.wayang.core.plan.wayangplan._
import org.apache.wayang.core.platform.Platform
import org.apache.wayang.core.util.{Tuple => WayangTuple}
import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
import org.apache.wayang.basic.data.{Record, Tuple2 => WayangTuple2}
import org.apache.wayang.basic.model.{DLModel, LogisticRegressionModel,DecisionTreeRegressionModel};
import org.apache.wayang.commons.util.profiledb.model.Experiment
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -1027,6 +1027,11 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
writeTextFileJava(url, toSerializableFunction(formatterUdf), udfLoad)
}

def writeParquet(url: String,
overwrite: Boolean = false,
preferDataset: Boolean = false)(implicit ev: Out =:= Record): Unit =
writeParquetJava(url, overwrite, preferDataset)

/**
* Write the data quanta in this instance to a text file. Triggers execution.
*
Expand Down Expand Up @@ -1090,6 +1095,16 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
this.planBuilder.sinks.clear()
}

private def writeParquetJava(url: String, overwrite: Boolean, preferDataset: Boolean)(implicit ev: Out =:= Record): Unit = {
val _ = ev
val sink = new ParquetSink(url, overwrite, preferDataset)
sink.setName(s"Write parquet $url")
this.connectTo(sink, 0)
this.planBuilder.sinks += sink
this.planBuilder.buildAndExecute()
this.planBuilder.sinks.clear()
}

/**
* Write the data quanta in this instance to a Object file. Triggers execution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,22 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
* @param projection the projection, if any
* @return [[DataQuantaBuilder]] for the file
*/
def readParquet(url: String, projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection))(ClassTag(classOf[Record]))
def readParquet(url: String,
projection: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
readParquet(url, projection, preferDataset = false)

/**
* Read a parquet file and optionally keep it backed by Spark Datasets.
*
* @param url the URL of the Parquet file
* @param projection the projection, if any
* @param preferDataset when {@code true}, emit a Dataset-backed channel
* @return [[DataQuantaBuilder]] for the file
*/
def readParquet(url: String,
projection: Array[String],
preferDataset: Boolean): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))(ClassTag(classOf[Record]))

/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,13 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
*
* @param url the URL of the Parquet file
* @param projection the projection, if any
* @param preferDataset when {@code true}, keep the resulting channel backed by Spark Datasets
* @return [[DataQuanta]] of [[Record]]s representing the file
*/
def readParquet(url: String, projection: Array[String] = null): DataQuanta[Record] = load(ParquetSource.create(url, projection))
def readParquet(url: String,
projection: Array[String] = null,
preferDataset: Boolean = false): DataQuanta[Record] =
load(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))

/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.basic.operators;

import org.apache.wayang.basic.data.Record;
import org.apache.wayang.core.plan.wayangplan.UnarySink;
import org.apache.wayang.core.types.DataSetType;

/**
* Logical operator that writes {@link Record}s into a Parquet file.
*/
public class ParquetSink extends UnarySink<Record> {

private final String outputUrl;

private final boolean isOverwrite;

private final boolean preferDataset;

public ParquetSink(String outputUrl, boolean isOverwrite, boolean preferDataset, DataSetType<Record> type) {
super(type);
this.outputUrl = outputUrl;
this.isOverwrite = isOverwrite;
this.preferDataset = preferDataset;
}

public ParquetSink(String outputUrl, boolean isOverwrite, boolean preferDataset) {
this(outputUrl, isOverwrite, preferDataset, DataSetType.createDefault(Record.class));
}

public String getOutputUrl() {
return this.outputUrl;
}

public boolean isOverwrite() {
return this.isOverwrite;
}

public boolean prefersDataset() {
return this.preferDataset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class ParquetSource extends UnarySource<Record> {

private MessageType schema;

private boolean preferDatasetOutput = false;

/**
* Creates a new instance.
*
Expand Down Expand Up @@ -124,6 +126,16 @@ public ParquetSource(ParquetSource that) {
this.projection = that.getProjection();
this.metadata = that.getMetadata();
this.schema = that.getSchema();
this.preferDatasetOutput = that.preferDatasetOutput;
}

public ParquetSource preferDatasetOutput(boolean preferDataset) {
this.preferDatasetOutput = preferDataset;
return this;
}

public boolean isDatasetOutputPreferred() {
return this.preferDatasetOutput;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
package org.apache.wayang.spark.channels;

import org.apache.wayang.basic.channels.FileChannel;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.channels.ChannelConversion;
import org.apache.wayang.core.optimizer.channels.DefaultChannelConversion;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.java.channels.CollectionChannel;
import org.apache.wayang.java.platform.JavaPlatform;
import org.apache.wayang.spark.operators.SparkBroadcastOperator;
import org.apache.wayang.spark.operators.SparkCacheOperator;
import org.apache.wayang.spark.operators.SparkCollectOperator;
import org.apache.wayang.spark.operators.SparkCollectionSource;
import org.apache.wayang.spark.operators.SparkDatasetToRddOperator;
import org.apache.wayang.spark.operators.SparkObjectFileSink;
import org.apache.wayang.spark.operators.SparkObjectFileSource;
import org.apache.wayang.spark.operators.SparkRddToDatasetOperator;
import org.apache.wayang.spark.operators.SparkTsvFileSink;
import org.apache.wayang.spark.operators.SparkTsvFileSource;

Expand Down Expand Up @@ -108,6 +113,32 @@ public class ChannelConversions {
() -> new SparkObjectFileSource<>(DataSetType.createDefault(Void.class))
);

public static final ChannelConversion DATASET_TO_UNCACHED_RDD = new DefaultChannelConversion(
DatasetChannel.UNCACHED_DESCRIPTOR,
RddChannel.UNCACHED_DESCRIPTOR,
() -> new SparkDatasetToRddOperator()
);

public static final ChannelConversion CACHED_DATASET_TO_UNCACHED_RDD = new DefaultChannelConversion(
DatasetChannel.CACHED_DESCRIPTOR,
RddChannel.UNCACHED_DESCRIPTOR,
() -> new SparkDatasetToRddOperator()
);

public static final ChannelConversion UNCACHED_RDD_TO_UNCACHED_DATASET = new DefaultChannelConversion(
RddChannel.UNCACHED_DESCRIPTOR,
DatasetChannel.UNCACHED_DESCRIPTOR,
ChannelConversions::createRddToDatasetOperator,
"via SparkRddToDatasetOperator"
);

public static final ChannelConversion CACHED_RDD_TO_UNCACHED_DATASET = new DefaultChannelConversion(
RddChannel.CACHED_DESCRIPTOR,
DatasetChannel.UNCACHED_DESCRIPTOR,
ChannelConversions::createRddToDatasetOperator,
"via SparkRddToDatasetOperator"
);

public static Collection<ChannelConversion> ALL = Arrays.asList(
UNCACHED_RDD_TO_CACHED_RDD,
COLLECTION_TO_BROADCAST,
Expand All @@ -119,6 +150,24 @@ public class ChannelConversions {
HDFS_OBJECT_FILE_TO_UNCACHED_RDD,
// HDFS_TSV_TO_UNCACHED_RDD,
CACHED_RDD_TO_HDFS_TSV,
UNCACHED_RDD_TO_HDFS_TSV
UNCACHED_RDD_TO_HDFS_TSV,
DATASET_TO_UNCACHED_RDD,
CACHED_DATASET_TO_UNCACHED_RDD,
UNCACHED_RDD_TO_UNCACHED_DATASET,
CACHED_RDD_TO_UNCACHED_DATASET
);

private static SparkRddToDatasetOperator createRddToDatasetOperator(Channel sourceChannel,
Configuration configuration) {
DataSetType<Record> type = DataSetType.createDefault(Record.class);
if (sourceChannel != null) {
DataSetType<?> sourceType = sourceChannel.getDataSetType();
if (Record.class.isAssignableFrom(sourceType.getDataUnitType().getTypeClass())) {
@SuppressWarnings("unchecked")
DataSetType<Record> casted = (DataSetType<Record>) sourceType;
type = casted;
}
}
return new SparkRddToDatasetOperator(type);
}
}
Loading
Loading