Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexedRDD for accelerated joins #848

Open
wants to merge 17 commits into
base: master
Choose a base branch
from

Conversation

jegonzal
Copy link
Contributor

In many applications (especially graph computation and machine learning) we are iteratively joining model parameters (vertices) with data (edges). In these cases it can be beneficial to pre-organize the records within each partition to share a similar structure.

Logically the IndexedRDD[K,V] extends the RDD[(K,V)] and provides the same functionality. An IndexedRDD is constructed by using the PairRDDFunctions.indexed method:

val tbl: IndexedRDD[String, Int] = sc.parallelize(Array( ("1",1), ("1", -1), ("3",3), ("5",5) )).indexed() 

The tbl.index can be then applied to other RDDs that share the same key set:

val tbl: IndexedRDD[String, Int] = sc.parallelize(Array( ("1",1), ("1", -1), ("3",3), ("5",5) )).indexed()
val tbl2: IndexedRDD[String, Float] = sc.parallelize(Array( ("1",1.0), ("3",3.0), ("5",5.0) )).indexed(tbl.index)  

By sharing the same index across multiple RDDs, we guarantee that the values within each RDD is organized identically enabling fast join operations. Furthermore, because the index is prematerialized (and cached) we can use it to accelerate join operations with RDD[(K,V)] that have not been indexed.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

@jegonzal
Copy link
Contributor Author

This is still a prototype but I wanted to open the discussion on the design so I can incorporate feedback early.

index.cache
valuesRDD.cache
return this
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be overriding persist() instead of cache() here to work with all storage levels

@mridulm
Copy link
Contributor

mridulm commented Aug 20, 2013

I have not looked into this in great detail, but shouldn't cogroup, etc
(anything using multiple rdd's) validate the partitioners are equal before
comparing partition index, etc ?
Or is the github mobile site messing with me !

Regards
Mridul
On Aug 20, 2013 1:50 AM, "Joey" [email protected] wrote:

In many applications (especially graph computation and machine learning)
we are iteratively joining model parameters (vertices) with data (edges).
In these cases it can be beneficial to pre-organize the records within each
partition to share a similar structure.

Logically the IndexedRDD[K,V] extends the RDD[(K,V)] and provides the
same functionality. An IndexedRDD is constructed by using the
PairRDDFunctions.indexed method:

val tbl: IndexedRDD[String, Int] = sc.parallelize(Array( ("1",1), ("1", -1), ("3",3), ("5",5) )).indexed()

The tbl.index can be then applied to other RDDs that share the same key
set:

val tbl: IndexedRDD[String, Int] = sc.parallelize(Array( ("1",1), ("1", -1), ("3",3), ("5",5) )).indexed()val tbl2: IndexedRDD[String, Float] = sc.parallelize(Array( ("1",1.0), ("3",3.0), ("5",5.0) )).indexed(tbl.index)

By sharing the same index across multiple RDDs, we guarantee that the
values within each RDD is organized identically enabling fast join
operations. Furthermore, because the index is prematerialized (and cached)
we can use it to accelerate join operations with RDD[(K,V)] that have not

been indexed.

You can merge this Pull Request by running

git pull https://github.com/jegonzal/spark indexed_rdd

Or view, comment on, or merge it at:

#848
Commit Summary

  • second indexedrdd design
  • Finished early prototype of IndexedRDD
  • Adding testing code for indexedrdd
  • IndexedRDD passes all PairRDD Function tests
  • adding better error handling when indexing an RDD
  • changing caching behavior on indexedrdds
  • Merge branch 'master' of https://github.com/mesos/spark into
    indexed_rdd2
  • Merged with changes to zipPartitions
  • Corrected all indexed RDD tests.
  • Merge branch 'master' of https://github.com/mesos/spark into
    indexed_rdd

File Changes

Patch Links:

@jegonzal
Copy link
Contributor Author

I have made some organizational changes based on suggestions from @rxin and @mateiz.

To RDD I added:

  def pairRDDFunctions[K, V](
      implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]): 
      PairRDDFunctions[K, V] = {
    new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]])
  }

This interesting piece of code returns a PairRDDFunctions wrapper for this RDD. By delegating the construction of the pairRDDFunctions object to the RDD class, specializations of RDD can return different implementations of the PairRDDFunctions. For example, the new IndexedRDD class overrides this function to return:

override def pairRDDFunctions[K1, V1](
      implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]): 
    PairRDDFunctions[K1, V1] = {
    new IndexedRDDFunctions[K1,V1](this.asInstanceOf[IndexedRDD[K1,V1]])
  }

I updated the implicit construction of PairRDDFunctions to call rdd.pairRDDFunctions

I modified PairRDDFunctions to provide a ClassManifest for intermediate types. This is needed for some PairRDD implementations that are array backed (i.e., IndexedRDD).

@jegonzal
Copy link
Contributor Author

In response to @mridulm, the IndexedRDD exploits the partitioning of the index. If both RDDs are indexed (IndexedRDDs) and have the same index then both must be partitioned identically and even arranged identically within each partition enabling cogroup to be achieve using zip. If both RDDs are indexed with a different index then the standard shuffle logic is applied though the index is used to help pre-construct hash maps.

@mridulm
Copy link
Contributor

mridulm commented Aug 28, 2013

To give an example of what I was referring to - take a look at cogroup in IndexedRDDFunctions,

self and other IndexedRDDFunctions need not be partitioned by the same partitioner - and so direct index (value) comparisons in the case match can result in incorrectness ?

Unfortunately, I do not have time to go over this PR - so will derfer to someone else to comment on this further.

@AndreSchumacher
Copy link
Contributor

Hey Joey, this looks nice. Do you happen to have some microbenchmarks for the performance gains of indexed versus non-indexed cogroup? Also you you mentioned the applications to graph algorithms. What other operations on RDD's do you think could benefit from having an index from this context?

@jegonzal
Copy link
Contributor Author

Unfortunately, I don't have any good benchmarks to evaluate joins so I am in the process of creating some. Does anyone have suggestions?

xiajunluan pushed a commit to xiajunluan/spark that referenced this pull request May 30, 2014
Sent secondary jars to distributed cache of all containers and add the cached jars to classpath before executors start. Tested on a YARN cluster (CDH-5.0).

`spark-submit --jars` also works in standalone server and `yarn-client`. Thanks for @andrewor14 for testing!

I removed "Doesn't work for drivers in standalone mode with "cluster" deploy mode." from `spark-submit`'s help message, though we haven't tested mesos yet.

CC: @dbtsai @sryza

Author: Xiangrui Meng <[email protected]>

Closes mesos#848 from mengxr/yarn-classpath and squashes the following commits:

23e7df4 [Xiangrui Meng] rename spark.jar to __spark__.jar and app.jar to __app__.jar to avoid confliction apped $CWD/ and $CWD/* to the classpath remove unused methods
a40f6ed [Xiangrui Meng] standalone -> cluster
65e04ad [Xiangrui Meng] update spark-submit help message and add a comment for yarn-client
11e5354 [Xiangrui Meng] minor changes
3e7e1c4 [Xiangrui Meng] use sparkConf instead of hadoop conf
dc3c825 [Xiangrui Meng] add secondary jars to classpath in yarn
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants