-
Notifications
You must be signed in to change notification settings - Fork 118
Use ingestion-client in the Shuffler
#4024
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Shuffler
8008c9c to
91f6046
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for replacing the direct Bifrost write with the IngressClient in the Shuffle @muhamadazmy. Maybe the name IngressClient does not fit 100% given that now also the Shuffle uses it. Maybe something like IngestionClient or so works better. Given that we don't use the send window of the IngressClient yet, I wouldn't expect a different runtime behavior of the shuffle. Once we have this, I would be interested in how the overall shuffle throughput increases by using the IngressClient.
I left a few minor comments for your consideration.
|
There seem to be a few test failures on GHA. |
| ingress | ||
| .ingest( | ||
| msg.partition_key(), | ||
| IngestRecord::from_parts(msg.record_keys(), msg), | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about support for rolling upgrades?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ingest does not fail unless the ingession client is closed. This means worst case is that it will block until leaders are responsive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think to avoid this situation it's possible we release support for the Ingest messages in PP first before actually using them in the following release.
d6e9955 to
8c0797e
Compare
fb4ba71 to
df301ad
Compare
8921aa2 to
ef64a0b
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for updating the shuffle to use the new ingestion client @muhamadazmy. LGTM. +1 for merging.
Did you ran any benchmarks/tests for a workload that needs to shuffle a lot to check whether we see an improvement with these changes?
|
|
||
| let partition_store_manager = PartitionStoreManager::create().await?; | ||
|
|
||
| let ingress = IngestionClient::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ingestion_client?
| /// | ||
| /// Settings for the shared ingestion client used by all workers to | ||
| /// manage record ingestion across partitions (shuffle). | ||
| pub shuffle: IngestionOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IngestionOptions options refer to the Kafka ingestion. Should the description be updated to also include the shuffle or can it be generic enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to update the docs of IngestionOptions to make it more generic.
| let mut stream = | ||
| state_machine::StateMachine::new(metadata, ingestion_client, outbox_reader, hint_rx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Is stream still the best variable name?
| loop { | ||
| match &mut self.state { | ||
| State::Idle => { | ||
| loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this inner loop needed or could it be removed and let the outer loop handle things?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, it's totally not needed.
58a3264 to
68b90af
Compare
3c9a9a4 to
b5a503b
Compare
- Use IngestionClient instead of bifrost to write to partitions logs - Remove deprecated `delete_invocation`
Avoid direct writes to bifrost in shuffler by using a dedicated ingestion-client instance.
Use ingestion-client in the
ShufflerAvoid direct writes to bifrost in shuffler by using a
dedicated ingestion-client instance.
Stack created with Sapling. Best reviewed with ReviewStack.
Shuffler#4024IngestionClientfor invocation and state mgmt #3980