Skip to content

Commit e36f9de

Browse files
committed
[SPARK-53213][CORE][SQL][K8S] Use Java Base64 instead of Base64.(decodeBase64|encodeBase64String)
### What changes were proposed in this pull request? This PR aims to use Java `java.util.Base64` instead of `Base64.encodeBase64String` or `Base64.decodeBase64` to improve performance. ### Why are the changes needed? Java native API is over **9x faster** than `Commons Codec`. ```scala scala> val a = new Array[Byte](1_000_000_000) scala> spark.time(org.apache.commons.codec.binary.Base64.decodeBase64(org.apache.commons.codec.binary.Base64.encodeBase64String(a)).length) Time taken: 10121 ms val res0: Int = 1000000000 scala> spark.time(java.util.Base64.getDecoder().decode(java.util.Base64.getEncoder().encodeToString(a)).length) Time taken: 1156 ms val res1: Int = 1000000000 ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51938 from dongjoon-hyun/SPARK-53213. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent f251007 commit e36f9de

File tree

8 files changed

+25
-17
lines changed

8 files changed

+25
-17
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ package org.apache.spark.deploy.k8s.features
1818

1919
import java.io.File
2020
import java.nio.file.Files
21+
import java.util.Base64
2122

2223
import scala.jdk.CollectionConverters._
2324
import scala.util.control.NonFatal
2425

2526
import io.fabric8.kubernetes.api.model._
26-
import org.apache.commons.codec.binary.Base64
2727
import org.apache.hadoop.security.UserGroupInformation
2828

2929
import org.apache.spark.deploy.SparkHadoopUtil
@@ -226,6 +226,7 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
226226
}
227227

228228
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
229+
val encodeToString = Base64.getEncoder().encodeToString(_)
229230
Seq[HasMetadata]() ++ {
230231
krb5File.map { path =>
231232
val file = new File(path)
@@ -247,7 +248,7 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
247248
.withName(ktSecretName)
248249
.endMetadata()
249250
.withImmutable(true)
250-
.addToData(kt.getName(), Base64.encodeBase64String(Files.readAllBytes(kt.toPath)))
251+
.addToData(kt.getName(), encodeToString(Files.readAllBytes(kt.toPath)))
251252
.build())
252253
} else {
253254
Nil
@@ -259,7 +260,7 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
259260
.withName(dtSecretName)
260261
.endMetadata()
261262
.withImmutable(true)
262-
.addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(delegationTokens))
263+
.addToData(KERBEROS_SECRET_KEY, encodeToString(delegationTokens))
263264
.build())
264265
} else {
265266
Nil

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ package org.apache.spark.deploy.k8s.features
1919
import java.io.File
2020
import java.nio.file.Files
2121
import java.security.PrivilegedExceptionAction
22+
import java.util.Base64
2223

2324
import scala.jdk.CollectionConverters._
2425

2526
import io.fabric8.kubernetes.api.model.{ConfigMap, Secret}
26-
import org.apache.commons.codec.binary.Base64
2727
import org.apache.hadoop.io.Text
2828
import org.apache.hadoop.security.UserGroupInformation
2929

@@ -126,7 +126,8 @@ class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
126126
val step = createStep(new SparkConf(false))
127127

128128
val dtSecret = filter[Secret](step.getAdditionalKubernetesResources()).head
129-
assert(dtSecret.getData().get(KERBEROS_SECRET_KEY) === Base64.encodeBase64String(tokens))
129+
assert(dtSecret.getData().get(KERBEROS_SECRET_KEY) ===
130+
Base64.getEncoder().encodeToString(tokens))
130131

131132
checkPodForTokens(step.configurePod(SparkPod.initialPod()),
132133
dtSecret.getMetadata().getName())

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.integrationtest
1818

19-
import java.util.Locale
19+
import java.util.{Base64, Locale}
2020

2121
import scala.jdk.CollectionConverters._
2222

2323
import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder}
24-
import org.apache.commons.codec.binary.Base64
2524
import org.scalatest.concurrent.Eventually
2625
import org.scalatest.matchers.should.Matchers._
2726

@@ -36,8 +35,8 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
3635
sb.withNewMetadata()
3736
.withName(ENV_SECRET_NAME)
3837
.endMetadata()
39-
val secUsername = Base64.encodeBase64String(ENV_SECRET_VALUE_1.getBytes())
40-
val secPassword = Base64.encodeBase64String(ENV_SECRET_VALUE_2.getBytes())
38+
val secUsername = Base64.getEncoder().encodeToString(ENV_SECRET_VALUE_1.getBytes())
39+
val secPassword = Base64.getEncoder().encodeToString(ENV_SECRET_VALUE_2.getBytes())
4140
val envSecretData = Map(ENV_SECRET_KEY_1 -> secUsername, ENV_SECRET_KEY_2 -> secPassword)
4241
sb.addToData(envSecretData.asJava)
4342
val envSecret = sb.build()

