Skip to content

Commit

Permalink
[WIP][SPARK-51348][BUILD][SQL] Upgrade Hive to 4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
vrozov committed Mar 8, 2025
1 parent eb71443 commit 95f108c
Show file tree
Hide file tree
Showing 56 changed files with 360 additions and 306 deletions.
23 changes: 22 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
<hive.group>org.apache.hive</hive.group>
<hive.classifier>core</hive.classifier>
<!-- Version used in Maven Hive dependency -->
<hive.version>2.3.10</hive.version>
<hive.version>4.0.1</hive.version>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>3.9.0</kafka.version>
<!-- After 10.17.1.0, the minimum required version is JDK19 -->
Expand Down Expand Up @@ -1839,6 +1839,10 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-storage-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -1991,6 +1995,14 @@
<groupId>net.hydromatic</groupId>
<artifactId>aggdesigner-algorithm</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-intg</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-registry</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -2294,6 +2306,10 @@
<groupId>${hive.group}</groupId>
<artifactId>hive-serde</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -2337,6 +2353,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-udf</artifactId>
<version>${hive.version}</version>
</dependency>

<dependency>
<groupId>org.apache.orc</groupId>
Expand Down
6 changes: 5 additions & 1 deletion sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-storage-api</artifactId>
<artifactId>hive-common</artifactId>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-serde</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.sql.execution.datasources

import java.io.{DataInput, DataOutput, IOException}
import java.sql.Date

import org.apache.hadoop.hive.serde2.io.DateWritable
import org.apache.hadoop.hive.common.`type`.Date
import org.apache.hadoop.hive.serde2.io.DateWritableV2
import org.apache.hadoop.io.WritableUtils

import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseJulianToGregorianDays}
Expand All @@ -38,27 +38,24 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulia
class DaysWritable(
var gregorianDays: Int,
var julianDays: Int)
extends DateWritable {
extends DateWritableV2 {

def this() = this(0, 0)
def this(gregorianDays: Int) =
this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays))
def this(dateWritable: DateWritable) = {
def this(dateWritable: DateWritableV2) = {
this(
gregorianDays = dateWritable match {
case daysWritable: DaysWritable => daysWritable.gregorianDays
case dateWritable: DateWritable =>
case dateWritable: DateWritableV2 =>
rebaseJulianToGregorianDays(dateWritable.getDays)
},
julianDays = dateWritable.getDays)
}

override def getDays: Int = julianDays
override def get: Date = {
new Date(DateWritable.daysToMillis(julianDays))
}
override def get(doesTimeMatter: Boolean): Date = {
new Date(DateWritable.daysToMillis(julianDays, doesTimeMatter))
Date.ofEpochMilli(DateWritableV2.daysToMillis(julianDays))
}

override def set(d: Int): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument => OrcSearchArgument}
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable}
import org.apache.hadoop.hive.serde2.io.{DateWritableV2, HiveDecimalWritable}

