From faa8d81dc2b01e2ca6bd1701714d2bf36e0c8bfa Mon Sep 17 00:00:00 2001 From: stheppi Date: Thu, 30 Nov 2023 23:12:51 +0000 Subject: [PATCH 1/2] Add the S3 sink for consumer offsets topic. --- .../aws/s3/config/S3ConfigSettings.scala | 6 +- .../sink/S3ConsumerGroupsSinkConnector.scala | 52 +++ .../s3/sink/S3ConsumerGroupsSinkTask.scala | 107 ++++++ .../config/S3ConsumerGroupsSinkConfig.scala | 54 +++ .../S3ConsumerGroupsSinkConfigDef.scala | 123 +++++++ .../aws/s3/sink/config/S3ObjectKey.scala | 92 +++++ .../aws/s3/storage/AwsS3Uploader.scala | 64 ++++ .../aws/s3/config/S3ConfigSettingsTest.scala | 2 +- .../S3ConsumerGroupsSinkConfigTest.scala | 95 ++++++ .../S3ConsumerGroupsSinkConfigTest.scala | 98 ++++++ .../config/S3ObjectKeyValidationTest.scala | 87 +++++ .../common/config/PropertiesHelper.scala | 85 +++++ .../common/consumers/CloudObjectKey.scala | 56 ++++ .../consumers/ConsumerGroupsWriter.scala | 138 ++++++++ .../consumers/GroupTopicPartition.scala | 25 ++ .../common/consumers/OffsetAndMetadata.scala | 53 +++ .../cloud/common/consumers/OffsetKey.scala | 51 +++ .../cloud/common/consumers/Uploader.scala | 24 ++ .../ConsumerGroupsWriterDecoderTest.scala | 226 +++++++++++++ .../consumers/ConsumerGroupsWriterTest.scala | 317 ++++++++++++++++++ 20 files changed, 1752 insertions(+), 3 deletions(-) create mode 100644 kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ConsumerGroupsSinkConnector.scala create mode 100644 kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ConsumerGroupsSinkTask.scala create mode 100644 kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfig.scala create mode 100644 kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfigDef.scala create mode 100644 kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ObjectKey.scala create mode 100644 kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3Uploader.scala create mode 100644 kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConsumerGroupsSinkConfigTest.scala create mode 100644 kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfigTest.scala create mode 100644 kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ObjectKeyValidationTest.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/PropertiesHelper.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/CloudObjectKey.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriter.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/GroupTopicPartition.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/OffsetAndMetadata.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/OffsetKey.scala create mode 100644 kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/Uploader.scala create mode 100644 kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriterDecoderTest.scala create mode 100644 kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriterTest.scala diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettings.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettings.scala index ab9e5d28e..3597e76ba 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettings.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettings.scala @@ -28,8 +28,6 @@ object S3ConfigSettings { val CUSTOM_ENDPOINT: String = s"$CONNECTOR_PREFIX.custom.endpoint" val ENABLE_VIRTUAL_HOST_BUCKETS: String = s"$CONNECTOR_PREFIX.vhost.bucket" - val PROFILES: String = s"$CONNECTOR_PREFIX.config.profiles" - val KCQL_CONFIG = s"$CONNECTOR_PREFIX.$KCQL_PROP_SUFFIX" val KCQL_DOC = "Contains the Kafka Connect Query Language describing the flow from Apache Kafka topics to Apache Hive tables." @@ -92,4 +90,8 @@ object S3ConfigSettings { val SOURCE_ORDERING_TYPE_DOC: String = "AlphaNumeric (the default)" val SOURCE_ORDERING_TYPE_DEFAULT: String = "AlphaNumeric" + // used by the consumer groups sink + val S3_BUCKET_CONFIG: String = s"$CONNECTOR_PREFIX.location" + val S3_BUCKET_DOC: String = + "Specify the S3 bucket, and optionally, a prefix, where Kafka consumer group offsets will be stored." } diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ConsumerGroupsSinkConnector.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ConsumerGroupsSinkConnector.scala new file mode 100644 index 000000000..5cb6ab68b --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ConsumerGroupsSinkConnector.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.sink + +import com.typesafe.scalalogging.LazyLogging +import io.lenses.streamreactor.common.utils.JarManifest +import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings +import io.lenses.streamreactor.connect.aws.s3.sink.config.S3ConsumerGroupsSinkConfigDef +import io.lenses.streamreactor.connect.cloud.common.config.TaskDistributor +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.connect.connector.Task +import org.apache.kafka.connect.sink.SinkConnector + +import java.util + +/** + * A connector which stores the latest Kafka consumer group offset from "__consumer_offsets" topic in S3. + */ +class S3ConsumerGroupsSinkConnector extends SinkConnector with LazyLogging { + + private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) + private val props: util.Map[String, String] = new util.HashMap[String, String]() + + override def version(): String = manifest.version() + + override def taskClass(): Class[_ <: Task] = classOf[S3ConsumerGroupsSinkTask] + + override def config(): ConfigDef = S3ConsumerGroupsSinkConfigDef.config + + override def start(props: util.Map[String, String]): Unit = { + logger.info(s"Creating S3 consumer groups sink connector") + this.props.putAll(props) + } + + override def stop(): Unit = () + + override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = + new TaskDistributor(S3ConfigSettings.CONNECTOR_PREFIX).distributeTasks(props, maxTasks) +} diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ConsumerGroupsSinkTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ConsumerGroupsSinkTask.scala new file mode 100644 index 000000000..6e789f9e0 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ConsumerGroupsSinkTask.scala @@ -0,0 +1,107 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.sink + +import cats.implicits.toShow +import io.lenses.streamreactor.common.errors.ErrorHandler +import io.lenses.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader +import io.lenses.streamreactor.common.utils.JarManifest +import io.lenses.streamreactor.connect.aws.s3.auth.AwsS3ClientCreator +import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.CONNECTOR_PREFIX +import io.lenses.streamreactor.connect.aws.s3.sink.config.S3ConsumerGroupsSinkConfig +import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3Uploader +import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId +import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskIdCreator +import io.lenses.streamreactor.connect.cloud.common.consumers.ConsumerGroupsWriter +import io.lenses.streamreactor.connect.cloud.common.utils.MapUtils +import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition } +import org.apache.kafka.connect.sink.SinkRecord +import org.apache.kafka.connect.sink.SinkTask + +import java.util +import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.jdk.CollectionConverters.MapHasAsJava +import scala.jdk.CollectionConverters.MapHasAsScala + +/** + * A Kafka connector designed to persist the most recent Kafka consumer group offsets from the `__consumer_offsets` topic to an S3 storage. + * The connector adheres to an eventually consistent model. It captures and stores the latest offset for each partition within each consumer group. + * These offsets are organized in S3 objects using the following key structure: `bucket/prefix.../consumerGroup/topic/partition`, with the offset value represented as a long. + * + * However, it's important to note that the connector does not actively track consumer groups that drop topic subscriptions or unsubscribe from topics. As a result, it does not automatically remove redundant keys. + * Also the writes are eventually consistent. Depending on Connect replaying messages, the offsets may be written multiple times. + * But since the s3 key is unique for group-topic-partition the last write will be the latest offset. + */ + +class S3ConsumerGroupsSinkTask extends SinkTask with ErrorHandler { + + private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) + + private var connectorTaskId: ConnectorTaskId = _ + private var writerManager: ConsumerGroupsWriter = _ + + override def version(): String = manifest.version() + + override def start(fallbackProps: util.Map[String, String]): Unit = { + + printAsciiHeader(manifest, "/aws-s3-cg-sink-ascii.txt") + + logger.debug(s"[{}] S3ConsumerGroupSinkTask.start", fallbackProps.get("name")) + + val contextProps = Option(context).flatMap(c => Option(c.configs())).map(_.asScala.toMap).getOrElse(Map.empty) + val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap).asJava + (for { + taskId <- new ConnectorTaskIdCreator(CONNECTOR_PREFIX).fromProps(fallbackProps) + config <- S3ConsumerGroupsSinkConfig.fromProps(props) + s3Client <- AwsS3ClientCreator.make(config.config) + uploader = new AwsS3Uploader(s3Client, taskId) + } yield new ConsumerGroupsWriter(config.location, uploader, taskId) -> taskId) match { + case Left(value) => throw value + case Right((writer, taskId)) => + writerManager = writer + connectorTaskId = taskId + } + } + + override def put(records: util.Collection[SinkRecord]): Unit = + writerManager.write(records.asScala.toList) match { + case Left(ex) => + logger.error(s"[{}] Failed to write records to S3", + Option(connectorTaskId).map(_.show).getOrElse("Unnamed"), + ex, + ) + throw ex + case Right(_) => () + } + + override def close(partitions: util.Collection[KafkaTopicPartition]): Unit = { + logger.debug( + "[{}] S3ConsumerGroupsSinkTask.close with {} partitions", + Option(connectorTaskId).map(_.show).getOrElse("Unnamed"), + partitions.size(), + ) + + Option(writerManager).foreach(_.close()) + } + + override def stop(): Unit = { + logger.debug("[{}] Stop", Option(connectorTaskId).map(_.show).getOrElse("Unnamed")) + + Option(writerManager).foreach(_.close()) + writerManager = null + } + +} diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfig.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfig.scala new file mode 100644 index 000000000..abf204bf2 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfig.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.sink.config + +import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._ +import io.lenses.streamreactor.connect.aws.s3.config._ +import io.lenses.streamreactor.connect.cloud.common.config.PropertiesHelper +import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey + +import java.util + +case class S3ConsumerGroupsSinkConfig( + location: CloudObjectKey, + config: S3Config, +) + +object S3ConsumerGroupsSinkConfig extends PropertiesHelper { + def fromProps( + props: util.Map[String, String], + ): Either[Throwable, S3ConsumerGroupsSinkConfig] = + S3ConsumerGroupsSinkConfig(S3ConsumerGroupsSinkConfigDef(props)) + + def apply( + s3ConfigDefBuilder: S3ConsumerGroupsSinkConfigDef, + ): Either[Throwable, S3ConsumerGroupsSinkConfig] = + S3ConsumerGroupsSinkConfig.from( + s3ConfigDefBuilder.getParsedValues, + ) + + def from(props: Map[String, _]): Either[Throwable, S3ConsumerGroupsSinkConfig] = + for { + bucketAndPrefix <- getStringEither(props, S3_BUCKET_CONFIG) + bucket <- CloudObjectKey.from(bucketAndPrefix) + _ <- AuthMode.withNameInsensitiveEither(getString(props, AUTH_MODE).getOrElse(AuthMode.Default.toString)) + } yield { + S3ConsumerGroupsSinkConfig( + bucket, + S3Config(props), + ) + } +} diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfigDef.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfigDef.scala new file mode 100644 index 000000000..1b3de7d76 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfigDef.scala @@ -0,0 +1,123 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.sink.config + +import io.lenses.streamreactor.common.config.base.traits.BaseConfig +import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._ +import io.lenses.streamreactor.connect.aws.s3.config._ +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.common.config.ConfigDef.Importance +import org.apache.kafka.common.config.ConfigDef.Type + +import java.util +import scala.jdk.CollectionConverters._ + +object S3ConsumerGroupsSinkConfigDef { + + val config: ConfigDef = new ConfigDef() + .define(S3_BUCKET_CONFIG, + Type.STRING, + Importance.HIGH, + S3_BUCKET_DOC, + "S3", + 1, + ConfigDef.Width.LONG, + S3_BUCKET_CONFIG, + ) + .define( + AWS_REGION, + Type.STRING, + "", + Importance.HIGH, + "AWS region", + ) + .define( + AWS_ACCESS_KEY, + Type.PASSWORD, + "", + Importance.HIGH, + "AWS access key", + ) + .define( + AWS_SECRET_KEY, + Type.PASSWORD, + "", + Importance.HIGH, + "AWS password key", + ) + .define( + AUTH_MODE, + Type.STRING, + AuthMode.Default.toString, + Importance.HIGH, + "Authenticate mode, 'credentials' or 'default'", + ) + .define( + CUSTOM_ENDPOINT, + Type.STRING, + "", + Importance.LOW, + "Custom S3-compatible endpoint (usually for testing)", + ) + .define( + ENABLE_VIRTUAL_HOST_BUCKETS, + Type.BOOLEAN, + false, + Importance.LOW, + "Enable virtual host buckets", + ).define( + HTTP_NBR_OF_RETRIES, + Type.INT, + HTTP_NBR_OF_RETIRES_DEFAULT, + Importance.MEDIUM, + HTTP_NBR_OF_RETRIES_DOC, + "Error", + 2, + ConfigDef.Width.LONG, + HTTP_NBR_OF_RETRIES, + ) + .define( + HTTP_ERROR_RETRY_INTERVAL, + Type.LONG, + HTTP_ERROR_RETRY_INTERVAL_DEFAULT, + Importance.MEDIUM, + HTTP_ERROR_RETRY_INTERVAL_DOC, + "Error", + 3, + ConfigDef.Width.LONG, + HTTP_ERROR_RETRY_INTERVAL, + ) + .define(HTTP_SOCKET_TIMEOUT, Type.LONG, HTTP_SOCKET_TIMEOUT_DEFAULT, Importance.LOW, HTTP_SOCKET_TIMEOUT_DOC) + .define(HTTP_CONNECTION_TIMEOUT, + Type.INT, + HTTP_CONNECTION_TIMEOUT_DEFAULT, + Importance.LOW, + HTTP_CONNECTION_TIMEOUT_DOC, + ) + .define( + POOL_MAX_CONNECTIONS, + Type.INT, + POOL_MAX_CONNECTIONS_DEFAULT, + Importance.LOW, + POOL_MAX_CONNECTIONS_DOC, + ) +} + +case class S3ConsumerGroupsSinkConfigDef(props: util.Map[String, String]) + extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3ConsumerGroupsSinkConfigDef.config, props) { + def getParsedValues: Map[String, _] = values().asScala.toMap + +} diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ObjectKey.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ObjectKey.scala new file mode 100644 index 000000000..c478a0d9d --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ObjectKey.scala @@ -0,0 +1,92 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.sink.config + +import cats.data.Validated +import cats.data.ValidatedNel +import cats.implicits.catsSyntaxTuple2Semigroupal +import cats.implicits.catsSyntaxValidatedId +import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey +import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey.validatedNonEmptyString + +object S3ObjectKey { + + def validate(s3: CloudObjectKey): ValidatedNel[String, CloudObjectKey] = + ( + validatedNonEmptyString(s3.bucket, "s3 bucket").andThen(validateS3ObjectKeyName), + validatedNonEmptyString(s3.prefix, "s3 prefix").andThen(validateS3PrefixName), + ).mapN((b, p) => CloudObjectKey(b, p)) + + /** Applies the S3 bucket naming restriction: + * must be between 3 (min) and 63 (max) characters long. + * can consist only of lowercase letters, numbers, dots (.), and hyphens (-). + * must begin and end with a letter or number. + * must not contain two adjacent periods. + * must not be formatted as an IP address (for example, 192.168.5.4). + * must not start with the prefix xn--. + * must not start with the prefix sthree- and the prefix sthree-configurator. + * must not end with the suffix -s3alias. This suffix is reserved for access point alias names. For more information, see Using a bucket-style alias for your S3 bucket access point. + * must not end with the suffix --ol-s3. + */ + def validateS3ObjectKeyName(bucket: String): ValidatedNel[String, String] = { + val bucketRegex = """^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$""".r + bucket match { + case bucketRegex() => + if ( + bucket.startsWith("sthree-") || bucket.startsWith("xn--") + || bucket.endsWith("-s3alias") || bucket.endsWith("--ol-s3") + || bucket.contains("..") + ) + "S3 bucket name does not conform to AWS naming restrictions".invalidNel + else { + //if bucket is an ip then it is invalid + val ipRegex = """^(\d{1,3}\.){3}\d{1,3}$""".r + ipRegex.findFirstIn(bucket) match { + case Some(_) => + "S3 bucket name does not conform to AWS naming restrictions".invalidNel + case None => Validated.validNel(bucket) + } + } + case _ => "S3 bucket name does not conform to AWS naming restrictions".invalidNel + } + + } + + /** + * Validates a prefix is valid. It should not start and end with /. + * Allows 0-9,a-z,A-Z,!, - , _, ., *, ', ), (, and /. + * Does not start and end with / + * + * @param prefix + * @return + */ + def validateS3PrefixName(prefix: Option[String]): ValidatedNel[String, Option[String]] = + prefix match { + case Some(p) => + //only allows these characters: 0-9,a-z,A-Z,!, - , _, ., *, ', ), (, / + // does not start and end with / + val prefixRegex = """^[0-9a-zA-Z!_\.\*\'\(\)\/\-]+$""".r + if (prefixRegex.findFirstIn(p).isDefined) { + + //cannot start and end with '/' + if (p.startsWith("/") || p.endsWith("/")) + "S3 prefix name does not conform to AWS naming restrictions".invalidNel + else Validated.validNel(Some(p)) + + } else "S3 prefix name does not conform to AWS naming restrictions".invalidNel + case None => Validated.validNel(None) + } +} diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3Uploader.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3Uploader.scala new file mode 100644 index 000000000..43f26e839 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/storage/AwsS3Uploader.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.storage + +import cats.implicits.toBifunctorOps +import cats.implicits.toShow +import com.typesafe.scalalogging.LazyLogging +import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId +import io.lenses.streamreactor.connect.cloud.common.consumers.Uploader +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest +import software.amazon.awssdk.services.s3.model.PutObjectRequest + +import java.nio.ByteBuffer +import scala.util.Try + +class AwsS3Uploader(s3Client: S3Client, connectorTaskId: ConnectorTaskId) extends Uploader with LazyLogging { + override def close(): Unit = s3Client.close() + + override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = { + logger.debug(s"[{}] AWS Uploading to s3 {}:{}", connectorTaskId.show, source, bucket, path) + Try { + s3Client.putObject( + PutObjectRequest.builder() + .bucket(bucket) + .key(path) + .contentLength(source.remaining()) + .build(), + RequestBody.fromByteBuffer(source), + ) + logger.debug(s"[{}] Completed upload to s3 {}:{}", connectorTaskId.show, source, bucket, path) + } + .toEither.leftMap { ex => + logger.error(s"[{}] Failed upload to s3 {}:{}", connectorTaskId.show, source, bucket, path, ex) + ex + } + } + + override def delete(bucket: String, path: String): Either[Throwable, Unit] = { + logger.debug(s"[{}] AWS Deleting from s3 {}:{}", connectorTaskId.show, bucket, path) + Try { + s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(path).build()) + logger.debug(s"[{}] Completed delete from s3 {}:{}", connectorTaskId.show, bucket, path) + } + .toEither.leftMap { ex => + logger.error(s"[{}] Failed delete from s3 {}:{}", connectorTaskId.show, bucket, path, ex) + ex + } + } +} diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala index 67ae92d45..5722ed73c 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala @@ -35,7 +35,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging { val configKeys = S3SinkConfigDef.config.configKeys().keySet().asScala ++ S3SourceConfigDef.config.configKeys().keySet().asScala - configKeys.size shouldBe 47 + configKeys.size shouldBe 48 configKeys.foreach { k => k.toLowerCase should be(k) } diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConsumerGroupsSinkConfigTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConsumerGroupsSinkConfigTest.scala new file mode 100644 index 000000000..717f93a1e --- /dev/null +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConsumerGroupsSinkConfigTest.scala @@ -0,0 +1,95 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.config + +import cats.implicits.catsSyntaxOptionId +import io.lenses.streamreactor.common.errors.ThrowErrorPolicy +import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._ +import io.lenses.streamreactor.connect.aws.s3.sink.config.S3ConsumerGroupsSinkConfig +import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.jdk.CollectionConverters._ +class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers { + test("creates an instance of S3ConsumerGroupsSinkConfig") { + S3ConsumerGroupsSinkConfig.fromProps( + Map( + S3_BUCKET_CONFIG -> "bucket:a/b/c", + AWS_REGION -> "eu-west-1", + AWS_ACCESS_KEY -> "access", + AWS_SECRET_KEY -> "secret", + AUTH_MODE -> "credentials", + CUSTOM_ENDPOINT -> "endpoint", + ).asJava, + ) match { + case Left(value) => fail("Expecting to build a config but got an error instead.", value) + case Right(value) => + value should be( + S3ConsumerGroupsSinkConfig( + CloudObjectKey("bucket", "a/b/c".some), + S3Config( + Some("eu-west-1"), + Some("access"), + Some("secret"), + AuthMode.Credentials, + Some("endpoint"), + false, + ThrowErrorPolicy(), + RetryConfig(20, 60000), + RetryConfig(5, 50), + HttpTimeoutConfig(Some(60000), Some(60000)), + None, + ), + ), + ) + } + } + + test("remove the / from the prefix") { + S3ConsumerGroupsSinkConfig.fromProps( + Map( + S3_BUCKET_CONFIG -> "bucket:a/b/c/", + AWS_REGION -> "eu-west-1", + AWS_ACCESS_KEY -> "access", + AWS_SECRET_KEY -> "secret", + AUTH_MODE -> "credentials", + CUSTOM_ENDPOINT -> "endpoint", + ).asJava, + ) match { + case Left(value) => fail("Expecting to build a config but got an error instead.", value) + case Right(value) => + value should be( + S3ConsumerGroupsSinkConfig( + CloudObjectKey("bucket", "a/b/c".some), + S3Config( + Some("eu-west-1"), + Some("access"), + Some("secret"), + AuthMode.Credentials, + Some("endpoint"), + false, + ThrowErrorPolicy(), + RetryConfig(20, 60000), + RetryConfig(5, 50), + HttpTimeoutConfig(Some(60000), Some(60000)), + None, + ), + ), + ) + } + } +} diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfigTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfigTest.scala new file mode 100644 index 000000000..1ab7f9336 --- /dev/null +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ConsumerGroupsSinkConfigTest.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.sink.config + +import cats.implicits.catsSyntaxOptionId +import io.lenses.streamreactor.common.errors.ThrowErrorPolicy +import io.lenses.streamreactor.connect.aws.s3.config.AuthMode +import io.lenses.streamreactor.connect.aws.s3.config.HttpTimeoutConfig +import io.lenses.streamreactor.connect.aws.s3.config.RetryConfig +import io.lenses.streamreactor.connect.aws.s3.config.S3Config +import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._ +import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.jdk.CollectionConverters._ +class S3ConsumerGroupsSinkConfigTest extends AnyFunSuite with Matchers { + test("creates an instance of S3ConsumerGroupsSinkConfig") { + S3ConsumerGroupsSinkConfig.fromProps( + Map( + S3_BUCKET_CONFIG -> "bucket:a/b/c", + AWS_REGION -> "eu-west-1", + AWS_ACCESS_KEY -> "access", + AWS_SECRET_KEY -> "secret", + AUTH_MODE -> "credentials", + CUSTOM_ENDPOINT -> "endpoint", + ).asJava, + ) match { + case Left(value) => fail("Expecting to build a config but got an error instead.", value) + case Right(value) => + value should be( + S3ConsumerGroupsSinkConfig( + CloudObjectKey("bucket", "a/b/c".some), + S3Config( + Some("eu-west-1"), + Some("access"), + Some("secret"), + AuthMode.Credentials, + Some("endpoint"), + false, + ThrowErrorPolicy(), + RetryConfig(20, 60000), + RetryConfig(5, 50), + HttpTimeoutConfig(Some(60000), Some(60000)), + None, + ), + ), + ) + } + } + + test("remove the / from the prefix") { + S3ConsumerGroupsSinkConfig.fromProps( + Map( + S3_BUCKET_CONFIG -> "bucket:a/b/c/", + AWS_REGION -> "eu-west-1", + AWS_ACCESS_KEY -> "access", + AWS_SECRET_KEY -> "secret", + AUTH_MODE -> "credentials", + CUSTOM_ENDPOINT -> "endpoint", + ).asJava, + ) match { + case Left(value) => fail("Expecting to build a config but got an error instead.", value) + case Right(value) => + value should be( + S3ConsumerGroupsSinkConfig( + CloudObjectKey("bucket", "a/b/c".some), + S3Config( + Some("eu-west-1"), + Some("access"), + Some("secret"), + AuthMode.Credentials, + Some("endpoint"), + false, + ThrowErrorPolicy(), + RetryConfig(20, 60000), + RetryConfig(5, 50), + HttpTimeoutConfig(Some(60000), Some(60000)), + None, + ), + ), + ) + } + } +} diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ObjectKeyValidationTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ObjectKeyValidationTest.scala new file mode 100644 index 000000000..5497d5171 --- /dev/null +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3ObjectKeyValidationTest.scala @@ -0,0 +1,87 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.aws.s3.sink.config + +import cats.implicits.catsSyntaxOptionId +import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class S3ObjectKeyValidationTest extends AnyFunSuite with Matchers { + test("S3ObjectKey should be valid") { + val bucket = CloudObjectKey("bucket", "prefix".some) + S3ObjectKey.validate(bucket).isValid shouldBe true + } + test("bucket with . and - is valid") { + S3ObjectKey.validate(CloudObjectKey("buck.et", "prefix".some)).isValid shouldBe true + S3ObjectKey.validate(CloudObjectKey("buck-et", "prefix".some)).isValid shouldBe true + } + test("bucket shorter than 3 characters should be invalid") { + val bucket = CloudObjectKey("bu", "prefix".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("bucket longer than 63 characters should be invalid") { + val bucket = CloudObjectKey("b" * 64, "prefix".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("bucket with uppercase characters should be invalid") { + val bucket = CloudObjectKey("BUCKET", "prefix".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("bucket with invalid characters should be invalid") { + val bucket = CloudObjectKey("buck et", "prefix".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("bucket with invalid prefix should be invalid") { + val bucket = CloudObjectKey("bucket", "pre fix".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("bucket starting with anything but a letter or number should be invalid") { + val bucket = CloudObjectKey(".bucket", "prefix".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("bucket ending with anything but a letter or number should be invalid") { + val bucket = CloudObjectKey("bucket.", "prefix".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("bucket starting with sthree- is invalid") { + val bucket = CloudObjectKey("sthree-bucket", "aws".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("bucket starting with xn-- is invalid") { + val bucket = CloudObjectKey("xn--bucket", "aws".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("bucket with two adjacent periods is invalid") { + val bucket = CloudObjectKey("buck..et", "prefix".some) + S3ObjectKey.validate(bucket).isValid shouldBe false + } + test("valid prefix is valid") { + S3ObjectKey.validate(CloudObjectKey("bucket", "ab.c/x/y/z".some)).isValid shouldBe true + S3ObjectKey.validate(CloudObjectKey("bucket", "4a/b/c".some)).isValid shouldBe true + } + test("prefix starting with / or ending with / is invalid") { + S3ObjectKey.validate(CloudObjectKey("bucket", "/ab/c".some)).isValid shouldBe false + S3ObjectKey.validate(CloudObjectKey("bucket", "ab/c/".some)).isValid shouldBe false + } + test("prefix with characters other than 0-9,a-z,A-Z,!, - , _, ., *, ', ), (, and / is invalid") { + S3ObjectKey.validate(CloudObjectKey("bucket", "ab/c$".some)).isValid shouldBe false + S3ObjectKey.validate(CloudObjectKey("bucket", "ab/c%".some)).isValid shouldBe false + S3ObjectKey.validate(CloudObjectKey("bucket", "ab/c&".some)).isValid shouldBe false + S3ObjectKey.validate(CloudObjectKey("bucket", "ab/c?".some)).isValid shouldBe false + S3ObjectKey.validate(CloudObjectKey("bucket", "@ab/c?".some)).isValid shouldBe false + } +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/PropertiesHelper.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/PropertiesHelper.scala new file mode 100644 index 000000000..b673821a7 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/config/PropertiesHelper.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.config + +import cats.implicits.catsSyntaxEitherId +import cats.implicits.toBifunctorOps +import org.apache.kafka.common.config.types.Password +import org.apache.kafka.connect.errors.ConnectException + +import scala.util.Try + +trait PropertiesHelper { + + def getStringEither(props: Map[String, _], key: String): Either[Throwable, String] = + getString(props, key) + .toRight(new ConnectException(s"Configuration is missing the setting for [$key].")) + + def getString(props: Map[String, _], key: String): Option[String] = + props.get(key) + .collect { + case s: String if s.nonEmpty => s + } + + def getPassword(props: Map[String, _], key: String): Option[String] = + props.get(key) + .collect { + case p: Password if p.value().nonEmpty => p.value() + case s: String if s.nonEmpty => s + } + + def getBoolean(props: Map[String, _], key: String): Option[Boolean] = + props.get(key) + .collect { + case b: Boolean => b + case "true" => true + case "false" => false + } + + def getLong(props: Map[String, _], key: String): Option[Long] = getLongEither(props, key).toOption + + def getLongEither(props: Map[String, _], key: String): Either[Throwable, Long] = + props.get(key) match { + case Some(value) => + value match { + case i: Int => i.toLong.asRight[Throwable] + case l: Long => l.asRight[Throwable] + case s: String => + Try(s.toLong).toEither.leftMap(_ => + new ConnectException(s"Configuration for setting [$key] is not a valid long."), + ) + case _ => new ConnectException(s"Configuration for setting [$key] is not a valid long.").asLeft[Long] + } + case None => new ConnectException(s"Configuration is missing the setting for [$key].").asLeft[Long] + } + def getInt(props: Map[String, _], key: String): Option[Int] = getIntEither(props, key).toOption + + def getIntEither(props: Map[String, _], key: String): Either[Throwable, Int] = + props.get(key) match { + case Some(value) => + value match { + case i: Int => i.asRight[Throwable] + case l: Long => l.toInt.asRight[Throwable] + case i: String => + Try(i.toInt).toEither.leftMap(_ => + new ConnectException(s"Configuration for setting [$key] is not a valid integer."), + ) + case _ => new ConnectException(s"Configuration for setting [$key] is not a valid integer.").asLeft[Int] + } + case None => new ConnectException(s"Configuration is missing the setting for [$key].").asLeft[Int] + } + +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/CloudObjectKey.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/CloudObjectKey.scala new file mode 100644 index 000000000..6a2f80143 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/CloudObjectKey.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.consumers + +import cats.data.Validated +import cats.data.ValidatedNel +import cats.implicits.catsSyntaxValidatedId +import org.apache.kafka.connect.errors.ConnectException + +case class CloudObjectKey(bucket: String, prefix: Option[String] = None) { + def withPrefix(prefix: String): CloudObjectKey = copy(prefix = Some(prefix)) +} + +object CloudObjectKey { + def from(bucketAndPrefix: String): Either[Throwable, CloudObjectKey] = + bucketAndPrefix.split(':').toList match { + case bucket :: Nil => Right(CloudObjectKey(bucket)) + case bucket :: prefix :: Nil => + prefix.trim match { + case "" => Right(CloudObjectKey(bucket)) + case p => + //remove / if p ends with / + if (p.endsWith("/")) Right(CloudObjectKey(bucket, Some(p.dropRight(1)))) + else + Right(CloudObjectKey(bucket, Some(p))) + } + case _ => Left(new ConnectException(s"Invalid bucket and prefix $bucketAndPrefix")) + } + def validatedNonEmptyString( + value: Option[String], + filedName: String, + ): ValidatedNel[String, Option[String]] = + value match { + case Some(v) => validatedNonEmptyString(v, filedName).map(Some(_)) + case None => Validated.validNel(None) + } + def validatedNonEmptyString(value: String, filedName: String): ValidatedNel[String, String] = + if (value.trim.isEmpty) { + s"$filedName field cannot be empty".invalidNel + } else { + Validated.validNel(value) + } +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriter.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriter.scala new file mode 100644 index 000000000..9db03153f --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriter.scala @@ -0,0 +1,138 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.consumers + +import cats.implicits.toShow +import cats.implicits.toTraverseOps +import com.typesafe.scalalogging.StrictLogging +import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId +import io.lenses.streamreactor.connect.cloud.common.consumers.ConsumerGroupsWriter.extractOffsets +import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.sink.SinkRecord + +import java.nio.ByteBuffer + +class ConsumerGroupsWriter(location: CloudObjectKey, uploader: Uploader, taskId: ConnectorTaskId) + extends AutoCloseable + with StrictLogging { + + override def close(): Unit = uploader.close() + + def write(records: List[SinkRecord]): Either[Throwable, Unit] = + records.traverse(extractOffsets) + .map(_.flatten) + .map { offsets => + offsets.foldLeft(Map.empty[GroupTopicPartition, OffsetAction]) { + case (acc, action: OffsetAction) => + acc + (action.key -> action) + } + }.flatMap { map => + map.toList.traverse { + case (groupTopicPartition, action) => + val s3KeySuffix = + s"${groupTopicPartition.group}/${groupTopicPartition.topic}/${groupTopicPartition.partition}" + val s3Key = location.prefix.fold(s3KeySuffix)(prefix => s"$prefix/$s3KeySuffix") + + action match { + case WriteOffset(offset) => + val content = ByteBuffer.allocate(8).putLong(offset.metadata.offset).rewind() + logger.debug(s"[${taskId.show}] Uploading offset $offset to $s3Key") + val result = uploader.upload( + content, + location.bucket, + s3Key, + ) + logger.debug(s"[${taskId.show}] Uploaded offset $offset to $s3Key") + result + case DeleteOffset(_) => + logger.debug(s"[${taskId.show}] Deleting offset $s3Key") + val result = uploader.delete( + location.bucket, + s3Key, + ) + logger.debug(s"[${taskId.show}] Deleted offset $s3Key") + result + } + }.map(_ => ()) + } +} + +object ConsumerGroupsWriter extends StrictLogging { + + /** + * Expects the [[SinkRecord]] to contain the key and value as byte arrays. + * The key is expected to be a byte array representation of an [[OffsetKey]], and ignores GroupMetadata keys. + * @param record The [[SinkRecord]] to extract the offset details from. + * @return Either an error or the offset details. + */ + def extractOffsets(record: SinkRecord): Either[Throwable, Option[OffsetAction]] = + Option(record.key()) match { + case None => Right(None) + case Some(key) => + for { + keyBytes <- validateByteArray(key, + "key", + "key.converter=org.apache.kafka.connect.converters.ByteArrayConverter", + ) + buffer = ByteBuffer.wrap(keyBytes) + version = buffer.getShort + result <- if (version == 0 || version == 1) { + for { + key <- OffsetKey.from(version, buffer) + value <- Option(record.value()) match { + case Some(value) => + for { + valueBytes <- validateByteArray( + value, + "value", + "value.converter=org.apache.kafka.connect.converters.ByteArrayConverter", + ) + metadata <- OffsetAndMetadata.from(ByteBuffer.wrap(valueBytes)) + } yield { + Some(WriteOffset(OffsetDetails(key, metadata))) + } + case None => + Right(Some(DeleteOffset(key.key))) + } + } yield value + } else { + Right(None) + } + } yield result + } + + private def validateByteArray(value: Any, name: String, converter: String): Either[Throwable, Array[Byte]] = + value match { + case bytes: Array[Byte] => Right(bytes) + case _ => + Left( + new ConnectException( + s"The record $name is not a byte array. Make sure the connector configuration uses '$converter'.", + ), + ) + } +} + +case class OffsetDetails(key: OffsetKey, metadata: OffsetAndMetadata) + +sealed trait OffsetAction { + def key: GroupTopicPartition +} + +case class WriteOffset(offset: OffsetDetails) extends OffsetAction { + override def key: GroupTopicPartition = offset.key.key +} +case class DeleteOffset(key: GroupTopicPartition) extends OffsetAction diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/GroupTopicPartition.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/GroupTopicPartition.scala new file mode 100644 index 000000000..2eb4bba98 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/GroupTopicPartition.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.consumers + +/** + * Mimics the Kafka core GroupTopicPartition class. + */ +case class GroupTopicPartition( + group: String, + topic: String, + partition: Int, +) diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/OffsetAndMetadata.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/OffsetAndMetadata.scala new file mode 100644 index 000000000..85b93b45f --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/OffsetAndMetadata.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.consumers + +import java.nio.ByteBuffer + +/** + * Mimics the Kafka core OffsetAndMetadata class. + */ + +case class OffsetAndMetadata( + offset: Long, + leaderEpoch: Int, + metadata: String, + commitTimestamp: Long, + expireTimestamp: Long, +) + +object OffsetAndMetadata { + private val LOWEST_SUPPORTED_VERSION: Short = 0 + private val HIGHEST_SUPPORTED_VERSION: Short = 3 + + def from(buffer: ByteBuffer): Either[Throwable, OffsetAndMetadata] = + Option(buffer).toRight(new IllegalArgumentException("Buffer cannot be null")).flatMap { _ => + val version = buffer.getShort() + if (version >= LOWEST_SUPPORTED_VERSION && version <= HIGHEST_SUPPORTED_VERSION) { + val offset = buffer.getLong() + val leaderEpoch = if (version >= 3) buffer.getInt() else -1 + val metadataLength = buffer.getShort() + val metadataBytes = new Array[Byte](metadataLength.toInt) + buffer.get(metadataBytes) + val metadata = new String(metadataBytes) + val commitTimestamp = buffer.getLong() + val expireTimestamp = if (version == 1) buffer.getLong() else -1L + Right(OffsetAndMetadata(offset, leaderEpoch, metadata, commitTimestamp, expireTimestamp)) + } else { + Left(new IllegalArgumentException(s"Unknown offset message version: $version")) + } + } +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/OffsetKey.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/OffsetKey.scala new file mode 100644 index 000000000..279213b5f --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/OffsetKey.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.consumers + +import java.nio.ByteBuffer + +/** + * Mimics the Kafka core OffsetKey class. + */ +case class OffsetKey(version: Short, key: GroupTopicPartition) + +object OffsetKey { + + /** + * Deserializes the OffsetKey from a byte array. + * + * @param version the version of the OffsetKey + * @param buffer the buffer to deserialize from + * @return the deserialized OffsetKey + */ + def from(version: Short, buffer: ByteBuffer): Either[Throwable, OffsetKey] = { + def readStringField(fieldName: String): Either[Throwable, String] = { + val length = buffer.getShort + if (length < 0) Left(new RuntimeException(s"non-nullable field $fieldName was serialized as null")) + else { + val bytes = new Array[Byte](length.toInt) + buffer.get(bytes) + Right(new String(bytes)) + } + } + + for { + group <- readStringField("group") + topic <- readStringField("topic") + partition = buffer.getInt + } yield OffsetKey(version, GroupTopicPartition(group, topic, partition)) + } +} diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/Uploader.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/Uploader.scala new file mode 100644 index 000000000..75d23fb02 --- /dev/null +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/consumers/Uploader.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.consumers + +import java.nio.ByteBuffer + +trait Uploader extends AutoCloseable { + def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] + + def delete(bucket: String, path: String): Either[Throwable, Unit] +} diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriterDecoderTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriterDecoderTest.scala new file mode 100644 index 000000000..1835b7ae1 --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriterDecoderTest.scala @@ -0,0 +1,226 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.consumers + +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.errors.ConnectException +import org.apache.kafka.connect.sink.SinkRecord +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.nio.ByteBuffer + +class ConsumerGroupsWriterDecoderTest extends AnyFunSuite with Matchers { + test("return None if the version is greater 1") { + val buffer = ByteBuffer.allocate(256) + buffer.putShort(2) + val record = new SinkRecord( + "__consumer_offsets", + 77, + Schema.BYTES_SCHEMA, + buffer.array(), + Schema.BYTES_SCHEMA, + Array.emptyByteArray, + -2, + ) + ConsumerGroupsWriter.extractOffsets(record) shouldBe Right(None) + } + + test("return the offset details") { + val buffer = ByteBuffer.allocate(256) + buffer.putShort(0.toShort) + val group = "group" + buffer.putShort(group.getBytes.length.toShort) + buffer.put(group.getBytes) + val topic = "topic" + buffer.putShort(topic.getBytes.length.toShort) + buffer.put(topic.getBytes) + val partition = 11 + buffer.putInt(partition) + + val key = buffer.array + val valueBuffer = ByteBuffer.allocate(256) + valueBuffer.putShort(0.toShort) + val offset = 123L + valueBuffer.putLong(offset) + val metadata = "metadata" + valueBuffer.putShort(metadata.getBytes.length.toShort) + valueBuffer.put(metadata.getBytes) + val commitTimestamp = 456L + valueBuffer.putLong(commitTimestamp) + + val value = valueBuffer.array + + val record = new SinkRecord("__consumer_offsets", 77, Schema.BYTES_SCHEMA, key, Schema.BYTES_SCHEMA, value, 100) + val actual = ConsumerGroupsWriter.extractOffsets(record) + val expected = Right( + Some( + WriteOffset( + OffsetDetails( + OffsetKey(0.toShort, GroupTopicPartition(group, topic, partition)), + OffsetAndMetadata(offset, -1, "metadata", commitTimestamp, -1L), + ), + ), + ), + ) + actual shouldBe expected + } + test("return None if the record key is null") { + val record = new SinkRecord("__consumer_offsets", + 77, + Schema.BYTES_SCHEMA, + null, + Schema.BYTES_SCHEMA, + Array.emptyByteArray, + 100, + ) + ConsumerGroupsWriter.extractOffsets(record) shouldBe Right(None) + } + test("return None if the record value is null") { + val buffer = ByteBuffer.allocate(256) + buffer.putShort(0.toShort) + val group = "group" + buffer.putShort(group.getBytes.length.toShort) + buffer.put(group.getBytes) + val topic = "topic" + buffer.putShort(topic.getBytes.length.toShort) + buffer.put(topic.getBytes) + val partition = 11 + buffer.putInt(partition) + + val key = buffer.array + val record = new SinkRecord("__consumer_offsets", 77, Schema.BYTES_SCHEMA, key, Schema.BYTES_SCHEMA, null, 100) + ConsumerGroupsWriter.extractOffsets(record) shouldBe + Right(Some(DeleteOffset(GroupTopicPartition("group", "topic", 11)))) + } + test("return an error when the record key is non bytes") { + val record = new SinkRecord("__consumer_offsets", + 77, + Schema.STRING_SCHEMA, + "key", + Schema.BYTES_SCHEMA, + Array.emptyByteArray, + 100, + ) + ConsumerGroupsWriter.extractOffsets(record) match { + case Left(value) => + value shouldBe a[ConnectException] + value.getMessage shouldBe "The record key is not a byte array. Make sure the connector configuration uses 'key.converter=org.apache.kafka.connect.converters.ByteArrayConverter'." + case Right(_) => fail("Expecting an error but got a value instead.") + } + + } + test("return an error when the value is not a byte array") { + val buffer = ByteBuffer.allocate(256) + buffer.putShort(0.toShort) + val group = "group" + buffer.putShort(group.getBytes.length.toShort) + buffer.put(group.getBytes) + val topic = "topic" + buffer.putShort(topic.getBytes.length.toShort) + buffer.put(topic.getBytes) + val partition = 11 + buffer.putInt(partition) + + val key = buffer.array + val record = new SinkRecord("__consumer_offsets", 77, Schema.BYTES_SCHEMA, key, Schema.STRING_SCHEMA, "value", 100) + ConsumerGroupsWriter.extractOffsets(record) match { + case Left(value) => + value shouldBe a[ConnectException] + value.getMessage shouldBe "The record value is not a byte array. Make sure the connector configuration uses 'value.converter=org.apache.kafka.connect.converters.ByteArrayConverter'." + case Right(_) => fail("Expecting an error but got a value instead.") + } + } + test("returns the offset details when the version is 3") { + val buffer = ByteBuffer.allocate(256) + buffer.putShort(0.toShort) + val group = "group" + buffer.putShort(group.getBytes.length.toShort) + buffer.put(group.getBytes) + val topic = "topic" + buffer.putShort(topic.getBytes.length.toShort) + buffer.put(topic.getBytes) + val partition = 11 + buffer.putInt(partition) + + val key = buffer.array + val valueBuffer = ByteBuffer.allocate(256) + valueBuffer.putShort(3.toShort) + val offset = 123L + valueBuffer.putLong(offset) + val leaderEpoch = 999 + valueBuffer.putInt(leaderEpoch) + val metadata = "metadata" + valueBuffer.putShort(metadata.getBytes.length.toShort) + valueBuffer.put(metadata.getBytes) + val commitTimestamp = 456L + valueBuffer.putLong(commitTimestamp) + + val value = valueBuffer.array + + val record = new SinkRecord("__consumer_offsets", 77, Schema.BYTES_SCHEMA, key, Schema.BYTES_SCHEMA, value, -2) + + ConsumerGroupsWriter.extractOffsets(record) shouldBe Right( + Some( + WriteOffset( + OffsetDetails( + OffsetKey(0.toShort, GroupTopicPartition(group, topic, partition)), + OffsetAndMetadata(offset, leaderEpoch, "metadata", commitTimestamp, -1L), + ), + ), + ), + ) + } + test("return the offset details when the version is 1") { + val buffer = ByteBuffer.allocate(256) + buffer.putShort(0.toShort) + val group = "group" + buffer.putShort(group.getBytes.length.toShort) + buffer.put(group.getBytes) + val topic = "topic" + buffer.putShort(topic.getBytes.length.toShort) + buffer.put(topic.getBytes) + val partition = 11 + buffer.putInt(partition) + + val key = buffer.array + val valueBuffer = ByteBuffer.allocate(256) + valueBuffer.putShort(1.toShort) + val offset = 123L + valueBuffer.putLong(offset) + val metadata = "metadata" + valueBuffer.putShort(metadata.getBytes.length.toShort) + valueBuffer.put(metadata.getBytes) + val commitTimestamp = 456L + valueBuffer.putLong(commitTimestamp) + val expireTimestamp = 789L + valueBuffer.putLong(expireTimestamp) + + val value = valueBuffer.array + + val record = new SinkRecord("__consumer_offsets", 77, Schema.BYTES_SCHEMA, key, Schema.BYTES_SCHEMA, value, -2) + ConsumerGroupsWriter.extractOffsets(record) shouldBe Right( + Some( + WriteOffset( + OffsetDetails( + OffsetKey(0.toShort, GroupTopicPartition(group, topic, partition)), + OffsetAndMetadata(offset, -1, "metadata", commitTimestamp, expireTimestamp), + ), + ), + ), + ) + } +} diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriterTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriterTest.scala new file mode 100644 index 000000000..9b31f22da --- /dev/null +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/consumers/ConsumerGroupsWriterTest.scala @@ -0,0 +1,317 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.cloud.common.consumers + +import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId +import org.apache.kafka.connect.sink.SinkRecord +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicReference + +class ConsumerGroupsWriterTest extends AnyFunSuite with Matchers { + private val taskId = ConnectorTaskId("connectorA", 1, 1) + test("write the offsets") { + val location = CloudObjectKey("bucket", None) + val offsetArray = new AtomicReference[Array[Byte]](Array.emptyByteArray) + val offsetKey = new AtomicReference[String]("") + val uploader = new Uploader { + override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = { + offsetArray.set(source.array()) + offsetKey.set(path) + Right(()) + } + + override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(()) + override def close(): Unit = {} + } + + val group = "lenses" + val topic = "topic" + val partition = 11 + val offset = 123L + val records = List( + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey(group, topic, partition), + null, + generateOffsetDetails(offset), + 100, + ), + ) + val writer = new ConsumerGroupsWriter(location, uploader, taskId) + writer.write(records) shouldBe Right(()) + offsetArray.get() shouldBe ByteBuffer.allocate(8).putLong(offset).array() + offsetKey.get() shouldBe s"$group/$topic/$partition" + } + + test("writes the offsets for different groups and topics") { + val location = CloudObjectKey("bucket", None) + var writes = Vector[(String, Array[Byte])]() + + val uploader = new Uploader { + override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = { + writes = writes :+ (path, source.array()) + Right(()) + } + override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(()) + override def close(): Unit = {} + } + val writer = new ConsumerGroupsWriter(location, uploader, taskId) + val records = List( + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(123L), + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 2), + null, + generateOffsetDetails(456L), + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic2", 1), + null, + generateOffsetDetails(789L), + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group2", "topic1", 1), + null, + generateOffsetDetails(101112L), + 100, + ), + ) + + writer.write(records) shouldBe Right(()) + val writesToLong = writes.map(w => (w._1, ByteBuffer.wrap(w._2).getLong)) + writesToLong should contain theSameElementsAs List( + ("group1/topic1/1", 123L), + ("group1/topic1/2", 456L), + ("group1/topic2/1", 789L), + ("group2/topic1/1", 101112L), + ) + } + + test("write once the entries for a group-topic-partition in one .write call") { + val location = CloudObjectKey("bucket", None) + var writes = Vector[(String, Array[Byte])]() + + val uploader = new Uploader { + override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = { + writes = writes :+ (path, source.array()) + Right(()) + } + override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(()) + override def close(): Unit = {} + } + val writer = new ConsumerGroupsWriter(location, uploader, taskId) + val records = List( + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(123L), + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(456L), + 101, + ), + ) + + writer.write(records) shouldBe Right(()) + val writesToLong = writes.map(w => (w._1, ByteBuffer.wrap(w._2).getLong)) + writesToLong should contain theSameElementsAs List( + ("group1/topic1/1", 456L), + ) + } + test("fail when the uploader fails") { + val location = CloudObjectKey("bucket", None) + val uploader = new Uploader { + override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = + Left(new RuntimeException("Boom!")) + override def delete(bucket: String, path: String): Either[Throwable, Unit] = Right(()) + override def close(): Unit = {} + } + val writer = new ConsumerGroupsWriter(location, uploader, taskId) + val records = List( + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(123L), + 100, + ), + ) + val result = writer.write(records) + result.isLeft shouldBe true + result.left.getOrElse(fail("should not fail")).getMessage shouldBe "Boom!" + } + + test("write the offset and delete on empty value payload") { + val location = CloudObjectKey("bucket", None) + var writes = Vector[(String, Array[Byte])]() + + var deletes = Vector[String]() + val uploader = new Uploader { + override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = { + writes = writes :+ (path, source.array()) + Right(()) + } + override def delete(bucket: String, path: String): Either[Throwable, Unit] = { + deletes = deletes :+ path + Right(()) + } + override def close(): Unit = {} + } + val writer = new ConsumerGroupsWriter(location, uploader, taskId) + val records = List( + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(123L), + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + null, + 100, + ), + ) + + writer.write(records) shouldBe Right(()) + writes shouldBe empty + deletes should contain theSameElementsAs List( + "group1/topic1/1", + ) + } + + test("write, delete , write writes the offset") { + val location = CloudObjectKey("bucket", None) + var writes = Vector[(String, Array[Byte])]() + + var deletes = Vector[String]() + val uploader = new Uploader { + override def upload(source: ByteBuffer, bucket: String, path: String): Either[Throwable, Unit] = { + writes = writes :+ (path, source.array()) + Right(()) + } + + override def delete(bucket: String, path: String): Either[Throwable, Unit] = { + deletes = deletes :+ path + Right(()) + } + + override def close(): Unit = {} + } + val writer = new ConsumerGroupsWriter(location, uploader, taskId) + val records = List( + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(123L), + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + null, + 100, + ), + new SinkRecord( + "__consumer_offsets", + 77, + null, + generateOffsetKey("group1", "topic1", 1), + null, + generateOffsetDetails(333L), + 100, + ), + ) + + writer.write(records) shouldBe Right(()) + val writesToLong = writes.map(w => (w._1, ByteBuffer.wrap(w._2).getLong)) + writesToLong should contain theSameElementsAs List( + ("group1/topic1/1", 333L), + ) + deletes shouldBe empty + } + private def generateOffsetKey(group: String, topic: String, partition: Int): Array[Byte] = { + val buffer = ByteBuffer.allocate(256) + buffer.putShort(0.toShort) + + buffer.putShort(group.getBytes.length.toShort) + buffer.put(group.getBytes) + + buffer.putShort(topic.getBytes.length.toShort) + buffer.put(topic.getBytes) + buffer.putInt(partition) + buffer.array + + } + + private def generateOffsetDetails(offset: Long) = { + val valueBuffer = ByteBuffer.allocate(256) + valueBuffer.putShort(0.toShort) + valueBuffer.putLong(offset) + val metadata = "metadata" + valueBuffer.putShort(metadata.getBytes.length.toShort) + valueBuffer.put(metadata.getBytes) + val commitTimestamp = 456L + valueBuffer.putLong(commitTimestamp) + valueBuffer.array + } +} From 0d3533752857a17b0e958392f29bd933a88506e6 Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 1 Dec 2023 10:09:41 +0000 Subject: [PATCH 2/2] Reverting back the assertion --- .../connect/aws/s3/config/S3ConfigSettingsTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala index 5722ed73c..67ae92d45 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala @@ -35,7 +35,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging { val configKeys = S3SinkConfigDef.config.configKeys().keySet().asScala ++ S3SourceConfigDef.config.configKeys().keySet().asScala - configKeys.size shouldBe 48 + configKeys.size shouldBe 47 configKeys.foreach { k => k.toLowerCase should be(k) }