Skip to content

Commit

Permalink
Optimised Schema Change for Cloud Datalake Sinks (#200)
Browse files Browse the repository at this point in the history
This PR removes the properties for governing schema change rollover, as they can lead to errors and do not make sense in the current context. Additionally, the schema change detection mechanism has been updated to better handle compatibility.  

**Changes**

### Removed Properties for Schema Change Rollover:  
The properties `$connectorPrefix.schema.change.rollover` have been removed for the following connectors: connect.s3, connect.datalake, and connect.gcpstorage. This change eliminates potential errors and simplifies the schema change handling logic.

### New Property for Schema Change Detection:

The property $connectorPrefix.schema.change.detector has been introduced for the following connectors: connect.s3, connect.datalake, and connect.gcpstorage.

This property allows users to configure the schema change detection behavior with the following possible values:
	•	`default`: Schemas are compared using object equality.
	•	`version`: Schemas are compared by their version field.
	•	`compatibility`: A more advanced mode that ensures schemas are compatible using Avro compatibility features.

### Updated Schema Change Detection Mechanism:
The `SchemaChangeDetector` trait and its implementations have been introduced to improve schema compatibility handling.
The `VersionSchemaChangeDetector`uses a version comparison instead of the previous method, which used only direct class equality comparison on the schema class.
The `DefaultSchemaChangeDetector` uses object equality for schema comparison, providing a straightforward approach to detecting changes.
The `CompatibilitySchemaChangeDetector` leverages Avro compatibility features to ensure schemas remain compatible, offering an advanced and robust mechanism for managing schema changes.

Commits:
* Avro Parquet Schema Rollover / No Rollover
* Initial parquet rollover changes
* Removing the schema rollover setting from configuration, amending how schema change is calculated for writers
* Test fixes
* Revert changes to CloudSinkTask
* Compatibility schema change detector
* Default schema change detector
* Add schema change detector connector property
* Fixes from review
  • Loading branch information
davidsloan authored Jan 22, 2025
1 parent 54d1db5 commit 4d42d05
Show file tree
Hide file tree
Showing 30 changed files with 747 additions and 232 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer.AvroFormatWriter
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.Offset
Expand All @@ -39,8 +40,8 @@ import org.scalatest.matchers.should.Matchers

class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest with LazyLogging {

val avroFormatReader = new AvroFormatReader()

val avroFormatReader = new AvroFormatReader()
val schemaChangeDetector = DefaultSchemaChangeDetector
"convert" should "write byte output stream with json for a single record" in {

implicit val compressionCodec = UNCOMPRESSED.toCodec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer._
import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
Expand All @@ -37,6 +38,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.conversion.ArraySinkDat
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.MapSinkData
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.sink.conversion.ToAvroDataConverter
import io.lenses.streamreactor.connect.cloud.common.stream.BuildLocalOutputStream
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
Expand All @@ -56,14 +58,17 @@ class ParquetFormatWriterStreamTest

implicit val compressionCodec: CompressionCodec = UNCOMPRESSED.toCodec()

val parquetFormatReader = new ParquetFormatReader()
val schemaChangeDetector = DefaultSchemaChangeDetector
val parquetFormatReader = new ParquetFormatReader()
val avroSchema = ToAvroDataConverter.convertSchema(users.head.schema())
val topicPartition = Topic("testTopic").withPartition(1)

"convert" should "write byte output stream with json for a single record" in {

implicit val compressionCodec: CompressionCodec = UNCOMPRESSED.toCodec()

val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), topicPartition)
val parquetFormatWriter =
new ParquetFormatWriter(blobStream)(compressionCodec)
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
Expand Down Expand Up @@ -112,8 +117,9 @@ class ParquetFormatWriterStreamTest

"convert" should "throw an error when writing array without schema" in {

val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), topicPartition)
val parquetFormatWriter =
new ParquetFormatWriter(blobStream)(compressionCodec)
parquetFormatWriter.write(
MessageDetail(
NullSinkData(None),
Expand All @@ -137,8 +143,9 @@ class ParquetFormatWriterStreamTest

val mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA)

val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), topicPartition)
val parquetFormatWriter = new ParquetFormatWriter(blobStream)(compressionCodec)

parquetFormatWriter.write(
MessageDetail(
NullSinkData(None),
Expand Down Expand Up @@ -189,9 +196,10 @@ class ParquetFormatWriterStreamTest
}

private def writeToParquetFile(implicit compressionCodec: CompressionCodec) = {
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), topicPartition)

val parquetFormatWriter = new ParquetFormatWriter(blobStream)
val parquetFormatWriter =
new ParquetFormatWriter(blobStream)(compressionCodec)
firstUsers.foreach(u =>
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(u),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
Expand Down Expand Up @@ -72,9 +73,10 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont

private val compressionCodec = UNCOMPRESSED.toCodec()

private val TopicName = "myTopic"
private val PathPrefix = "streamReactorBackups"
private val avroFormatReader = new AvroFormatReader
private val TopicName = "myTopic"
private val PathPrefix = "streamReactorBackups"
private val schemaChangeDetector = DefaultSchemaChangeDetector
private val avroFormatReader = new AvroFormatReader

private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator
private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
Expand Down Expand Up @@ -104,13 +106,13 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
dataStorage = DataStorageSettings.disabled,
),
),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec = compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
rolloverOnSchemaChangeEnabled = true,
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec = compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
schemaChangeDetector = schemaChangeDetector,
)

"avro sink" should "write 2 records to avro format in s3" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
Expand Down Expand Up @@ -60,7 +61,6 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.time.Instant

import scala.jdk.CollectionConverters._

