-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Clean up some duplicated code. #982
Conversation
@@ -267,9 +266,8 @@ private[stream] object Collect { | |||
} catch { | |||
case NonFatal(ex) => | |||
decider(ex) match { | |||
case Supervision.Stop => failStage(ex) | |||
case Supervision.Resume => if (!hasBeenPulled(in)) pull(in) | |||
case Supervision.Restart => if (!hasBeenPulled(in)) pull(in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain the change? - does pull(in)
automatically check !hasBeenPulled(in)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to check the (!hasBeenPulled(in))
, because there is no additional pull(in)
. these method was copied from the old code, where, an exception maybe throw during the preStart
but the upstream has been pulled. but here, as the collect
operator never do some pulling or any other things which can throws after a pull(in)
, so I think it would be better to just remove those duplicated check.
That's why I remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change needs better explanation - you should not expect reviewers to read all the other code for themselves
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm - but it would be good if someone else reviews this too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I trust that the !hasBeenPulled(in)
is not necessary it would be good to add a comment explaining why for future people reading the code.
Would also be good to add a unit test confirming this if its quick to do so.
Ill approve once comment is added
@@ -267,9 +266,8 @@ private[stream] object Collect { | |||
} catch { | |||
case NonFatal(ex) => | |||
decider(ex) match { | |||
case Supervision.Stop => failStage(ex) | |||
case Supervision.Resume => if (!hasBeenPulled(in)) pull(in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment here explaining why !hasBeenPulled(in)
is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? the Map and Filter don't have this either.
I would like to add that for in MapConcat
operator which explain why a !hasBeenPulled(in)
test is needed.
I think the origin reason about why this !hasBeenPulled(in)
test is because that method is a abstract class and so that exception can happen when onPush/onPull, so it have to be guard for extending, and reduce the sub class' burden. that's not the case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? the Map and Filter don't have this either.
We should add the comments there as well then. Its more that when other people that are not so familiar with Akka/Pekko read the code they get a better understanding of why these exceptions are there.
Akka's problem historically is that a lot of the contributions were done within Lightbend and so there is a lot of inhouse knowledge that wasn't visible to outside contributors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't push any code because gfw blocked me, Maybe I can use a web ide?
How about we do that in a separate pr or you take care about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://pekko.apache.org/docs/pekko/current/stream/stream-customize.html
It's detailly described here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mdedetrich Please unlock this, I'm not pay for everything, thanks, I just a volunteer, and this kind of information is well covered in the doc, this is code, not a book, and the Chinese GFW make everything harder, if you like to detail comment thing, I'm very glad to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@He-Pin I was away from laptop, ill just add a commit with comment onto your branch and then merge it.
case Supervision.Restart => if (!hasBeenPulled(in)) pull(in) | ||
case Supervision.Stop => failStage(ex) | ||
case _ => | ||
// The !hasBeenPulled(in) check is not required here since it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@He-Pin Is this comment correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is true, but where this comment comes from, I didn't remember...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a maintainer of Pekko, I have permission to add commits onto branches of other people's PR. This is my commit that I added now.
I will approve PR and squash + merge it, thanks for work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, hah, The chinese networking to github is very unstable, that's why I hate changing and changing, sometime I can't pulling pull too, both via ssh and https, and VPN is illegal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Motivation:
I found these code is a little duplicated.
Result:
Simpler code.