[FLINK-24493] [flink-connector-base] Introduce DemultiplexingSink #27072
+2,478
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
Per the discussions within FLINK-24493, support does not presently exist for dynamically routing incoming elements to specific sinks at runtime, based on some characteristic of the incoming element. This pull request addresses that by implementing a new
DemultiplexingSinkconstruct that will support such behavior.The sink implementation consists of two components: the
DemultiplexingSinkitself along with a correspondingSinkRouterinterface, whos implementation will govern how a given element will be mapped to a specific sink at runtime. These active routes (i.e. those that have been previously initialized) will be stored within the internal state of the sink to prevent unnecessary sink initialization for existing sinks, essentially functioning as a cache.Visual Example
The example above demonstrates a simple DemultiplexingSink with a router configured to route each element to a destination that corresponds with the first character of the element (e.g. "apple" -> A, "banana" -> B, etc.).
Example Usage
An example demonstrating this behavior might look like the following:
This example demonstrates consuming a series of
YourElementelements and routes them to dynamic Kafka topics based on a specific attribute within the object itself through the following chain of events:SinkRouterinstance responsible for defining the logic to route the element to its destination topic (in this case a predefinedelement.getTopicName()implementation)DemultiplexingSink<YourElement, String>instance that uses the previously definedSinkRouterinstance.SinkRouter.getRoute()will be executed to determine the sink to route the element to.NOTE: The
SinkRouterimplementation is not limited to String-based keys, so the above example could easily support dynamic routing to different Kafka brokers, topics, etc. Implementations using other popular sinks such as JDBC, Elasticsearch, etc. may likely require additional fields to resolve depending on the level of dynamic behavior required (e.g. including credentials, etc.)Brief change log
DemultiplexingSinkand relatedSinkRouterinterface to support dynamic sink creation and routing at runtime.DemultiplexingSinkState,DemultiplexingSinkStateSerializer, andDemultiplexingSinkWriterrelated to stateful sink operations, resiliency, and recoveryDemultiplexingSinkTestto verify sink and writer creation,DemultiplexingSinkStateSerializerTestfor verification of state serialization/deserialzation behavior, andDemultiplexingSinkWriterTestto verify successful writing to single/multiple routesDemultiplexingSinkStateManagementTestto verify snapshotting and restoration workflow for stateful and non-stateful sinks.DemultiplexingSinkITusing the FlinkMiniClusterExtensionto verify supported behavior worked as expected.Verifying this change
This change added a suite of related tests covering various aspects of the
DemultiplexingSinkbehavior pertaining to routing, snapshotting and recovery, and state serialization/deserialization as mentioned in the change log above. An overview of these cases can be listed below as well at a high-level file summary:DemultiplexingSinkTestDemultiplexingSinkWriterTestDemultiplexingSinkStateSerializerTestDemultiplexingSinkStateManagementTestDemultiplexingSinkITAll of these tests consistently pass without issue - as do all of the existing suite of tests found throughout the project.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): Yes, specifically@PublicEvolving()Documentation