diff --git a/src/main/scala/devtools/kafka_data_viewer/AppSettings.scala b/src/main/scala/devtools/kafka_data_viewer/AppSettings.scala index 42461ed..d6fe4d9 100644 --- a/src/main/scala/devtools/kafka_data_viewer/AppSettings.scala +++ b/src/main/scala/devtools/kafka_data_viewer/AppSettings.scala @@ -7,202 +7,258 @@ import scala.collection.mutable import org.yaml.snakeyaml.{DumperOptions, Yaml} -import devtools.kafka_data_viewer.KafkaDataViewer.{ConnectionDefinition, ProducerSettings, _} +import devtools.kafka_data_viewer.KafkaDataViewer.{ + ConnectionDefinition, + ProducerSettings, + _ +} import devtools.kafka_data_viewer.NodeObservableLinking._ import devtools.kafka_data_viewer.ResourceUtility._ import devtools.kafka_data_viewer.TypeSerializers.TypeSerializer -import devtools.kafka_data_viewer.kafkaconn.MessageFormats.{AvroMessage, MessageType, StringMessage, ZipMessage} +import devtools.kafka_data_viewer.kafkaconn.MessageFormats.{ + AvroMessage, + MessageType, + StringMessage, + ZipMessage +} import devtools.lib.rxext.BehaviorSubject import devtools.lib.rxext.Subject.behaviorSubject import devtools.lib.rxui.DisposeStore -class AppSettings( - val connections: BehaviorSubject[Seq[ConnectionDefinition]], - val filters: BehaviorSubject[Seq[FilterData]]) +class AppSettings(val connections: BehaviorSubject[Seq[ConnectionDefinition]], + val filters: BehaviorSubject[Seq[FilterData]]) object AppSettings { - def connect(): AppSettings = { - - val appPropsFile = new File("application.setting.yml") - if (!appPropsFile.exists()) { - appPropsFile.createNewFile() - } + def connect(): AppSettings = { - val dumperOptions = new DumperOptions() - dumperOptions.setPrettyFlow(true) - dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) - val settingsYaml = new Yaml(dumperOptions) - val settingsRaw = withres(new FileInputStream(appPropsFile))(res => settingsYaml.load[java.util.Map[String, Object]](res)) - val settings = if (settingsRaw == null) new java.util.HashMap[String, Object]() else settingsRaw - - val root: Node = new Node(settings, onChange = withres(new FileWriter(appPropsFile))(settingsYaml.dump(settings, _))) - - val connections = behaviorSubject[Seq[ConnectionDefinition]]() - - import TypeSerializers._ - root.listNodes[ConnectionDefinition]("connections", connections, ConnectionDefinition(), (item, node) => { - node.property("name", item.name) - node.property("kafkaHost", item.kafkaHost) - node.listStrings("avroRegistries", item.avroRegistries) - node.listCustom("topicsSettings", item.topicSettings) - node.listNodes[ProducerSettings]("producers", item.producers, ProducerSettings(), (item, node) => { - node.property("topic", item.topic) - node.propertyCustom("custom", item.custom) - node.propertyCustom("messageType", item.msgType) - node.propertyCustom("partition", item.partition) - node.property("key", item.key) - node.property("value", item.value) - node.propertyCustom("order", item.order) - }) - }) - - val filters = behaviorSubject[Seq[FilterData]](Seq()) - - root.listNodes[FilterData]("filters", filters, (behaviorSubject(), behaviorSubject()), (item, node) => { - node.property("filter", item._1) - node.listStrings("topics", item._2) - }) - - NodeObservableLinking.initialize = false - - new AppSettings(connections, filters) + val appPropsFile = new File("application.setting.yml") + if (!appPropsFile.exists()) { + appPropsFile.createNewFile() } + + val dumperOptions = new DumperOptions() + dumperOptions.setPrettyFlow(true) + dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK) + val settingsYaml = new Yaml(dumperOptions) + val settingsRaw = withRes(new FileInputStream(appPropsFile))( + res => settingsYaml.load[java.util.Map[String, Object]](res) + ) + val settings = + if (settingsRaw == null) new java.util.HashMap[String, Object]() + else settingsRaw + + val root: Node = new Node( + settings, + onChange = + withRes(new FileWriter(appPropsFile))(settingsYaml.dump(settings, _)) + ) + + val connections = behaviorSubject[Seq[ConnectionDefinition]]() + + import TypeSerializers._ + root.listNodes[ConnectionDefinition]( + "connections", + connections, + ConnectionDefinition(), + (item, node) => { + node.property("name", item.name) + node.property("kafkaHost", item.kafkaHost) + node.listStrings("avroRegistries", item.avroRegistries) + node.listCustom("topicsSettings", item.topicSettings) + node.listNodes[ProducerSettings]( + "producers", + item.producers, + ProducerSettings(), + (item, node) => { + node.property("topic", item.topic) + node.propertyCustom("custom", item.custom) + node.propertyCustom("messageType", item.msgType) + node.propertyCustom("partition", item.partition) + node.property("key", item.key) + node.property("value", item.value) + node.propertyCustom("order", item.order) + } + ) + } + ) + + val filters = behaviorSubject[Seq[FilterData]](Seq()) + + root.listNodes[FilterData]( + "filters", + filters, + (behaviorSubject(), behaviorSubject()), + (item, node) => { + node.property("filter", item._1) + node.listStrings("topics", item._2) + } + ) + + NodeObservableLinking.initialize = false + + new AppSettings(connections, filters) + } } object TypeSerializers { - trait TypeSerializer[T] { - - def serialize(t: T): String - - def deserialize(s: String): T + trait TypeSerializer[T] { + + def serialize(t: T): String + + def deserialize(s: String): T + } + + implicit val msgTypeType: TypeSerializer[MessageType] = + new TypeSerializer[MessageType] { + private val stringType: String = "String" + private val zipType: String = "GZIP" + private val avroType: String = "AVRO" + + override def serialize(t: MessageType): String = t match { + case StringMessage => stringType + case ZipMessage => zipType + case AvroMessage(avroHost) => s"$avroType:$avroHost" + } + + override def deserialize(s: String): MessageType = s match { + case `stringType` => StringMessage + case `zipType` => ZipMessage + case avro if avro startsWith s"$avroType:" => + AvroMessage(s.substring(avroType.length + 1)) + case _ => + throw new IllegalStateException("not supported serialized type") + } } - implicit val msgTypeType: TypeSerializer[MessageType] = new TypeSerializer[MessageType] { - private val stringType: String = "String" - private val zipType: String = "GZIP" - private val avroType: String = "AVRO" + implicit val topicToMsgType: TypeSerializer[TopicToMessageType] = + new TypeSerializer[TopicToMessageType]() { - override def serialize(t: MessageType): String = t match { - case StringMessage => stringType - case ZipMessage => zipType - case AvroMessage(avroHost) => s"$avroType:$avroHost" - } + override def serialize(t: TopicToMessageType): String = + t._1 + ":" + msgTypeType.serialize(t._2) - override def deserialize(s: String): MessageType = s match { - case `stringType` => StringMessage - case `zipType` => ZipMessage - case avro if avro startsWith s"$avroType:" => AvroMessage(s.substring(avroType.length + 1)) - case _ => throw new IllegalStateException("not supported serialized type") - } + override def deserialize(s: String): TopicToMessageType = { + val idx = s.indexOf(':') + s.substring(0, idx) -> msgTypeType.deserialize(s.substring(idx + 1)) + } } - implicit val topicToMsgType: TypeSerializer[TopicToMessageType] = new TypeSerializer[TopicToMessageType]() { + implicit val booleanType: TypeSerializer[Boolean] = + new TypeSerializer[Boolean] { - override def serialize(t: TopicToMessageType): String = t._1 + ":" + msgTypeType.serialize(t._2) + override def serialize(t: Boolean): String = t.toString - override def deserialize(s: String): TopicToMessageType = { - val idx = s.indexOf(':') - s.substring(0, idx) -> msgTypeType.deserialize(s.substring(idx + 1)) - } + override def deserialize(s: String): Boolean = "true" == s } - implicit val booleanType: TypeSerializer[Boolean] = new TypeSerializer[Boolean] { + implicit val optionalIntType: TypeSerializer[Option[Int]] = + new TypeSerializer[Option[Int]] { - override def serialize(t: Boolean): String = t.toString + override def serialize(t: Option[Int]): String = + t.map(_.toString).getOrElse("-") - override def deserialize(s: String): Boolean = "true" == s + override def deserialize(s: String): Option[Int] = + if (s == null || s == "-" || s == "") None else Some(s.toInt) } - implicit val optionalIntType: TypeSerializer[Option[Int]] = new TypeSerializer[Option[Int]] { + implicit val optionalMessageTypeType: TypeSerializer[Option[MessageType]] = + new TypeSerializer[Option[MessageType]] { - override def serialize(t: Option[Int]): String = t.map(_.toString).getOrElse("-") + override def serialize(t: Option[MessageType]): String = + t.map(msgTypeType.serialize).getOrElse("-") - override def deserialize(s: String): Option[Int] = if (s == null || s == "-") None else Some(s.toInt) + override def deserialize(s: String): Option[MessageType] = + if (s == null || s == "-") None else Some(msgTypeType.deserialize(s)) } - implicit val optionalMessageTypeType: TypeSerializer[Option[MessageType]] = new TypeSerializer[Option[MessageType]] { - - override def serialize(t: Option[MessageType]): String = t.map(msgTypeType.serialize).getOrElse("-") - - override def deserialize(s: String): Option[MessageType] = if (s == null || s == "-") None else Some(msgTypeType.deserialize(s)) - } + implicit val intType: TypeSerializer[Int] = new TypeSerializer[Int] { - implicit val intType: TypeSerializer[Int] = new TypeSerializer[Int] { + override def serialize(t: Int): String = t.toString - override def serialize(t: Int): String = t.toString - - override def deserialize(s: String): Int = if (s == null) -1 else s.toInt - } + override def deserialize(s: String): Int = if (s == null) -1 else s.toInt + } } object NodeObservableLinking { - val $ = new DisposeStore() - var initialize = true - - class Node(root: java.util.Map[String, Object], onChange: => Unit) { - - def listNodes[T](nodeName: String, items: BehaviorSubject[Seq[T]], creator: => T, bind: (T, Node) => Unit): Unit = { - root.putIfAbsent(nodeName, new util.ArrayList[util.Map[String, Object]]()) - val contents = root.get(nodeName).asInstanceOf[util.ArrayList[util.Map[String, Object]]] - val prevItems = mutable.ArrayBuffer[(T, util.Map[String, Object])]() - for (content <- contents.asScala) { - val item = creator - val thisNode = new Node(content, onChange) - bind(item, thisNode) - prevItems += item -> content - } - items << prevItems.map(_._1) - - for (items <- $(items)) { - val newItems = items diff prevItems.map(_._1) - val removedItems = prevItems.filterNot(x => items.contains(x._1)) - for (item <- newItems) { - val content = new util.HashMap[String, Object]() - val node = new Node(content, onChange) - bind(item, node) - contents.add(content) - prevItems += item -> content - } - for (item <- removedItems) { - contents.remove(item._2) - prevItems -= item - } - onChange - } + val $ = new DisposeStore() + var initialize = true + + class Node(root: java.util.Map[String, Object], onChange: => Unit) { + + def listNodes[T](nodeName: String, + items: BehaviorSubject[Seq[T]], + creator: => T, + bind: (T, Node) => Unit): Unit = { + root.putIfAbsent(nodeName, new util.ArrayList[util.Map[String, Object]]()) + val contents = root + .get(nodeName) + .asInstanceOf[util.ArrayList[util.Map[String, Object]]] + val prevItems = mutable.ArrayBuffer[(T, util.Map[String, Object])]() + for (content <- contents.asScala) { + val item = creator + val thisNode = new Node(content, onChange) + bind(item, thisNode) + prevItems += item -> content + } + items << prevItems.map(_._1) + + for (items <- $(items)) { + val newItems = items diff prevItems.map(_._1) + val removedItems = prevItems.filterNot(x => items.contains(x._1)) + for (item <- newItems) { + val content = new util.HashMap[String, Object]() + val node = new Node(content, onChange) + bind(item, node) + contents.add(content) + prevItems += item -> content } - - def property(nodeName: String, prop: BehaviorSubject[String], defaultValue: String = ""): Unit = { - if (initialize) prop << root.getOrDefault(nodeName, defaultValue).toString - for (change <- $(prop)) {root.put(nodeName, change); onChange } + for (item <- removedItems) { + contents.remove(item._2) + prevItems -= item } + onChange + } + } - def propertyCustom[T](nodeName: String, prop: BehaviorSubject[T])(implicit converter: TypeSerializer[T]): Unit = { - if (initialize) prop << converter.deserialize(root.get(nodeName).asInstanceOf[String]) - for (change <- $(prop)) {root.put(nodeName, converter.serialize(change)); onChange } - } + def property(nodeName: String, + prop: BehaviorSubject[String], + defaultValue: String = ""): Unit = { + if (initialize) prop << root.getOrDefault(nodeName, defaultValue).toString + for (change <- $(prop)) { root.put(nodeName, change); onChange } + } - def listStrings(nodeName: String, prop: BehaviorSubject[Seq[String]]): Unit = { - if (initialize) { - val initial = root.get(nodeName).asInstanceOf[util.List[String]].asScala - prop << Option(initial).getOrElse(Seq()) - } - for (change <- $(prop)) {root.put(nodeName, change.asJava); onChange } - } + def propertyCustom[T](nodeName: String, prop: BehaviorSubject[T])( + implicit converter: TypeSerializer[T] + ): Unit = { + if (initialize) + prop << converter.deserialize(root.get(nodeName).asInstanceOf[String]) + for (change <- $(prop)) { + root.put(nodeName, converter.serialize(change)); onChange + } + } - def listCustom[T](nodeName: String, prop: BehaviorSubject[Seq[T]])(implicit converter: TypeSerializer[T]): Unit = { - if (initialize) { - val initial = root.get(nodeName).asInstanceOf[util.List[String]].asScala - prop << Option(initial).getOrElse(Seq()).map(converter.deserialize) - } - for (change <- $(prop)) {root.put(nodeName, change.map(converter.serialize).asJava); onChange } - } + def listStrings(nodeName: String, + prop: BehaviorSubject[Seq[String]]): Unit = { + if (initialize) { + val initial = root.get(nodeName).asInstanceOf[util.List[String]].asScala + prop << Option(initial).getOrElse(Seq()) + } + for (change <- $(prop)) { root.put(nodeName, change.asJava); onChange } } -} + def listCustom[T](nodeName: String, prop: BehaviorSubject[Seq[T]])( + implicit converter: TypeSerializer[T] + ): Unit = { + if (initialize) { + val initial = root.get(nodeName).asInstanceOf[util.List[String]].asScala + prop << Option(initial).getOrElse(Seq()).map(converter.deserialize) + } + for (change <- $(prop)) { + root.put(nodeName, change.map(converter.serialize).asJava); onChange + } + } + } -object ResourceUtility { - def withres[T <: AutoCloseable, R](t: T)(f: T => R): R = try f(t) finally t.close() -} \ No newline at end of file +} diff --git a/src/main/scala/devtools/kafka_data_viewer/AppUpdate.scala b/src/main/scala/devtools/kafka_data_viewer/AppUpdate.scala index f96a829..8c9062d 100644 --- a/src/main/scala/devtools/kafka_data_viewer/AppUpdate.scala +++ b/src/main/scala/devtools/kafka_data_viewer/AppUpdate.scala @@ -5,45 +5,50 @@ import java.util.Properties import com.fasterxml.jackson.databind.ObjectMapper -object AppUpdate { - - case class AppUpdateInfo(currentVersion: String, newVersion: String, updateUrl: String) - - def verify(): Option[AppUpdateInfo] = { - +import scala.util.Try - try { - (for { - propsStream <- Option(AppUpdate.getClass.getClassLoader.getResourceAsStream("build.properties")) - props = {val p = new Properties(); p.load(propsStream); p } - version = props.getProperty("version") - baseUrl = props.getProperty("url") - updateUrl = props.getProperty("update_url") - } yield { - val url = new URL(baseUrl) - val con = url.openConnection() - val in = con.getInputStream - - val mapper = new ObjectMapper() - val json = mapper.readValue(in, classOf[java.util.HashMap[_, _]]) - - val newVersion = json.get("name").asInstanceOf[String] - - in.close() - if (version != newVersion) - Some(AppUpdateInfo( - currentVersion = version, - newVersion = newVersion, - updateUrl = updateUrl - )) - else - None - }).flatten - } catch { - case e: Exception => - e.printStackTrace() - None - } +object AppUpdate { - } + case class AppUpdateInfo(currentVersion: String, + newVersion: String, + updateUrl: String) + + def verify(): Option[AppUpdateInfo] = { + Try(for { + propsStream <- Option( + AppUpdate.getClass.getClassLoader + .getResourceAsStream("build.properties") + ) + props <- Try { val p = new Properties(); p.load(propsStream); p }.toOption + version = props.getProperty("version") + url = new URL(props.getProperty("url")) + updateUrl = props.getProperty("update_url") + info <- ResourceUtility.using(url.openConnection())(_ => ()) { + con => + val newVersion = + ResourceUtility.using(con.getInputStream)(_.close()) { in => + val mapper = new ObjectMapper() + val json = mapper.readValue(in, classOf[java.util.HashMap[_, _]]) + json.get("name").asInstanceOf[String] + } + if (version != newVersion) + Some( + AppUpdateInfo( + currentVersion = version, + newVersion = newVersion, + updateUrl = updateUrl + ) + ) + else + None + } + } yield info) + .recover { + case e => + e.printStackTrace() + None + } + .toOption + .flatten + } } diff --git a/src/main/scala/devtools/kafka_data_viewer/Loan.scala b/src/main/scala/devtools/kafka_data_viewer/Loan.scala new file mode 100644 index 0000000..e8853f4 --- /dev/null +++ b/src/main/scala/devtools/kafka_data_viewer/Loan.scala @@ -0,0 +1,30 @@ +package devtools.kafka_data_viewer + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.control.Exception.ignoring + +object Loan extends LoanT + +/** + * https://github.com/scalikejdbc/scalikejdbc/blob/master/scalikejdbc-core/src/main/scala/scalikejdbc/LoanPattern.scala + * Loan pattern implementation + */ +trait LoanT { + type Closable = { def close(): Unit } + + def using[R <: Closable, A](resource: R)(f: R => A): A = + try f(resource) + finally ignoring(classOf[Throwable]) apply { + resource.close() + } + + /** + * Guarantees a Closeable resource will be closed after being passed to a block that takes + * the resource as a parameter and returns a Future. + */ + def futureUsing[R <: Closable, A](resource: R)( + f: R => Future[A] + )(implicit ec: ExecutionContext): Future[A] = f(resource) andThen { + case _ => resource.close() + } // close no matter what +} diff --git a/src/main/scala/devtools/kafka_data_viewer/ResourceUtility.scala b/src/main/scala/devtools/kafka_data_viewer/ResourceUtility.scala new file mode 100644 index 0000000..f2eabdb --- /dev/null +++ b/src/main/scala/devtools/kafka_data_viewer/ResourceUtility.scala @@ -0,0 +1,14 @@ +package devtools.kafka_data_viewer + +object ResourceUtility { + def using[T, R](acquire: => T)(release: T => Unit)(use: T => R): R = { + try { + use(acquire) + } finally { + release() + } + } + + def withRes[T <: AutoCloseable, R](acquire: => T)(use: T => R): R = + using(acquire)(t => t.close())(use) +} diff --git a/src/main/scala/devtools/kafka_data_viewer/kafkaconn/KafkaConnector.scala b/src/main/scala/devtools/kafka_data_viewer/kafkaconn/KafkaConnector.scala index a30a60a..7d4a5fd 100644 --- a/src/main/scala/devtools/kafka_data_viewer/kafkaconn/KafkaConnector.scala +++ b/src/main/scala/devtools/kafka_data_viewer/kafkaconn/KafkaConnector.scala @@ -7,13 +7,24 @@ import java.util.concurrent.Semaphore import java.util.concurrent.atomic.AtomicInteger import devtools.kafka_data_viewer.kafkaconn.Connector._ -import devtools.kafka_data_viewer.kafkaconn.CustomPartitioner.DelegatingPartitioner import devtools.lib.rxext.{Observable, Subject} import devtools.lib.rxui.DisposeStore -import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} -import org.apache.kafka.clients.producer.{KafkaProducer, Partitioner, ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.consumer.{ + ConsumerConfig, + ConsumerRecord, + KafkaConsumer +} +import org.apache.kafka.clients.producer.{ + KafkaProducer, + Partitioner, + ProducerConfig, + ProducerRecord +} import org.apache.kafka.common.errors.InterruptException -import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} +import org.apache.kafka.common.serialization.{ + ByteArrayDeserializer, + ByteArraySerializer +} import org.apache.kafka.common.{Cluster, PartitionInfo, TopicPartition} import scala.collection.JavaConverters._ @@ -23,227 +34,309 @@ import scala.language.postfixOps class KafkaConnector(host: String) extends Connector { + private val consumers = ArrayBuffer[ConsumerConnection]() + private val producers = ArrayBuffer[ProducerConnection]() - private val consumers = ArrayBuffer[ConsumerConnection]() - private val producers = ArrayBuffer[ProducerConnection]() + override def connectConsumer(): ConsumerConnection = { + val connection = new KafkaConsumerConnection(host) + consumers += connection + connection + } - override def connectConsumer(): ConsumerConnection = { - val connection = new KafkaConsumerConnection(host) - consumers += connection - connection - } + override def connectProducer(): ProducerConnection = { + val connection = new KafkaProducerConnection(host) + producers += connection + connection + } - override def connectProducer(): ProducerConnection = { - val connection = new KafkaProducerConnection(host) - producers += connection - connection - } - - override def closeAll(): Unit = { - consumers foreach (_.close()) - producers foreach (_.close()) - } + override def closeAll(): Unit = { + consumers foreach (_.close()) + producers foreach (_.close()) + } } object Counter { - val counter = new AtomicInteger(0) + val counter = new AtomicInteger(0) } class KafkaConsumerConnection(host: String) extends ConsumerConnection { - private val consumerProps = Map( - "bootstrap.servers" -> host - , "key.deserializer" -> classOf[ByteArrayDeserializer].getName - , "value.deserializer" -> classOf[ByteArrayDeserializer].getName - , "enable.auto.commit" -> "false" - , ConsumerConfig.FETCH_MAX_BYTES_CONFIG -> new Integer(100000) - , ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG -> new Integer(100000)) - - - type GenRecord = ConsumerRecord[Array[Byte], Array[Byte]] - - var closed = false - val opSemaphore = new Semaphore(1, false) - - println("Connect with " + (consumerProps: Map[String, Object]).asJava.get("bootstrap.servers")) - private val consumer = new KafkaConsumer[Array[Byte], Array[Byte]]((consumerProps: Map[String, Object]).asJava) - - override def queryTopics(): Seq[String] = operation { - println("Query topics") - consumer.listTopics().asScala.filterNot(_._1.startsWith("_")).keys.toSeq.sorted + private val consumerProps = Map( + "bootstrap.servers" -> host, + "key.deserializer" -> classOf[ByteArrayDeserializer].getName, + "value.deserializer" -> classOf[ByteArrayDeserializer].getName, + "enable.auto.commit" -> "false", + ConsumerConfig.FETCH_MAX_BYTES_CONFIG -> new Integer(100000), + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG -> new Integer(100000) + ) + + type GenRecord = ConsumerRecord[Array[Byte], Array[Byte]] + + var closed = false + val opSemaphore = new Semaphore(1, false) + + println( + "Connect with " + (consumerProps: Map[String, Object]).asJava + .get("bootstrap.servers") + ) + private val consumer = new KafkaConsumer[Array[Byte], Array[Byte]]( + (consumerProps: Map[String, Object]).asJava + ) + + override def queryTopics(): Seq[String] = operation { + println("Query topics") + consumer + .listTopics() + .asScala + .filterNot(_._1.startsWith("_")) + .keys + .toSeq + .sorted + } + + override def queryTopicsWithSizes(): Seq[(String, Long)] = operation { + println("Query topics with sizes") + val allTopics: mutable.Map[String, util.List[PartitionInfo]] = + consumer.listTopics().asScala + val topics = allTopics.filterNot(_._1.startsWith("_")) + val topicPartitions: Seq[TopicPartition] = topics.values + .flatMap(x => x.asScala) + .map(info => new TopicPartition(info.topic(), info.partition())) + .toSeq + val beginnings = + consumer.beginningOffsets(topicPartitions.asJava).asScala.toMap + val endings = consumer.endOffsets(topicPartitions.asJava).asScala.toSeq + val sizes = endings.map { + case (ref, end) => ref -> (end - beginnings.getOrElse(ref, end)) } - - override def queryTopicsWithSizes(): Seq[(String, Long)] = operation { - println("Query topics with sizes") - val allTopics: mutable.Map[String, util.List[PartitionInfo]] = consumer.listTopics().asScala - val topics = allTopics.filterNot(_._1.startsWith("_")) - val topicPartitions: Seq[TopicPartition] = topics.values.flatMap(x => x.asScala) - .map(info => new TopicPartition(info.topic(), info.partition())).toSeq - val beginnings = consumer.beginningOffsets(topicPartitions.asJava).asScala.toMap - val endings = consumer.endOffsets(topicPartitions.asJava).asScala.toSeq - val sizes = endings.map { case (ref, end) => ref -> (end - beginnings.getOrElse(ref, end)) } - val result = sizes.foldLeft(Map[String, Long]())((acc, el) => acc + (el._1.topic() -> (el._2 + acc.getOrElse(el._1.topic(), 0L)))).toSeq - result.sortBy(_._1) + val result = sizes + .foldLeft(Map[String, Long]())( + (acc, el) => + acc + (el._1.topic() -> (el._2 + acc.getOrElse(el._1.topic(), 0L))) + ) + .toSeq + result.sortBy(_._1) + } + + private def buildBinaryRecord( + r: ConsumerRecord[Array[Byte], Array[Byte]] + ): BinaryTopicRecord = + BinaryTopicRecord( + topic = r.topic(), + partition = r.partition(), + offset = r.offset(), + time = Instant.ofEpochMilli(r.timestamp()), + key = r.key(), + value = r.value() + ) + + override def readNextRecords()( + executionMonitor: Option[Subject[String]], + stop: Option[Observable[Unit]] + ): Observable[Seq[BinaryTopicRecord]] = { + println("read next topics") + val $ = new DisposeStore() + var read = 0 + var stopped = false + for (stopAction <- stop; _ <- $(stopAction)) stopped = true + Observable.create( + obs => + operation { + for (recs <- Stream + .continually(consumer.poll(ofSeconds(3))) + .takeWhile(!_.isEmpty && !stopped)) { + val records = recs.asScala.toSeq + read += records.size + for (mon <- executionMonitor) mon << ("Read " + read) + obs onNext records.map(buildBinaryRecord) + } + for (mon <- executionMonitor) mon << ("Completed read " + read) + obs onComplete () + $.dispose() + } + ) + } + + override def readTopicsContinually( + topics: Observable[Seq[String]] + ): Observable[Seq[BinaryTopicRecord]] = { + println("read topics continually") + + def assignToTopics(topics: Seq[String]): Unit = { + val topicPartitions = consumer + .listTopics() + .asScala + .filter(x => topics.contains(x._1)) + .toSeq + .flatMap( + x => + x._2.asScala + .map(info => new TopicPartition(info.topic(), info.partition())) + ) + .asJava + + consumer.assign(topicPartitions) + consumer.seekToEnd(topicPartitions) } - private def buildBinaryRecord(r: ConsumerRecord[Array[Byte], Array[Byte]]): BinaryTopicRecord = - BinaryTopicRecord(topic = r.topic(), partition = r.partition(), offset = r.offset(), time = Instant.ofEpochMilli(r.timestamp()), key = r.key(), value = r.value()) - - override def readNextRecords()(executionMonitor: Option[Subject[String]], stop: Option[Observable[Unit]]): Observable[Seq[BinaryTopicRecord]] = { - println("read next topics") - val $ = new DisposeStore() - var read = 0 - var stopped = false - for (stopAction <- stop; _ <- $(stopAction)) stopped = true - Observable.create(obs => operation { - for (recs <- Stream.continually(consumer.poll(ofSeconds(3))).takeWhile(!_.isEmpty && !stopped)) { - val records = recs.asScala.toSeq - read += records.size - for (mon <- executionMonitor) mon << ("Read " + read) - obs onNext records.map(buildBinaryRecord) - } - for (mon <- executionMonitor) mon << ("Completed read " + read) - obs onComplete() - $.dispose() - }) - } - - override def readTopicsContinually(topics: Observable[Seq[String]]): Observable[Seq[BinaryTopicRecord]] = { - println("read topics continually") - - def assignToTopics(topics: Seq[String]): Unit = { - val topicPartitions = consumer.listTopics().asScala - .filter(x => topics.contains(x._1)) - .toSeq - .flatMap(x => x._2.asScala.map(info => new TopicPartition(info.topic(), info.partition()))) - .asJava - - consumer.assign(topicPartitions) - consumer.seekToEnd(topicPartitions) - } - - Observable.create(obs => operation { - - assignToTopics(Seq()) - var assignment = Seq[String]() - var newTopics: Seq[String] = null - topics.subscribe(topics => newTopics = topics) - val getNewTopics = () => {val r = newTopics; newTopics = null; r } - - val reassign = (newTopics: Seq[String]) => if (newTopics != null) { - assignToTopics(newTopics) - assignment = newTopics - } - - val nextRecords: Unit => Seq[GenRecord] = _ => - if (assignment.isEmpty) {try Thread.sleep(1000) catch {case e: InterruptedException =>}; Seq() } - else try consumer.poll(ofSeconds(1)).asScala.toSeq catch {case e: InterruptException => Seq()} - - - for (records <- Stream.continually(reassign.andThen(nextRecords)(getNewTopics())).takeWhile(_ => !closed)) - obs onNext (records map buildBinaryRecord) - - obs onComplete() - }) - } - - override def readTopic(topic: String, limitPerPartition: Long)(executionMonitor: Option[Subject[String]], stop: Option[Observable[Unit]]): Observable[Seq[BinaryTopicRecord]] = { - println("read topics") - val $ = new DisposeStore() - val partitions: Seq[PartitionInfo] = consumer.listTopics().asScala.find(_._1 == topic).get._2.asScala - val topicPartitions = partitions.map(info => new TopicPartition(info.topic(), info.partition())) - - val beginnings = consumer.beginningOffsets(topicPartitions.asJava).asScala.toMap - val endings = consumer.endOffsets(topicPartitions.asJava).asScala.toSeq - val position = (p: TopicPartition, e: Long) => if (limitPerPartition < 0L) beginnings(p).toLong else math.max(e - limitPerPartition, beginnings(p)) - val positions: Seq[(TopicPartition, Long)] = endings.map(x => x._1 -> position(x._1, x._2)) - - val totalCount = endings.map(x => x._2 - beginnings(x._1)).sum - consumer.assign(topicPartitions.asJava) - for ((partition, position) <- positions) consumer.seek(partition, position) - - for (mon <- executionMonitor) mon onNext ("Total to read " + totalCount) - - var read = 0 - var stopped = false - for (stopAction <- stop; _ <- $(stopAction)) stopped = true - Observable.create(obs => operation { - for (recs <- Stream.continually(consumer.poll(ofSeconds(3))).takeWhile(!_.isEmpty && !stopped)) { - val records = recs.asScala.toSeq - read += records.size - for (mon <- executionMonitor) mon onNext ("Read " + read + " of " + totalCount) - obs onNext (records map buildBinaryRecord) - } - for (mon <- executionMonitor) mon onNext ("Completed read " + read + " of " + totalCount) - obs onComplete() - $.dispose() - }) - } - - private def operation[R](f: => R): R = { - if (!opSemaphore.tryAcquire()) throw new Exception("Operation within execution") - else try f finally opSemaphore.release() - } - - override def close(): Unit = { - println("close") - closed = true - opSemaphore.acquire() - consumer.close() - } - - override def queryTopicPartitions(topic: String): Seq[Int] = { - println("query topic partitions") - consumer.partitionsFor(topic).asScala.map(_.partition()).sorted - } + Observable.create( + obs => + operation { + + assignToTopics(Seq()) + var assignment = Seq[String]() + var newTopics: Seq[String] = null + topics.subscribe(topics => newTopics = topics) + val getNewTopics = () => { val r = newTopics; newTopics = null; r } + + val reassign = (newTopics: Seq[String]) => + if (newTopics != null) { + assignToTopics(newTopics) + assignment = newTopics + } + + val nextRecords: Unit => Seq[GenRecord] = _ => + if (assignment.isEmpty) { + try Thread.sleep(1000) + catch { case e: InterruptedException => }; Seq() + } else + try consumer.poll(ofSeconds(1)).asScala.toSeq + catch { case e: InterruptException => Seq() } + + for (records <- Stream + .continually(reassign.andThen(nextRecords)(getNewTopics())) + .takeWhile(_ => !closed)) + obs onNext (records map buildBinaryRecord) + + obs onComplete () + } + ) + } + + override def readTopic(topic: String, limitPerPartition: Long)( + executionMonitor: Option[Subject[String]], + stop: Option[Observable[Unit]] + ): Observable[Seq[BinaryTopicRecord]] = { + println("read topics") + val $ = new DisposeStore() + val partitions: Seq[PartitionInfo] = + consumer.listTopics().asScala.find(_._1 == topic).get._2.asScala + val topicPartitions = + partitions.map(info => new TopicPartition(info.topic(), info.partition())) + + val beginnings = + consumer.beginningOffsets(topicPartitions.asJava).asScala.toMap + val endings = consumer.endOffsets(topicPartitions.asJava).asScala.toSeq + val position = (p: TopicPartition, e: Long) => + if (limitPerPartition < 0L) beginnings(p).toLong + else math.max(e - limitPerPartition, beginnings(p)) + val positions: Seq[(TopicPartition, Long)] = + endings.map(x => x._1 -> position(x._1, x._2)) + + val totalCount = endings.map(x => x._2 - beginnings(x._1)).sum + consumer.assign(topicPartitions.asJava) + for ((partition, position) <- positions) consumer.seek(partition, position) + + for (mon <- executionMonitor) mon onNext ("Total to read " + totalCount) + + var read = 0 + var stopped = false + for (stopAction <- stop; _ <- $(stopAction)) stopped = true + Observable.create( + obs => + operation { + for (recs <- Stream + .continually(consumer.poll(ofSeconds(3))) + .takeWhile(!_.isEmpty && !stopped)) { + val records = recs.asScala.toSeq + read += records.size + for (mon <- executionMonitor) + mon onNext ("Read " + read + " of " + totalCount) + obs onNext (records map buildBinaryRecord) + } + for (mon <- executionMonitor) + mon onNext ("Completed read " + read + " of " + totalCount) + obs onComplete () + $.dispose() + } + ) + } + + private def operation[R](f: => R): R = { + if (!opSemaphore.tryAcquire()) + throw new Exception("Operation within execution") + else + try f + finally opSemaphore.release() + } + + override def close(): Unit = { + println("close") + closed = true + opSemaphore.acquire() + consumer.close() + } + + override def queryTopicPartitions(topic: String): Seq[Int] = { + println("query topic partitions") + consumer.partitionsFor(topic).asScala.map(_.partition()).sorted + } } class KafkaProducerConnection(host: String) extends ProducerConnection { - private val producerProps = Map( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> host, - ProducerConfig.ACKS_CONFIG -> "all", - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, - ProducerConfig.PARTITIONER_CLASS_CONFIG -> classOf[DelegatingPartitioner].getName) - - private val producer = new KafkaProducer[Array[Byte], Array[Byte]]((producerProps: Map[String, Object]).asJava) - - override def send(topic: String, key: Array[Byte], value: Array[Byte], partition: PartitionerMode): Unit = { - try { - println("Try to send message!") - CustomPartitioner.partitionMode = partition - producer.send(new ProducerRecord(topic, key, value)) - } catch { - case e: Exception => e.printStackTrace() - } + private val producerProps = Map( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> host, + ProducerConfig.ACKS_CONFIG -> "all", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, + ProducerConfig.PARTITIONER_CLASS_CONFIG -> classOf[CustomPartitioner].getName + ) + + private val producer = new KafkaProducer[Array[Byte], Array[Byte]]( + (producerProps: Map[String, Object]).asJava + ) + + override def send(topic: String, + key: Array[Byte], + value: Array[Byte], + partition: PartitionerMode): Unit = { + try { + println("Try to send message!") + CustomPartitioner.partitionMode = partition + producer.send(new ProducerRecord(topic, key, value)) + } catch { + case e: Exception => e.printStackTrace() } + } - override def close(): Unit = producer.close() + override def close(): Unit = producer.close() } object CustomPartitioner { + val DefaultPartitioner = + new org.apache.kafka.clients.producer.internals.DefaultPartitioner() +} +class CustomPartitioner(partitionMode: PartitionerMode) extends Partitioner { + import CustomPartitioner._ - private val defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner() - - var partitionMode: PartitionerMode = _ - - class DelegatingPartitioner extends Partitioner { - - override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = partitionMode match { - case DefaultPartitioner => defaultPartitioner.partition(topic, key, keyBytes, value, valueBytes, cluster) - case ExactPartition(partition) => partition - } - - override def close(): Unit = Unit + override def partition(topic: String, + key: Any, + keyBytes: Array[Byte], + value: Any, + valueBytes: Array[Byte], + cluster: Cluster): Int = partitionMode match { + case DefaultPartitioner => + DefaultPartitioner + .partition(topic, key, keyBytes, value, valueBytes, cluster) + case ExactPartition(partition) => partition + } - override def configure(configs: util.Map[String, _]): Unit = Unit - } + override def close(): Unit = () + override def configure(configs: util.Map[String, _]): Unit = () } - -