Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding second moment of values per key for Typed-API reduce operations #1279

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
22 changes: 19 additions & 3 deletions scalding-core/src/main/scala/com/twitter/scalding/Operations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,13 @@ package com.twitter.scalding {
}
}

/** In the typed API every reduce operation is handled by this Buffer */
private[scalding] object SkewMonitorCounters {
// Strangely, if group name and key name are different, then the counter would be zero
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is bug by itself. Can you confirm this is still true and not due to another issue? We don't want to make a bunch of groups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a bug. if you simply put

flowProcess.increment("TestGroup", "TestKey", 1)

in def operate(flowProcess: FlowProcess[_], call: BufferCall[Any]) {

and this should print 3 in the test

ReduceValueCounterTest. However it prints

PRINTING KEY AND GROUP! 0

val KeyCount = "scalding.skew.monitor.key.count"
val ValuesCountSum = "scalding.skew.monitor.value.count.sum"
val ValuesCountSquareSum = "scalding.skew.monitor.value.sum.square"
}

class TypedBufferOp[K, V, U](
conv: TupleConverter[K],
@transient reduceFn: (K, Iterator[V]) => Iterator[U],
Expand All @@ -491,13 +497,23 @@ package com.twitter.scalding {
.asScala
.map(_.getObject(0).asInstanceOf[V])

val caches = values.toList
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, we can't do this (because it would force everything to memory), but what about something like this:

class CountingIterator[T](wraps: Iterator[T]) extends Iterator[T] {
  private[this] var nextCalls = 0
  def hasNext = wraps.hasNext
  def next = { nextCalls += 1; wraps.next }
  def seen: Int = nextCalls
}

then we could wrap values with this, then call .seen at the end to see how many values went in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

val numValuesPerKey = caches.size.toLong

// Avoiding a lambda here
val resIter = reduceFnSer.get(key, values)
val resIter = reduceFnSer.get(key, caches.toIterator)
while (resIter.hasNext) {
val tup = Tuple.size(1)
tup.set(0, resIter.next)
val t2 = resIter.next
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


tup.set(0, t2)
oc.add(tup)
}

flowProcess.increment(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount, 1L)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better to use a group like "scalding.debug" since I think we want to group them up so they are easy to see next to each other in the job tracker UI.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want the group and counter to be the same string. We want the group to be something like "scalding debug" and the counter value is fine.

flowProcess.increment(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum, numValuesPerKey)
flowProcess.increment(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum, numValuesPerKey * numValuesPerKey)
}

}
}
40 changes: 40 additions & 0 deletions scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,46 @@ class CounterJobTest extends WordSpec with Matchers {
}
}

class ReduceValueCountJob(args: Args) extends Job(args) {
TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3))))
.group
.foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) }
.toTypedPipe
.map{
case (a: Int, (b: Int, c: Int)) =>
(a, b, c)
}
.write(TypedTsv[(Int, Int, Int)](args("output")))
}

class ReduceValueCounterTest extends WordSpec with Matchers {
"Reduce Values Count" should {
JobTest(new com.twitter.scalding.ReduceValueCountJob(_))
.arg("output", "output0")
.counter(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount){
x =>
x should be(3)
}
.counter(SkewMonitorCounters.ValuesCountSquareSum, SkewMonitorCounters.ValuesCountSquareSum) {
x =>
// key 1 has two values, thus 2^2 = 4. key 2 and 3 has only one respectively
x should be(4 + 1 + 1)
}
.counter(SkewMonitorCounters.ValuesCountSum, SkewMonitorCounters.ValuesCountSum) {
x =>
// key 1 has two values, thus 2^2 = 4. key 2 and 3 has only one respectively
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a copied comment, don't want 2^2 right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right. it's a copied commend

x should be (2 + 1 + 1)
}
.sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("output0")){
tuples =>

}
.runHadoop
.finish
}

}

object DailySuffixTsvJob {
val strd1 = "2014-05-01"
val strd2 = "2014-05-02"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,33 @@ class ReduceOperationsTest extends WordSpec with Matchers {
.runHadoop
.finish
}

"Std for number of values aassocated with each key " should {
class SDForReduceOperation(args: Args) extends Job(args) {
TypedPipe.from(List((1, (1, 1)), (2, (2, 2)), (1, (3, 3)), (3, (3, 3))))
.group
.foldLeft((0, 0)){ (a, b) => (a._1 + b._1, a._2 + b._2) }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could do .forceToReducers.sum and it should get the same effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. FIxed.

.toTypedPipe
.map{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map { note the space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

case (a: Int, (b: Int, c: Int)) =>
(a, b, c)
}
.write(TypedTsv[(Int, Int, Int)](args("output")))
}

class ExperimentTest extends WordSpec with Matchers {
"A PageRank2 job" should {
JobTest(new com.twitter.scalding.ReduceValueCountJob(_))
.arg("output", "blah")
.sink[(Int, Int, Int)](TypedTsv[(Int, Int, Int)]("blah")){
tuples =>
println("RES = " + tuples)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove println statements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Dup-test. Thanks for the catch

}
.run
.finish
}

}

}
}