scalastyle-config.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,11 @@ This file is divided into 3 sections:
443443
<customMessage>Use listFiles of SparkFileUtil or Utils instead.</customMessage>
444444
</check>
445445

446+
<check customId="commonscodecbase64" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
447+
<parameters><parameter name="regex">\bBase64\.(en|de)codeBase64</parameter></parameters>
448+
<customMessage>Use java.util.Base64 instead</customMessage>
449+
</check>
450+
446451
<check customId="commonslang3javaversion" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
447452
<parameters><parameter name="regex">org\.apache\.commons\.lang3\..*JavaVersion</parameter></parameters>
448453
<customMessage>Use JEP 223 API (java.lang.Runtime.Version) instead of

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
2121
import java.math.{MathContext, RoundingMode}
22+
import java.util.Base64
2223

2324
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
2425

@@ -202,12 +203,12 @@ object HistogramSerializer {
202203
out.flush()
203204
out.close()
204205

205-
org.apache.commons.codec.binary.Base64.encodeBase64String(bos.toByteArray)
206+
Base64.getEncoder().encodeToString(bos.toByteArray)
206207
}
207208

208209
/** Deserializes a given string to a histogram. */
209210
final def deserialize(str: String): Histogram = {
210-
val bytes = org.apache.commons.codec.binary.Base64.decodeBase64(str)
211+
val bytes = Base64.getDecoder().decode(str)
211212
val bis = new ByteArrayInputStream(bytes)
212213
val ins = new DataInputStream(new LZ4BlockInputStream(bis))
213214
val height = ins.readDouble()

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/Base64Benchmark.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ object Base64Benchmark extends SqlBasedBenchmark {
5252
}
5353
}
5454

55+
// scalastyle:off commonscodecbase64
5556
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
5657
Seq(1, 3, 5, 7).map { len =>
5758
val benchmark = new Benchmark(s"encode for $len", N, output = output)
@@ -75,4 +76,5 @@ object Base64Benchmark extends SqlBasedBenchmark {
7576
benchmark
7677
}.foreach(_.run())
7778
}
79+
// scalastyle:on commonscodecbase64
7880
}

sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpServlet.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.UnsupportedEncodingException;
2222
import java.security.PrivilegedExceptionAction;
2323
import java.security.SecureRandom;
24+
import java.util.Base64;
2425
import java.util.Map;
2526
import java.util.Set;
2627
import java.util.concurrent.TimeUnit;
@@ -31,7 +32,6 @@
3132
import jakarta.servlet.http.HttpServletResponse;
3233
import jakarta.ws.rs.core.NewCookie;
3334

34-
import org.apache.commons.codec.binary.Base64;
3535
import org.apache.commons.codec.binary.StringUtils;
3636
import org.apache.hadoop.hive.conf.HiveConf;
3737
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -423,7 +423,7 @@ public String run() throws HttpAuthenticationException {
423423
gssContext = manager.createContext(serverCreds);
424424
// Get service ticket from the authorization header
425425
String serviceTicketBase64 = getAuthHeader(request, authType);
426-
byte[] inToken = Base64.decodeBase64(serviceTicketBase64.getBytes());
426+
byte[] inToken = Base64.getDecoder().decode(serviceTicketBase64.getBytes());
427427
gssContext.acceptSecContext(inToken, 0, inToken.length);
428428
// Authenticate or deny based on its context completion
429429
if (!gssContext.isEstablished()) {
@@ -504,7 +504,7 @@ private String[] getAuthHeaderTokens(HttpServletRequest request,
504504
String authType) throws HttpAuthenticationException {
505505
String authHeaderBase64 = getAuthHeader(request, authType);
506506
String authHeaderString = StringUtils.newStringUtf8(
507-
Base64.decodeBase64(authHeaderBase64.getBytes()));
507+
Base64.getDecoder().decode(authHeaderBase64.getBytes()));
508508
String[] creds = authHeaderString.split(":");
509509
return creds;
510510
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717

1818
package org.apache.spark.sql.hive.orc
1919

20-
import java.util.Properties
20+
import java.util.{Base64, Properties}
2121

2222
import scala.jdk.CollectionConverters._
2323
import scala.util.control.NonFatal
2424

2525
import com.esotericsoftware.kryo.Kryo
2626
import com.esotericsoftware.kryo.io.Output
27-
import org.apache.commons.codec.binary.Base64
2827
import org.apache.hadoop.conf.Configuration
2928
import org.apache.hadoop.fs.{FileStatus, Path}
3029
import org.apache.hadoop.hive.ql.io.orc._
@@ -215,7 +214,7 @@ case class OrcFileFormat() extends FileFormat with DataSourceRegister with Seria
215214
val out = new Output(4 * 1024, 10 * 1024 * 1024)
216215
kryo.writeObject(out, sarg)
217216
out.close()
218-
Base64.encodeBase64String(out.toBytes)
217+
Base64.getEncoder().encodeToString(out.toBytes)
219218
}
220219
}
221220

0 commit comments

Comments
 (0)