class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {
Expand All @@ -72,7 +72,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
private val TopicName = "myTopic"
private val PathPrefix = "streamReactorBackups"
private implicit val cloudLocationValidator: S3LocationValidator.type = S3LocationValidator

private val schemaChangeDetector = DefaultSchemaChangeDetector
"json sink" should "write single json record using offset key naming" in {

val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
Expand Down Expand Up @@ -107,11 +107,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
rolloverOnSchemaChangeEnabled = true,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
schemaChangeDetector = schemaChangeDetector,
)

val sink = writerManagerCreator.from(config)._2
Expand Down Expand Up @@ -172,11 +172,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
rolloverOnSchemaChangeEnabled = true,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
schemaChangeDetector = schemaChangeDetector,
)

val sink = writerManagerCreator.from(config)._2
Expand Down Expand Up @@ -241,11 +241,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
rolloverOnSchemaChangeEnabled = true,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
schemaChangeDetector = schemaChangeDetector,
)

val sink = writerManagerCreator.from(config)._2
Expand Down Expand Up @@ -316,11 +316,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
rolloverOnSchemaChangeEnabled = true,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
schemaChangeDetector = schemaChangeDetector,
)

val sink = writerManagerCreator.from(config)._2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
Expand Down Expand Up @@ -67,10 +68,10 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC

private val compressionCodec = UNCOMPRESSED.toCodec()
private implicit val cloudLocationValidator: S3LocationValidator.type = S3LocationValidator

private val TopicName = "myTopic"
private val PathPrefix = "streamReactorBackups"
private val parquetFormatReader = new ParquetFormatReader
private val schemaChangeDetector = DefaultSchemaChangeDetector
private val TopicName = "myTopic"
private val PathPrefix = "streamReactorBackups"
private val parquetFormatReader = new ParquetFormatReader

private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
private def parquetConfig = S3SinkConfig(
Expand Down Expand Up @@ -104,11 +105,11 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
),
indexOptions = IndexOptions(5, ".indexes").some,
compressionCodec,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
rolloverOnSchemaChangeEnabled = true,
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L, 1.0),
logMetrics = false,
schemaChangeDetector = schemaChangeDetector,
)

"parquet sink" should "write 2 records to parquet format in s3" in {
Expand Down Expand Up @@ -151,6 +152,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
.field("name", SchemaBuilder.string().required().build())
.field("designation", SchemaBuilder.string().optional().build())
.field("salary", SchemaBuilder.float64().optional().build())
.version(2)
.build()

val usersWithNewSchema = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter
import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
Expand All @@ -28,6 +29,7 @@ class WriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerT
private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator

private val topicPartition = Topic("topic").withPartition(10)
private val schemaChangeDetector: SchemaChangeDetector = mock[SchemaChangeDetector]

private val s3KeyNamer = mock[CloudKeyNamer]
"S3WriterManager" should "return empty map when no offset or metadata writers can be found" in {
Expand All @@ -41,7 +43,7 @@ class WriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerT
formatWriterFn = (_, _) => mock[FormatWriter].asRight,
writerIndexer = mock[WriterIndexer[S3FileMetadata]],
_.asRight,
rolloverOnSchemaChangeEnabled = true,
schemaChangeDetector = schemaChangeDetector,
)

val result = wm.preCommit(Map(topicPartition -> new OffsetAndMetadata(999)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.aws.s3.config.S3ConnectionConfig
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.traits.CloudSinkConfig
import io.lenses.streamreactor.connect.cloud.common.config.traits.PropsToConfigConverter
import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketOptions
Expand Down Expand Up @@ -51,9 +52,10 @@ object S3SinkConfig extends PropsToConfigConverter[S3SinkConfig] {
cloudLocationValidator: CloudLocationValidator,
): Either[Throwable, S3SinkConfig] =
for {
sinkBucketOptions <- CloudSinkBucketOptions(connectorTaskId, s3ConfigDefBuilder)
indexOptions = s3ConfigDefBuilder.getIndexSettings
logMetrics = s3ConfigDefBuilder.getBoolean(LOG_METRICS_CONFIG)
sinkBucketOptions <- CloudSinkBucketOptions(connectorTaskId, s3ConfigDefBuilder)
indexOptions = s3ConfigDefBuilder.getIndexSettings
logMetrics = s3ConfigDefBuilder.getBoolean(LOG_METRICS_CONFIG)
schemaChangeDetector = s3ConfigDefBuilder.schemaChangeDetector(),
} yield S3SinkConfig(
S3ConnectionConfig(s3ConfigDefBuilder.getParsedValues),
sinkBucketOptions,
Expand All @@ -63,19 +65,19 @@ object S3SinkConfig extends PropsToConfigConverter[S3SinkConfig] {
errorPolicy = s3ConfigDefBuilder.getErrorPolicyOrDefault,
connectorRetryConfig = s3ConfigDefBuilder.getRetryConfig,
logMetrics = logMetrics,
s3ConfigDefBuilder.shouldRollOverOnSchemaChange(),
schemaChangeDetector = schemaChangeDetector,
)

}

case class S3SinkConfig(
connectionConfig: S3ConnectionConfig,
bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty,
indexOptions: Option[IndexOptions],
compressionCodec: CompressionCodec,
batchDelete: Boolean,
errorPolicy: ErrorPolicy,
connectorRetryConfig: RetryConfig,
logMetrics: Boolean,
rolloverOnSchemaChangeEnabled: Boolean,
connectionConfig: S3ConnectionConfig,
bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty,
indexOptions: Option[IndexOptions],
compressionCodec: CompressionCodec,
batchDelete: Boolean,
errorPolicy: ErrorPolicy,
connectorRetryConfig: RetryConfig,
logMetrics: Boolean,
schemaChangeDetector: SchemaChangeDetector,
) extends CloudSinkConfig[S3ConnectionConfig]
Loading

0 comments on commit 4d42d05

Please sign in to comment.