-
Notifications
You must be signed in to change notification settings - Fork 8
03 Working with Jupyter Studio
After you access in the link created by docker run
command you will see the following folders
The hello world example shows the basic elements that will be needed to interact with the dependencies
The import of dependencies could be done by
%mavenRepo apache-snapshots https://repository.apache.org/content/repositories/snapshots
%maven org.apache.wayang:wayang-core:0.6.1-SNAPSHOT
or
%%loadFromPOM
<repository>
<id>apache-snapshots</id>
<url>https://repository.apache.org/content/repositories/snapshots</url>
</repository>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-core</artifactId>
<version>0.6.1-SNAPSHOT</version>
</dependency>
As Blossom team we recommend to use the loadFromPOM
approach.
Get Wayang.
To use it with Maven, for instance, include the following into your POM file:
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-***</artifactId>
<version>0.6.1-SNAPSHOT</version>
</dependency>
Note the ***
: Wayang ships with multiple modules that can be included in your app, depending on how you want to use it:
-
wayang-core
: provides core data structures and the optimizer (required) -
wayang-basic
: provides common operators and data types for your apps (recommended) -
wayang-api
: provides an easy-to-use Scala and Java API to assemble Wayang plans (recommended) -
wayang-java
,wayang-spark
,wayang-graphchi
,wayang-sqlite3
,wayang-postgres
: adapters for the various supported processing platforms -
wayang-profiler
: provides functionality to learn operator and UDF cost functions from historical execution data
For the sake of version flexibility, you still have to include your Hadoop (hadoop-hdfs
and hadoop-common
) and Spark (spark-core
and spark-graphx
) version of choice.
You need to obtain the most recent snapshot version of Wayang via Sonatype's snapshot repository. Just include
<repositories>
<repository>
<id>apache-snapshots</id>
<name>Apache Foundation Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots</url>
</repository>
</repositories>
Configure Wayang. In order for Wayang to work properly, it is necessary to tell Wayang about the capacities of your processing platforms and how to reach them. While there is a default configuration that allows to test Wayang right away, we recommend to create a properties file to adapt the configuration where necessary. To have Wayang use that configuration transparently, just run you app via
$ java -Dwayang.configuration=url://to/my/wayang.properties ...
You can find the most relevant settings in the following:
- General settings
-
wayang.core.log.enabled (= true)
: whether to log execution statistics to allow learning better cardinality and cost estimators for the optimizer -
wayang.core.log.executions (= ~/.wayang/executions.json)
where to log execution times of operator groups -
wayang.core.log.cardinalities (= ~/.wayang/cardinalities.json)
where to log cardinality measurements -
wayang.core.optimizer.instrumentation (= org.apache.wayang.core.profiling.OutboundInstrumentationStrategy)
: where to measure cardinalities in Wayang plans; other options areorg.apache.wayang.core.profiling.NoInstrumentationStrategy
andorg.apache.wayang.core.profiling.FullInstrumentationStrategy
-
wayang.core.optimizer.reoptimize (= false)
: whether to progressively optimize Wayang plans -
wayang.basic.tempdir (= file:///tmp)
: where to store temporary files, in particular for inter-platform communication
-
- Java Streams
-
wayang.java.cpu.mhz (= 2700)
: clock frequency of processor the JVM runs on in MHz -
wayang.java.hdfs.ms-per-mb (= 2.7)
: average throughput from HDFS to JVM in ms/MB
-
- Apache Spark
-
spark.master (= local)
: Spark master- various other Spark settings are supported, e.g.,
spark.executor.memory
,spark.serializer
, ...
- various other Spark settings are supported, e.g.,
-
wayang.spark.cpu.mhz (= 2700)
: clock frequency of processor the Spark workers run on in MHz -
wayang.spark.hdfs.ms-per-mb (= 2.7)
: average throughput from HDFS to the Spark workers in ms/MB -
wayang.spark.network.ms-per-mb (= 8.6)
: average network throughput of the Spark workers in ms/MB -
wayang.spark.init.ms (= 4500)
: time it takes Spark to initialize in ms
-
- GraphChi
-
wayang.graphchi.cpu.mhz (= 2700)
: clock frequency of processor GraphChi runs on in MHz -
wayang.graphchi.cpu.cores (= 2)
: number of cores GraphChi runs on -
wayang.graphchi.hdfs.ms-per-mb (= 2.7)
: average throughput from HDFS to GraphChi in ms/MB
-
- SQLite
-
wayang.sqlite3.jdbc.url
: JDBC URL to use SQLite -
wayang.sqlite3.jdbc.user
: optional user name -
wayang.sqlite3.jdbc.password
: optional password -
wayang.sqlite3.cpu.mhz (= 2700)
: clock frequency of processor SQLite runs on in MHz -
wayang.sqlite3.cpu.cores (= 2)
: number of cores SQLite runs on
-
- PostgreSQL
-
wayang.postgres.jdbc.url
: JDBC URL to use PostgreSQL -
wayang.postgres.jdbc.user
: optional user name -
wayang.postgres.jdbc.password
: optional password -
wayang.postgres.cpu.mhz (= 2700)
: clock frequency of processor PostgreSQL runs on in MHz -
wayang.postgres.cpu.cores (= 2)
: number of cores PostgreSQL runs on
-
Code with Wayang. The recommended way to specify your apps with Wayang is via its Scala or Java API from the wayang-api
module. You can find examples below.
The "Hello World!" of data processing systems is the wordcount.
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.util.Collection;
import java.util.Arrays;
public class WordcountJava {
public static void main(String[] args){
// Settings
String inputUrl = "file:/tmp.txt";
// Get a plan builder.
WayangContext wayangContext = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format("WordCount (%s)", inputUrl))
.withUdfJarOf(WordcountJava.class);
// Start building the WayangPlan.
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
// Read the text file.
.readTextFile(inputUrl).withName("Load file")
// Split each line by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")
// Execute the plan and collect the results.
.collect();
System.out.println(wordcounts);
}
}
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
object WordcountScala {
def main(args: Array[String]) {
// Settings
val inputUrl = "file:/tmp.txt"
// Get a plan builder.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"WordCount ($inputUrl)")
.withUdfJarsOf(this.getClass)
val wordcounts = planBuilder
// Read the text file.
.readTextFile(inputUrl).withName("Load file")
// Split each line by non-word characters.
.flatMap(_.split("\\W+"), selectivity = 10).withName("Split words")
// Filter empty tokens.
.filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")
// Attach counter to each word.
.map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
// Execute the plan and collect the results.
.collect()
println(wordcounts)
}
}
Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means.
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
import scala.util.Random
import scala.collection.JavaConversions._
object kmeans {
def main(args: Array[String]) {
// Settings
val inputUrl = "file:/kmeans.txt"
val k = 5
val iterations = 100
val configuration = new Configuration
// Get a plan builder.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)")
.withUdfJarsOf(this.getClass)
case class Point(x: Double, y: Double)
case class TaggedPoint(x: Double, y: Double, cluster: Int)
case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) {
def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count)
def average = TaggedPointCounter(x / count, y / count, cluster, 0)
}
// Read and parse the input file(s).
val points = planBuilder
.readTextFile(inputUrl).withName("Read file")
.map { line =>
val fields = line.split(",")
Point(fields(0).toDouble, fields(1).toDouble)
}.withName("Create points")
// Create initial centroids.
val random = new Random
val initialCentroids = planBuilder
.loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids")
// Declare UDF to select centroid for each data point.
class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] {
/** Keeps the broadcasted centroids. */
var centroids: Iterable[TaggedPointCounter] = _
override def open(executionCtx: ExecutionContext) = {
centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids")
}
override def apply(point: Point): TaggedPointCounter = {
var minDistance = Double.PositiveInfinity
var nearestCentroidId = -1
for (centroid <- centroids) {
val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5)
if (distance < minDistance) {
minDistance = distance
nearestCentroidId = centroid.cluster
}
}
new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1)
}
}
// Do the k-means loop.
val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids =>
points
.mapJava(new SelectNearestCentroid,
udfLoad = LoadProfileEstimators.createFromSpecification(
"my.udf.costfunction.key", configuration
))
.withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid")
.reduceByKey(_.cluster, _.add_points(_)).withName("Add up points")
.withCardinalityEstimator(k)
.map(_.average).withName("Average points")
}).withName("Loop")
// Collect the results.
.collect()
println(finalCentroids)
}
}