diff --git a/extensions/spark/kyuubi-spark-connector-yarn/pom.xml b/extensions/spark/kyuubi-spark-connector-yarn/pom.xml new file mode 100644 index 00000000000..24020a6d071 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/pom.xml @@ -0,0 +1,275 @@ + + + + 4.0.0 + + org.apache.kyuubi + kyuubi-parent + 1.11.0-SNAPSHOT + ../../../pom.xml + + + kyuubi-spark-connector-yarn_${scala.binary.version} + jar + Kyuubi Spark Hadoop YARN Connector + https://kyuubi.apache.org/ + + + + org.apache.kyuubi + kyuubi-spark-connector-common_${scala.binary.version} + ${project.version} + + + + org.apache.kyuubi + kyuubi-spark-connector-common_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.scala-lang + scala-library + provided + + + + org.slf4j + slf4j-api + provided + + + + org.apache.spark + spark-sql_${scala.binary.version} + provided + + + + com.google.guava + guava + + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + test + + + + org.apache.spark + spark-catalyst_${scala.binary.version} + test-jar + test + + + + org.scalatestplus + scalacheck-1-17_${scala.binary.version} + test + + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + test-jar + test + + + + org.apache.kyuubi + kyuubi-server_${scala.binary.version} + ${project.version} + test + + + + org.apache.kyuubi + kyuubi-server_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.kyuubi + kyuubi-common_${scala.binary.version} + ${project.version} + test + + + + org.apache.kyuubi + kyuubi-common_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.hadoop + hadoop-client-api + provided + + + + org.apache.hadoop + hadoop-client-runtime + test + + + + org.apache.hadoop + hadoop-client-minicluster + test + + + + javax.servlet + javax.servlet-api + test + + + + jakarta.servlet + jakarta.servlet-api + test + + + + org.bouncycastle + bcprov-jdk18on + test + + + + org.bouncycastle + bcpkix-jdk18on + test + + + + jakarta.activation + jakarta.activation-api + test + + + + jakarta.xml.bind + jakarta.xml.bind-api + test + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + true + + + + + org.apache.maven.plugins + maven-shade-plugin + + false + + + com.google.guava:guava + org.apache.kyuubi:* + + + + + com.google.common + ${kyuubi.shade.packageName}.com.google.common + + com.google.common.** + + + + + + + + shade + + package + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + + test-jar + + test-compile + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + + + cross-version-test + + + + org.apache.maven.plugins + maven-clean-plugin + + true + + + target/scala-${scala.binary.version}/classes + **/*.* + + + + + + clean target/scala-${scala.binary.version}/classes + + clean + + process-test-classes + + + + + + + + diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/BasicScan.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/BasicScan.scala new file mode 100644 index 00000000000..75407211de3 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/BasicScan.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import scala.jdk.CollectionConverters.iterableAsScalaIterableConverter + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read._ + +trait BasicScan + extends ScanBuilder + with Scan with Batch with Serializable { + + protected val hadoopConfMap: Map[String, String] = SparkSession.active.sparkContext + .hadoopConfiguration.asScala.map(kv => (kv.getKey, kv.getValue)).toMap + + override def toBatch: Batch = this + + override def build(): Scan = this +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartition.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartition.scala new file mode 100644 index 00000000000..8c806eb9c28 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartition.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.sources.Filter + +case class YarnAppPartition(hadoopConfMap: Map[String, String], filters: Array[Filter]) + extends InputPartition diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartitionReader.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartitionReader.scala new file mode 100644 index 00000000000..62bce64d46d --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppPartitionReader.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.jdk.CollectionConverters.asScalaBufferConverter + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, YarnApplicationState} +import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.sql.sources.{EqualTo, Filter, In} +import org.apache.spark.unsafe.types.UTF8String + +class YarnAppPartitionReader(yarnAppPartition: YarnAppPartition) + extends PartitionReader[InternalRow] with Logging { + + private val validYarnStateSet = + Set("NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING", "FINISHED", "FAILED", "KILLED") + + private val appIterator = fetchApp().iterator + + override def next(): Boolean = appIterator.hasNext + + override def get(): InternalRow = { + val app = appIterator.next() + new GenericInternalRow(Array[Any]( + UTF8String.fromString(app.id), + UTF8String.fromString(app.appType), + UTF8String.fromString(app.user), + UTF8String.fromString(app.name), + UTF8String.fromString(app.state), + UTF8String.fromString(app.queue), + UTF8String.fromString(app.attemptId), + app.submitTime, + app.launchTime, + app.startTime, + app.finishTime, + UTF8String.fromString(app.trackingUrl), + UTF8String.fromString(app.originalTrackingUrl))) + } + + override def close(): Unit = {} + + private def fetchApp(): mutable.Seq[YarnApplication] = { + val hadoopConf = new Configuration() + yarnAppPartition.hadoopConfMap.foreach(kv => hadoopConf.set(kv._1, kv._2)) + val yarnClient = YarnClient.createYarnClient() + yarnClient.init(hadoopConf) + yarnClient.start() + // fet apps + val applicationReports: java.util.List[ApplicationReport] = + yarnAppPartition.filters match { + case filters if filters.isEmpty => yarnClient.getApplications + // id => point query + // state => batch query + // type => in (a,b,c), batch query + case filters => + filters.collectFirst { + case EqualTo("id", appId: String) => java.util.Collections.singletonList( + yarnClient.getApplicationReport(ApplicationId.fromString(appId))) + case EqualTo("state", state: String) => + state match { + case validState if validYarnStateSet.contains(validState) => + yarnClient.getApplications( + java.util.EnumSet.of(YarnApplicationState.valueOf(validState))) + case _ => Seq.empty[ApplicationReport].asJava + } + case EqualTo("type", appType: String) => + yarnClient.getApplications(java.util.Collections.singleton(appType)) + case In("state", states: Array[Any]) => yarnClient.getApplications( + java.util.EnumSet.copyOf(states + .map(x => x.toString) + .filter(x => validYarnStateSet.contains(x)) + .map(x => + YarnApplicationState.valueOf(x)).toList.asJava)) + case In("type", types: Array[Any]) => yarnClient.getApplications( + types.map(x => x.toString).toSet.asJava) + case _ => yarnClient.getApplications() + }.get + } + val appSeq = applicationReports.asScala.filter(app => + yarnAppPartition.filters + .forall(filter => maybeFilter(app, filter))) + .map(app => { + YarnApplication( + id = app.getApplicationId.toString, + appType = app.getApplicationType, + user = app.getUser, + name = app.getName, + state = app.getYarnApplicationState.name, + queue = app.getQueue, + attemptId = app.getCurrentApplicationAttemptId.toString, + submitTime = app.getSubmitTime, + launchTime = app.getLaunchTime, + startTime = app.getStartTime, + finishTime = app.getFinishTime, + trackingUrl = app.getTrackingUrl, + originalTrackingUrl = app.getOriginalTrackingUrl) + }) + yarnClient.close() + appSeq + } + + private def maybeFilter(app: ApplicationReport, filter: Filter): Boolean = { + filter match { + case EqualTo("id", appId: String) => app.getApplicationId.toString.equals(appId) + case EqualTo("state", appState: String) => app.getYarnApplicationState.name().equals(appState) + case EqualTo("type", appType: String) => app.getApplicationType.equals(appType) + case In("state", states) => states.map(x => x.toString) + .contains(app.getYarnApplicationState.name()) + case In("type", types) => types.map(x => x.toString) + .contains(app.getApplicationType) + case _ => false + } + } +} + +// Helper class to represent app +case class YarnApplication( + id: String, + appType: String, + user: String, + name: String, + state: String, + queue: String, + attemptId: String, + submitTime: Long, + launchTime: Long, + startTime: Long, + finishTime: Long, + trackingUrl: String, + originalTrackingUrl: String) diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppReaderFactory.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppReaderFactory.scala new file mode 100644 index 00000000000..edb9cbf646a --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppReaderFactory.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} + +class YarnAppReaderFactory extends PartitionReaderFactory { + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val yarnAppPartition = partition.asInstanceOf[YarnAppPartition] + new YarnAppPartitionReader(yarnAppPartition) + } +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScan.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScan.scala new file mode 100644 index 00000000000..faa83e3cc39 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScan.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class YarnAppScan(options: CaseInsensitiveStringMap, schema: StructType, pushed: Array[Filter]) + extends BasicScan { + + // override def toBatch: Batch = this + + override def readSchema(): StructType = schema + + override def planInputPartitions(): Array[InputPartition] = { + Array(YarnAppPartition( + hadoopConfMap, + pushed)) + } + + override def createReaderFactory(): PartitionReaderFactory = + new YarnAppReaderFactory + + // override def build(): Scan = this +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScanBuilder.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScanBuilder.scala new file mode 100644 index 00000000000..0e52995ca4a --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppScanBuilder.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.sources.{EqualTo, Filter, In} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class YarnAppScanBuilder(options: CaseInsensitiveStringMap, schema: StructType) + extends ScanBuilder { + + private var pushed: Array[Filter] = Array.empty + + def pushedFilters(): Array[Filter] = pushed + + def build(): Scan = YarnAppScan(options, schema, pushed) + + def pushFilters(filters: Array[Filter]): Array[Filter] = { + val (supportedFilter, unsupportedFilter) = filters.partition { + case filter: EqualTo => + filter match { + case EqualTo("id", _) => true + case EqualTo("state", _) => true + case EqualTo("type", _) => true + case _ => false + } + case filter: In => + filter match { + case In("state", _) => true + case In("type", _) => true + case _ => false + } + case _ => false + } + pushed = supportedFilter + unsupportedFilter + } +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnApplicationTable.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnApplicationTable.scala new file mode 100644 index 00000000000..7df2a2468a0 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnApplicationTable.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.spark.connector.yarn + +import java.util + +import scala.jdk.CollectionConverters.setAsJavaSetConverter + +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class YarnAppTable extends Table with SupportsRead { + override def name(): String = "apps" + + override def schema(): StructType = + new StructType(Array( + StructField("id", StringType, nullable = false), + StructField("type", StringType, nullable = false), + StructField("user", StringType, nullable = false), + StructField("name", StringType, nullable = false), + StructField("state", StringType, nullable = false), + StructField("queue", StringType, nullable = false), + StructField("attempt_id", StringType, nullable = false), + StructField("submit_time", LongType, nullable = false), + StructField("launch_time", LongType, nullable = false), + StructField("start_time", LongType, nullable = false), + StructField("finish_time", LongType, nullable = false), + StructField("tracking_url", StringType, nullable = false), + StructField("original_tracking_url", StringType, nullable = false))) + + override def capabilities(): util.Set[TableCapability] = + Set(TableCapability.BATCH_READ).asJava + + override def newScanBuilder(caseInsensitiveStringMap: CaseInsensitiveStringMap): ScanBuilder = + YarnAppScanBuilder( + caseInsensitiveStringMap, + schema()) +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnCatalog.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnCatalog.scala new file mode 100644 index 00000000000..2c68bda0548 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnCatalog.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import java.util + +import scala.jdk.CollectionConverters.mapAsJavaMapConverter + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException} +import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class YarnCatalog extends TableCatalog with Logging { + private var catalogName: String = _ + + override def initialize( + name: String, + caseInsensitiveStringMap: CaseInsensitiveStringMap): Unit = { + catalogName = name + } + + override def listTables(namespace: Array[String]): Array[Identifier] = { + Array(Identifier.of(namespace, "app_logs"), Identifier.of(namespace, "apps")) + } + + override def loadTable(identifier: Identifier): Table = identifier.name match { + case "app_logs" => new YarnLogTable + case "apps" => new YarnApplicationTable + case _ => throw new NoSuchTableException(s"${identifier.name}") + + } + + override def createTable( + identifier: Identifier, + structType: StructType, + transforms: Array[Transform], + map: util.Map[String, String]): Table = { + throw new UnsupportedOperationException(s"The tables in catalog " + + s"${catalogName} does not support CREATE TABLE") + } + + override def alterTable(identifier: Identifier, tableChanges: TableChange*): Table = { + throw new UnsupportedOperationException(s"The tables in catalog " + + s"${catalogName} does not support ALTER TABLE") + } + + override def dropTable(identifier: Identifier): Boolean = { + throw new UnsupportedOperationException(s"The tables in catalog " + + s"${catalogName} does not support DROP TABLE") + } + + override def renameTable(identifier: Identifier, identifier1: Identifier): Unit = { + throw new UnsupportedOperationException(s"The tables in catalog " + + s"${catalogName} does not support RENAME TABLE") + } + + override def name(): String = this.catalogName +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartition.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartition.scala new file mode 100644 index 00000000000..918d774ab00 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartition.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.sources.Filter + +case class YarnLogPartition( + hadoopConfMap: Map[String, String], + logPath: String, + remoteAppLogDir: String, + filters: Array[Filter]) + extends InputPartition diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartitionReader.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartitionReader.scala new file mode 100644 index 00000000000..3825888e0ff --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogPartitionReader.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import java.io.{BufferedReader, InputStreamReader} + +import scala.collection.mutable.ArrayBuffer +import scala.util.matching.Regex + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.IOUtils +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.sql.sources.{EqualTo, Filter} +import org.apache.spark.unsafe.types.UTF8String + +class YarnLogPartitionReader(yarnLogPartition: YarnLogPartition) + extends PartitionReader[InternalRow] { + + private val logIterator = fetchLog().iterator + + override def next(): Boolean = logIterator.hasNext + + override def get(): InternalRow = { + val yarnLog = logIterator.next() + new GenericInternalRow(Array[Any]( + UTF8String.fromString(yarnLog.appId), + UTF8String.fromString(yarnLog.user), + UTF8String.fromString(yarnLog.host), + UTF8String.fromString(yarnLog.containerId), + yarnLog.lineNumber, + UTF8String.fromString(yarnLog.fileName), + UTF8String.fromString(yarnLog.message))) + } + + override def close(): Unit = {} + + /** + * fetch log + * * /tmp/logs/xxx/bucket-xxx-tfile/0001/application_1734531705578_0001/localhost_32422 + * * /tmp/logs/xxx/bucket-logs-tfile/0001/application_1734530210878_0001/localhost_24232 + * + * @param logStatus + * @param user + * @param containerId + * @param applicationId + */ + private def fetchLog(): Seq[LogEntry] = { + val logDirInReg = yarnLogPartition.remoteAppLogDir match { + // in case of /tmp/logs/, /tmp/logs// + case dir if dir.endsWith("/") => + val tmpDir = dir.replaceAll("/+", "/") + tmpDir.substring(0, tmpDir.length - 1).replace("/", "\\/") + // in case of /tmp/logs + case dir => dir.replace("/", "\\/") + } + val pathPattern: Regex = + s".*${logDirInReg}/(.*?)/.*?/(application_.*?)/(.+)_(\\d+)".r + yarnLogPartition.logPath match { + case pathPattern(user, applicationId, containerHost, containerSuffix) => + val path = new Path(yarnLogPartition.logPath) + val hadoopConf = new Configuration() + yarnLogPartition.hadoopConfMap.foreach(kv => hadoopConf.set(kv._1, kv._2)) + val fs = path.getFileSystem(hadoopConf) + val inputStream = fs.open(path) + val reader = new BufferedReader(new InputStreamReader(inputStream)) + var line: String = null + var lineNumber: Int = 0 + val logEntries = new ArrayBuffer[LogEntry]() + try { + while ({ + line = reader.readLine() + line != null + }) { + lineNumber += 1 + logEntries += LogEntry( + applicationId, + user, + s"${containerHost}_${containerSuffix}", + containerHost, + lineNumber, + path.toUri.getPath, + line) + } + logEntries.filter(entry => + yarnLogPartition.filters.forall(filter => + maybeFilter(entry, filter))) + } finally { + IOUtils.closeStream(inputStream) + reader.close() + } + case _ => Seq.empty + } + } + + private def maybeFilter(entry: LogEntry, filter: Filter): Boolean = { + filter match { + case EqualTo("app_id", appId: String) => entry.appId.equals(appId) + case EqualTo("container_id", containerId: String) => entry.containerId.equals(containerId) + case EqualTo("user", user: String) => entry.user.equals(user) + case _ => false + } + } +} + +// Helper class to represent log entries +case class LogEntry( + appId: String, + user: String, + containerId: String, + host: String, + lineNumber: Int, + fileName: String, + message: String) diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogReaderFactory.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogReaderFactory.scala new file mode 100644 index 00000000000..0b21a381fa5 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogReaderFactory.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} + +class YarnLogReaderFactory extends PartitionReaderFactory { + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val yarnPartition = partition.asInstanceOf[YarnLogPartition] + new YarnLogPartitionReader(yarnPartition) + } +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScan.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScan.scala new file mode 100644 index 00000000000..8906784636f --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScan.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.sources.{EqualTo, Filter} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class YarnLogScan( + options: CaseInsensitiveStringMap, + schema: StructType, + filters: Array[Filter]) + extends BasicScan { + override def readSchema(): StructType = schema + + private val remoteAppLogDirKeyInYarnSite = "yarn.nodemanager.remote-app-log-dir" + private val remoteAppLogDirKey = "spark.sql.catalog.yarn.log.dir" + + private val remoteAppLogDir = { + val dir = SparkSession.active.sparkContext + .getConf.getOption(remoteAppLogDirKey) match { + case Some(dir) => Some(dir) + case _ => hadoopConfMap.get(remoteAppLogDirKeyInYarnSite) + } + if (dir.isEmpty) { + throw new IllegalArgumentException( + s"remoteAppLogDir should be set with ${remoteAppLogDirKey} or set with " + + s"${remoteAppLogDirKeyInYarnSite} in yarn-site.xml") + } + dir.get + } + + // given a path in hdfs, then get all files under it, supports * + private def listFiles(pathStr: String): mutable.Seq[FileStatus] = { + val hadoopConf = new Configuration() + hadoopConfMap.foreach(kv => hadoopConf.set(kv._1, kv._2)) + val fs = FileSystem.get(hadoopConf) + val path = new Path(pathStr) + val logFiles = mutable.ArrayBuffer[FileStatus]() + val fileStatuses: Array[FileStatus] = fs.globStatus(path) + if (fileStatuses != null && fileStatuses.nonEmpty) { + fileStatuses.foreach { + case status if status.isFile => logFiles += status + case status if status.isDirectory => + val fileIterator = fs.listFiles(status.getPath, true) + while (fileIterator.hasNext) { + val fileStatus = fileIterator.next() + if (fileStatus.isFile) logFiles += fileStatus + } + } + } + fs.close() + logFiles + } + + /** + * pushdown equalTo + * hadoop3: + * /tmp/logs/xxx/bucket-xxx-tfile/0001/application_1734531705578_0001/localhost_32422 + * /tmp/logs/xxx/bucket-logs-tfile/0001/application_1734530210878_0001/localhost_24232 + * + * @return + */ + private def tryPushDownPredicates(): mutable.Seq[FileStatus] = { + filters match { + case pushed if pushed.isEmpty => listFiles(remoteAppLogDir) + case pushed => pushed.collectFirst { + case EqualTo("app_id", appId: String) => + listFiles(s"${remoteAppLogDir}/*/*/*/${appId}") ++ + // compatible for hadoop2 + listFiles(s"${remoteAppLogDir}/*/*/${appId}") + case EqualTo("container_id", containerId: String) => + listFiles(s"${remoteAppLogDir}/*/*/*/*/${containerId}") ++ + // compatible for hadoop2 + listFiles(s"${remoteAppLogDir}/*/*/*/${containerId}") + case EqualTo("user", user: String) => listFiles(s"${remoteAppLogDir}/${user}") + case _ => listFiles(remoteAppLogDir) + }.get + } + } + + override def planInputPartitions(): Array[InputPartition] = { + // get file nums and construct nums inputPartition + tryPushDownPredicates().map(fileStatus => { + YarnLogPartition(hadoopConfMap, fileStatus.getPath.toString, remoteAppLogDir, filters) + }).toArray + } + + override def createReaderFactory(): PartitionReaderFactory = + new YarnLogReaderFactory + +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScanBuilder.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScanBuilder.scala new file mode 100644 index 00000000000..6698a2c1520 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogScanBuilder.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.sources.{EqualTo, Filter} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class YarnLogScanBuilder(options: CaseInsensitiveStringMap, schema: StructType) + extends ScanBuilder { + + private var pushed: Array[Filter] = Array.empty + + def pushedFilters(): Array[Filter] = pushed + + override def build(): Scan = YarnAppScan(options, schema, pushed) + + def pushFilters(filters: Array[Filter]): Array[Filter] = { + val (supportedFilter, unsupportedFilter) = filters.partition { + case filter: EqualTo => + filter match { + case EqualTo("app_id", _) => true + case EqualTo("user", _) => true + case EqualTo("container_id", _) => true + case _ => false + } + case _ => false + } + pushed = supportedFilter + unsupportedFilter + } +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogTable.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogTable.scala new file mode 100644 index 00000000000..45607e2074b --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogTable.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.spark.connector.yarn + +import java.util + +import scala.jdk.CollectionConverters.setAsJavaSetConverter + +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class YarnLogTable extends Table with SupportsRead { + override def name(): String = "app_logs" + + override def schema(): StructType = + new StructType(Array( + StructField("app_id", StringType, nullable = false), + StructField("user", StringType, nullable = false), + StructField("host", StringType, nullable = false), + StructField("container_id", StringType, nullable = false), + StructField("line_num", IntegerType, nullable = false), + StructField("file_name", StringType, nullable = false), + StructField("message", StringType, nullable = true))) + + override def capabilities(): util.Set[TableCapability] = + Set(TableCapability.BATCH_READ).asJava + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = + YarnLogScanBuilder(options,schema()) +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/SparkYarnConnectorWithYarn.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/SparkYarnConnectorWithYarn.scala new file mode 100644 index 00000000000..f32a2ab463c --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/SparkYarnConnectorWithYarn.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +trait SparkYarnConnectorWithYarn extends WithKyuubiServerAndYarnMiniCluster { + protected val CONF_DIR: String = "tmp/hadoop-conf" + + override def beforeAll(): Unit = { + super.beforeAll() + // mock app submit + submitMockTasksInParallelTreeTimes() + // log all conf + miniHdfsService.getHadoopConf.forEach(kv => + info(s"mini hdfs conf ${kv.getKey}: ${kv.getValue}")) + miniYarnService.getYarnConf.forEach(kv => + info(s"mini hdfs conf ${kv.getKey}: ${kv.getValue}")) + } + + override def afterAll(): Unit = { + super.afterAll() + } +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/WithKyuubiServerAndYarnMiniCluster.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/WithKyuubiServerAndYarnMiniCluster.scala new file mode 100644 index 00000000000..2e53eea1caf --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/WithKyuubiServerAndYarnMiniCluster.scala @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import java.io.{File, FileWriter} +import java.util.Collections + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.records.{ApplicationSubmissionContext, ContainerLaunchContext, Resource, YarnApplicationState} +import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.Records + +import org.apache.kyuubi.{KyuubiFunSuite, Utils, WithKyuubiServer} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX +import org.apache.kyuubi.server.{MiniDFSService, MiniYarnService} +import org.apache.kyuubi.util.JavaUtils + +trait WithKyuubiServerAndYarnMiniCluster extends KyuubiFunSuite with WithKyuubiServer { + + private val taskTypeSet: Set[String] = Set("TYPE_1", "TYPE_2", "TYPE_3") + + override protected val conf: KyuubiConf = new KyuubiConf(false) + + val kyuubiHome: String = JavaUtils.getCodeSourceLocation(getClass).split("extensions").head + + protected var miniHdfsService: MiniDFSService = _ + + protected var miniYarnService: MiniYarnService = _ + + protected var hdfsServiceUrl: String = _ + + protected var hdfsConf: Configuration = _ + + protected val yarnConf: YarnConfiguration = { + val yarnConfig = new YarnConfiguration() + + // configurations copied from org.apache.flink.yarn.YarnTestBase + yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32) + yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096) + + yarnConfig.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true) + yarnConfig.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5) + yarnConfig.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 20) + yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4) + yarnConfig.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600) + yarnConfig.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false) + // memory is overwritten in the MiniYARNCluster. + // so we have to change the number of cores for testing. + yarnConfig.setInt(YarnConfiguration.NM_VCORES, 666) + yarnConfig.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 99.0f) + yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 1000) + yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 5000) + + // capacity-scheduler.xml is missing in hadoop-client-minicluster so this is a workaround + yarnConfig.set("yarn.scheduler.capacity.root.queues", "default,four_cores_queue") + + yarnConfig.setInt("yarn.scheduler.capacity.root.default.capacity", 100) + yarnConfig.setFloat("yarn.scheduler.capacity.root.default.user-limit-factor", 1) + yarnConfig.setInt("yarn.scheduler.capacity.root.default.maximum-capacity", 100) + yarnConfig.set("yarn.scheduler.capacity.root.default.state", "RUNNING") + yarnConfig.set("yarn.scheduler.capacity.root.default.acl_submit_applications", "*") + yarnConfig.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*") + + yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-capacity", 100) + yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-applications", 10) + yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-allocation-vcores", 4) + yarnConfig.setFloat("yarn.scheduler.capacity.root.four_cores_queue.user-limit-factor", 1) + yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_submit_applications", "*") + yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_administer_queue", "*") + + yarnConfig.setInt("yarn.scheduler.capacity.node-locality-delay", -1) + // Set bind host to localhost to avoid java.net.BindException + yarnConfig.set(YarnConfiguration.RM_BIND_HOST, "localhost") + yarnConfig.set(YarnConfiguration.NM_BIND_HOST, "localhost") + + // Configure YARN log aggregation + yarnConfig.set("yarn.nodemanager.remote-app-log-dir", "/tmp/logs") + yarnConfig.set("yarn.nodemanager.remote-app-log-dir-suffix", "xxx") + yarnConfig.set("yarn.log-aggregation-enable", "true") + yarnConfig.set("yarn.log-aggregation.retain-seconds", "3600") + yarnConfig.set("yarn.log-aggregation.retain-check-interval-seconds", "300") + + yarnConfig + } + override def beforeAll(): Unit = { + hdfsConf = new Configuration() + // before HADOOP-18206 (3.4.0), HDFS MetricsLogger strongly depends on + // commons-logging, we should disable it explicitly, otherwise, it throws + // ClassNotFound: org.apache.commons.logging.impl.Log4JLogger + hdfsConf.set("dfs.namenode.metrics.logger.period.seconds", "0") + hdfsConf.set("dfs.datanode.metrics.logger.period.seconds", "0") + + miniHdfsService = new MiniDFSService(hdfsConf) + miniHdfsService.initialize(conf) + miniHdfsService.start() + + hdfsServiceUrl = s"hdfs://localhost:${miniHdfsService.getDFSPort}" + + // add some hdfs conf in yarn conf + hdfsConf.set("fs.defaultFS", hdfsServiceUrl) + yarnConf.set("fs.defaultFS", hdfsServiceUrl) + yarnConf.addResource(miniHdfsService.getHadoopConf) + + val cp = System.getProperty("java.class.path") + // exclude kyuubi flink engine jar that has SPI for EmbeddedExecutorFactory + // which can't be initialized on the client side + val hadoopJars = cp.split(":").filter(s => !s.contains("flink")) + val hadoopClasspath = hadoopJars.mkString(":") + yarnConf.set("yarn.application.classpath", hadoopClasspath) + + miniYarnService = new MiniYarnService() + miniYarnService.setYarnConf(yarnConf) + miniYarnService.initialize(conf) + miniYarnService.start() + + val hadoopConfDir = Utils.createTempDir().toFile + val writer = new FileWriter(new File(hadoopConfDir, "core-site.xml")) + yarnConf.writeXml(writer) + writer.close() + + conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.KYUUBI_HOME", kyuubiHome) + conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CLASSPATH", hadoopClasspath) + conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath) + conf.set(s"flink.containerized.master.env.HADOOP_CLASSPATH", hadoopClasspath) + conf.set(s"flink.containerized.master.env.HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath) + conf.set(s"flink.containerized.taskmanager.env.HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath) + + super.beforeAll() + } + + def submitMockTaskOnYarn(): Unit = { + // Initialize YarnClient + val yarnClient = YarnClient.createYarnClient() + yarnClient.init(yarnConf) + yarnClient.start() + // Create a simple application submission context + val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) + val applicationId = yarnClient.createApplication() + .getApplicationSubmissionContext.getApplicationId + appContext.setApplicationId(applicationId) + appContext.setApplicationName("TestApp") + // use random pickup + appContext.setApplicationType(taskTypeSet.toSeq(Random.nextInt(taskTypeSet.size))) + + // Set up container launch context (e.g., commands to execute) + val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) + val commands = Collections.singletonList("echo Hello, MiniYARNCluster! && sleep 5 lscd") + amContainer.setCommands(commands) + + // Application Master resource requirements + val capability = Records.newRecord(classOf[Resource]) + capability.setMemorySize(128) + capability.setVirtualCores(1) + + appContext.setResource(capability) + appContext.setAMContainerSpec(amContainer) + + // Submit the application + yarnClient.submitApplication(appContext) + info(s"Application ${applicationId} submitted successfully.") + + // Wait for application to complete + var appReport = yarnClient.getApplicationReport(applicationId) + while (appReport.getYarnApplicationState != YarnApplicationState.FINISHED && + appReport.getYarnApplicationState != YarnApplicationState.FAILED && + appReport.getYarnApplicationState != YarnApplicationState.KILLED) { + info(s"Application State: ${appReport.getYarnApplicationState}") + Thread.sleep(1000) + appReport = yarnClient.getApplicationReport(applicationId) + } + + info(s"Final Application State: ${appReport.getYarnApplicationState}") + + // Clean up + yarnClient.stop() + } + + def submitMockTasksInParallelTreeTimes(): Unit = { + val threads = (1 to 5).map { i => + new Thread(() => { + info(s"Starting submission in thread $i") + try { + submitMockTaskOnYarn() + } catch { + case e: Exception => + error(s"Error in thread $i: ${e.getMessage}", e) + } + info(s"Finished submission in thread $i") + }) + } + + threads.foreach(_.start()) + threads.foreach(_.join()) + } + + override def afterAll(): Unit = { + super.afterAll() + if (miniYarnService != null) { + miniYarnService.stop() + miniYarnService = null + } + if (miniHdfsService != null) { + miniHdfsService.stop() + miniHdfsService = null + } + } +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppQuerySuite.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppQuerySuite.scala new file mode 100644 index 00000000000..6dfe4402885 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnAppQuerySuite.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import scala.jdk.CollectionConverters.asScalaBufferConverter + +import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema + +import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession + +class YarnAppQuerySuite extends SparkYarnConnectorWithYarn { + test("query apps") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + miniHdfsService.getHadoopConf.forEach(kv => { + if (kv.getKey.startsWith("fs")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + miniYarnService.getYarnConf.forEach(kv => { + if (kv.getKey.startsWith("yarn")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + spark.sql("USE yarn") + val appCnt = spark.sql("select count(1) from yarn.default.apps") + .collect().head.asInstanceOf[GenericRowWithSchema].getLong(0) + assert(appCnt >= 0L) + } + } + + test("query app with appId") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + miniHdfsService.getHadoopConf.forEach(kv => { + if (kv.getKey.startsWith("fs")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + miniYarnService.getYarnConf.forEach(kv => { + if (kv.getKey.startsWith("yarn")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + spark.sql("USE yarn") + val yarnClient = YarnClient.createYarnClient() + yarnClient.init(yarnConf) + yarnClient.start() + yarnClient.getApplications.forEach(app => { + val appId = app.getApplicationId.toString + val queryCnt = spark.sql("select count(1) from yarn.default.apps " + + s"where id = '${appId}'").collect().head.getLong(0) + assert(queryCnt == 1) + val queryApps = spark.sql(s"select * from yarn.default.apps " + + s"where id = '${appId}'").collect() + .map(x => x.asInstanceOf[GenericRowWithSchema]) + .map(x => + YarnApplication( + id = x.getString(0), + appType = x.getString(1), + user = x.getString(2), + name = x.getString(3), + state = x.getString(4), + queue = x.getString(5), + attemptId = x.getString(6), + submitTime = x.getLong(7), + launchTime = x.getLong(8), + startTime = x.getLong(9), + finishTime = x.getLong(10), + trackingUrl = x.getString(11), + originalTrackingUrl = x.getString(12))) + val queryApp = queryApps.head + assert(queryApp.appType == app.getApplicationType) + assert(queryApp.user == app.getUser) + assert(queryApp.name == app.getName) + assert(queryApp.state == app.getYarnApplicationState.name) + assert(queryApp.queue == app.getQueue) + assert(queryApp.attemptId == app.getCurrentApplicationAttemptId.toString) + assert(queryApp.submitTime == app.getSubmitTime) + assert(queryApp.launchTime == app.getLaunchTime) + assert(queryApp.startTime == app.getStartTime) + assert(queryApp.finishTime == app.getFinishTime) + assert(queryApp.trackingUrl == app.getTrackingUrl) + assert(queryApp.originalTrackingUrl == app.getOriginalTrackingUrl) + }) + yarnClient.close() + } + } + + test("query app with equalTo/in appState") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + miniHdfsService.getHadoopConf.forEach(kv => { + if (kv.getKey.startsWith("fs")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + miniYarnService.getYarnConf.forEach(kv => { + if (kv.getKey.startsWith("yarn")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + spark.sql("USE yarn") + val yarnClient = YarnClient.createYarnClient() + yarnClient.init(yarnConf) + yarnClient.start() + yarnClient.getApplications().asScala.map(x => x.getYarnApplicationState.toString) + .map(x => (x, 1)) + .groupBy(x => x._1) + .mapValues(values => values.map(_._2).sum) + .foreach(x => { + val appState = x._1 + val appCnt = x._2 + val queryCntWithEqualTo = spark.sql("select count(1) from yarn.default.apps " + + s"where state = '${appState}'").collect().head.getLong(0) + assert(queryCntWithEqualTo == appCnt) + val queryCntWithIn = spark.sql("select count(1) from yarn.default.apps " + + s"where state in ('${appState}')").collect().head.getLong(0) + assert(queryCntWithIn == appCnt) + }) + yarnClient.close() + } + } + + test("query app with equalTo/in appType") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + miniHdfsService.getHadoopConf.forEach(kv => { + if (kv.getKey.startsWith("fs")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + miniYarnService.getYarnConf.forEach(kv => { + if (kv.getKey.startsWith("yarn")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + spark.sql("USE yarn") + val yarnClient = YarnClient.createYarnClient() + yarnClient.init(yarnConf) + yarnClient.start() + yarnClient.getApplications().asScala.map(x => x.getApplicationType) + .map(x => (x, 1)) + .groupBy(x => x._1) + .mapValues(values => values.map(_._2).sum) + .foreach(x => { + val appType = x._1 + val appCnt = x._2 + val queryCntWithEqualTo = spark.sql("select count(1) from yarn.default.apps " + + s"where type = '${appType}'").collect().head.getLong(0) + assert(queryCntWithEqualTo == appCnt) + val queryCntWithIn = spark.sql("select count(1) from yarn.default.apps " + + s"where type in ('${appType}')").collect().head.getLong(0) + assert(queryCntWithIn == appCnt) + }) + yarnClient.close() + } + } +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnCatalogSuite.scala new file mode 100644 index 00000000000..0a97223e8e5 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnCatalogSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession + +class YarnCatalogSuite extends SparkYarnConnectorWithYarn { + test("get catalog name") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { _ => + val catalog = new YarnCatalog + val catalogName = "yarn" + catalog.initialize(catalogName, CaseInsensitiveStringMap.empty()) + assert(catalog.name() == catalogName) + } + } + + test("supports namespaces") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + spark.sql("USE yarn") + assert(spark.sql("SHOW DATABASES").collect().length == 1) + assert(spark.sql("SHOW NAMESPACES").collect().length == 1) + assert(spark.sql("SHOW DATABASES").collect().head.get(0) == "default") + } + } + + test("show tables") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + spark.sql("USE yarn") + assert(spark.sql("SHOW TABLES").collect().length == 2) + } + } + + test("nonexistent namespace") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + val namespace = "nonexistent_db" + val exception = intercept[NoSuchNamespaceException] { + spark.sql(s"show tables from yarn.${namespace}.apps") + } + assert(exception.message.contains(s"The schema `${namespace}` cannot be found") + || exception.message.contains("SCHEMA_NOT_FOUND")) + } + } + + test("nonexistent table") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + val exception = intercept[AnalysisException] { + spark.table("yarn.default.nonexistent_table") + } + assert(exception.message.contains("Table or view not found") + || exception.message.contains("TABLE_OR_VIEW_NOT_FOUND")) + } + } +} diff --git a/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogQuerySuite.scala b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogQuerySuite.scala new file mode 100644 index 00000000000..96c3b1dea14 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-yarn/src/test/scala/org/apache/kyuubi/spark/connector/yarn/YarnLogQuerySuite.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.yarn + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession + +import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession + +class YarnLogQuerySuite extends SparkYarnConnectorWithYarn { + test("query logs with host") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + // .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + miniHdfsService.getHadoopConf.forEach(kv => { + if (kv.getKey.startsWith("fs")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + miniYarnService.getYarnConf.forEach(kv => { + if (kv.getKey.startsWith("yarn")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + spark.sql("USE yarn") + val cnt = spark.sql( + "select count(1) from yarn.default.app_logs " + + "where (host='localhost' or host like '%host') and " + + "app_id like '%application%'").collect().head.getLong( + 0) + assert(cnt > 0) + } + } + + test("query logs") { + val sparkConf = new SparkConf() + .setMaster("local[*]") + // .set("spark.ui.enabled", "false") + .set("spark.sql.catalogImplementation", "in-memory") + .set("spark.sql.catalog.yarn", classOf[YarnCatalog].getName) + miniHdfsService.getHadoopConf.forEach(kv => { + if (kv.getKey.startsWith("fs")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + miniYarnService.getYarnConf.forEach(kv => { + if (kv.getKey.startsWith("yarn")) { + sparkConf.set(s"spark.hadoop.${kv.getKey}", kv.getValue) + } + }) + withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark => + spark.sql("USE yarn") + val rows = spark.sql( + "select * from yarn.default.app_logs where line_num = 10" + + " and host='localhost'" + + " limit 10").collect() + val host = rows.head.getString(2) + assert(host == "localhost") + } + } + +} diff --git a/pom.xml b/pom.xml index 73146363243..89376e4b4f3 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,7 @@ extensions/spark/kyuubi-spark-connector-common extensions/spark/kyuubi-spark-connector-tpcds extensions/spark/kyuubi-spark-connector-tpch + extensions/spark/kyuubi-spark-connector-yarn extensions/spark/kyuubi-spark-lineage extensions/spark/kyuubi-spark-jvm-quake externals/kyuubi-chat-engine