Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utility function to get a setup & cleanup function for mapping each partition #456

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open

Utility function to get a setup & cleanup function for mapping each partition #456

wants to merge 13 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Feb 8, 2013

Often when mapping some RDD, you want to do a bit of setup before processing each partition, followed by cleanup at the end of the partition; this adds utility functions to make that easier.

I felt that this is worth including because its a little tricky to get right -- I needed to add a "CleanupIterator", and I have an example in the unit tests of how this fails w/out it. OTOH, I wasn't sure if this necessarily belongs in the spark api itself (eg., do we add a version of foreach with this also?)

We find it a useful addition, and so thought others might also ...

@@ -4,6 +4,8 @@ import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, PartitionPruningRDD}
import spark.RDD.PartitionMapper
import collection._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style of import doesn't match the convention used in the rest of the codebase. It should probably be replaced with an import of scala.collection.mutable.Set, since that appears to be the only class that it's importing.

@squito
Copy link
Contributor Author

squito commented Feb 8, 2013

I cleaned up the style issues, sorry about that.
I left that "failing" test in there ... happy to remove it if you want me to, just wanted to be clear if you want me to document the trouble w/ mapPartitions somewhere

@stephenh
Copy link
Contributor

stephenh commented Feb 9, 2013

Hi Imran,

Have you seen the TaskContext and addOnCompleteCallback? That is what HadoopRDD uses to close the FileStream after all of the lines in a Hadoop file have been read.

You might be able to do what you're doing with just a custom RDD that did something like:

override def compute(s: Split, context: TaskContext): Iterator[(T, U)] = {
   setupDbConnection()
   context.addOnCompleteCallback { tearDownDbConnection()
   // call parent rdd or do own compute stuff

I believe this will achieve the same thing, as compute will be called on each partition, and you'll have start/stop hooks around the execution on each partition.

@mateiz
Copy link
Member

mateiz commented Feb 9, 2013

I agree with Stephen here. The addOnCompleteCallback mechanism also makes sure to call your handler if the task throws an exception, which is important.

Also, can you add a similar method in the Java API? I guess you would need to create an interface in Java for the setup/close stuff too.

@squito
Copy link
Contributor Author

squito commented Feb 11, 2013

good point, I definitely hadn't thought about ensuring cleanup w/ exceptions.

I've updated it to use onCompleteCallback. I also added it to the java api -- I added separate classes for PairRDDs & DoubleRDDs, dunno if there is a better way to do that.

@stephenh
Copy link
Contributor

I wonder if this could be done with something more like decoration:

val rdd = sc.textFile(...).setupPartitions(dbSetupLogic).mapPartitions(...).cleanupPartitions(dbCloseLogic)

So there would be two new RDDs, PartitionSetupRDD that first invoked its setup function once/partition, then called firstParent.compute, and then PartitionCleanupRDD, that setup the complete callback for its cleanup function.

Not sure if the decoupling would lead to unintended/nonsensical use cases. But, just musing, then perhaps they could be used separately, if you only need one or the other, or without the map, which PartitionMapper currently forces you do to.

Also, I just like that this would use plain functions and not a new "PartitionMapper" interface--for some reason that doesn't feel quite right, but I can't think of a better name.

I see what you're trying to do though.

* setup & cleanup that happens before & after computing each partition
*/
def mapWithSetupAndCleanup[K,V](m: JavaPairPartitionMapper[T,K,V]): JavaPairRDD[K,V] = {
val scalaMapper = new PartitionMapper[T,(K,V)] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CanJavaPairPartitionMapper<T, K, V> be an abstract class that extends or implements PartitionMapper<T, Tuple2<K, V>>? If you can do that, then you wouldn't have to wrap the the Java PartitionMapper to convert it into its Scala counterpart.

@mateiz
Copy link
Member

mateiz commented Feb 11, 2013

Regarding Stephen's comment -- I think it's better to keep PartitionMapper a single object instead of doing functions, in case you need to share state among the setup, map and clean methods (e.g. you open some external resource, use it in your map, then close it).

@stephenh
Copy link
Contributor

Ah, good point. That makes sense.

@squito
Copy link
Contributor Author

squito commented Feb 12, 2013

I updated JavaPairPartitionMapper, per Josh's suggestion. (We lose the ability for map to throw an exception, but that is already the case for the basic PartitionMapper.)

I tried doing the same thing for JavaDoubleRDD, but somehow I got stuck with weird manifest errors. First it complained:

[error] found : ClassManifest[scala.Double]
[error] required: ClassManifest[java.lang.Double]

Then when I switched to explicitly using a java.lang.Double manifest, it reversed:

[error] found : spark.RDD[java.lang.Double]
[error] required: spark.RDD[scala.Double]

so I just left it as is

@squito
Copy link
Contributor Author

squito commented Feb 13, 2013

OK I think this is ready to go now. I got rid of the need for the helper object for JavaDoubleRDD, by just casting from java.lang.Double to scala.Double, and it seems happy. Also I put a throws Exception declaration on map in PartitionMapper, for the java api.

@squito
Copy link
Contributor Author

squito commented Mar 16, 2013

Just curious what the status is on this -- waiting for some additional changes here, decided against merging it, or just haven't gotten to it yet.

@@ -401,6 +398,54 @@ public void mapPartitions() {
}

@Test
public void mapPartitionsWithSetupAndCleanup() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use two spaces for the indentation in this file (it looks like maybe it's tabs, or multiple spaces)

@mateiz
Copy link
Member

mateiz commented Mar 16, 2013

Sorry, just hadn't had a chance to look at it. It looks good but I made two small comments.

Conflicts:
	core/src/main/scala/spark/RDD.scala
@squito
Copy link
Contributor Author

squito commented Mar 16, 2013

thanks! I've updated to take those comments into account.

@squito
Copy link
Contributor Author

squito commented Mar 20, 2013

wow, somhow I totally missed committing changes to one file before ... hope you didn't waste time looking at it before, now its actually all there

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@AmplabJenkins
Copy link

I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission!

1 similar comment
@AmplabJenkins
Copy link

I'm the Jenkins test bot for the UC, Berkeley AMPLab. I've noticed your pull request and will test it once an admin authorizes me to. Thanks for your submission!

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants