-
Notifications
You must be signed in to change notification settings - Fork 707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding second moment of values per key for Typed-API reduce operations #1279
base: develop
Are you sure you want to change the base?
Conversation
DO NOT MERGE YET. |
@johnynek Hi Oscar. Thanks for walking me through the code today! I missed one problem to discuss with you, which I note in the code. so if I'm doing
For the test it would print
Which should actually has a "value count = 2" for key 1. (please see test ReduceValueCounterTest for detail) I have the test in branch: exie/1068 should be easy to replicate. Just uncomment the block and comment the block under it. (in Operation.scala line 509-524) |
The reason should due to:
is trying to iterate the reduced result thus it's iterating through how many keys it has. Thus unfortunately we can't use a var to do a count to see how many values are associated with each key here. |
@@ -491,13 +497,23 @@ package com.twitter.scalding { | |||
.asScala | |||
.map(_.getObject(0).asInstanceOf[V]) | |||
|
|||
val caches = values.toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, we can't do this (because it would force everything to memory), but what about something like this:
class CountingIterator[T](wraps: Iterator[T]) extends Iterator[T] {
private[this] var nextCalls = 0
def hasNext = wraps.hasNext
def next = { nextCalls += 1; wraps.next }
def seen: Int = nextCalls
}
then we could wrap values
with this, then call .seen
at the end to see how many values went in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
@johnynek Does this look good? |
|
||
val numValuesPerKey = values.seen | ||
|
||
flowProcess.increment(SkewMonitorCounters.KeyCount, SkewMonitorCounters.KeyCount, 1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want the group and counter to be the same string. We want the group to be something like "scalding debug" and the counter value is fine.
After testing it for a couple more times, I confirm it's a bug. Here's how you could re-produce it: Checkout the code above and uncomment line 523 in scalding/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala Then in sbt do test-only com.twitter.scalding.ReduceValueCounterTest It print out a line (corresponding to the code in CoreTest.scala line 1837) But if you use same group name as key name, then it gives |
|
proof of concept for #1068