-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition in PageListener #1351
Conversation
This PR - Introduced an `AtomicInteger` called `pagesInFlight` to track the number of pages currently being processed. - Incremented `pagesInFlight` before processing each page and decremented it after processing is complete - Adjusted the condition in `scheduleImputeHCTask` to check both `pagesInFlight.get() == 0` (all pages have been processed) and `sentOutPages.get() == receivedPages.get()` (all responses have been received) before scheduling the `imputeHC` task. - Removed the previous final check in `onResponse` that decided when to schedule `imputeHC`, relying instead on the updated counters for accurate synchronization. These changes address the race condition where `sentOutPages` might not have been incremented in time before checking whether to schedule the `imputeHC` task. By accurately tracking the number of in-flight pages and sent pages, we ensure that `imputeHC` is executed only after all pages have been fully processed and all responses have been received. Testing done: 1. Reproduced the race condition by starting two detectors with imputation. This causes an out of order illegal argument exception from RCF due to this race condition. Also verified the change fixed the problem. 2. added an IT for the above scenario. Signed-off-by: Kaituo Li <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1351 +/- ##
=========================================
Coverage 80.00% 80.00%
- Complexity 5662 5673 +11
=========================================
Files 533 533
Lines 23429 23430 +1
Branches 2335 2334 -1
=========================================
+ Hits 18745 18746 +1
- Misses 3573 3578 +5
+ Partials 1111 1106 -5
Flags with carried forward coverage won't be shown. Click here to find out more.
|
} | ||
|
||
@Override | ||
public void onResponse(CompositeRetriever.Page entityFeatures) { | ||
// start processing next page after sending out features for previous page | ||
if (pageIterator.hasNext()) { | ||
pageIterator.next(this); | ||
} else if (config.getImputationOption() != null) { | ||
scheduleImputeHCTask(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we first going inside here and then incrementing the pages inFlight, shouldn't we first increment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% sure on this actually, then this pagesInFlight.get() == 0
will never be reached? I was just thinking of first case also if its 0 it might pass right away
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we finished processing all of the inflight requests, pagesInFlight.get() == 0, right?
} else { | ||
// No entity features to process | ||
// Decrement pagesInFlight immediately | ||
pagesInFlight.decrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm, here every page is from the feature aggregation results, we want to make sure we received every page and then sending it out to processing (means sending the aggregated feature data to the correct model and doing .process())? then after we check each entity if data was received and send impute call to place the imputed value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
Overall were we processing results of detector 1 still but then detector 2 already finished and went to scheduleImputeHCTask? |
scheduleImputeHCTask is detector specific. So when detector 2 is finished, scheduleImputeHCTask will start regardless of detector 1. |
Signed-off-by: Kaituo Li <[email protected]>
* Fix race condition in PageListener This PR - Introduced an `AtomicInteger` called `pagesInFlight` to track the number of pages currently being processed. - Incremented `pagesInFlight` before processing each page and decremented it after processing is complete - Adjusted the condition in `scheduleImputeHCTask` to check both `pagesInFlight.get() == 0` (all pages have been processed) and `sentOutPages.get() == receivedPages.get()` (all responses have been received) before scheduling the `imputeHC` task. - Removed the previous final check in `onResponse` that decided when to schedule `imputeHC`, relying instead on the updated counters for accurate synchronization. These changes address the race condition where `sentOutPages` might not have been incremented in time before checking whether to schedule the `imputeHC` task. By accurately tracking the number of in-flight pages and sent pages, we ensure that `imputeHC` is executed only after all pages have been fully processed and all responses have been received. Testing done: 1. Reproduced the race condition by starting two detectors with imputation. This causes an out of order illegal argument exception from RCF due to this race condition. Also verified the change fixed the problem. 2. added an IT for the above scenario. Signed-off-by: Kaituo Li <[email protected]> * make sure increment before schedule Signed-off-by: Kaituo Li <[email protected]> --------- Signed-off-by: Kaituo Li <[email protected]> (cherry picked from commit f62885a) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Fix race condition in PageListener This PR - Introduced an `AtomicInteger` called `pagesInFlight` to track the number of pages currently being processed. - Incremented `pagesInFlight` before processing each page and decremented it after processing is complete - Adjusted the condition in `scheduleImputeHCTask` to check both `pagesInFlight.get() == 0` (all pages have been processed) and `sentOutPages.get() == receivedPages.get()` (all responses have been received) before scheduling the `imputeHC` task. - Removed the previous final check in `onResponse` that decided when to schedule `imputeHC`, relying instead on the updated counters for accurate synchronization. These changes address the race condition where `sentOutPages` might not have been incremented in time before checking whether to schedule the `imputeHC` task. By accurately tracking the number of in-flight pages and sent pages, we ensure that `imputeHC` is executed only after all pages have been fully processed and all responses have been received. Testing done: 1. Reproduced the race condition by starting two detectors with imputation. This causes an out of order illegal argument exception from RCF due to this race condition. Also verified the change fixed the problem. 2. added an IT for the above scenario. Signed-off-by: Kaituo Li <[email protected]> * make sure increment before schedule Signed-off-by: Kaituo Li <[email protected]> --------- Signed-off-by: Kaituo Li <[email protected]> (cherry picked from commit f62885a) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Fix race condition in PageListener This PR - Introduced an `AtomicInteger` called `pagesInFlight` to track the number of pages currently being processed. - Incremented `pagesInFlight` before processing each page and decremented it after processing is complete - Adjusted the condition in `scheduleImputeHCTask` to check both `pagesInFlight.get() == 0` (all pages have been processed) and `sentOutPages.get() == receivedPages.get()` (all responses have been received) before scheduling the `imputeHC` task. - Removed the previous final check in `onResponse` that decided when to schedule `imputeHC`, relying instead on the updated counters for accurate synchronization. These changes address the race condition where `sentOutPages` might not have been incremented in time before checking whether to schedule the `imputeHC` task. By accurately tracking the number of in-flight pages and sent pages, we ensure that `imputeHC` is executed only after all pages have been fully processed and all responses have been received. Testing done: 1. Reproduced the race condition by starting two detectors with imputation. This causes an out of order illegal argument exception from RCF due to this race condition. Also verified the change fixed the problem. 2. added an IT for the above scenario. * make sure increment before schedule --------- (cherry picked from commit f62885a) Signed-off-by: Kaituo Li <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Fix race condition in PageListener This PR - Introduced an `AtomicInteger` called `pagesInFlight` to track the number of pages currently being processed. - Incremented `pagesInFlight` before processing each page and decremented it after processing is complete - Adjusted the condition in `scheduleImputeHCTask` to check both `pagesInFlight.get() == 0` (all pages have been processed) and `sentOutPages.get() == receivedPages.get()` (all responses have been received) before scheduling the `imputeHC` task. - Removed the previous final check in `onResponse` that decided when to schedule `imputeHC`, relying instead on the updated counters for accurate synchronization. These changes address the race condition where `sentOutPages` might not have been incremented in time before checking whether to schedule the `imputeHC` task. By accurately tracking the number of in-flight pages and sent pages, we ensure that `imputeHC` is executed only after all pages have been fully processed and all responses have been received. Testing done: 1. Reproduced the race condition by starting two detectors with imputation. This causes an out of order illegal argument exception from RCF due to this race condition. Also verified the change fixed the problem. 2. added an IT for the above scenario. * make sure increment before schedule --------- (cherry picked from commit f62885a) Signed-off-by: Kaituo Li <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Description
This PR
AtomicInteger
calledpagesInFlight
to track the number of pages currently being processed.pagesInFlight
before processing each page and decremented it after processing is completescheduleImputeHCTask
to check bothpagesInFlight.get() == 0
(all pages have been processed) andsentOutPages.get() == receivedPages.get()
(all responses have been received) before scheduling theimputeHC
task.onResponse
that decided when to scheduleimputeHC
, relying instead on the updated counters for accurate synchronization.These changes address the race condition where
sentOutPages
might not have been incremented in time before checking whether to schedule theimputeHC
task. By accurately tracking the number of in-flight pages and sent pages, we ensure thatimputeHC
is executed only after all pages have been fully processed and all responses have been received.Testing done:
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.