import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.execution.datasources.DaysWritable
Expand All @@ -38,15 +38,15 @@ private[sql] object OrcShimUtils {
private[sql] type SearchArgument = OrcSearchArgument

def getGregorianDays(value: Any): Int = {
new DaysWritable(value.asInstanceOf[DateWritable]).gregorianDays
new DaysWritable(value.asInstanceOf[DateWritableV2]).gregorianDays
}

def getDecimal(value: Any): Decimal = {
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
}

def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritableV2 = {
if (reuseObj) {
val result = new DaysWritable()
(getter, ordinal) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.thrift.DBTokenStore;
import org.apache.hadoop.hive.thrift.HiveDelegationTokenManager;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.hive.metastore.security.DBTokenStore;
import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
Expand Down Expand Up @@ -80,7 +80,7 @@ public String getAuthName() {
private String authTypeStr;
private final String transportMode;
private final HiveConf conf;
private HiveDelegationTokenManager delegationTokenManager = null;
private MetastoreDelegationTokenManager delegationTokenManager = null;

public static final String HS2_PROXY_USER = "hive.server2.proxy.user";
public static final String HS2_CLIENT_TOKEN = "hiveserver2ClientToken";
Expand Down Expand Up @@ -114,18 +114,19 @@ public HiveAuthFactory(HiveConf conf) throws TTransportException, IOException {
authTypeStr = AuthTypes.NONE.getAuthName();
}
if (authTypeStr.equalsIgnoreCase(AuthTypes.KERBEROS.getAuthName())) {
String principal = conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
String keytab = conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
String principal = conf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
String client = conf.getVar(ConfVars.HIVE_SERVER2_CLIENT_KERBEROS_PRINCIPAL);
if (needUgiLogin(UserGroupInformation.getCurrentUser(),
SecurityUtil.getServerPrincipal(principal, "0.0.0.0"), keytab)) {
saslServer = ShimLoader.getHadoopThriftAuthBridge().createServer(keytab, principal);
saslServer = HadoopThriftAuthBridge.getBridge().createServer(keytab, principal, client);
} else {
// Using the default constructor to avoid unnecessary UGI login.
saslServer = new HadoopThriftAuthBridge.Server();
}

// start delegation token manager
delegationTokenManager = new HiveDelegationTokenManager();
delegationTokenManager = new MetastoreDelegationTokenManager();
try {
// rawStore is only necessary for DBTokenStore
Object rawStore = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.security.auth.Subject;

import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.protocol.BasicHttpContext;
Expand Down Expand Up @@ -68,8 +69,7 @@ public final class HttpAuthUtils {
*/
public static String getKerberosServiceTicket(String principal, String host,
String serverHttpUrl, boolean assumeSubject) throws Exception {
String serverPrincipal =
ShimLoader.getHadoopThriftAuthBridge().getServerPrincipal(principal, host);
String serverPrincipal = HadoopThriftAuthBridge.getBridge().getServerPrincipal(principal, host);
if (assumeSubject) {
// With this option, we're assuming that the external application,
// using the JDBC driver has done a JAAS kerberos login already
Expand All @@ -82,7 +82,7 @@ public static String getKerberosServiceTicket(String principal, String host,
} else {
// JAAS login from ticket cache to setup the client UserGroupInformation
UserGroupInformation clientUGI =
ShimLoader.getHadoopThriftAuthBridge().getCurrentUGIWithConf("kerberos");
HadoopThriftAuthBridge.getBridge().getCurrentUGIWithConf("kerberos");
return clientUGI.doAs(new HttpKerberosClientAction(serverPrincipal, serverHttpUrl));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import javax.security.sasl.SaslException;

import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.hive.service.rpc.thrift.TCLIService.Iface;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static TTransport getKerberosTransport(String principal, String host,
return createSubjectAssumedTransport(principal, underlyingTransport, saslProps);
} else {
HadoopThriftAuthBridge.Client authBridge =
ShimLoader.getHadoopThriftAuthBridge().createClientWithConf("kerberos");
HadoopThriftAuthBridge.getBridge().createClientWithConf("kerberos");
return authBridge.createClientTransport(principal, host, "KERBEROS", null,
underlyingTransport, saslProps);
}
Expand All @@ -77,7 +77,7 @@ public static TTransport createSubjectAssumedTransport(String principal,
public static TTransport getTokenTransport(String tokenStr, String host,
TTransport underlyingTransport, Map<String, String> saslProps) throws SaslException {
HadoopThriftAuthBridge.Client authBridge =
ShimLoader.getHadoopThriftAuthBridge().createClientWithConf("kerberos");
HadoopThriftAuthBridge.getBridge().createClientWithConf("kerberos");

try {
return authBridge.createClientTransport(null, host, "DIGEST", tokenStr, underlyingTransport,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
public abstract class ExecuteStatementOperation extends Operation {
protected String statement = null;

public ExecuteStatementOperation(HiveSession parentSession, String statement,
public ExecuteStatementOperation(HiveSession parentSession, OperationManager operationManager, String statement,
Map<String, String> confOverlay, boolean runInBackground) {
super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT, runInBackground);
super(parentSession, operationManager, confOverlay, OperationType.EXECUTE_STATEMENT, runInBackground);
this.statement = statement;
}

Expand All @@ -43,7 +43,7 @@ protected void registerCurrentOperationLog() {
isOperationLogEnabled = false;
return;
}
OperationLog.setCurrentOperationLog(operationLog);
operationManager.setCurrentOperationLog(operationLog, operationLogFile);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class GetCatalogsOperation extends MetadataOperation {

protected final RowSet rowSet;

protected GetCatalogsOperation(HiveSession parentSession) {
super(parentSession, OperationType.GET_CATALOGS);
protected GetCatalogsOperation(HiveSession parentSession, OperationManager operationManager) {
super(parentSession, operationManager, OperationType.GET_CATALOGS);
rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.TableIterable;
import org.apache.hadoop.hive.metastore.TableIterable;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
Expand Down Expand Up @@ -119,9 +119,9 @@ public class GetColumnsOperation extends MetadataOperation {

protected final RowSet rowSet;

protected GetColumnsOperation(HiveSession parentSession, String catalogName, String schemaName,
String tableName, String columnName) {
super(parentSession, OperationType.GET_COLUMNS);
protected GetColumnsOperation(HiveSession parentSession, OperationManager operationManager,
String catalogName, String schemaName, String tableName, String columnName) {
super(parentSession, operationManager, OperationType.GET_COLUMNS);
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ public class GetCrossReferenceOperation extends MetadataOperation {
private final String foreignTableName;
private final RowSet rowSet;

public GetCrossReferenceOperation(HiveSession parentSession,
public GetCrossReferenceOperation(HiveSession parentSession, OperationManager operationManager,
String parentCatalogName, String parentSchemaName, String parentTableName,
String foreignCatalog, String foreignSchema, String foreignTable) {
super(parentSession, OperationType.GET_FUNCTIONS);
super(parentSession, operationManager, OperationType.GET_FUNCTIONS);
this.parentCatalogName = parentCatalogName;
this.parentSchemaName = parentSchemaName;
this.parentTableName = parentTableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public class GetFunctionsOperation extends MetadataOperation {

protected final RowSet rowSet;

public GetFunctionsOperation(HiveSession parentSession,
public GetFunctionsOperation(HiveSession parentSession, OperationManager operationManager,
String catalogName, String schemaName, String functionName) {
super(parentSession, OperationType.GET_FUNCTIONS);
super(parentSession, operationManager, OperationType.GET_FUNCTIONS);
this.catalogName = catalogName;
this.schemaName = schemaName;
this.functionName = functionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public class GetPrimaryKeysOperation extends MetadataOperation {

private final RowSet rowSet;

public GetPrimaryKeysOperation(HiveSession parentSession,
public GetPrimaryKeysOperation(HiveSession parentSession, OperationManager operationManager,
String catalogName, String schemaName, String tableName) {
super(parentSession, OperationType.GET_FUNCTIONS);
super(parentSession, operationManager, OperationType.GET_FUNCTIONS);
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class GetSchemasOperation extends MetadataOperation {

protected RowSet rowSet;

protected GetSchemasOperation(HiveSession parentSession,
protected GetSchemasOperation(HiveSession parentSession, OperationManager operationManager,
String catalogName, String schemaName) {
super(parentSession, OperationType.GET_SCHEMAS);
super(parentSession, operationManager, OperationType.GET_SCHEMAS);
this.catalogName = catalogName;
this.schemaName = schemaName;
this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class GetTableTypesOperation extends MetadataOperation {
protected final RowSet rowSet;
private final TableTypeMapping tableTypeMapping;

protected GetTableTypesOperation(HiveSession parentSession) {
super(parentSession, OperationType.GET_TABLE_TYPES);
protected GetTableTypesOperation(HiveSession parentSession, OperationManager operationManager) {
super(parentSession, operationManager, OperationType.GET_TABLE_TYPES);
String tableMappingStr = getParentSession().getHiveConf()
.getVar(HiveConf.ConfVars.HIVE_SERVER2_TABLE_TYPE_MAPPING);
tableTypeMapping =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ public class GetTablesOperation extends MetadataOperation {
.addStringColumn("REF_GENERATION",
"Specifies how values in SELF_REFERENCING_COL_NAME are created.");

protected GetTablesOperation(HiveSession parentSession,
protected GetTablesOperation(HiveSession parentSession, OperationManager operationManager,
String catalogName, String schemaName, String tableName,
List<String> tableTypes) {
super(parentSession, OperationType.GET_TABLES);
super(parentSession, operationManager, OperationType.GET_TABLES);
this.catalogName = catalogName;
this.schemaName = schemaName;
this.tableName = tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public class GetTypeInfoOperation extends MetadataOperation {

protected final RowSet rowSet;

protected GetTypeInfoOperation(HiveSession parentSession) {
super(parentSession, OperationType.GET_TYPE_INFO);
protected GetTypeInfoOperation(HiveSession parentSession, OperationManager operationManager) {
super(parentSession, operationManager, OperationType.GET_TYPE_INFO);
rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false);
}

Expand Down
Loading

0 comments on commit 95f108c

Please sign in to comment.