Skip to content

Commit 204e4e4

Browse files
authored
[MINOR] improvement(spark-client): put sparkConf as extra properties while client request accessCluster (#2254)
### What changes were proposed in this pull request? put sparkConf as extra properties while client request accessCluster ### Why are the changes needed? Coordinator can let the spark application access rss cluster or not by some customized access checker leverage some of the spark configs. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No need
1 parent b7d391c commit 204e4e4

File tree

5 files changed

+42
-15
lines changed

5 files changed

+42
-15
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java

+13
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.shuffle;
1919

20+
import java.util.HashMap;
21+
import java.util.Map;
2022
import java.util.Set;
2123

2224
import scala.Tuple2;
@@ -515,4 +517,15 @@ public static RssConf toRssConf(SparkConf sparkConf) {
515517
}
516518
return rssConf;
517519
}
520+
521+
public static Map<String, String> sparkConfToMap(SparkConf sparkConf) {
522+
Map<String, String> map = new HashMap<>();
523+
524+
for (Tuple2<String, String> tuple : sparkConf.getAll()) {
525+
String key = tuple._1;
526+
map.put(key, tuple._2);
527+
}
528+
529+
return map;
530+
}
518531
}

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java

+2-15
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import java.util.function.Supplier;
3535
import java.util.stream.Collectors;
3636

37-
import scala.Tuple2;
38-
3937
import com.google.common.annotations.VisibleForTesting;
4038
import com.google.common.collect.Maps;
4139
import com.google.common.collect.Sets;
@@ -1064,7 +1062,7 @@ protected void registerShuffleServers(
10641062
}
10651063
LOG.info("Start to register shuffleId {}", shuffleId);
10661064
long start = System.currentTimeMillis();
1067-
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
1065+
Map<String, String> sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf());
10681066
serverToPartitionRanges.entrySet().stream()
10691067
.forEach(
10701068
entry -> {
@@ -1095,7 +1093,7 @@ protected void registerShuffleServers(
10951093
}
10961094
LOG.info("Start to register shuffleId[{}]", shuffleId);
10971095
long start = System.currentTimeMillis();
1098-
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
1096+
Map<String, String> sparkConfMap = RssSparkConfig.sparkConfToMap(getSparkConf());
10991097
Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
11001098
serverToPartitionRanges.entrySet();
11011099
entries.stream()
@@ -1141,15 +1139,4 @@ public boolean isRssStageRetryForFetchFailureEnabled() {
11411139
public SparkConf getSparkConf() {
11421140
return sparkConf;
11431141
}
1144-
1145-
public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
1146-
Map<String, String> map = new HashMap<>();
1147-
1148-
for (Tuple2<String, String> tuple : sparkConf.getAll()) {
1149-
String key = tuple._1;
1150-
map.put(key, tuple._2);
1151-
}
1152-
1153-
return map;
1154-
}
11551142
}

client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java

+10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.shuffle;
1919

20+
import java.util.List;
2021
import java.util.Map;
2122
import java.util.Set;
2223

@@ -32,6 +33,8 @@
3233
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
3334
import org.apache.uniffle.client.request.RssAccessClusterRequest;
3435
import org.apache.uniffle.client.response.RssAccessClusterResponse;
36+
import org.apache.uniffle.common.config.RssClientConf;
37+
import org.apache.uniffle.common.config.RssConf;
3538
import org.apache.uniffle.common.exception.RssException;
3639
import org.apache.uniffle.common.rpc.StatusCode;
3740
import org.apache.uniffle.common.util.Constants;
@@ -127,6 +130,13 @@ private boolean tryAccessCluster() {
127130
extraProperties.put(
128131
ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));
129132

133+
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
134+
List<String> excludeProperties =
135+
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
136+
rssConf.getAll().stream()
137+
.filter(entry -> !excludeProperties.contains(entry.getKey()))
138+
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));
139+
130140
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
131141
try {
132142
if (coordinatorClient != null) {

client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java

+10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.shuffle;
1919

20+
import java.util.List;
2021
import java.util.Map;
2122
import java.util.Set;
2223

@@ -32,6 +33,8 @@
3233
import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
3334
import org.apache.uniffle.client.request.RssAccessClusterRequest;
3435
import org.apache.uniffle.client.response.RssAccessClusterResponse;
36+
import org.apache.uniffle.common.config.RssClientConf;
37+
import org.apache.uniffle.common.config.RssConf;
3538
import org.apache.uniffle.common.exception.RssException;
3639
import org.apache.uniffle.common.rpc.StatusCode;
3740
import org.apache.uniffle.common.util.Constants;
@@ -131,6 +134,13 @@ private boolean tryAccessCluster() {
131134
extraProperties.put(
132135
ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));
133136

137+
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
138+
List<String> excludeProperties =
139+
rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
140+
rssConf.getAll().stream()
141+
.filter(entry -> !excludeProperties.contains(entry.getKey()))
142+
.forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));
143+
134144
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
135145
try {
136146
if (coordinatorClient != null) {

common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java

+7
Original file line numberDiff line numberDiff line change
@@ -303,4 +303,11 @@ public class RssClientConf {
303303
.withDescription(
304304
"The block id manager class of server for this application, "
305305
+ "the implementation of this interface to manage the shuffle block ids");
306+
307+
public static final ConfigOption<List<String>> RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES =
308+
ConfigOptions.key("rss.client.reportExcludeProperties")
309+
.stringType()
310+
.asList()
311+
.defaultValues()
312+
.withDescription("the report exclude properties could be configured by this option");
306313
}

0 commit comments

Comments
 (0)