diff --git a/pom.xml b/pom.xml index 94e170149a7fd..7ceb76ab25110 100644 --- a/pom.xml +++ b/pom.xml @@ -132,7 +132,7 @@ org.apache.hive core - 2.3.10 + 4.0.1 3.9.0 @@ -1839,6 +1839,10 @@ ${hive.group} hive-storage-api + + org.apache.tez + tez-api + @@ -1991,6 +1995,14 @@ net.hydromatic aggdesigner-algorithm + + org.apache.atlas + atlas-intg + + + org.apache.hadoop + hadoop-yarn-registry + @@ -2294,6 +2306,10 @@ ${hive.group} hive-serde + + org.apache.hadoop + hadoop-common + org.slf4j slf4j-api @@ -2337,6 +2353,11 @@ + + org.apache.hive + hive-udf + ${hive.version} + org.apache.orc diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 39d8c39954410..9406d8026a91a 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -123,7 +123,11 @@ ${hive.group} - hive-storage-api + hive-common + + + ${hive.group} + hive-serde org.apache.parquet diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala index a04c2fcbbac12..93be706c78685 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritable.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.execution.datasources import java.io.{DataInput, DataOutput, IOException} -import java.sql.Date -import org.apache.hadoop.hive.serde2.io.DateWritable +import org.apache.hadoop.hive.common.`type`.Date +import org.apache.hadoop.hive.serde2.io.DateWritableV2 import org.apache.hadoop.io.WritableUtils import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseJulianToGregorianDays} @@ -38,16 +38,16 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulia class DaysWritable( var gregorianDays: Int, var julianDays: Int) - extends DateWritable { + extends DateWritableV2 { def this() = this(0, 0) def this(gregorianDays: Int) = this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays)) - def this(dateWritable: DateWritable) = { + def this(dateWritable: DateWritableV2) = { this( gregorianDays = dateWritable match { case daysWritable: DaysWritable => daysWritable.gregorianDays - case dateWritable: DateWritable => + case dateWritable: DateWritableV2 => rebaseJulianToGregorianDays(dateWritable.getDays) }, julianDays = dateWritable.getDays) @@ -55,10 +55,7 @@ class DaysWritable( override def getDays: Int = julianDays override def get: Date = { - new Date(DateWritable.daysToMillis(julianDays)) - } - override def get(doesTimeMatter: Boolean): Date = { - new Date(DateWritable.daysToMillis(julianDays, doesTimeMatter)) + Date.ofEpochMilli(DateWritableV2.daysToMillis(julianDays)) } override def set(d: Int): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala index 60c5b7a266c51..ec33af76afbd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument => OrcSearchArgument} import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator} -import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable} +import org.apache.hadoop.hive.serde2.io.{DateWritableV2, HiveDecimalWritable} import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.execution.datasources.DaysWritable @@ -38,7 +38,7 @@ private[sql] object OrcShimUtils { private[sql] type SearchArgument = OrcSearchArgument def getGregorianDays(value: Any): Int = { - new DaysWritable(value.asInstanceOf[DateWritable]).gregorianDays + new DaysWritable(value.asInstanceOf[DateWritableV2]).gregorianDays } def getDecimal(value: Any): Decimal = { @@ -46,7 +46,7 @@ private[sql] object OrcShimUtils { Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale()) } - def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = { + def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritableV2 = { if (reuseObj) { val result = new DaysWritable() (getter, ordinal) => diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java index 2bd6210f58c76..f4ea7e6c5e734 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HiveAuthFactory.java @@ -30,10 +30,10 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.thrift.DBTokenStore; -import org.apache.hadoop.hive.thrift.HiveDelegationTokenManager; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode; +import org.apache.hadoop.hive.metastore.security.DBTokenStore; +import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ProxyUsers; @@ -80,7 +80,7 @@ public String getAuthName() { private String authTypeStr; private final String transportMode; private final HiveConf conf; - private HiveDelegationTokenManager delegationTokenManager = null; + private MetastoreDelegationTokenManager delegationTokenManager = null; public static final String HS2_PROXY_USER = "hive.server2.proxy.user"; public static final String HS2_CLIENT_TOKEN = "hiveserver2ClientToken"; @@ -114,18 +114,19 @@ public HiveAuthFactory(HiveConf conf) throws TTransportException, IOException { authTypeStr = AuthTypes.NONE.getAuthName(); } if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName())) { - String principal = conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); String keytab = conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); + String principal = conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); + String client = conf.getVar(ConfVars.HIVE_SERVER2_CLIENT_KERBEROS_PRINCIPAL); if (needUgiLogin(UserGroupInformation.getCurrentUser(), SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytab)) { - saslServer = ShimLoader.getHadoopThriftAuthBridge().createServer(keytab, principal); + saslServer = HadoopThriftAuthBridge.getBridge().createServer(keytab, principal, client); } else { // Using the default constructor to avoid unnecessary UGI login. saslServer = new HadoopThriftAuthBridge.Server(); } // start delegation token manager - delegationTokenManager = new HiveDelegationTokenManager(); + delegationTokenManager = new MetastoreDelegationTokenManager(); try { // rawStore is only necessary for DBTokenStore Object rawStore = null; diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthUtils.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthUtils.java index e307bdab04498..7f90e5a2ff11a 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthUtils.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/HttpAuthUtils.java @@ -31,6 +31,7 @@ import javax.security.auth.Subject; import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; import org.apache.http.protocol.BasicHttpContext; @@ -68,8 +69,7 @@ public final class HttpAuthUtils { */ public static String getKerberosServiceTicket(String principal, String host, String serverHttpUrl, boolean assumeSubject) throws Exception { - String serverPrincipal = - ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal(principal, host); + String serverPrincipal = HadoopThriftAuthBridge.getBridge().getServerPrincipal(principal, host); if (assumeSubject) { // With this option, we're assuming that the external application, // using the JDBC driver has done a JAAS kerberos login already @@ -82,7 +82,7 @@ public static String getKerberosServiceTicket(String principal, String host, } else { // JAAS login from ticket cache to setup the client UserGroupInformation UserGroupInformation clientUGI = - ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf("kerberos"); + HadoopThriftAuthBridge.getBridge().getCurrentUGIWithConf("kerberos"); return clientUGI.doAs(new HttpKerberosClientAction(serverPrincipal, serverHttpUrl)); } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java index ef91f94eeec2b..4b05d72530dd7 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/KerberosSaslHelper.java @@ -21,8 +21,8 @@ import javax.security.sasl.SaslException; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge; -import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.rpc.thrift.TCLIService; import org.apache.hive.service.rpc.thrift.TCLIService.Iface; @@ -52,7 +52,7 @@ public static TTransport getKerberosTransport(String principal, String host, return createSubjectAssumedTransport(principal, underlyingTransport, saslProps); } else { HadoopThriftAuthBridge.Client authBridge = - ShimLoader.getHadoopThriftAuthBridge().createClientWithConf("kerberos"); + HadoopThriftAuthBridge.getBridge().createClientWithConf("kerberos"); return authBridge.createClientTransport(principal, host, "KERBEROS", null, underlyingTransport, saslProps); } @@ -77,7 +77,7 @@ public static TTransport createSubjectAssumedTransport(String principal, public static TTransport getTokenTransport(String tokenStr, String host, TTransport underlyingTransport, Map saslProps) throws SaslException { HadoopThriftAuthBridge.Client authBridge = - ShimLoader.getHadoopThriftAuthBridge().createClientWithConf("kerberos"); + HadoopThriftAuthBridge.getBridge().createClientWithConf("kerberos"); try { return authBridge.createClientTransport(null, host, "DIGEST", tokenStr, underlyingTransport, diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java index 815a369b6b237..4c4a3f60e5e7d 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java @@ -25,9 +25,9 @@ public abstract class ExecuteStatementOperation extends Operation { protected String statement = null; - public ExecuteStatementOperation(HiveSession parentSession, String statement, + public ExecuteStatementOperation(HiveSession parentSession, OperationManager operationManager, String statement, Map confOverlay, boolean runInBackground) { - super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT, runInBackground); + super(parentSession, operationManager, confOverlay, OperationType.EXECUTE_STATEMENT, runInBackground); this.statement = statement; } @@ -43,7 +43,7 @@ protected void registerCurrentOperationLog() { isOperationLogEnabled = false; return; } - OperationLog.setCurrentOperationLog(operationLog); + operationManager.setCurrentOperationLog(operationLog, operationLogFile); } } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index ef4bbb45e8f4e..e597d328635a6 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -33,8 +33,8 @@ public class GetCatalogsOperation extends MetadataOperation { protected final RowSet rowSet; - protected GetCatalogsOperation(HiveSession parentSession) { - super(parentSession, OperationType.GET_CATALOGS); + protected GetCatalogsOperation(HiveSession parentSession, OperationManager operationManager) { + super(parentSession, operationManager, OperationType.GET_CATALOGS); rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java index 250adc51f81e9..1cfee892adac5 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.ql.metadata.TableIterable; +import org.apache.hadoop.hive.metastore.TableIterable; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; @@ -119,9 +119,9 @@ public class GetColumnsOperation extends MetadataOperation { protected final RowSet rowSet; - protected GetColumnsOperation(HiveSession parentSession, String catalogName, String schemaName, - String tableName, String columnName) { - super(parentSession, OperationType.GET_COLUMNS); + protected GetColumnsOperation(HiveSession parentSession, OperationManager operationManager, + String catalogName, String schemaName, String tableName, String columnName) { + super(parentSession, operationManager, OperationType.GET_COLUMNS); this.catalogName = catalogName; this.schemaName = schemaName; this.tableName = tableName; diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java index 3a29859a20747..b21ba0d092716 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetCrossReferenceOperation.java @@ -99,10 +99,10 @@ public class GetCrossReferenceOperation extends MetadataOperation { private final String foreignTableName; private final RowSet rowSet; - public GetCrossReferenceOperation(HiveSession parentSession, + public GetCrossReferenceOperation(HiveSession parentSession, OperationManager operationManager, String parentCatalogName, String parentSchemaName, String parentTableName, String foreignCatalog, String foreignSchema, String foreignTable) { - super(parentSession, OperationType.GET_FUNCTIONS); + super(parentSession, operationManager, OperationType.GET_FUNCTIONS); this.parentCatalogName = parentCatalogName; this.parentSchemaName = parentSchemaName; this.parentTableName = parentTableName; diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index 3f02f753bf875..a78cd6fb08c45 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -66,9 +66,9 @@ public class GetFunctionsOperation extends MetadataOperation { protected final RowSet rowSet; - public GetFunctionsOperation(HiveSession parentSession, + public GetFunctionsOperation(HiveSession parentSession, OperationManager operationManager, String catalogName, String schemaName, String functionName) { - super(parentSession, OperationType.GET_FUNCTIONS); + super(parentSession, operationManager, OperationType.GET_FUNCTIONS); this.catalogName = catalogName; this.schemaName = schemaName; this.functionName = functionName; diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java index 9273283429744..9339865f29f6c 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetPrimaryKeysOperation.java @@ -61,9 +61,9 @@ public class GetPrimaryKeysOperation extends MetadataOperation { private final RowSet rowSet; - public GetPrimaryKeysOperation(HiveSession parentSession, + public GetPrimaryKeysOperation(HiveSession parentSession, OperationManager operationManager, String catalogName, String schemaName, String tableName) { - super(parentSession, OperationType.GET_FUNCTIONS); + super(parentSession, operationManager, OperationType.GET_FUNCTIONS); this.catalogName = catalogName; this.schemaName = schemaName; this.tableName = tableName; diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java index 865e264bd5f4f..a719085193223 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java @@ -44,9 +44,9 @@ public class GetSchemasOperation extends MetadataOperation { protected RowSet rowSet; - protected GetSchemasOperation(HiveSession parentSession, + protected GetSchemasOperation(HiveSession parentSession, OperationManager operationManager, String catalogName, String schemaName) { - super(parentSession, OperationType.GET_SCHEMAS); + super(parentSession, operationManager, OperationType.GET_SCHEMAS); this.catalogName = catalogName; this.schemaName = schemaName; this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index b75eaec5ff651..9009da795d1d3 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -43,8 +43,8 @@ public class GetTableTypesOperation extends MetadataOperation { protected final RowSet rowSet; private final TableTypeMapping tableTypeMapping; - protected GetTableTypesOperation(HiveSession parentSession) { - super(parentSession, OperationType.GET_TABLE_TYPES); + protected GetTableTypesOperation(HiveSession parentSession, OperationManager operationManager) { + super(parentSession, operationManager, OperationType.GET_TABLE_TYPES); String tableMappingStr = getParentSession().getHiveConf() .getVar(HiveConf.ConfVars.HIVE_SERVER2_TABLE_TYPE_MAPPING); tableTypeMapping = diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index bd9f0814814f1..c3bf9ffa0629f 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -66,10 +66,10 @@ public class GetTablesOperation extends MetadataOperation { .addStringColumn("REF_GENERATION", "Specifies how values in SELF_REFERENCING_COL_NAME are created."); - protected GetTablesOperation(HiveSession parentSession, + protected GetTablesOperation(HiveSession parentSession, OperationManager operationManager, String catalogName, String schemaName, String tableName, List tableTypes) { - super(parentSession, OperationType.GET_TABLES); + super(parentSession, operationManager, OperationType.GET_TABLES); this.catalogName = catalogName; this.schemaName = schemaName; this.tableName = tableName; diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java index ad692d46edd29..9cdff0635334c 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java @@ -76,8 +76,8 @@ public class GetTypeInfoOperation extends MetadataOperation { protected final RowSet rowSet; - protected GetTypeInfoOperation(HiveSession parentSession) { - super(parentSession, OperationType.GET_TYPE_INFO); + protected GetTypeInfoOperation(HiveSession parentSession, OperationManager operationManager) { + super(parentSession, operationManager, OperationType.GET_TYPE_INFO); rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/LogDivertAppender.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/LogDivertAppender.java index 32cc42f008bda..28023bea477bd 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/LogDivertAppender.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/LogDivertAppender.java @@ -16,7 +16,11 @@ */ package org.apache.hive.service.cli.operation; +import java.io.BufferedOutputStream; import java.io.CharArrayWriter; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; import java.io.Serializable; import java.util.Map; import java.util.regex.Pattern; @@ -180,7 +184,7 @@ public Result filter(org.apache.logging.log4j.core.Logger logger, Level level, M @Override public Result filter(LogEvent logEvent) { - OperationLog log = operationManager.getOperationLogByThread(); + OperationLog log = operationManager.getCurrentOperationLog(); boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE); if (log == null) { @@ -296,7 +300,7 @@ private LogDivertAppender(OperationManager operationManager, @Override public void append(LogEvent event) { - OperationLog log = operationManager.getOperationLogByThread(); + OperationLog log = operationManager.getCurrentOperationLog(); // Set current layout depending on the verbose/non-verbose mode. if (log != null) { @@ -318,6 +322,12 @@ public void append(LogEvent event) { LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); return; } - log.writeOperationLog(logOutput); + try { + PrintStream out = + new PrintStream(new FileOutputStream(operationManager.getCurrentOperationLogFile())); + out.print(logOutput); + } catch (FileNotFoundException e) { + LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); + } } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java index a818d1ecfd16e..a111285df6f5e 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/MetadataOperation.java @@ -42,8 +42,8 @@ public abstract class MetadataOperation extends Operation { protected static TableSchema RESULT_SET_SCHEMA; private static final char SEARCH_STRING_ESCAPE = '\\'; - protected MetadataOperation(HiveSession parentSession, OperationType opType) { - super(parentSession, opType); + protected MetadataOperation(HiveSession parentSession, OperationManager operationManager, OperationType opType) { + super(parentSession, operationManager, opType); setHasResultSet(true); } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java index f488a411c31f3..64cc6340d7cfb 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/Operation.java @@ -21,11 +21,13 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hive.service.cli.FetchOrientation; @@ -46,6 +48,7 @@ public abstract class Operation { protected final HiveSession parentSession; + protected final OperationManager operationManager; private OperationState state = OperationState.INITIALIZED; private final OperationHandle opHandle; private HiveConf configuration; @@ -56,6 +59,7 @@ public abstract class Operation { protected volatile HiveSQLException operationException; protected final boolean runAsync; protected volatile Future backgroundHandle; + protected File operationLogFile; protected OperationLog operationLog; protected boolean isOperationLogEnabled; protected Map confOverlay = new HashMap(); @@ -71,25 +75,29 @@ public abstract class Operation { FetchOrientation.FETCH_FIRST, FetchOrientation.FETCH_PRIOR); - protected Operation(HiveSession parentSession, OperationType opType) { - this(parentSession, null, opType); + protected Operation(HiveSession parentSession, OperationManager operationManager, OperationType opType) { + this(parentSession, operationManager, null, opType); } - protected Operation(HiveSession parentSession, Map confOverlay, + protected Operation(HiveSession parentSession, OperationManager operationManager, Map confOverlay, OperationType opType) { - this(parentSession, confOverlay, opType, false); + this(parentSession, operationManager, confOverlay, opType, false); } protected Operation(HiveSession parentSession, - Map confOverlay, OperationType opType, boolean runInBackground) { + OperationManager operationManager, Map confOverlay, OperationType opType, boolean runInBackground) { this.parentSession = parentSession; + this.operationManager = Objects.requireNonNull(operationManager); this.confOverlay = confOverlay; this.runAsync = runInBackground; this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); lastAccessTime = System.currentTimeMillis(); operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); - queryState = new QueryState(parentSession.getHiveConf(), confOverlay, runInBackground); + queryState = new QueryState.Builder() + .withHiveConf(parentSession.getHiveConf()) + .withConfOverlay(confOverlay) + .build(); } public Future getBackgroundHandle() { @@ -204,7 +212,7 @@ public boolean isFailed() { protected void createOperationLog() { if (parentSession.isOperationLogEnabled()) { - File operationLogFile = new File(parentSession.getOperationLogSessionDir(), + operationLogFile = new File(parentSession.getOperationLogSessionDir(), opHandle.getHandleIdentifier().toString()); isOperationLogEnabled = true; @@ -234,23 +242,16 @@ protected void createOperationLog() { } // create OperationLog object with above log file - try { - operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf()); - } catch (FileNotFoundException e) { - LOG.warn("Unable to instantiate OperationLog object for operation: {}", e, - MDC.of(LogKeys.OPERATION_HANDLE$.MODULE$, opHandle)); - isOperationLogEnabled = false; - return; - } + operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf()); // register this operationLog to current thread - OperationLog.setCurrentOperationLog(operationLog); + operationManager.setCurrentOperationLog(operationLog, operationLogFile); } } protected void unregisterOperationLog() { if (isOperationLogEnabled) { - OperationLog.removeCurrentOperationLog(); + operationManager.removeCurrentOperationLog(); } } @@ -336,11 +337,11 @@ protected void validateFetchOrientation(FetchOrientation orientation, } } - protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) { - HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(), - response.getSQLState(), response.getResponseCode()); - if (response.getException() != null) { - ex.initCause(response.getException()); + protected HiveSQLException toSQLException(String prefix, CommandProcessorException exception) { + HiveSQLException ex = new HiveSQLException(prefix + ": " + exception.getMessage(), + exception.getSqlState(), exception.getResponseCode()); + if (exception.getCause() != null) { + ex.initCause(exception.getCause()); } return ex; } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java index fd8266d1a9acc..f71cc50e7cffd 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -17,6 +17,7 @@ package org.apache.hive.service.cli.operation; +import java.io.File; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -56,6 +57,11 @@ public class OperationManager extends AbstractService { private final Map handleToOperation = new HashMap(); + private final ThreadLocal threadLocalOperationLog = + ThreadLocal.withInitial(() -> null); + + private final ThreadLocal threadLocalFile = ThreadLocal.withInitial(() -> null); + public OperationManager() { super(OperationManager.class.getSimpleName()); } @@ -97,20 +103,20 @@ public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parent } public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) { - GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession); + GetTypeInfoOperation operation = new GetTypeInfoOperation(parentSession, this); addOperation(operation); return operation; } public GetCatalogsOperation newGetCatalogsOperation(HiveSession parentSession) { - GetCatalogsOperation operation = new GetCatalogsOperation(parentSession); + GetCatalogsOperation operation = new GetCatalogsOperation(parentSession, this); addOperation(operation); return operation; } public GetSchemasOperation newGetSchemasOperation(HiveSession parentSession, String catalogName, String schemaName) { - GetSchemasOperation operation = new GetSchemasOperation(parentSession, catalogName, schemaName); + GetSchemasOperation operation = new GetSchemasOperation(parentSession, this, catalogName, schemaName); addOperation(operation); return operation; } @@ -119,20 +125,20 @@ public MetadataOperation newGetTablesOperation(HiveSession parentSession, String catalogName, String schemaName, String tableName, List tableTypes) { MetadataOperation operation = - new GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes); + new GetTablesOperation(parentSession, this, catalogName, schemaName, tableName, tableTypes); addOperation(operation); return operation; } public GetTableTypesOperation newGetTableTypesOperation(HiveSession parentSession) { - GetTableTypesOperation operation = new GetTableTypesOperation(parentSession); + GetTableTypesOperation operation = new GetTableTypesOperation(parentSession, this); addOperation(operation); return operation; } public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession, String catalogName, String schemaName, String tableName, String columnName) { - GetColumnsOperation operation = new GetColumnsOperation(parentSession, + GetColumnsOperation operation = new GetColumnsOperation(parentSession, this, catalogName, schemaName, tableName, columnName); addOperation(operation); return operation; @@ -140,7 +146,7 @@ public GetColumnsOperation newGetColumnsOperation(HiveSession parentSession, public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession, String catalogName, String schemaName, String functionName) { - GetFunctionsOperation operation = new GetFunctionsOperation(parentSession, + GetFunctionsOperation operation = new GetFunctionsOperation(parentSession, this, catalogName, schemaName, functionName); addOperation(operation); return operation; @@ -148,7 +154,7 @@ public GetFunctionsOperation newGetFunctionsOperation(HiveSession parentSession, public GetPrimaryKeysOperation newGetPrimaryKeysOperation(HiveSession parentSession, String catalogName, String schemaName, String tableName) { - GetPrimaryKeysOperation operation = new GetPrimaryKeysOperation(parentSession, + GetPrimaryKeysOperation operation = new GetPrimaryKeysOperation(parentSession, this, catalogName, schemaName, tableName); addOperation(operation); return operation; @@ -158,7 +164,7 @@ public GetCrossReferenceOperation newGetCrossReferenceOperation( HiveSession session, String primaryCatalog, String primarySchema, String primaryTable, String foreignCatalog, String foreignSchema, String foreignTable) { - GetCrossReferenceOperation operation = new GetCrossReferenceOperation(session, + GetCrossReferenceOperation operation = new GetCrossReferenceOperation(session, this, primaryCatalog, primarySchema, primaryTable, foreignCatalog, foreignSchema, foreignTable); addOperation(operation); @@ -283,8 +289,22 @@ private Schema getLogSchema() { return schema; } - public OperationLog getOperationLogByThread() { - return OperationLog.getCurrentOperationLog(); + public void setCurrentOperationLog(OperationLog log, File file) { + threadLocalOperationLog.set(log); + threadLocalFile.set(file); + } + + public OperationLog getCurrentOperationLog() { + return threadLocalOperationLog.get(); + } + + public File getCurrentOperationLogFile() { + return threadLocalFile.get(); + } + + public void removeCurrentOperationLog() { + threadLocalOperationLog.remove(); + threadLocalFile.remove(); } public List removeExpiredOperations(OperationHandle[] handles) { diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 4b55453ec7a8b..59025e72e0226 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -115,8 +115,7 @@ public HiveSessionImpl(TProtocolVersion protocol, String username, String passwo try { // In non-impersonation mode, map scheduler queue to current user // if fair scheduler is configured. - if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && - hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) { + if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username); } } catch (IOException e) { @@ -272,7 +271,7 @@ private static void setConf(String varname, String key, String varvalue, boolean new VariableSubstitution(() -> SessionState.get().getHiveVariables()); HiveConf conf = SessionState.get().getConf(); String value = substitution.substitute(conf, varvalue); - if (conf.getBoolVar(HiveConf.ConfVars.HIVECONFVALIDATION)) { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CONF_VALIDATION)) { HiveConf.ConfVars confVars = HiveConf.getConfVars(key); if (confVars != null) { if (!confVars.isType(value)) { diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java index 0ec13424fd0f5..b24aefab02b06 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java @@ -23,8 +23,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.rpc.thrift.TProtocolVersion; @@ -96,6 +98,14 @@ public void close() throws HiveSQLException { } } + private static Token createToken(String tokenStr, String tokenService) + throws IOException { + Token delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenService)); + return delegationToken; + } + /** * Enable delegation token for the session * save the token string and set the token.signature in hive conf. The metastore client uses @@ -108,7 +118,8 @@ private void setDelegationToken(String delegationTokenStr) throws HiveSQLExcepti if (delegationTokenStr != null) { getHiveConf().set("hive.metastore.token.signature", HS2TOKEN); try { - Utils.setTokenStr(sessionUgi, delegationTokenStr, HS2TOKEN); + Token delegationToken = createToken(delegationTokenStr, HS2TOKEN); + sessionUgi.addToken(delegationToken); } catch (IOException e) { throw new HiveSQLException("Couldn't setup delegation token in the ugi", e); } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index c7fa7b5f3e0ac..1cef6c8c47115 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -81,8 +81,13 @@ protected void initializeServer() { } String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); + String keyStoreType = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_TYPE).trim(); + String keyStoreAlgorithm = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYMANAGERFACTORY_ALGORITHM).trim(); + String includeCiphersuites = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_BINARY_INCLUDE_CIPHERSUITES).trim(); + serverSocket = HiveAuthUtils.getServerSSLSocket(hiveHost, portNum, keyStorePath, - keyStorePassword, sslVersionBlacklist); + keyStorePassword, keyStoreType, keyStoreAlgorithm, sslVersionBlacklist, + includeCiphersuites); } // In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java index 16d8540b40560..66ef2aceea8b3 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java @@ -20,7 +20,7 @@ import java.util.Map; -import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HMSHandler; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.spark.internal.SparkLogger; @@ -67,7 +67,7 @@ private void cleanRawStore() { */ public void cacheThreadLocalRawStore() { Long threadId = this.getId(); - RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore(); + RawStore threadLocalRawStore = HMSHandler.getRawStore(); if (threadLocalRawStore != null && !threadRawStoreMap.containsKey(threadId)) { LOG.debug("Adding RawStore: " + threadLocalRawStore + ", for the thread: " + this.getName() + " to threadRawStoreMap for future cleanup."); diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index caa85ebe57dee..49bc27aa837ab 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -26,7 +26,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.hive.shims.Utils import org.apache.hive.service.cli._ -import org.apache.hive.service.cli.operation.ExecuteStatementOperation +import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.hive.service.rpc.thrift.{TCLIServiceConstants, TColumnDesc, TPrimitiveTypeEntry, TRowSet, TTableSchema, TTypeDesc, TTypeEntry, TTypeId, TTypeQualifiers, TTypeQualifierValue} @@ -42,11 +42,13 @@ import org.apache.spark.util.{Utils => SparkUtils} private[hive] class SparkExecuteStatementOperation( val session: SparkSession, parentSession: HiveSession, + operationManager: OperationManager, statement: String, confOverlay: JMap[String, String], runInBackground: Boolean = true, queryTimeout: Long) - extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) + extends ExecuteStatementOperation(parentSession, operationManager, statement, confOverlay, + runInBackground) with SparkOperation with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala index e4bb91d466ff4..43597351ff59f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hive.service.cli.OperationState -import org.apache.hive.service.cli.operation.GetCatalogsOperation +import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} @@ -34,8 +34,9 @@ import org.apache.spark.sql.SparkSession */ private[hive] class SparkGetCatalogsOperation( val session: SparkSession, - parentSession: HiveSession) - extends GetCatalogsOperation(parentSession) + parentSession: HiveSession, + operationManager: OperationManager) + extends GetCatalogsOperation(parentSession, operationManager) with SparkOperation with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 1004ca8cf2712..39bee2e1c0b28 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObject} import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType import org.apache.hive.service.cli._ -import org.apache.hive.service.cli.operation.GetColumnsOperation +import org.apache.hive.service.cli.operation.{GetColumnsOperation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} @@ -47,11 +47,13 @@ import org.apache.spark.sql.types._ private[hive] class SparkGetColumnsOperation( val session: SparkSession, parentSession: HiveSession, + operationManager: OperationManager, catalogName: String, schemaName: String, tableName: String, columnName: String) - extends GetColumnsOperation(parentSession, catalogName, schemaName, tableName, columnName) + extends GetColumnsOperation(parentSession, operationManager, catalogName, schemaName, tableName, + columnName) with SparkOperation with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 515e64f5f529c..8cf8fce26f98f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -23,7 +23,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils} import org.apache.hive.service.cli._ -import org.apache.hive.service.cli.operation.GetFunctionsOperation +import org.apache.hive.service.cli.operation.{GetFunctionsOperation, OperationManager} import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG import org.apache.hive.service.cli.session.HiveSession @@ -42,10 +42,12 @@ import org.apache.spark.sql.SparkSession private[hive] class SparkGetFunctionsOperation( val session: SparkSession, parentSession: HiveSession, + operationManager: OperationManager, catalogName: String, schemaName: String, functionName: String) - extends GetFunctionsOperation(parentSession, catalogName, schemaName, functionName) + extends GetFunctionsOperation(parentSession, operationManager, catalogName, schemaName, + functionName) with SparkOperation with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index 0e2c35b5ef550..ff94773503780 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -21,7 +21,7 @@ import java.util.regex.Pattern import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hive.service.cli._ -import org.apache.hive.service.cli.operation.GetSchemasOperation +import org.apache.hive.service.cli.operation.{GetSchemasOperation, OperationManager} import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG import org.apache.hive.service.cli.session.HiveSession @@ -40,9 +40,10 @@ import org.apache.spark.sql.SparkSession private[hive] class SparkGetSchemasOperation( val session: SparkSession, parentSession: HiveSession, + operationManager: OperationManager, catalogName: String, schemaName: String) - extends GetSchemasOperation(parentSession, catalogName, schemaName) + extends GetSchemasOperation(parentSession, operationManager, catalogName, schemaName) with SparkOperation with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index 9709739a64a42..93f40017eb51d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -21,7 +21,7 @@ import java.util.UUID import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hive.service.cli._ -import org.apache.hive.service.cli.operation.GetTableTypesOperation +import org.apache.hive.service.cli.operation.{GetTableTypesOperation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} @@ -37,8 +37,9 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType */ private[hive] class SparkGetTableTypesOperation( val session: SparkSession, - parentSession: HiveSession) - extends GetTableTypesOperation(parentSession) + parentSession: HiveSession, + operationManager: OperationManager) + extends GetTableTypesOperation(parentSession, operationManager) with SparkOperation with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index e1dd6e8dd95bc..4ec1eecd8faf7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils} import org.apache.hive.service.cli._ -import org.apache.hive.service.cli.operation.GetTablesOperation +import org.apache.hive.service.cli.operation.{GetTablesOperation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} @@ -45,11 +45,13 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ private[hive] class SparkGetTablesOperation( val session: SparkSession, parentSession: HiveSession, + operationManager: OperationManager, catalogName: String, schemaName: String, tableName: String, tableTypes: JList[String]) - extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) + extends GetTablesOperation(parentSession, operationManager, catalogName, schemaName, tableName, + tableTypes) with SparkOperation with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala index 456ec44678c5c..f8776a29ea60d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hadoop.hive.serde2.thrift.Type import org.apache.hadoop.hive.serde2.thrift.Type._ import org.apache.hive.service.cli.OperationState -import org.apache.hive.service.cli.operation.GetTypeInfoOperation +import org.apache.hive.service.cli.operation.{GetTypeInfoOperation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.{Logging, MDC} @@ -38,8 +38,9 @@ import org.apache.spark.sql.SparkSession */ private[hive] class SparkGetTypeInfoOperation( val session: SparkSession, - parentSession: HiveSession) - extends GetTypeInfoOperation(parentSession) + parentSession: HiveSession, + operationManager: OperationManager) + extends GetTypeInfoOperation(parentSession, operationManager) with SparkOperation with Logging { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 083d9c4a0d436..13e6f03126ecd 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor} import org.apache.hadoop.hive.common.HiveInterruptUtils +import org.apache.hadoop.hive.common.io.SessionStream import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors._ @@ -99,9 +100,9 @@ private[hive] object SparkSQLCLIDriver extends Logging { sessionState.in = System.in try { - sessionState.out = new PrintStream(System.out, true, UTF_8.name()) - sessionState.info = new PrintStream(System.err, true, UTF_8.name()) - sessionState.err = new PrintStream(System.err, true, UTF_8.name()) + sessionState.out = new SessionStream(System.out, true, UTF_8.name()) + sessionState.info = new SessionStream(System.err, true, UTF_8.name()) + sessionState.err = new SessionStream(System.err, true, UTF_8.name()) } catch { case e: UnsupportedEncodingException => closeHiveSessionStateIfStarted(sessionState) @@ -182,9 +183,9 @@ private[hive] object SparkSQLCLIDriver extends Logging { // will set the output into an invalid buffer. sessionState.in = System.in try { - sessionState.out = new PrintStream(System.out, true, UTF_8.name()) - sessionState.info = new PrintStream(System.err, true, UTF_8.name()) - sessionState.err = new PrintStream(System.err, true, UTF_8.name()) + sessionState.out = new SessionStream(System.out, true, UTF_8.name()) + sessionState.info = new SessionStream(System.err, true, UTF_8.name()) + sessionState.err = new SessionStream(System.err, true, UTF_8.name()) } catch { case e: UnsupportedEncodingException => exit(ERROR_PATH_NOT_FOUND) } @@ -206,17 +207,27 @@ private[hive] object SparkSQLCLIDriver extends Logging { cli.printMasterAndAppId() if (sessionState.execString != null) { - exit(cli.processLine(sessionState.execString)) + try { + cli.processLine(sessionState.execString) + exit(0) + } catch { + case e: CommandProcessorException => + logError(log"Could not execute. (${MDC(ERROR, e.getMessage)})") + exit(e.getErrorCode) + } } try { if (sessionState.fileName != null) { - exit(cli.processFile(sessionState.fileName)) + cli.processFile(sessionState.fileName) } } catch { case e: FileNotFoundException => logError(log"Could not open input file for reading. (${MDC(ERROR, e.getMessage)})") exit(ERROR_PATH_NOT_FOUND) + case e: CommandProcessorException => + logError(log"Could not process input file. (${MDC(ERROR, e.getMessage)})") + exit(e.getErrorCode) } val reader = new ConsoleReader() @@ -257,7 +268,6 @@ private[hive] object SparkSQLCLIDriver extends Logging { } } - var ret = 0 var prefix = "" def currentDB = { @@ -285,7 +295,12 @@ private[hive] object SparkSQLCLIDriver extends Logging { if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) { line = prefix + line - ret = cli.processLine(line, true) + try { + cli.processLine(line, true) + } catch { + case e: CommandProcessorException => + exit(e.getErrorCode) + } prefix = "" currentPrompt = promptWithCurrentDB } else { @@ -298,7 +313,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { closeHiveSessionStateIfStarted(sessionState) - exit(ret) + exit(0) } def printUsage(): Unit = { @@ -421,7 +436,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { console.printInfo(s"Spark master: $master, Application Id: $appId") } - override def processCmd(cmd: String): Int = { + override def processCmd(cmd: String): CommandProcessorResponse = { + val ret = new CommandProcessorResponse() val cmd_trimmed: String = cmd.trim() val cmd_lower = cmd_trimmed.toLowerCase(Locale.ROOT) val tokens: Array[String] = cmd_trimmed.split("\\s+") @@ -437,9 +453,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { val endTimeNs = System.nanoTime() val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0 console.printInfo(s"Time taken: $timeTaken seconds") - 0 + ret } else { - var ret = 0 val hconf = conf.asInstanceOf[HiveConf] val proc: CommandProcessor = CommandProcessorFactory.get(tokens, hconf) @@ -451,7 +466,6 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { proc.isInstanceOf[ResetProcessor] ) { val driver = new SparkSQLDriver - driver.init() val out = sessionState.out val err = sessionState.err val startTimeNs: Long = System.nanoTime() @@ -462,7 +476,6 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { driver.run(cmd) } catch { case t: Throwable => - ret = 1 val format = SparkSQLEnv.sparkSession.sessionState.conf.errorMessageFormat val msg = t match { case st: SparkThrowable with Throwable => @@ -476,7 +489,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { t.printStackTrace(err) } driver.close() - return ret + throw new CommandProcessorException(t) } val endTimeNs = System.nanoTime() val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0 @@ -506,13 +519,10 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { s"""Failed with exception ${e.getClass.getName}: ${e.getMessage} |${org.apache.hadoop.util.StringUtils.stringifyException(e)} """.stripMargin) - ret = 1 + throw new CommandProcessorException(e) } - val cret = driver.close() - if (ret == 0) { - ret = cret - } + driver.close() var responseMsg = s"Time taken: $timeTaken seconds" if (counter != 0) { @@ -525,7 +535,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { if (sessionState.getIsVerbose) { sessionState.out.println(tokens(0) + " " + cmd_1) } - ret = proc.run(cmd_1).getResponseCode + proc.run(cmd_1) } // scalastyle:on println } @@ -534,7 +544,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { } // Adapted processLine from Hive 2.3's CliDriver.processLine. - override def processLine(line: String, allowInterrupting: Boolean): Int = { + override def processLine(line: String, allowInterrupting: Boolean): CommandProcessorResponse = { var oldSignal: SignalHandler = null var interruptSignal: Signal = null @@ -566,7 +576,9 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { } try { - var lastRet: Int = 0 + val ignoreErrors = + HiveConf.getBoolVar(conf, HiveConf.getConfVars("hive.cli.errors.ignore")) + var ret: CommandProcessorResponse = null // we can not use "split" function directly as ";" may be quoted val commands = splitSemiColon(line).asScala @@ -577,20 +589,19 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { } else { command += oneCmd if (!StringUtils.isBlank(command)) { - val ret = processCmd(command) - command = "" - lastRet = ret - val ignoreErrors = - HiveConf.getBoolVar(conf, HiveConf.getConfVars("hive.cli.errors.ignore")) - if (ret != 0 && !ignoreErrors) { - CommandProcessorFactory.clean(conf.asInstanceOf[HiveConf]) - return ret + try { + ret = processCmd(command) + } catch { + case e: CommandProcessorException => + if (!ignoreErrors) { + throw e + } } + command = "" } } } - CommandProcessorFactory.clean(conf.asInstanceOf[HiveConf]) - lastRet + ret } finally { // Once we are done processing the line, restore the old handler if (oldSignal != null && interruptSignal != null) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 7cc181ea6945a..b5f54d5884e08 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -24,6 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} import org.apache.hadoop.hive.ql.Driver +import org.apache.hadoop.hive.ql.QueryState import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.SparkThrowable @@ -38,15 +39,12 @@ import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv.sparkSession) - extends Driver + extends Driver(new QueryState.Builder().build()) with Logging { private[hive] var tableSchema: Schema = _ private[hive] var hiveResponse: Seq[String] = _ - override def init(): Unit = { - } - private def getResultSetSchema(query: QueryExecution): Schema = { val analyzed = query.analyzed logDebug(s"Result Schema: ${analyzed.output}") @@ -79,7 +77,7 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv. } } tableSchema = getResultSetSchema(execution) - new CommandProcessorResponse(0) + new CommandProcessorResponse() } catch { case st: SparkThrowable => logDebug(s"Failed in [$command]", st) @@ -90,10 +88,9 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv. } } - override def close(): Int = { + override def close(): Unit = { hiveResponse = null tableSchema = null - 0 } override def getResults(res: JList[_]): Boolean = { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 8e12165dd6f14..5c81400289d22 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -52,7 +52,7 @@ private[thriftserver] class SparkSQLOperationManager() val conf = sparkSession.sessionState.conf val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation( - sparkSession, parentSession, statement, confOverlay, runInBackground, queryTimeout) + sparkSession, parentSession, this, statement, confOverlay, runInBackground, queryTimeout) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") @@ -64,7 +64,7 @@ private[thriftserver] class SparkSQLOperationManager() val session = sessionToContexts.get(parentSession.getSessionHandle) require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetCatalogsOperation(session, parentSession) + val operation = new SparkGetCatalogsOperation(session, parentSession, this) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetCatalogsOperation with session=$parentSession.") operation @@ -77,7 +77,8 @@ private[thriftserver] class SparkSQLOperationManager() val session = sessionToContexts.get(parentSession.getSessionHandle) require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetSchemasOperation(session, parentSession, catalogName, schemaName) + val operation = new SparkGetSchemasOperation(session, parentSession, this, catalogName, + schemaName) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetSchemasOperation with session=$parentSession.") operation @@ -92,7 +93,7 @@ private[thriftserver] class SparkSQLOperationManager() val session = sessionToContexts.get(parentSession.getSessionHandle) require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetTablesOperation(session, parentSession, + val operation = new SparkGetTablesOperation(session, parentSession, this, catalogName, schemaName, tableName, tableTypes) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetTablesOperation with session=$parentSession.") @@ -108,7 +109,7 @@ private[thriftserver] class SparkSQLOperationManager() val session = sessionToContexts.get(parentSession.getSessionHandle) require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetColumnsOperation(session, parentSession, + val operation = new SparkGetColumnsOperation(session, parentSession, this, catalogName, schemaName, tableName, columnName) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetColumnsOperation with session=$parentSession.") @@ -120,21 +121,22 @@ private[thriftserver] class SparkSQLOperationManager() val session = sessionToContexts.get(parentSession.getSessionHandle) require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetTableTypesOperation(session, parentSession) + val operation = new SparkGetTableTypesOperation(session, parentSession, this) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetTableTypesOperation with session=$parentSession.") operation } - override def newGetFunctionsOperation( + def newGetFunctionsOperation( parentSession: HiveSession, + operationManager: OperationManager, catalogName: String, schemaName: String, functionName: String): GetFunctionsOperation = synchronized { val session = sessionToContexts.get(parentSession.getSessionHandle) require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetFunctionsOperation(session, parentSession, + val operation = new SparkGetFunctionsOperation(session, parentSession, operationManager, catalogName, schemaName, functionName) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetFunctionsOperation with session=$parentSession.") @@ -146,7 +148,7 @@ private[thriftserver] class SparkSQLOperationManager() val session = sessionToContexts.get(parentSession.getSessionHandle) require(session != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + " initialized or had already closed.") - val operation = new SparkGetTypeInfoOperation(session, parentSession) + val operation = new SparkGetTypeInfoOperation(session, parentSession, this) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created GetTypeInfoOperation with session=$parentSession.") operation diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/GetCatalogsOperationMock.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/GetCatalogsOperationMock.scala index 1bc9aaf672c3b..3f09e60454908 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/GetCatalogsOperationMock.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/GetCatalogsOperationMock.scala @@ -20,12 +20,12 @@ import java.nio.ByteBuffer import java.util.UUID import org.apache.hive.service.cli.OperationHandle -import org.apache.hive.service.cli.operation.GetCatalogsOperation +import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.hive.service.rpc.thrift.{THandleIdentifier, TOperationHandle, TOperationType} -class GetCatalogsOperationMock(parentSession: HiveSession) - extends GetCatalogsOperation(parentSession) { +class GetCatalogsOperationMock(parentSession: HiveSession, operationManager: OperationManager) + extends GetCatalogsOperation(parentSession, operationManager) { override def runInternal(): Unit = {} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala index 6f89fbfb788bc..eb0e91728fdad 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala @@ -67,7 +67,7 @@ class OperationManagerMock extends OperationManager { private val calledHandles: mutable.Set[OperationHandle] = new mutable.HashSet[OperationHandle]() override def newGetCatalogsOperation(parentSession: HiveSession): GetCatalogsOperation = { - val operation = new GetCatalogsOperationMock(parentSession) + val operation = new GetCatalogsOperationMock(parentSession, this) try { val m = classOf[OperationManager].getDeclaredMethod("addOperation", classOf[Operation]) m.setAccessible(true) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index 6f0fedcb85368..590df2d28850e 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -113,7 +113,7 @@ trait SharedThriftServer extends SharedSparkSession { val interceptor = new HttpBasicAuthInterceptor( username, "anonymous", - null, null, true, new util.HashMap[String, String]()) + null, null, true, new util.HashMap[String, String](), new util.HashMap[String, String]()) new THttpClient( s"http://localhost:$serverPort/cliservice", HttpClientBuilder.create.addInterceptorFirst(interceptor).build()) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index 5abf034c1dea1..902d604c08d6b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -24,6 +24,7 @@ import scala.concurrent.duration._ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hive.service.cli.OperationState +import org.apache.hive.service.cli.operation.OperationManager import org.apache.hive.service.cli.session.{HiveSession, HiveSessionImpl} import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TTypeId} import org.mockito.Mockito.{doReturn, mock, spy, when, RETURNS_DEEP_STUBS} @@ -93,7 +94,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark doReturn(dataFrame, Nil: _*).when(spySparkSession).sql(statement) val executeStatementOperation = new MySparkExecuteStatementOperation(spySparkSession, - hiveSession, statement, signal, finalState) + hiveSession, new OperationManagerMock(), statement, signal, finalState) val run = new Thread() { override def run(): Unit = executeStatementOperation.runInternal() @@ -112,10 +113,11 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark private class MySparkExecuteStatementOperation( session: SparkSession, hiveSession: HiveSession, + operationManager: OperationManager, statement: String, signal: Semaphore, finalState: OperationState) - extends SparkExecuteStatementOperation(session, hiveSession, statement, + extends SparkExecuteStatementOperation(session, hiveSession, operationManager, statement, new util.HashMap, false, 0) { override def cleanup(): Unit = { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkThriftServerProtocolVersionsSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkThriftServerProtocolVersionsSuite.scala index 60afcf815361b..8b0093edfaf6e 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkThriftServerProtocolVersionsSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkThriftServerProtocolVersionsSuite.scala @@ -64,8 +64,9 @@ class SparkThriftServerProtocolVersionsSuite extends HiveThriftServer2TestBase { rs = new HiveQueryResultSet.Builder(connection) .setClient(client) - .setSessionHandle(sessHandle) - .setStmtHandle(stmtHandle).setMaxRows(Int.MaxValue).setFetchSize(Int.MaxValue) + .setStmtHandle(stmtHandle) + .setMaxRows(Int.MaxValue) + .setFetchSize(Int.MaxValue) .build() f(rs) } finally { @@ -133,7 +134,6 @@ class SparkThriftServerProtocolVersionsSuite extends HiveThriftServer2TestBase { rs = new HiveQueryResultSet.Builder(connection) .setClient(client) - .setSessionHandle(sessHandle) .setStmtHandle(getTableResp.getOperationHandle) .build() f(rs) diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 45e1400f22108..1f1a3da99deb6 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -126,6 +126,10 @@ hive-llap-client ${hive.llap.scope} + + org.apache.hive + hive-udf + org.apache.avro diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveDateTimeUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveDateTimeUtils.scala new file mode 100644 index 0000000000000..49ea3dc61314f --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveDateTimeUtils.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.sql.Date + +import org.apache.hadoop.hive.common.`type`.{Date => HiveDate, Timestamp => HiveTimestamp} + +import org.apache.spark.sql.catalyst.util.DateTimeUtils + +object HiveDateTimeUtils { + def fromHiveTimestamp(t: HiveTimestamp): Long = { + DateTimeUtils.fromJavaTimestamp(t.toSqlTimestamp) + } + + def fromHiveDate(d: HiveDate): Int = { + DateTimeUtils.fromJavaDate(new Date(d.toEpochMilli)) + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 9f1954cbf6868..b354c3ab50755 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -342,7 +342,7 @@ private[hive] trait HiveInspectors { case _: BinaryObjectInspector => withNullSafe(o => o.asInstanceOf[Array[Byte]]) case _: DateObjectInspector if x.preferWritable() => - withNullSafe(o => getDateWritable(o)) + withNullSafe(o => getDaysWritable(o)) case _: DateObjectInspector => withNullSafe(o => DateTimeUtils.toJavaDate(o.asInstanceOf[Int])) case _: TimestampObjectInspector if x.preferWritable() => @@ -480,7 +480,7 @@ private[hive] trait HiveInspectors { _ => constant case poi: WritableConstantTimestampObjectInspector => val t = poi.getWritableConstantValue - val constant = DateTimeUtils.fromJavaTimestamp(t.getTimestamp) + val constant = HiveDateTimeUtils.fromHiveTimestamp(t.getTimestamp) _ => constant case poi: WritableConstantIntObjectInspector => val constant = poi.getWritableConstantValue.get() @@ -509,7 +509,7 @@ private[hive] trait HiveInspectors { System.arraycopy(writable.getBytes, 0, constant, 0, constant.length) _ => constant case poi: WritableConstantDateObjectInspector => - val constant = DateTimeUtils.fromJavaDate(poi.getWritableConstantValue.get()) + val constant = HiveDateTimeUtils.fromHiveDate(poi.getWritableConstantValue.get()) _ => constant case mi: StandardConstantMapObjectInspector => val keyUnwrapper = unwrapperFor(mi.getMapKeyObjectInspector) @@ -641,7 +641,7 @@ private[hive] trait HiveInspectors { case x: DateObjectInspector => data: Any => { if (data != null) { - DateTimeUtils.fromJavaDate(x.getPrimitiveJavaObject(data)) + HiveDateTimeUtils.fromHiveDate(x.getPrimitiveJavaObject(data)) } else { null } @@ -649,7 +649,7 @@ private[hive] trait HiveInspectors { case x: TimestampObjectInspector if x.preferWritable() => data: Any => { if (data != null) { - DateTimeUtils.fromJavaTimestamp(x.getPrimitiveWritableObject(data).getTimestamp) + HiveDateTimeUtils.fromHiveTimestamp(x.getPrimitiveWritableObject(data).getTimestamp) } else { null } @@ -657,7 +657,7 @@ private[hive] trait HiveInspectors { case ti: TimestampObjectInspector => data: Any => { if (data != null) { - DateTimeUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data)) + HiveDateTimeUtils.fromHiveTimestamp(ti.getPrimitiveJavaObject(data)) } else { null } @@ -1016,7 +1016,7 @@ private[hive] trait HiveInspectors { private def getDateWritableConstantObjectInspector(value: Any): ObjectInspector = PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( - TypeInfoFactory.dateTypeInfo, getDateWritable(value)) + TypeInfoFactory.dateTypeInfo, getDaysWritable(value)) private def getTimestampWritableConstantObjectInspector(value: Any): ObjectInspector = PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( @@ -1081,7 +1081,7 @@ private[hive] trait HiveInspectors { new hadoopIo.BytesWritable(value.asInstanceOf[Array[Byte]]) } - private def getDateWritable(value: Any): DaysWritable = + private def getDaysWritable(value: Any): DaysWritable = if (value == null) { null } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a8e91dc1c1e85..3804e57067f26 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.util.VersionInfo import org.apache.hive.common.util.HiveVersionInfo @@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging { " Note that, this a read-only conf and only used to report the built-in hive version." + " If you want a different metastore client for Spark to call, please refer to" + " spark.sql.hive.metastore.version.") - .version("1.1.1") + .version("4.0.1") .stringConf .checkValue(_ == builtinHiveVersion, "The builtin Hive version is read-only, please use spark.sql.hive.metastore.version") @@ -77,7 +77,7 @@ private[spark] object HiveUtils extends Logging { "2.0.0 through 2.3.10, " + "3.0.0 through 3.1.3 and " + "4.0.0 through 4.0.1.") - .version("1.4.0") + .version("4.0.1") .stringConf .checkValue(isCompatibleHiveVersion, "Unsupported Hive Metastore version") .createWithDefault(builtinHiveVersion) @@ -474,14 +474,14 @@ private[spark] object HiveUtils extends Logging { // You can search hive.metastore.uris in the code of HiveConf (in Hive's repo). // Then, you will find that the local metastore mode is only set to true when // hive.metastore.uris is not set. - propMap.put("hive.metastore.uris", "") + propMap.put(ConfVars.THRIFT_URIS.getVarname, "") // The execution client will generate garbage events, therefore the listeners that are generated // for the execution clients are useless. In order to not output garbage, we don't generate // these listeners. - propMap.put(ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname, "") - propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "") - propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "") + propMap.put(ConfVars.PRE_EVENT_LISTENERS.getVarname, "") + propMap.put(ConfVars.EVENT_LISTENERS.getVarname, "") + propMap.put(ConfVars.END_FUNCTION_LISTENERS.getVarname, "") // SPARK-21451: Spark will gather all `spark.hadoop.*` properties from a `SparkConf` to a // Hadoop Configuration internally, as long as it happens after SparkContext initialized. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index a93c6bd6b4e9b..038ff5dd9bf1a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -43,11 +43,10 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.SerializableConfiguration /** * A trait for subclasses that handle table scans. @@ -96,7 +95,7 @@ class HadoopTableReader( override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( hiveTable, - Utils.classForName[Deserializer](tableDesc.getSerdeClassName), + tableDesc.getDeserializer(_broadcastedHadoopConf.value.value), filterOpt = None) /** @@ -110,7 +109,7 @@ class HadoopTableReader( */ def makeRDDForTable( hiveTable: HiveTable, - deserializerClass: Class[_ <: Deserializer], + deserializer: Deserializer, filterOpt: Option[PathFilter]): RDD[InternalRow] = { assert(!hiveTable.isPartitioned, @@ -132,11 +131,6 @@ class HadoopTableReader( val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => - val hconf = broadcastedHadoopConf.value.value - val deserializer = deserializerClass.getConstructor().newInstance() - DeserializerLock.synchronized { - deserializer.initialize(hconf, localTableDesc.getProperties) - } HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) } @@ -144,8 +138,7 @@ class HadoopTableReader( } override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow] = { - val partitionToDeserializer = partitions.map(part => - (part, part.getDeserializer.getClass.asInstanceOf[Class[Deserializer]])).toMap + val partitionToDeserializer = partitions.map(part => (part, part.getDeserializer)).toMap makeRDDForPartitionedTable(partitionToDeserializer, filterOpt = None) } @@ -160,7 +153,7 @@ class HadoopTableReader( * subdirectory of each partition being read. If None, then all files are accepted. */ def makeRDDForPartitionedTable( - partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]], + partitionToDeserializer: Map[HivePartition, Deserializer], filterOpt: Option[PathFilter]): RDD[InternalRow] = { val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDescFromTableDesc(tableDesc, partition, true) @@ -210,7 +203,7 @@ class HadoopTableReader( createHadoopRDD(partDesc, inputPathStr).mapPartitions { iter => val hconf = broadcastedHiveConf.value.value - val deserializer = localDeserializer.getConstructor().newInstance() + val deserializer = localDeserializer // SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema // information) may be defined in table properties. Here we should merge table properties // and partition properties before initializing the deserializer. Note that partition @@ -225,14 +218,8 @@ class HadoopTableReader( }.foreach { case (key, value) => props.setProperty(key, value) } - DeserializerLock.synchronized { - deserializer.initialize(hconf, props) - } // get the table deserializer - val tableSerDe = localTableDesc.getDeserializerClass.getConstructor().newInstance() - DeserializerLock.synchronized { - tableSerDe.initialize(hconf, tableProperties) - } + val tableSerDe = localTableDesc.getDeserializer(hconf) // fill the non partition key attributes HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, @@ -488,10 +475,11 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { row.update(ordinal, HiveShim.toCatalystDecimal(oi, value)) case oi: TimestampObjectInspector => (value: Any, row: InternalRow, ordinal: Int) => - row.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value))) + row.setLong(ordinal, + HiveDateTimeUtils.fromHiveTimestamp(oi.getPrimitiveJavaObject(value))) case oi: DateObjectInspector => (value: Any, row: InternalRow, ordinal: Int) => - row.setInt(ordinal, DateTimeUtils.fromJavaDate(oi.getPrimitiveJavaObject(value))) + row.setInt(ordinal, HiveDateTimeUtils.fromHiveDate(oi.getPrimitiveJavaObject(value))) case oi: BinaryObjectInspector => (value: Any, row: InternalRow, ordinal: Int) => row.update(ordinal, oi.getPrimitiveJavaObject(value)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 90f8a3a85d70c..3c099508d6f65 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -38,9 +38,8 @@ import org.apache.hadoop.hive.metastore.{HiveMetaStoreClient, IMetaStoreClient, import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable} -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC -import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.ql.util.DirectionUtils import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe @@ -483,7 +482,7 @@ private[hive] class HiveClientImpl( // are sorted in ascending order, only then propagate the sortedness information // to downstream processing / optimizations in Spark // TODO: In future we can have Spark support columns sorted in descending order - val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC) + val allAscendingSorted = sortColumnOrders.forall(_.getOrder == DirectionUtils.ASCENDING_CODE) val sortColumnNames = if (allAscendingSorted) { sortColumnOrders.map(_.getCol) @@ -885,20 +884,6 @@ private[hive] class HiveClientImpl( // Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed // and the CommandProcessorFactory.clean function removed. driver.getClass.getMethod("close").invoke(driver) - if (version != hive.v3_0 && version != hive.v3_1 && version != hive.v4_0) { - CommandProcessorFactory.clean(conf) - } - } - - def getResponseCode(response: CommandProcessorResponse): Int = { - if (version < hive.v4_0) { - response.getResponseCode - } else { - // Since Hive 4.0, response code is removed from CommandProcessorResponse. - // Here we simply return 0 for the positive cases as for error cases it will - // throw exceptions early. - 0 - } } // Hive query needs to start SessionState. @@ -914,15 +899,9 @@ private[hive] class HiveClientImpl( proc match { case driver: Driver => try { - val response: CommandProcessorResponse = driver.run(cmd) - if (getResponseCode(response) != 0) { - // Throw an exception if there is an error in query processing. - // This works for hive 3.x and earlier versions. - throw new QueryExecutionException(response.getErrorMessage) - } + driver.run(cmd) driver.setMaxRows(maxRows) - val results = shim.getDriverResults(driver) - results + shim.getDriverResults(driver) } catch { case e @ (_: QueryExecutionException | _: SparkThrowable) => throw e @@ -942,15 +921,8 @@ private[hive] class HiveClientImpl( out.asInstanceOf[PrintStream].println(tokens(0) + " " + cmd_1) // scalastyle:on println } - val response: CommandProcessorResponse = proc.run(cmd_1) - val responseCode = getResponseCode(response) - if (responseCode != 0) { - // Throw an exception if there is an error in query processing. - // This works for hive 3.x and earlier versions. For 4.x and later versions, - // It will go to the catch block directly. - throw new QueryExecutionException(response.getErrorMessage) - } - Seq(responseCode.toString) + proc.run(cmd_1) + Seq.empty } } catch { case e: Exception => @@ -1073,12 +1045,7 @@ private[hive] class HiveClientImpl( val t = table.getTableName logDebug(s"Deleting table $t") try { - shim.getIndexes(client, "default", t, 255).foreach { index => - shim.dropIndex(client, "default", t, index.getIndexName) - } - if (!table.isIndexTable) { - shim.dropTable(client, "default", t) - } + shim.dropTable(client, "default", t) } catch { case _: NoSuchMethodError => // HIVE-18448 Hive 3.0 remove index APIs @@ -1201,7 +1168,7 @@ private[hive] object HiveClientImpl extends Logging { if (bucketSpec.sortColumnNames.nonEmpty) { hiveTable.setSortCols( bucketSpec.sortColumnNames - .map(col => new Order(col, HIVE_COLUMN_ORDER_ASC)) + .map(col => new Order(col, DirectionUtils.ASCENDING_CODE)) .toList .asJava ) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index b17b68ad99592..0c50eb4c9b38e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -29,11 +29,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{IMetaStoreClient, PartitionDropOptions, TableType} -import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, Index, MetaException, PrincipalType, ResourceType, ResourceUri} +import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.io.AcidUtils import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table} -import org.apache.hadoop.hive.ql.plan.{AddPartitionDesc, DynamicPartitionCtx} +import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants @@ -211,8 +211,6 @@ private[client] sealed abstract class Shim { def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] - def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit - def dropTable( hive: Hive, dbName: String, @@ -235,8 +233,6 @@ private[client] sealed abstract class Shim { def getMSC(hive: Hive): IMetaStoreClient - def getIndexes(hive: Hive, dbName: String, tableName: String, max: Short): Seq[Index] - protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { klass.getMethod(name, args: _*) } @@ -324,16 +320,10 @@ private[client] class Shim_v2_0 extends Shim with Logging { table: Table, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { - val addPartitionDesc = new AddPartitionDesc(table.getDbName, table.getTableName, ignoreIfExists) - parts.zipWithIndex.foreach { case (s, i) => - addPartitionDesc.addPartition( - s.spec.asJava, s.storage.locationUri.map(CatalogUtils.URIToString).orNull) - if (s.parameters.nonEmpty) { - addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) - } + parts.zipWithIndex.foreach { case (s, _) => + hive.createPartition(table, s.parameters.asJava) + recordHiveCall() } - recordHiveCall() - hive.createPartitions(addPartitionDesc) } override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] = { @@ -513,11 +503,6 @@ private[client] class Shim_v2_0 extends Shim with Logging { numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions) } - override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = { - recordHiveCall() - hive.dropIndex(dbName, tableName, indexName, throwExceptionInDropIndex, deleteDataInDropIndex) - } - override def dropTable( hive: Hive, dbName: String, @@ -992,17 +977,9 @@ private[client] class Shim_v2_0 extends Shim with Logging { oldPartSpec: JMap[String, String], newPart: Partition): Unit = { recordHiveCall() - hive.renamePartition(table, oldPartSpec, newPart) + hive.renamePartition(table, oldPartSpec, newPart, 0) } - override def getIndexes( - hive: Hive, - dbName: String, - tableName: String, - max: Short): Seq[Index] = { - recordHiveCall() - hive.getIndexes(dbName, tableName, max).asScala.toSeq - } } private[client] class Shim_v2_1 extends Shim_v2_0 { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 0d4efd9e77742..a71f6e72e69a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.FileSinkDesc -import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils @@ -89,7 +88,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) // Add table properties from storage handler to hadoopConf, so any custom storage // handler settings can be set to hadoopConf HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false) - Utilities.copyTableJobPropertiesToConf(tableDesc, conf) + Utilities.copyTableJobPropertiesToConf(tableDesc, conf.asInstanceOf[JobConf]) // Avoid referencing the outer object. val fileSinkConfSer = fileSinkConf @@ -135,12 +134,7 @@ class HiveOutputWriter( private def tableDesc = fileSinkConf.getTableInfo - private val serializer = { - val serializer = tableDesc.getDeserializerClass.getConstructor(). - newInstance().asInstanceOf[Serializer] - serializer.initialize(jobConf, tableDesc.getProperties) - serializer - } + private val serializer = tableDesc.getDeserializer(jobConf) private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter( jobConf, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala index de2d15415837a..4535e723c9328 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala @@ -284,7 +284,7 @@ object HiveScriptIOSchema extends HiveInspectors { val properties = new Properties() properties.putAll(propsMap.asJava) - serde.initialize(null, properties) + serde.initialize(null, properties, null) serde } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 335d552fd50b7..0a6776c7205e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -123,8 +123,7 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) - val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() - deserializer.initialize(hiveConf, tableDesc.getProperties) + val deserializer = tableDesc.getDeserializer(hiveConf) // Specifies types and object inspectors of columns to be scanned. val structOI = ObjectInspectorUtils diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala index d97d3cd6dd4a9..a71038cb19224 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala @@ -149,7 +149,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: P try { stagingDirForCreating.foreach { stagingDir => val fs: FileSystem = stagingDir.getFileSystem(hadoopConf) - if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) { + if (!FileUtils.mkdir(fs, stagingDir, hadoopConf)) { throw SparkException.internalError( "Cannot create staging directory '" + stagingDir.toString + "'") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b5d3fb699d62e..06b32f9810b45 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -146,9 +146,9 @@ case class InsertIntoHiveTable( if (numDynamicPartitions > 0) { if (overwrite && table.tableType == CatalogTableType.EXTERNAL) { val numWrittenParts = writtenParts.size - val maxDynamicPartitionsKey = HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + val maxDynamicPartitionsKey = HiveConf.ConfVars.DYNAMIC_PARTITION_MAX_PARTS.varname val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey, - HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal) + HiveConf.ConfVars.DYNAMIC_PARTITION_MAX_PARTS.defaultIntVal) if (numWrittenParts > maxDynamicPartitions) { throw QueryExecutionErrors.writePartitionExceedConfigSizeWhenDynamicPartitionError( numWrittenParts, maxDynamicPartitions, maxDynamicPartitionsKey) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 4e1567aac5f20..7f570f3555d48 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -233,7 +233,7 @@ private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":")) val serde = new OrcSerde - serde.initialize(conf, table) + serde.initialize(conf, table, null) serde } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 9dcc6abe20271..48233c0812654 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -23,7 +23,7 @@ import java.sql.Timestamp import scala.util.Try -import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkFiles, TestUtils} @@ -1210,7 +1210,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd .zip(parts) .map { case (k, v) => if (v == "NULL") { - s"$k=${ConfVars.DEFAULTPARTITIONNAME.defaultStrVal}" + s"$k=${ConfVars.DEFAULTPARTITIONNAME.getDefaultVal}" } else { s"$k=$v" } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 6604fe2a9d61e..05770307e8c2e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -18,11 +18,10 @@ package org.apache.spark.sql.hive.execution import java.io.{DataInput, DataOutput, File, PrintWriter} -import java.util.{ArrayList, Arrays, Properties} +import java.util.{ArrayList, Arrays} import scala.jdk.CollectionConverters._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.exec.UDF import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType} @@ -843,8 +842,6 @@ class TestPair(x: Int, y: Int) extends Writable with Serializable { } class PairSerDe extends AbstractSerDe { - override def initialize(p1: Configuration, p2: Properties): Unit = {} - override def getObjectInspector: ObjectInspector = { ObjectInspectorFactory .getStandardStructObjectInspector(