diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/formats/AvroFormatWriterStreamTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/formats/AvroFormatWriterStreamTest.scala index 8f6281b4a..843cf7928 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/formats/AvroFormatWriterStreamTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/formats/AvroFormatWriterStreamTest.scala @@ -24,6 +24,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader import io.lenses.streamreactor.connect.cloud.common.formats.writer.AvroFormatWriter import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED import io.lenses.streamreactor.connect.cloud.common.model.Offset @@ -39,8 +40,8 @@ import org.scalatest.matchers.should.Matchers class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest with LazyLogging { - val avroFormatReader = new AvroFormatReader() - + val avroFormatReader = new AvroFormatReader() + val schemaChangeDetector = DefaultSchemaChangeDetector "convert" should "write byte output stream with json for a single record" in { implicit val compressionCodec = UNCOMPRESSED.toCodec() diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/formats/ParquetFormatWriterStreamTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/formats/ParquetFormatWriterStreamTest.scala index 2b6a5f737..a3042b50f 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/formats/ParquetFormatWriterStreamTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/formats/ParquetFormatWriterStreamTest.scala @@ -25,6 +25,7 @@ import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection import io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetFormatReader import io.lenses.streamreactor.connect.cloud.common.formats.writer._ +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic @@ -37,6 +38,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.conversion.ArraySinkDat import io.lenses.streamreactor.connect.cloud.common.sink.conversion.MapSinkData import io.lenses.streamreactor.connect.cloud.common.sink.conversion.NullSinkData import io.lenses.streamreactor.connect.cloud.common.sink.conversion.StructSinkData +import io.lenses.streamreactor.connect.cloud.common.sink.conversion.ToAvroDataConverter import io.lenses.streamreactor.connect.cloud.common.stream.BuildLocalOutputStream import org.apache.kafka.connect.data.Schema import org.apache.kafka.connect.data.SchemaBuilder @@ -56,14 +58,17 @@ class ParquetFormatWriterStreamTest implicit val compressionCodec: CompressionCodec = UNCOMPRESSED.toCodec() - val parquetFormatReader = new ParquetFormatReader() + val schemaChangeDetector = DefaultSchemaChangeDetector + val parquetFormatReader = new ParquetFormatReader() + val avroSchema = ToAvroDataConverter.convertSchema(users.head.schema()) + val topicPartition = Topic("testTopic").withPartition(1) "convert" should "write byte output stream with json for a single record" in { implicit val compressionCodec: CompressionCodec = UNCOMPRESSED.toCodec() - - val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1)) - val parquetFormatWriter = new ParquetFormatWriter(blobStream) + val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), topicPartition) + val parquetFormatWriter = + new ParquetFormatWriter(blobStream)(compressionCodec) parquetFormatWriter.write(MessageDetail(NullSinkData(None), StructSinkData(users.head), Map.empty, @@ -112,8 +117,9 @@ class ParquetFormatWriterStreamTest "convert" should "throw an error when writing array without schema" in { - val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1)) - val parquetFormatWriter = new ParquetFormatWriter(blobStream) + val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), topicPartition) + val parquetFormatWriter = + new ParquetFormatWriter(blobStream)(compressionCodec) parquetFormatWriter.write( MessageDetail( NullSinkData(None), @@ -137,8 +143,9 @@ class ParquetFormatWriterStreamTest val mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA) - val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1)) - val parquetFormatWriter = new ParquetFormatWriter(blobStream) + val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), topicPartition) + val parquetFormatWriter = new ParquetFormatWriter(blobStream)(compressionCodec) + parquetFormatWriter.write( MessageDetail( NullSinkData(None), @@ -189,9 +196,10 @@ class ParquetFormatWriterStreamTest } private def writeToParquetFile(implicit compressionCodec: CompressionCodec) = { - val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1)) + val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), topicPartition) - val parquetFormatWriter = new ParquetFormatWriter(blobStream) + val parquetFormatWriter = + new ParquetFormatWriter(blobStream)(compressionCodec) firstUsers.foreach(u => parquetFormatWriter.write(MessageDetail(NullSinkData(None), StructSinkData(u), diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala index 0bee9eb75..c123c618d 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala @@ -30,6 +30,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic @@ -72,9 +73,10 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont private val compressionCodec = UNCOMPRESSED.toCodec() - private val TopicName = "myTopic" - private val PathPrefix = "streamReactorBackups" - private val avroFormatReader = new AvroFormatReader + private val TopicName = "myTopic" + private val PathPrefix = "streamReactorBackups" + private val schemaChangeDetector = DefaultSchemaChangeDetector + private val avroFormatReader = new AvroFormatReader private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some) @@ -104,13 +106,13 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont dataStorage = DataStorageSettings.disabled, ), ), - indexOptions = IndexOptions(5, ".indexes").some, - compressionCodec = compressionCodec, - batchDelete = true, - errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), - connectorRetryConfig = new RetryConfig(1, 1L, 1.0), - logMetrics = false, - rolloverOnSchemaChangeEnabled = true, + indexOptions = IndexOptions(5, ".indexes").some, + compressionCodec = compressionCodec, + batchDelete = true, + errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), + connectorRetryConfig = new RetryConfig(1, 1L, 1.0), + logMetrics = false, + schemaChangeDetector = schemaChangeDetector, ) "avro sink" should "write 2 records to avro format in s3" in { diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala index e41d4f43e..f711ea0d1 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala @@ -29,6 +29,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic @@ -60,7 +61,6 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import java.time.Instant - import scala.jdk.CollectionConverters._ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest { @@ -72,7 +72,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont private val TopicName = "myTopic" private val PathPrefix = "streamReactorBackups" private implicit val cloudLocationValidator: S3LocationValidator.type = S3LocationValidator - + private val schemaChangeDetector = DefaultSchemaChangeDetector "json sink" should "write single json record using offset key naming" in { val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some) @@ -107,11 +107,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont ), indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, - batchDelete = true, - errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), - connectorRetryConfig = new RetryConfig(1, 1L, 1.0), - logMetrics = false, - rolloverOnSchemaChangeEnabled = true, + batchDelete = true, + errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), + connectorRetryConfig = new RetryConfig(1, 1L, 1.0), + logMetrics = false, + schemaChangeDetector = schemaChangeDetector, ) val sink = writerManagerCreator.from(config)._2 @@ -172,11 +172,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont ), indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, - batchDelete = true, - errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), - connectorRetryConfig = new RetryConfig(1, 1L, 1.0), - logMetrics = false, - rolloverOnSchemaChangeEnabled = true, + batchDelete = true, + errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), + connectorRetryConfig = new RetryConfig(1, 1L, 1.0), + logMetrics = false, + schemaChangeDetector = schemaChangeDetector, ) val sink = writerManagerCreator.from(config)._2 @@ -241,11 +241,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont ), indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, - batchDelete = true, - errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), - connectorRetryConfig = new RetryConfig(1, 1L, 1.0), - logMetrics = false, - rolloverOnSchemaChangeEnabled = true, + batchDelete = true, + errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), + connectorRetryConfig = new RetryConfig(1, 1L, 1.0), + logMetrics = false, + schemaChangeDetector = schemaChangeDetector, ) val sink = writerManagerCreator.from(config)._2 @@ -316,11 +316,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont ), indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, - batchDelete = true, - errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), - connectorRetryConfig = new RetryConfig(1, 1L, 1.0), - logMetrics = false, - rolloverOnSchemaChangeEnabled = true, + batchDelete = true, + errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), + connectorRetryConfig = new RetryConfig(1, 1L, 1.0), + logMetrics = false, + schemaChangeDetector = schemaChangeDetector, ) val sink = writerManagerCreator.from(config)._2 diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala index 0bdbc6de6..0110cfcb9 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala @@ -30,6 +30,7 @@ import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection import io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetFormatReader import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic @@ -67,10 +68,10 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC private val compressionCodec = UNCOMPRESSED.toCodec() private implicit val cloudLocationValidator: S3LocationValidator.type = S3LocationValidator - - private val TopicName = "myTopic" - private val PathPrefix = "streamReactorBackups" - private val parquetFormatReader = new ParquetFormatReader + private val schemaChangeDetector = DefaultSchemaChangeDetector + private val TopicName = "myTopic" + private val PathPrefix = "streamReactorBackups" + private val parquetFormatReader = new ParquetFormatReader private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some) private def parquetConfig = S3SinkConfig( @@ -104,11 +105,11 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC ), indexOptions = IndexOptions(5, ".indexes").some, compressionCodec, - batchDelete = true, - errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), - connectorRetryConfig = new RetryConfig(1, 1L, 1.0), - logMetrics = false, - rolloverOnSchemaChangeEnabled = true, + batchDelete = true, + errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW), + connectorRetryConfig = new RetryConfig(1, 1L, 1.0), + logMetrics = false, + schemaChangeDetector = schemaChangeDetector, ) "parquet sink" should "write 2 records to parquet format in s3" in { @@ -151,6 +152,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC .field("name", SchemaBuilder.string().required().build()) .field("designation", SchemaBuilder.string().optional().build()) .field("salary", SchemaBuilder.float64().optional().build()) + .version(2) .build() val usersWithNewSchema = List( diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/WriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/WriterManagerTest.scala index 047ac0c24..d148ec184 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/WriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/WriterManagerTest.scala @@ -5,6 +5,7 @@ import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator @@ -28,6 +29,7 @@ class WriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerT private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator private val topicPartition = Topic("topic").withPartition(10) + private val schemaChangeDetector: SchemaChangeDetector = mock[SchemaChangeDetector] private val s3KeyNamer = mock[CloudKeyNamer] "S3WriterManager" should "return empty map when no offset or metadata writers can be found" in { @@ -41,7 +43,7 @@ class WriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerT formatWriterFn = (_, _) => mock[FormatWriter].asRight, writerIndexer = mock[WriterIndexer[S3FileMetadata]], _.asRight, - rolloverOnSchemaChangeEnabled = true, + schemaChangeDetector = schemaChangeDetector, ) val result = wm.preCommit(Map(topicPartition -> new OffsetAndMetadata(999))) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala index b69bfc0ff..a28feb659 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala @@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.aws.s3.config.S3ConnectionConfig import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.config.traits.CloudSinkConfig import io.lenses.streamreactor.connect.cloud.common.config.traits.PropsToConfigConverter +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketOptions @@ -51,9 +52,10 @@ object S3SinkConfig extends PropsToConfigConverter[S3SinkConfig] { cloudLocationValidator: CloudLocationValidator, ): Either[Throwable, S3SinkConfig] = for { - sinkBucketOptions <- CloudSinkBucketOptions(connectorTaskId, s3ConfigDefBuilder) - indexOptions = s3ConfigDefBuilder.getIndexSettings - logMetrics = s3ConfigDefBuilder.getBoolean(LOG_METRICS_CONFIG) + sinkBucketOptions <- CloudSinkBucketOptions(connectorTaskId, s3ConfigDefBuilder) + indexOptions = s3ConfigDefBuilder.getIndexSettings + logMetrics = s3ConfigDefBuilder.getBoolean(LOG_METRICS_CONFIG) + schemaChangeDetector = s3ConfigDefBuilder.schemaChangeDetector(), } yield S3SinkConfig( S3ConnectionConfig(s3ConfigDefBuilder.getParsedValues), sinkBucketOptions, @@ -63,19 +65,19 @@ object S3SinkConfig extends PropsToConfigConverter[S3SinkConfig] { errorPolicy = s3ConfigDefBuilder.getErrorPolicyOrDefault, connectorRetryConfig = s3ConfigDefBuilder.getRetryConfig, logMetrics = logMetrics, - s3ConfigDefBuilder.shouldRollOverOnSchemaChange(), + schemaChangeDetector = schemaChangeDetector, ) } case class S3SinkConfig( - connectionConfig: S3ConnectionConfig, - bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty, - indexOptions: Option[IndexOptions], - compressionCodec: CompressionCodec, - batchDelete: Boolean, - errorPolicy: ErrorPolicy, - connectorRetryConfig: RetryConfig, - logMetrics: Boolean, - rolloverOnSchemaChangeEnabled: Boolean, + connectionConfig: S3ConnectionConfig, + bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty, + indexOptions: Option[IndexOptions], + compressionCodec: CompressionCodec, + batchDelete: Boolean, + errorPolicy: ErrorPolicy, + connectorRetryConfig: RetryConfig, + logMetrics: Boolean, + schemaChangeDetector: SchemaChangeDetector, ) extends CloudSinkConfig[S3ConnectionConfig] diff --git a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/sink/config/DatalakeSinkConfig.scala b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/sink/config/DatalakeSinkConfig.scala index 59ebfb72f..67a2ce879 100644 --- a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/sink/config/DatalakeSinkConfig.scala +++ b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/sink/config/DatalakeSinkConfig.scala @@ -20,6 +20,7 @@ import io.lenses.streamreactor.common.errors.ErrorPolicy import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.config.traits.CloudSinkConfig import io.lenses.streamreactor.connect.cloud.common.config.traits.PropsToConfigConverter +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketOptions @@ -46,10 +47,11 @@ object DatalakeSinkConfig extends PropsToConfigConverter[DatalakeSinkConfig] { cloudLocationValidator: CloudLocationValidator, ): Either[Throwable, DatalakeSinkConfig] = for { - authMode <- s3ConfigDefBuilder.getAuthMode - sinkBucketOptions <- CloudSinkBucketOptions(connectorTaskId, s3ConfigDefBuilder) - indexOptions = s3ConfigDefBuilder.getIndexSettings - logMetrics = s3ConfigDefBuilder.getBoolean(LOG_METRICS_CONFIG) + authMode <- s3ConfigDefBuilder.getAuthMode + sinkBucketOptions <- CloudSinkBucketOptions(connectorTaskId, s3ConfigDefBuilder) + indexOptions = s3ConfigDefBuilder.getIndexSettings + logMetrics = s3ConfigDefBuilder.getBoolean(LOG_METRICS_CONFIG) + schemaChangeDetector = s3ConfigDefBuilder.schemaChangeDetector() } yield DatalakeSinkConfig( AzureConnectionConfig(s3ConfigDefBuilder.getParsedValues, authMode), sinkBucketOptions, @@ -58,18 +60,18 @@ object DatalakeSinkConfig extends PropsToConfigConverter[DatalakeSinkConfig] { s3ConfigDefBuilder.getErrorPolicyOrDefault, s3ConfigDefBuilder.getRetryConfig, logMetrics, - s3ConfigDefBuilder.shouldRollOverOnSchemaChange(), + schemaChangeDetector, ) } case class DatalakeSinkConfig( - connectionConfig: AzureConnectionConfig, - bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty, - indexOptions: Option[IndexOptions], - compressionCodec: CompressionCodec, - errorPolicy: ErrorPolicy, - connectorRetryConfig: RetryConfig, - logMetrics: Boolean, - rolloverOnSchemaChangeEnabled: Boolean, + connectionConfig: AzureConnectionConfig, + bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty, + indexOptions: Option[IndexOptions], + compressionCodec: CompressionCodec, + errorPolicy: ErrorPolicy, + connectorRetryConfig: RetryConfig, + logMetrics: Boolean, + schemaChangeDetector: SchemaChangeDetector, ) extends CloudSinkConfig[AzureConnectionConfig] diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala index 9df56d02d..b660ff9f9 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/traits/CloudConfig.scala @@ -17,6 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.config.traits import io.lenses.streamreactor.common.config.base.RetryConfig import io.lenses.streamreactor.common.errors.ErrorPolicy +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketOptions import io.lenses.streamreactor.connect.cloud.common.sink.config.IndexOptions @@ -72,12 +73,8 @@ trait CloudSinkConfig[CC] extends CloudConfig { def logMetrics: Boolean - /** - * Indicates whether rollover on schema change is enabled. - * - * @return `true` if rollover on schema change is enabled, `false` otherwise. - */ - def rolloverOnSchemaChangeEnabled: Boolean + def schemaChangeDetector: SchemaChangeDetector + } /** diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/AvroFormatWriter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/AvroFormatWriter.scala index dcabf767f..e73dc24e2 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/AvroFormatWriter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/AvroFormatWriter.scala @@ -36,8 +36,12 @@ import org.apache.kafka.connect.data.{ Schema => ConnectSchema } import scala.util.Try -class AvroFormatWriter(outputStream: CloudOutputStream)(implicit compressionCodec: CompressionCodec) - extends FormatWriter +class AvroFormatWriter( + outputStream: CloudOutputStream, +)( + implicit + compressionCodec: CompressionCodec, +) extends FormatWriter with LazyLogging { private val avroCompressionCodec: CodecFactory = { @@ -78,7 +82,9 @@ class AvroFormatWriter(outputStream: CloudOutputStream)(implicit compressionCode override def getPointer: Long = avroWriterState.fold(0L)(_.pointer) private class AvroWriterState(outputStream: CloudOutputStream, connectSchema: Option[ConnectSchema]) { - private val schema: Schema = ToAvroDataConverter.convertSchema(connectSchema) + private val schema: Schema = connectSchema.map(ToAvroDataConverter.convertSchema).getOrElse( + throw new IllegalArgumentException("Schema-less data is not supported for Avro/Parquet"), + ) private val writer: GenericDatumWriter[Any] = new GenericDatumWriter[Any](schema) private val fileWriter: DataFileWriter[Any] = new DataFileWriter[Any](writer).setCodec(avroCompressionCodec).create(schema, outputStream) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/CsvFormatWriter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/CsvFormatWriter.scala index d875fac7b..da50d32ce 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/CsvFormatWriter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/CsvFormatWriter.scala @@ -28,7 +28,11 @@ import java.io.OutputStreamWriter import scala.jdk.CollectionConverters.ListHasAsScala import scala.util.Try -class CsvFormatWriter(outputStream: CloudOutputStream, writeHeaders: Boolean) extends FormatWriter with LazyLogging { +class CsvFormatWriter( + outputStream: CloudOutputStream, + writeHeaders: Boolean, +) extends FormatWriter + with LazyLogging { private val outputStreamWriter = new OutputStreamWriter(outputStream) private val csvWriter = new CSVWriter(outputStreamWriter) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/FormatWriter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/FormatWriter.scala index 51fbe32ea..6c380944f 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/FormatWriter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/FormatWriter.scala @@ -25,48 +25,47 @@ import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.sink.NonFatalCloudSinkError import io.lenses.streamreactor.connect.cloud.common.sink.SinkError import io.lenses.streamreactor.connect.cloud.common.stream.BuildLocalOutputStream -import io.lenses.streamreactor.connect.cloud.common.stream.CloudOutputStream -import java.io.File +import java.nio.file.Path import scala.util.Try object FormatWriter { def apply( formatSelection: FormatSelection, - path: File, + path: Path, topicPartition: TopicPartition, )( implicit compressionCodec: CompressionCodec, ): Either[SinkError, FormatWriter] = { for { - outputStream <- Try(new BuildLocalOutputStream(toBufferedOutputStream(path), topicPartition)) - writer <- Try(FormatWriter(formatSelection, outputStream)) + outputStream <- Try(new BuildLocalOutputStream(toBufferedOutputStream(path.toFile), topicPartition)) + writer <- Try { + formatSelection match { + case ParquetFormatSelection => + new ParquetFormatWriter(outputStream) + case JsonFormatSelection => new JsonFormatWriter(outputStream) + case AvroFormatSelection => new AvroFormatWriter(outputStream) + case TextFormatSelection(_) => new TextFormatWriter(outputStream) + case CsvFormatSelection(formatOptions) => + new CsvFormatWriter(outputStream, formatOptions.contains(WithHeaders)) + case BytesFormatSelection => new BytesFormatWriter(outputStream) + case _ => throw FormatWriterException(s"Unsupported cloud format $formatSelection.format") + } + } } yield writer }.toEither.leftMap(ex => new NonFatalCloudSinkError(ex.getMessage, ex.some)) - def apply( - formatInfo: FormatSelection, - outputStream: CloudOutputStream, - )( - implicit - compressionCodec: CompressionCodec, - ): FormatWriter = - formatInfo match { - case ParquetFormatSelection => new ParquetFormatWriter(outputStream) - case JsonFormatSelection => new JsonFormatWriter(outputStream) - case AvroFormatSelection => new AvroFormatWriter(outputStream) - case TextFormatSelection(_) => new TextFormatWriter(outputStream) - case CsvFormatSelection(formatOptions) => new CsvFormatWriter(outputStream, formatOptions.contains(WithHeaders)) - case BytesFormatSelection => new BytesFormatWriter(outputStream) - case _ => throw FormatWriterException(s"Unsupported cloud format $formatInfo.format") - } - } trait FormatWriter extends AutoCloseable { + /** + * Determines if the file should be rolled over when a schema change is detected. + * + * @return True if the file should be rolled over on schema change, false otherwise. + */ def rolloverFileOnSchemaChange(): Boolean def write(message: MessageDetail): Either[Throwable, Unit] @@ -76,4 +75,5 @@ trait FormatWriter extends AutoCloseable { def complete(): Either[SinkError, Unit] def close(): Unit = { val _ = complete() } + } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/ParquetFormatWriter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/ParquetFormatWriter.scala index befb11443..925d54ec4 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/ParquetFormatWriter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/ParquetFormatWriter.scala @@ -38,8 +38,12 @@ import org.apache.parquet.hadoop.metadata.{ CompressionCodecName => ParquetCompr import scala.util.Try -class ParquetFormatWriter(outputStream: CloudOutputStream)(implicit compressionCodec: CompressionCodec) - extends FormatWriter +class ParquetFormatWriter( + outputStream: CloudOutputStream, +)( + implicit + compressionCodec: CompressionCodec, +) extends FormatWriter with LazyLogging { private val parquetCompressionCodec: ParquetCompressionCodecName = { @@ -63,15 +67,22 @@ class ParquetFormatWriter(outputStream: CloudOutputStream)(implicit compressionC logger.debug("ParquetFormatWriter - write") val genericRecord = ToAvroDataConverter.convertToGenericRecord(messageDetail.value) - if (writer == null) { - writer = init(messageDetail.value.schema()) - } + createWriterIfNoWriter( + messageDetail.value.schema().getOrElse( + throw new IllegalArgumentException("Schema-less data is not supported for Avro/Parquet"), + ), + ) writer.write(genericRecord) outputStream.flush() }.toEither - private def init(connectSchema: Option[ConnectSchema]): ParquetWriter[Any] = { + private def createWriterIfNoWriter(connectSchema: ConnectSchema): Unit = + if (writer == null) { + writer = init(connectSchema) + } + + private def init(connectSchema: ConnectSchema): ParquetWriter[Any] = { val schema: Schema = ToAvroDataConverter.convertSchema(connectSchema) val outputFile = new ParquetOutputFile(outputStream) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/CompatibilitySchemaChangeDetector.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/CompatibilitySchemaChangeDetector.scala new file mode 100644 index 000000000..9f885bb79 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/CompatibilitySchemaChangeDetector.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2017-2025 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.formats.writer.schema + +import io.lenses.streamreactor.connect.cloud.common.sink.conversion.ToAvroDataConverter +import org.apache.avro.SchemaCompatibility +import org.apache.kafka.connect.data.{ Schema => ConnectSchema } + +/** + * Implementation of SchemaChangeDetector that detects schema changes based on compatibility. + */ +object CompatibilitySchemaChangeDetector extends SchemaChangeDetector { + + /** + * Checks the compatibility between the old schema and the new schema. + * + * @param oldSchema The old schema. + * @param newSchema The new schema. + * @return False if the schemas are compatible, true otherwise. + */ + override def detectSchemaChange(oldSchema: ConnectSchema, newSchema: ConnectSchema): Boolean = { + + val oldSchemaAvro = ToAvroDataConverter.convertSchema(oldSchema) + val newSchemaAvro = ToAvroDataConverter.convertSchema(newSchema) + SchemaCompatibility.checkReaderWriterCompatibility(oldSchemaAvro, newSchemaAvro).getResult.getCompatibility match { + case SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE => false + case _ => true + } + } +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/DefaultSchemaChangeDetector.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/DefaultSchemaChangeDetector.scala new file mode 100644 index 000000000..68e7c498f --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/DefaultSchemaChangeDetector.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2025 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.formats.writer.schema + +import org.apache.kafka.connect.data.Schema + +/** + * Default implementation of SchemaChangeDetector that always detects a schema change. + */ +object DefaultSchemaChangeDetector extends SchemaChangeDetector { + + /** + * Detects if there is a change between the old schema and the new schema. + * + * @param oldSchema The old schema. + * @param newSchema The new schema. + * @return Always returns true, indicating a schema change. + */ + override def detectSchemaChange(oldSchema: Schema, newSchema: Schema): Boolean = true +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/SchemaChangeDetector.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/SchemaChangeDetector.scala new file mode 100644 index 000000000..0eb3644e7 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/SchemaChangeDetector.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2017-2025 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.formats.writer.schema + +import org.apache.kafka.connect.data.{ Schema => ConnectSchema } + +/** + * Trait defining the contract for schema change detection. + */ +trait SchemaChangeDetector { + + /** + * Detects if there is a change between the old schema and the new schema. + * + * @param oldSchema The old schema. + * @param newSchema The new schema. + * @return True if the schema has changed, false otherwise. + */ + def detectSchemaChange(oldSchema: ConnectSchema, newSchema: ConnectSchema): Boolean +} + +object SchemaChangeDetector { + def apply(detectorName: String): SchemaChangeDetector = + detectorName.toLowerCase match { + case "compatibility" => CompatibilitySchemaChangeDetector + case "version" => VersionSchemaChangeDetector + case _ => DefaultSchemaChangeDetector + } +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/VersionSchemaChangeDetector.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/VersionSchemaChangeDetector.scala new file mode 100644 index 000000000..425d0194b --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/VersionSchemaChangeDetector.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2017-2025 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.formats.writer.schema + +import org.apache.kafka.connect.data.{ Schema => ConnectSchema } + +/** + * Implementation of SchemaChangeDetector that detects schema changes based on version numbers. + */ +object VersionSchemaChangeDetector extends SchemaChangeDetector { + + /** + * Compares the version of the old schema with the version of the new schema. + * + * @param oldSchema The old schema. + * @param newSchema The new schema. + * @return True if the new schema's version is greater than the old schema's version, false otherwise. + */ + override def detectSchemaChange(oldSchema: ConnectSchema, newSchema: ConnectSchema): Boolean = { + + val oldName = oldSchema.name() + val newName = newSchema.name() + + val oldVersion = oldSchema.version() + val newVersion = newSchema.version() + + (!oldName.equals(newName)) || newVersion > oldVersion + } +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreator.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreator.scala index 47f2cc558..2c2d821c3 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreator.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreator.scala @@ -114,7 +114,7 @@ class WriterManagerCreator[MD <: FileMetadata, SC <: CloudSinkConfig[_]] extends for { formatWriter <- formats.writer.FormatWriter( bucketOptions.formatSelection, - stagingFilename, + stagingFilename.toPath, topicPartition, )(config.compressionCodec) } yield formatWriter @@ -140,7 +140,7 @@ class WriterManagerCreator[MD <: FileMetadata, SC <: CloudSinkConfig[_]] extends formatWriterFn, writerIndexer, transformers.transform, - config.rolloverOnSchemaChangeEnabled, + config.schemaChangeDetector, ) (indexManager, writerManager) } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/SchemaChangeSettings.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/SchemaChangeSettings.scala index 44c228798..4b3d39438 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/SchemaChangeSettings.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/SchemaChangeSettings.scala @@ -17,34 +17,35 @@ package io.lenses.streamreactor.connect.cloud.common.sink.config import io.lenses.streamreactor.common.config.base.traits.BaseSettings import io.lenses.streamreactor.common.config.base.traits.WithConnectorPrefix +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.common.config.ConfigDef.Importance import org.apache.kafka.common.config.ConfigDef.Type trait SchemaChangeConfigKeys extends WithConnectorPrefix { - protected def SCHEMA_CHANGE_ROLLOVER = s"$connectorPrefix.schema.change.rollover" + protected def SCHEMA_CHANGE_DETECTOR = s"$connectorPrefix.schema.change.detector" - private val SCHEMA_CHANGE_ROLLOVER_DOC = "Roll over on schema change." - private val SCHEMA_CHANGE_ROLLOVER_DEFAULT: Boolean = true + private val SCHEMA_CHANGE_DETECTOR_DOC = "Schema change detector." + private val SCHEMA_CHANGE_DETECTOR_DEFAULT: String = "default" def withSchemaChangeConfig(configDef: ConfigDef): ConfigDef = configDef.define( - SCHEMA_CHANGE_ROLLOVER, - Type.BOOLEAN, - SCHEMA_CHANGE_ROLLOVER_DEFAULT, + SCHEMA_CHANGE_DETECTOR, + Type.STRING, + SCHEMA_CHANGE_DETECTOR_DEFAULT, Importance.LOW, - SCHEMA_CHANGE_ROLLOVER_DOC, + SCHEMA_CHANGE_DETECTOR_DOC, "Schema Change", 1, - ConfigDef.Width.SHORT, - SCHEMA_CHANGE_ROLLOVER, + ConfigDef.Width.MEDIUM, + SCHEMA_CHANGE_DETECTOR, ) } trait SchemaChangeSettings extends BaseSettings with SchemaChangeConfigKeys { - def shouldRollOverOnSchemaChange(): Boolean = - getBoolean(SCHEMA_CHANGE_ROLLOVER) + def schemaChangeDetector(): SchemaChangeDetector = + SchemaChangeDetector(getString(SCHEMA_CHANGE_DETECTOR)) } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverter.scala index 549ef9a24..e9b0fb266 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverter.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverter.scala @@ -44,10 +44,7 @@ object ToAvroDataConverter { ) private val avroDataConverter = new AvroData(avroDataConfig) - def convertSchema(connectSchema: Option[ConnectSchema]): Schema = connectSchema - .fold(throw new IllegalArgumentException("Schema-less data is not supported for Avro/Parquet"))( - avroDataConverter.fromConnectSchema, - ) + def convertSchema(connectSchema: ConnectSchema): Schema = avroDataConverter.fromConnectSchema(connectSchema) def convertToGenericRecord[A <: Any](sinkData: SinkData): Any = sinkData match { diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala index 71b2e752a..fcc7825d7 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/Writer.scala @@ -20,6 +20,7 @@ import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.model.UploadableFile @@ -37,13 +38,13 @@ import scala.math.Ordered.orderingToOrdered import scala.util.Try class Writer[SM <: FileMetadata]( - topicPartition: TopicPartition, - commitPolicy: CommitPolicy, - writerIndexer: WriterIndexer[SM], - stagingFilenameFn: () => Either[SinkError, File], - objectKeyBuilder: ObjectKeyBuilder, - formatWriterFn: File => Either[SinkError, FormatWriter], - rolloverOnSchemaChangeEnabled: Boolean, + topicPartition: TopicPartition, + commitPolicy: CommitPolicy, + writerIndexer: WriterIndexer[SM], + stagingFilenameFn: () => Either[SinkError, File], + objectKeyBuilder: ObjectKeyBuilder, + formatWriterFn: File => Either[SinkError, FormatWriter], + schemaChangeDetector: SchemaChangeDetector, )( implicit connectorTaskId: ConnectorTaskId, @@ -239,12 +240,17 @@ class Writer[SM <: FileMetadata]( } def shouldRollover(schema: Schema): Boolean = - rolloverOnSchemaChangeEnabled && - rolloverOnSchemaChange && + rolloverOnSchemaChange && schemaHasChanged(schema) protected[writer] def schemaHasChanged(schema: Schema): Boolean = - writeState.getCommitState.lastKnownSchema.exists(_ != schema) + writeState match { + case w: Writing => + w.getCommitState.lastKnownSchema.exists { lastSchema => + lastSchema != schema && schemaChangeDetector.detectSchemaChange(lastSchema, schema) + } + case _ => false + } protected[writer] def rolloverOnSchemaChange: Boolean = writeState match { diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala index 1a4d304b8..ce28310dc 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterManager.scala @@ -20,6 +20,7 @@ import com.typesafe.scalalogging.StrictLogging import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation @@ -52,15 +53,15 @@ case class MapKey(topicPartition: TopicPartition, partitionValues: immutable.Map * sinks, since file handles cannot be safely shared without considerable overhead. */ class WriterManager[SM <: FileMetadata]( - commitPolicyFn: TopicPartition => Either[SinkError, CommitPolicy], - bucketAndPrefixFn: TopicPartition => Either[SinkError, CloudLocation], - keyNamerFn: TopicPartition => Either[SinkError, KeyNamer], - stagingFilenameFn: (TopicPartition, Map[PartitionField, String]) => Either[SinkError, File], - objKeyBuilderFn: (TopicPartition, Map[PartitionField, String]) => ObjectKeyBuilder, - formatWriterFn: (TopicPartition, File) => Either[SinkError, FormatWriter], - writerIndexer: WriterIndexer[SM], - transformerF: MessageDetail => Either[RuntimeException, MessageDetail], - rolloverOnSchemaChangeEnabled: Boolean, + commitPolicyFn: TopicPartition => Either[SinkError, CommitPolicy], + bucketAndPrefixFn: TopicPartition => Either[SinkError, CloudLocation], + keyNamerFn: TopicPartition => Either[SinkError, KeyNamer], + stagingFilenameFn: (TopicPartition, Map[PartitionField, String]) => Either[SinkError, File], + objKeyBuilderFn: (TopicPartition, Map[PartitionField, String]) => ObjectKeyBuilder, + formatWriterFn: (TopicPartition, File) => Either[SinkError, FormatWriter], + writerIndexer: WriterIndexer[SM], + transformerF: MessageDetail => Either[RuntimeException, MessageDetail], + schemaChangeDetector: SchemaChangeDetector, )( implicit connectorTaskId: ConnectorTaskId, @@ -178,7 +179,7 @@ class WriterManager[SM <: FileMetadata]( () => stagingFilenameFn(topicPartition, partitionValues), objKeyBuilderFn(topicPartition, partitionValues), formatWriterFn.curried(topicPartition), - rolloverOnSchemaChangeEnabled, + schemaChangeDetector, ) } } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/CompatibilitySchemaChangeDetectorTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/CompatibilitySchemaChangeDetectorTest.scala new file mode 100644 index 000000000..9713cb4b2 --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/CompatibilitySchemaChangeDetectorTest.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2017-2025 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.formats.writer.schema + +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.scalatest.funsuite.AnyFunSuiteLike + +class CompatibilitySchemaChangeDetectorTest extends AnyFunSuiteLike { + + private val detector = CompatibilitySchemaChangeDetector + + def createSchema(fields: (String, Schema)*): Schema = { + val builder = SchemaBuilder.struct() + fields.foreach { case (name, schema) => builder.field(name, schema) } + builder.build() + } + + test("detectSchemaChange returns false for identical schemas") { + val schema1 = createSchema("field1" -> Schema.STRING_SCHEMA) + val schema2 = createSchema("field1" -> Schema.STRING_SCHEMA) + assert(!detector.detectSchemaChange(schema1, schema2)) + } + + test("detectSchemaChange returns true for schemas with different fields") { + val schema1 = createSchema("field1" -> Schema.STRING_SCHEMA) + val schema2 = createSchema("field2" -> Schema.STRING_SCHEMA) + assert(detector.detectSchemaChange(schema1, schema2)) + } + + test("detectSchemaChange returns true for schemas with different field types") { + val schema1 = createSchema("field1" -> Schema.STRING_SCHEMA) + val schema2 = createSchema("field1" -> Schema.INT32_SCHEMA) + assert(detector.detectSchemaChange(schema1, schema2)) + } + + test("detectSchemaChange returns false for compatible schemas with additional fields") { + val schema1 = createSchema("field1" -> Schema.STRING_SCHEMA) + val schema2 = createSchema("field1" -> Schema.STRING_SCHEMA, "field2" -> Schema.INT32_SCHEMA) + assert(!detector.detectSchemaChange(schema1, schema2)) + } + + test("detectSchemaChange returns true for incompatible schemas with removed fields") { + val schema1 = createSchema("field1" -> Schema.STRING_SCHEMA, "field2" -> Schema.INT32_SCHEMA) + val schema2 = createSchema("field1" -> Schema.STRING_SCHEMA) + assert(detector.detectSchemaChange(schema1, schema2)) + } + +} diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/DefaultSchemaChangeDetectorTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/DefaultSchemaChangeDetectorTest.scala new file mode 100644 index 000000000..49a632aaa --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/DefaultSchemaChangeDetectorTest.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2017-2025 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.formats.writer.schema + +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.scalatest.funsuite.AnyFunSuite + +class DefaultSchemaChangeDetectorTest extends AnyFunSuite { + + private val detector = DefaultSchemaChangeDetector + + def createSchema(fields: (String, Schema)*): Schema = { + val builder = SchemaBuilder.struct() + fields.foreach { case (name, schema) => builder.field(name, schema) } + builder.build() + } + + def createEmptySchema(): Schema = SchemaBuilder.struct().build() + + test("detectSchemaChange always returns true for identical schemas") { + val schema1 = createSchema("field1" -> Schema.STRING_SCHEMA) + val schema2 = createSchema("field1" -> Schema.STRING_SCHEMA) + assert(detector.detectSchemaChange(schema1, schema2)) + } + + test("detectSchemaChange always returns true for schemas with different fields") { + val schema1 = createSchema("field1" -> Schema.STRING_SCHEMA) + val schema2 = createSchema("field2" -> Schema.STRING_SCHEMA) + assert(detector.detectSchemaChange(schema1, schema2)) + } + + test("detectSchemaChange always returns true for schemas with different field types") { + val schema1 = createSchema("field1" -> Schema.STRING_SCHEMA) + val schema2 = createSchema("field1" -> Schema.INT32_SCHEMA) + assert(detector.detectSchemaChange(schema1, schema2)) + } + + test("detectSchemaChange always returns true for empty schemas") { + val schema1 = createEmptySchema() + val schema2 = createEmptySchema() + assert(detector.detectSchemaChange(schema1, schema2)) + } + + test("detectSchemaChange always returns true for one empty and one non-empty schema") { + val schema1 = createEmptySchema() + val schema2 = createSchema("field1" -> Schema.STRING_SCHEMA) + assert(detector.detectSchemaChange(schema1, schema2)) + } +} diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/VersionSchemaChangeDetectorTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/VersionSchemaChangeDetectorTest.scala new file mode 100644 index 000000000..946751e53 --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/formats/writer/schema/VersionSchemaChangeDetectorTest.scala @@ -0,0 +1,116 @@ +/* + * Copyright 2017-2025 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.formats.writer.schema + +import org.apache.kafka.connect.data.{ Schema => ConnectSchema } +import org.mockito.MockitoSugar +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers + +class VersionSchemaChangeDetectorTest extends AnyFunSuiteLike with MockitoSugar with Matchers { + + test("detectSchemaChange should return true when new schema version is greater than old schema version") { + val oldSchema = mock[ConnectSchema] + val newSchema = mock[ConnectSchema] + val detector = VersionSchemaChangeDetector + + when(oldSchema.version()).thenReturn(1) + when(newSchema.version()).thenReturn(2) + + detector.detectSchemaChange(oldSchema, newSchema) should be(true) + } + + test("detectSchemaChange should return false when new schema version is equal to old schema version") { + val oldSchema = mock[ConnectSchema] + val newSchema = mock[ConnectSchema] + val detector = VersionSchemaChangeDetector + + when(oldSchema.version()).thenReturn(1) + when(newSchema.version()).thenReturn(1) + + detector.detectSchemaChange(oldSchema, newSchema) should be(false) + } + + test("detectSchemaChange should return false when new schema version is less than old schema version") { + val oldSchema = mock[ConnectSchema] + val newSchema = mock[ConnectSchema] + val detector = VersionSchemaChangeDetector + + when(oldSchema.version()).thenReturn(2) + when(newSchema.version()).thenReturn(1) + + detector.detectSchemaChange(oldSchema, newSchema) should be(false) + } + + test("detectSchemaChange should return true when new schema name is different from old schema name") { + val oldSchema = mock[ConnectSchema] + val newSchema = mock[ConnectSchema] + val detector = VersionSchemaChangeDetector + + when(oldSchema.name()).thenReturn("oldName") + when(newSchema.name()).thenReturn("newName") + when(oldSchema.version()).thenReturn(1) + when(newSchema.version()).thenReturn(1) + + detector.detectSchemaChange(oldSchema, newSchema) should be(true) + } + + test( + "detectSchemaChange should return false when new schema name is the same as old schema name and version is equal", + ) { + val oldSchema = mock[ConnectSchema] + val newSchema = mock[ConnectSchema] + val detector = VersionSchemaChangeDetector + + when(oldSchema.name()).thenReturn("sameName") + when(newSchema.name()).thenReturn("sameName") + when(oldSchema.version()).thenReturn(1) + when(newSchema.version()).thenReturn(1) + + detector.detectSchemaChange(oldSchema, newSchema) should be(false) + } + + test( + "detectSchemaChange should return true when new schema name is the same as old schema name but version is greater", + ) { + val oldSchema = mock[ConnectSchema] + val newSchema = mock[ConnectSchema] + val detector = VersionSchemaChangeDetector + + when(oldSchema.name()).thenReturn("sameName") + when(newSchema.name()).thenReturn("sameName") + when(oldSchema.version()).thenReturn(1) + when(newSchema.version()).thenReturn(2) + + detector.detectSchemaChange(oldSchema, newSchema) should be(true) + } + + test( + "detectSchemaChange should return false when new schema name is the same as old schema name but version is less", + ) { + val oldSchema = mock[ConnectSchema] + val newSchema = mock[ConnectSchema] + val detector = VersionSchemaChangeDetector + + when(oldSchema.name()).thenReturn("sameName") + when(newSchema.name()).thenReturn("sameName") + when(oldSchema.version()).thenReturn(2) + when(newSchema.version()).thenReturn(1) + + detector.detectSchemaChange(oldSchema, newSchema) should be(false) + } + +} diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreatorTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreatorTest.scala index 527354875..4d3acfd0d 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreatorTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/WriterManagerCreatorTest.scala @@ -21,6 +21,8 @@ import io.lenses.streamreactor.common.config.base.intf.ConnectionConfig import io.lenses.streamreactor.common.errors.NoopErrorPolicy import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.config.traits.CloudSinkConfig +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.DefaultSchemaChangeDetector +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketOptions @@ -40,14 +42,14 @@ class WriterManagerCreatorTest extends AnyFunSuite with Matchers with MockitoSug case class FakeConnectionConfig() extends ConnectionConfig case class FakeCloudSinkConfig( - connectionConfig: FakeConnectionConfig, - bucketOptions: Seq[CloudSinkBucketOptions], - indexOptions: Option[IndexOptions], - compressionCodec: CompressionCodec, - connectorRetryConfig: RetryConfig, - errorPolicy: NoopErrorPolicy, - logMetrics: Boolean = false, - rolloverOnSchemaChangeEnabled: Boolean = true, + connectionConfig: FakeConnectionConfig, + bucketOptions: Seq[CloudSinkBucketOptions], + indexOptions: Option[IndexOptions], + compressionCodec: CompressionCodec, + connectorRetryConfig: RetryConfig, + errorPolicy: NoopErrorPolicy, + logMetrics: Boolean = false, + schemaChangeDetector: SchemaChangeDetector = DefaultSchemaChangeDetector, ) extends CloudSinkConfig[FakeConnectionConfig] case class FakeFileMetadata(file: String, lastModified: Instant) extends FileMetadata diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/SchemaChangeSettingsTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/SchemaChangeSettingsTest.scala index 71ef3ced6..4b04902c6 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/SchemaChangeSettingsTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/SchemaChangeSettingsTest.scala @@ -21,15 +21,15 @@ import org.scalatest.matchers.should.Matchers class SchemaChangeSettingsTest extends AnyFlatSpec with Matchers { - "withSchemaChangeConfig" should "define schema change rollover config with default value" in { + "withSchemaChangeConfig" should "define schema change detector config with default value" in { val configDef = new SchemaChangeConfigKeys { override def connectorPrefix: String = "connector" }.withSchemaChangeConfig(new ConfigDef()) - val configKey = configDef.configKeys().get("connector.schema.change.rollover") + val configKey = configDef.configKeys().get("connector.schema.change.detector") - configKey.defaultValue shouldBe true - configKey.documentation shouldBe "Roll over on schema change." + configKey.defaultValue shouldBe "default" + configKey.documentation shouldBe "Schema change detector." } } diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverterTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverterTest.scala index 1e3eda062..7075e7b8d 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverterTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/conversion/ToAvroDataConverterTest.scala @@ -53,7 +53,7 @@ class ToAvroDataConverterTest extends AnyFunSuiteLike with Matchers { val avroData = new AvroData(100) val connectSchema = avroData.toConnectSchema(avroSchema) - val avroSchemaBack = ToAvroDataConverter.convertSchema(Some(connectSchema)) + val avroSchemaBack = ToAvroDataConverter.convertSchema(connectSchema) avroSchemaBack.getField("tenant_cd").schema().getTypes.get(1).getEnumSymbols.asScala.toSet shouldBe Set("one", "two", "three", diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterTest.scala index 57d702466..9022cd048 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/writer/WriterTest.scala @@ -17,6 +17,7 @@ package io.lenses.streamreactor.connect.cloud.common.sink.writer import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.formats.writer.FormatWriter +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition @@ -26,6 +27,7 @@ import io.lenses.streamreactor.connect.cloud.common.sink.naming.ObjectKeyBuilder import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import org.apache.kafka.connect.data.Schema +import org.mockito.Answers import org.mockito.MockitoSugar import org.scalatest.funsuite.AnyFunSuiteLike import org.scalatest.matchers.should.Matchers @@ -43,7 +45,7 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { private val stagingFilenameFn: () => Either[SinkError, File] = () => Right(new File("test-file")) private val formatWriterFn: File => Either[SinkError, FormatWriter] = _ => Right(formatWriter) private val topicPartition: TopicPartition = Topic("test-topic").withPartition(0) - private val rolloverOnSchemaChangeEnabled = true; + private val schemaChangeDetector: SchemaChangeDetector = mock[SchemaChangeDetector] test("shouldSkip should return false when indexing is disabled") { when(writerIndexer.indexingEnabled()).thenReturn(false) @@ -54,7 +56,7 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { stagingFilenameFn, objectKeyBuilder, formatWriterFn, - rolloverOnSchemaChangeEnabled, + schemaChangeDetector, ) writer.shouldSkip(Offset(100)) shouldBe false } @@ -70,7 +72,7 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { stagingFilenameFn, objectKeyBuilder, formatWriterFn, - rolloverOnSchemaChangeEnabled, + schemaChangeDetector, ) writer.writeState = NoWriter(CommitState(topicPartition, Some(Offset(100)))) writer.shouldSkip(Offset(100)) shouldBe true @@ -86,7 +88,7 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { stagingFilenameFn, objectKeyBuilder, formatWriterFn, - rolloverOnSchemaChangeEnabled, + schemaChangeDetector, ) writer.writeState = NoWriter(CommitState(topicPartition, Some(Offset(100)))) writer.shouldSkip(Offset(101)) shouldBe false @@ -101,7 +103,7 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { stagingFilenameFn, objectKeyBuilder, formatWriterFn, - rolloverOnSchemaChangeEnabled, + schemaChangeDetector, ) writer.writeState = Uploading( CommitState(topicPartition, Some(Offset(100))), @@ -123,7 +125,7 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { stagingFilenameFn, objectKeyBuilder, formatWriterFn, - rolloverOnSchemaChangeEnabled, + schemaChangeDetector, ) writer.writeState = Uploading(CommitState(topicPartition, Some(Offset(100))), new File("test-file"), Offset(150), 1L, 1L) @@ -139,7 +141,7 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { stagingFilenameFn, objectKeyBuilder, formatWriterFn, - rolloverOnSchemaChangeEnabled, + schemaChangeDetector, ) writer.writeState = Writing(CommitState(topicPartition, Some(Offset(100))), formatWriter, new File("test-file"), Offset(150), 1L, 1L) @@ -156,7 +158,7 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { stagingFilenameFn, objectKeyBuilder, formatWriterFn, - rolloverOnSchemaChangeEnabled, + schemaChangeDetector, ) writer.writeState = Writing(CommitState(topicPartition, Some(Offset(100))), formatWriter, new File("test-file"), Offset(150), 1L, 1L) @@ -172,7 +174,7 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { stagingFilenameFn, objectKeyBuilder, formatWriterFn, - rolloverOnSchemaChangeEnabled = true, + schemaChangeDetector, ), ) when(writer.schemaHasChanged(schema)).thenReturn(true) @@ -180,54 +182,120 @@ class WriterTest extends AnyFunSuiteLike with Matchers with MockitoSugar { writer.shouldRollover(schema) shouldBe true } - test("shouldRollover returns false when rollover is disabled") { - val schema = mock[Schema] - val writer = spy( - new Writer[FileMetadata](topicPartition, - commitPolicy, - writerIndexer, - stagingFilenameFn, - objectKeyBuilder, - formatWriterFn, - rolloverOnSchemaChangeEnabled = false, - ), + test("schemaHasChanged should return true when schema has changed in Writing state") { + val schema = mock[Schema] + val lastSchema = mock[Schema] + val formatWriter = mock[FormatWriter](Answers.RETURNS_DEEP_STUBS) + val commitState = CommitState(topicPartition, Some(Offset(100))) + .copy(lastKnownSchema = Some(lastSchema)) + val writingState = Writing(commitState, formatWriter, new File("test-file"), Offset(150), 1L, 1L) + val writer = new Writer[FileMetadata](topicPartition, + commitPolicy, + writerIndexer, + stagingFilenameFn, + objectKeyBuilder, + formatWriterFn, + schemaChangeDetector, ) - when(writer.schemaHasChanged(schema)).thenReturn(true) - when(writer.rolloverOnSchemaChange).thenReturn(true) - writer.shouldRollover(schema) shouldBe false + writer.writeState = writingState + + when(schemaChangeDetector.detectSchemaChange(lastSchema, schema)).thenReturn(true) + + writer.schemaHasChanged(schema) shouldBe true } - test("shouldRollover returns false when schema has not changed") { - val schema = mock[Schema] - val writer = spy( - new Writer[FileMetadata](topicPartition, - commitPolicy, - writerIndexer, - stagingFilenameFn, - objectKeyBuilder, - formatWriterFn, - rolloverOnSchemaChangeEnabled = true, - ), + test("schemaHasChanged should return false when schema has not changed in Writing state") { + val schema = mock[Schema] + val lastSchema = mock[Schema] + val formatWriter = mock[FormatWriter](Answers.RETURNS_DEEP_STUBS) + val commitState = CommitState(topicPartition, Some(Offset(100))) + .copy(lastKnownSchema = Some(lastSchema)) + val writingState = Writing(commitState, formatWriter, new File("test-file"), Offset(150), 1L, 1L) + val writer = new Writer[FileMetadata](topicPartition, + commitPolicy, + writerIndexer, + stagingFilenameFn, + objectKeyBuilder, + formatWriterFn, + schemaChangeDetector, ) - when(writer.schemaHasChanged(schema)).thenReturn(false) - when(writer.rolloverOnSchemaChange).thenReturn(true) - writer.shouldRollover(schema) shouldBe false + writer.writeState = writingState + + when(schemaChangeDetector.detectSchemaChange(lastSchema, schema)).thenReturn(false) + + writer.schemaHasChanged(schema) shouldBe false } - test("shouldRollover returns false when rolloverOnSchemaChange is false") { + test("schemaHasChanged should return false when not in Writing state") { val schema = mock[Schema] - val writer = spy( - new Writer[FileMetadata](topicPartition, - commitPolicy, - writerIndexer, - stagingFilenameFn, - objectKeyBuilder, - formatWriterFn, - rolloverOnSchemaChangeEnabled = true, - ), + val writer = new Writer[FileMetadata](topicPartition, + commitPolicy, + writerIndexer, + stagingFilenameFn, + objectKeyBuilder, + formatWriterFn, + schemaChangeDetector, ) - when(writer.schemaHasChanged(schema)).thenReturn(true) - when(writer.rolloverOnSchemaChange).thenReturn(false) - writer.shouldRollover(schema) shouldBe false + writer.writeState = NoWriter(CommitState(topicPartition, Some(Offset(100)))) + + writer.schemaHasChanged(schema) shouldBe false + } + + test("schemaHasChanged should return false when lastKnownSchema is None in Writing state") { + val schema = mock[Schema] + val formatWriter = mock[FormatWriter] + val commitState = CommitState(topicPartition, Some(Offset(100))) + val writingState = Writing(commitState, formatWriter, new File("test-file"), Offset(150), 1L, 1L) + val writer = new Writer[FileMetadata](topicPartition, + commitPolicy, + writerIndexer, + stagingFilenameFn, + objectKeyBuilder, + formatWriterFn, + schemaChangeDetector, + ) + writer.writeState = writingState + + writer.schemaHasChanged(schema) shouldBe false + } + test("schemaHasChanged should return false when lastKnownSchema is the same as the new schema in Writing state") { + val schema = mock[Schema] + val formatWriter = mock[FormatWriter] + val commitState = CommitState(topicPartition, Some(Offset(100))) + .copy(lastKnownSchema = Some(schema)) + val writingState = Writing(commitState, formatWriter, new File("test-file"), Offset(150), 1L, 1L) + val writer = new Writer[FileMetadata](topicPartition, + commitPolicy, + writerIndexer, + stagingFilenameFn, + objectKeyBuilder, + formatWriterFn, + schemaChangeDetector, + ) + writer.writeState = writingState + + writer.schemaHasChanged(schema) shouldBe false + } + + test( + "schemaHasChanged should return false when lastKnownSchema is None and schemaHasChanged returns false in Writing state", + ) { + val schema = mock[Schema] + val formatWriter = mock[FormatWriter](Answers.RETURNS_DEEP_STUBS) + val commitState = CommitState(topicPartition, Some(Offset(100))) + val writingState = Writing(commitState, formatWriter, new File("test-file"), Offset(150), 1L, 1L) + val writer = new Writer[FileMetadata](topicPartition, + commitPolicy, + writerIndexer, + stagingFilenameFn, + objectKeyBuilder, + formatWriterFn, + schemaChangeDetector, + ) + writer.writeState = writingState + + when(schemaChangeDetector.detectSchemaChange(null, schema)).thenReturn(false) + + writer.schemaHasChanged(schema) shouldBe false } } diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfig.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfig.scala index 2bab67a0e..e650b51ac 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfig.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/sink/config/GCPStorageSinkConfig.scala @@ -21,6 +21,7 @@ import io.lenses.streamreactor.common.errors.ErrorPolicy import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId import io.lenses.streamreactor.connect.cloud.common.config.traits.CloudSinkConfig import io.lenses.streamreactor.connect.cloud.common.config.traits.PropsToConfigConverter +import io.lenses.streamreactor.connect.cloud.common.formats.writer.schema.SchemaChangeDetector import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketOptions @@ -52,29 +53,30 @@ object GCPStorageSinkConfig extends PropsToConfigConverter[GCPStorageSinkConfig] sinkBucketOptions <- CloudSinkBucketOptions(connectorTaskId, gcpConfigDefBuilder) indexOptions = gcpConfigDefBuilder.getIndexSettings logMetrics = gcpConfigDefBuilder.getBoolean(LOG_METRICS_CONFIG) + schemaChangeDetector = gcpConfigDefBuilder.schemaChangeDetector(), } yield GCPStorageSinkConfig( gcpConnectionSettings, sinkBucketOptions, indexOptions, gcpConfigDefBuilder.getCompressionCodec(), - avoidResumableUpload = gcpConfigDefBuilder.isAvoidResumableUpload, - errorPolicy = gcpConfigDefBuilder.getErrorPolicyOrDefault, - connectorRetryConfig = gcpConfigDefBuilder.getRetryConfig, - logMetrics = logMetrics, - rolloverOnSchemaChangeEnabled = gcpConfigDefBuilder.shouldRollOverOnSchemaChange(), + avoidResumableUpload = gcpConfigDefBuilder.isAvoidResumableUpload, + errorPolicy = gcpConfigDefBuilder.getErrorPolicyOrDefault, + connectorRetryConfig = gcpConfigDefBuilder.getRetryConfig, + logMetrics = logMetrics, + schemaChangeDetector = schemaChangeDetector, ) } } case class GCPStorageSinkConfig( - connectionConfig: GCPConnectionConfig, - bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty, - indexOptions: Option[IndexOptions], - compressionCodec: CompressionCodec, - avoidResumableUpload: Boolean, - connectorRetryConfig: RetryConfig, - errorPolicy: ErrorPolicy, - logMetrics: Boolean, - rolloverOnSchemaChangeEnabled: Boolean, + connectionConfig: GCPConnectionConfig, + bucketOptions: Seq[CloudSinkBucketOptions] = Seq.empty, + indexOptions: Option[IndexOptions], + compressionCodec: CompressionCodec, + avoidResumableUpload: Boolean, + connectorRetryConfig: RetryConfig, + errorPolicy: ErrorPolicy, + logMetrics: Boolean, + schemaChangeDetector: SchemaChangeDetector, ) extends CloudSinkConfig[GCPConnectionConfig]