-
Notifications
You must be signed in to change notification settings - Fork 470
[sql_server] Initial implementation for Source Rendering #32133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[sql_server] Initial implementation for Source Rendering #32133
Conversation
0dab210
to
f74160b
Compare
f74160b
to
6248e1e
Compare
let (mut client, connection) = | ||
mz_sql_server_util::Client::connect(connection_config).await?; | ||
// TODO(sql_server1): Move the connection into its own future. | ||
mz_ore::task::spawn(|| "sql_server-connection", async move { connection.await }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the TODO refer to? It seems to be its own future already. Also the async block can be reduced to just connection
:
mz_ore::task::spawn(|| "sql_server-connection", async move { connection.await }); | |
mz_ore::task::spawn(|| "sql_server-connection", connection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TODO refers to just cleaning up this API a bit, everywhere we use it we have to spawn this task and instead it would be great if the task was just spawned internally.
Also the returned Connection
type implements IntoFuture
not Future
so it needs the async move { connection.await }
block
.map(|output| { | ||
( | ||
Arc::clone(&output.capture_instance), | ||
(output.partition_index, Arc::clone(&output.decoder)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a partition index and then splitting the stream into multiple ones using .partition(..)
is done in the other sources because the SourceRender
trait didn't support multiple outputs directly. This is now possible so since this is a greenfield source it would be nice to directly create as many outputs as necessary and drive them directly here. This will allow us to manipulate the frontiers of each outputs separately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted offline, I have a local branch that does what Petros describes, I'll make this change in a follow-up PR
// Decode the SQL Server row into an MZ one. | ||
let Some((partition_idx, decoder)) = capture_instances.get(&capture_instance) | ||
else { | ||
let definite_error = DefiniteError::ProgrammingError(format!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean here by ProgrammingError
? Is this condition only hit through a bug? If yes then I wouldn't make it a definite error since that will permanently break the source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally here you would do a .expect(...)
but in case of a bug I don't want to panic clusterd
. Switched to a TransientError
!
return Ok(()); | ||
}; | ||
|
||
// Failing to decode data is a permanent failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a permanent failure if the problematic row is later retracted so what we want to do here is emit the error in the output and continue as if nothing happened. The error message should contain enough data for the user to locate the problematic row (perhaps a raw representation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! I also reworked decoding to return a new SqlServerDecodeError
whose aim is to be a stable error type.
src/sql-server-util/src/cdc.rs
Outdated
@@ -310,60 +356,114 @@ pub enum CdcError { | |||
/// function. | |||
/// | |||
/// [`sys.fn_cdc_increment_lsn`](https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/sys-fn-cdc-increment-lsn-transact-sql?view=sql-server-ver16) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doc comment block needs some updates since LSNs are not treated as opaque blobs anymore and we increment them without calling this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated!
src/sql-server-util/src/cdc.rs
Outdated
// the LSN from progressing, we want to wait a bit for SQL Server to | ||
// become ready. | ||
let (_client, lsn_result) = mz_ore::retry::Retry::default() | ||
.max_duration(std::time::Duration::from_secs(10)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this give 10 seconds to the query to come back? The default might be too tight, and definitely worth a dyncfg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does, this should only really matter when CDC is first initialized for the entire DB, but is something I ran into in tests. I made this configurable via a dyncfg and updated the max wait to 30 seconds
let next_lsn = crate::inspect::increment_lsn(self.client, new_lsn).await?; | ||
max_observed_lsn = db_max_lsn; | ||
// TODO(sql_server2): We should occassionally check to see how close the LSN we | ||
// generate is to the LSN returned from incrementing via SQL Server itself. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, what will we do with this information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much, but it might be interesting to observe how the LSN of the upstream advances just for our own understanding
@@ -175,7 +201,7 @@ impl<'a> CdcStream<'a> { | |||
self.client, | |||
&*instance, | |||
*instance_lsn, | |||
new_lsn, | |||
db_max_lsn, | |||
RowFilterOption::AllUpdateOld, | |||
) | |||
// TODO(sql_server3): Make this chunk size configurable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not part of this PR, but the code below seems to be doing some grouping by LSNs. Timely doesn't care about the order of LSNs in the data stream as long as progress statements are correct so we can directly iterate and emit the data as they come.
That said, in which case does SQL Server send us data out of order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know!
SQL Server shouldn't be sending data out of order but the type this Stream yields is:
Data {
/// The capture instance these changes are for.
capture_instance: Arc<str>,
/// The LSN that this change occurred at.
lsn: Lsn,
/// The change itself.
changes: Vec<Operation>,
},
So we're grouping by LSN just so we can emit an entire set of Operation
s all at once instead of (Arc<str>, Lsn, Operation)
tuples. Happy to refactor this in a follow up if you'd prefer!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emitting a sequence of (Arc<str>, Lsn, Operation)
is definitely more timely-like so I highly suggest that we do that, and totally fine as a followup. Separately, it would be nice if we could get rid of that Arc<str>
in favor of the table id which I presume is some small copy-able type that doesn't need refcounts update all the time.
src/sql-server-util/src/cdc.rs
Outdated
// event in case we haven't already. | ||
// | ||
// Note: Generally this only ever occurs when starting a stream and the LSN | ||
// we're starting with matches the current max of the DB. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this else block can be removed entirely and instead just emit a progress event at the beginning before entering the replication loop. Or don't emit it at all and let the caller assume it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good thought, refactored to emit this once at the start!
* introduces a new SqlServerDecodeError which documents the stability requirements * updates the SqlServerDecoder to return this new error type * changes decoding from a definite error to committing a SourceError
…nd LSN implementation * refactor a few methods on the Lsn struct * change the CdcHandle::into_stream method to emit a single progress event at start
* add dyncfgs for the CDC poll rate and the amount of time we'll wait for SQL server to report an initial LSN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick turnaround on those comments!
Thank you for the quick review! |
This PR adds an MVP implementation of the
SourceRender
trait for the SQL Server source. It also adds a testdrive test that exercises replicating data from SQL Server into Materialize.TODO(parkmycar): I need to add some more detail here
Motivation
Progress towards https://github.com/MaterializeInc/database-issues/issues/8762
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.