-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition in PageListener #1351
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -210,6 +210,10 @@ class PageListener implements ActionListener<CompositeRetriever.Page> { | |
private String taskId; | ||
private AtomicInteger receivedPages; | ||
private AtomicInteger sentOutPages; | ||
// By introducing pagesInFlight and incrementing it in the main thread before asynchronous processing begins, | ||
// we ensure that the count of in-flight pages is accurate at all times. This allows us to reliably determine | ||
// when all pages have been processed. | ||
private AtomicInteger pagesInFlight; | ||
|
||
PageListener(PageIterator pageIterator, Config config, long dataStartTime, long dataEndTime, String taskId) { | ||
this.pageIterator = pageIterator; | ||
|
@@ -220,14 +224,21 @@ class PageListener implements ActionListener<CompositeRetriever.Page> { | |
this.taskId = taskId; | ||
this.receivedPages = new AtomicInteger(); | ||
this.sentOutPages = new AtomicInteger(); | ||
this.pagesInFlight = new AtomicInteger(); | ||
} | ||
|
||
@Override | ||
public void onResponse(CompositeRetriever.Page entityFeatures) { | ||
// Increment pagesInFlight to track the processing of this page | ||
pagesInFlight.incrementAndGet(); | ||
|
||
// start processing next page after sending out features for previous page | ||
if (pageIterator.hasNext()) { | ||
pageIterator.next(this); | ||
} else if (config.getImputationOption() != null) { | ||
scheduleImputeHCTask(); | ||
} | ||
|
||
if (entityFeatures != null && false == entityFeatures.isEmpty()) { | ||
LOG | ||
.info( | ||
|
@@ -309,19 +320,15 @@ public void onResponse(CompositeRetriever.Page entityFeatures) { | |
} catch (Exception e) { | ||
LOG.error("Unexpected exception", e); | ||
handleException(e); | ||
} finally { | ||
// Decrement pagesInFlight after processing is complete | ||
pagesInFlight.decrementAndGet(); | ||
} | ||
}); | ||
} | ||
|
||
if (!pageIterator.hasNext() && config.getImputationOption() != null) { | ||
if (sentOutPages.get() > 0) { | ||
// at least 1 page sent out. Wait until all responses are back. | ||
scheduleImputeHCTask(); | ||
} else { | ||
// no data in current interval. Send out impute request right away. | ||
imputeHC(dataStartTime, dataEndTime, configId, taskId); | ||
} | ||
|
||
} else { | ||
// No entity features to process | ||
// Decrement pagesInFlight immediately | ||
pagesInFlight.decrementAndGet(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to confirm, here every page is from the feature aggregation results, we want to make sure we received every page and then sending it out to processing (means sending the aggregated feature data to the correct model and doing .process())? then after we check each entity if data was received and send impute call to place the imputed value? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
} | ||
} | ||
|
||
|
@@ -358,7 +365,10 @@ private void scheduleImputeHCTask() { | |
|
||
@Override | ||
public void run() { | ||
if (sentOutPages.get() == receivedPages.get()) { | ||
// By using pagesInFlight in the condition within scheduleImputeHCTask, we ensure that imputeHC | ||
// is executed only after all pages have been processed (pagesInFlight.get() == 0) and all | ||
// responses have been received (sentOutPages.get() == receivedPages.get()). | ||
if (pagesInFlight.get() == 0 && sentOutPages.get() == receivedPages.get()) { | ||
if (!sent.get()) { | ||
// since we don't know when cancel will succeed, need sent to ensure imputeHC is only called once | ||
sent.set(true); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we first going inside here and then incrementing the pages inFlight, shouldn't we first increment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% sure on this actually, then this
pagesInFlight.get() == 0
will never be reached? I was just thinking of first case also if its 0 it might pass right awayThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we finished processing all of the inflight requests, pagesInFlight.get() == 0, right?