18
18
package org .apache .uniffle .test ;
19
19
20
20
import java .io .File ;
21
- import java .io .IOException ;
22
- import java .net .ServerSocket ;
23
- import java .util .ArrayList ;
21
+ import java .util .Arrays ;
24
22
import java .util .HashMap ;
25
23
import java .util .List ;
26
24
import java .util .Map ;
32
30
import org .junit .jupiter .api .BeforeAll ;
33
31
import org .junit .jupiter .api .Test ;
34
32
import org .junit .jupiter .api .io .TempDir ;
35
- import org .slf4j .Logger ;
36
- import org .slf4j .LoggerFactory ;
37
33
38
34
import org .apache .uniffle .client .factory .ShuffleClientFactory ;
39
35
import org .apache .uniffle .client .impl .ShuffleWriteClientImpl ;
42
38
import org .apache .uniffle .common .rpc .ServerType ;
43
39
import org .apache .uniffle .common .util .Constants ;
44
40
import org .apache .uniffle .coordinator .CoordinatorConf ;
45
- import org .apache .uniffle .coordinator .CoordinatorServer ;
46
41
import org .apache .uniffle .server .ShuffleServer ;
47
42
import org .apache .uniffle .server .ShuffleServerConf ;
48
43
import org .apache .uniffle .storage .util .StorageType ;
56
51
* {@code RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS}
57
52
*/
58
53
public class AssignmentWithTagsTest extends CoordinatorTestBase {
59
- private static final Logger LOG = LoggerFactory .getLogger (AssignmentWithTagsTest .class );
60
54
61
55
// KV: tag -> shuffle server id
62
56
private static Map <String , List <Integer >> tagOfShufflePorts = new HashMap <>();
57
+ private static final String tag1 = "fixed" ;
58
+ private static final String tag2 = "elastic" ;
63
59
64
- private static List <Integer > findAvailablePorts (int num ) throws IOException {
65
- List <ServerSocket > sockets = new ArrayList <>();
66
- List <Integer > ports = new ArrayList <>();
67
-
68
- for (int i = 0 ; i < num ; i ++) {
69
- ServerSocket socket = new ServerSocket (0 );
70
- ports .add (socket .getLocalPort ());
71
- sockets .add (socket );
72
- }
73
-
74
- for (ServerSocket socket : sockets ) {
75
- socket .close ();
76
- }
77
-
78
- return ports ;
79
- }
80
-
81
- private static void createAndStartShuffleServerWithTags (Set <String > tags , File tmpDir )
82
- throws Exception {
83
- ShuffleServerConf shuffleServerConf = getShuffleServerConf (ServerType .GRPC );
60
+ private static void prepareShuffleServerConf (int subDirIndex , Set <String > tags , File tmpDir ) {
61
+ ShuffleServerConf shuffleServerConf =
62
+ shuffleServerConfWithoutPort (subDirIndex , tmpDir , ServerType .GRPC );
84
63
shuffleServerConf .setLong ("rss.server.app.expired.withoutHeartbeat" , 4000 );
85
-
86
- File dataDir1 = new File (tmpDir , "data1" );
87
- File dataDir2 = new File (tmpDir , "data2" );
88
- String basePath = dataDir1 .getAbsolutePath () + "," + dataDir2 .getAbsolutePath ();
89
-
90
64
shuffleServerConf .setString ("rss.storage.type" , StorageType .LOCALFILE .name ());
91
- shuffleServerConf .setString ("rss.storage.basePath" , basePath );
92
65
shuffleServerConf .setString ("rss.server.tags" , StringUtils .join (tags , "," ));
93
-
94
- List <Integer > ports = findAvailablePorts (2 );
95
- shuffleServerConf .setInteger ("rss.rpc.server.port" , ports .get (0 ));
96
- shuffleServerConf .setInteger ("rss.jetty.http.port" , ports .get (1 ));
97
-
98
- for (String tag : tags ) {
99
- tagOfShufflePorts .putIfAbsent (tag , new ArrayList <>());
100
- tagOfShufflePorts .get (tag ).add (ports .get (0 ));
101
- }
102
- tagOfShufflePorts .putIfAbsent (Constants .SHUFFLE_SERVER_VERSION , new ArrayList <>());
103
- tagOfShufflePorts .get (Constants .SHUFFLE_SERVER_VERSION ).add (ports .get (0 ));
104
-
105
- LOG .info (
106
- "Shuffle server data dir: {}, rpc port: {}, http port: {}" ,
107
- dataDir1 + "," + dataDir2 ,
108
- ports .get (0 ),
109
- ports .get (1 ));
110
-
111
- ShuffleServer server = new ShuffleServer (shuffleServerConf );
112
- grpcShuffleServers .add (server );
113
- server .start ();
66
+ storeShuffleServerConf (shuffleServerConf );
114
67
}
115
68
116
69
@ BeforeAll
117
70
public static void setupServers (@ TempDir File tmpDir ) throws Exception {
118
- CoordinatorConf coordinatorConf = getCoordinatorConf ();
119
- createCoordinatorServer (coordinatorConf );
71
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort ();
72
+ storeCoordinatorConf (coordinatorConf );
120
73
121
- for (CoordinatorServer coordinator : coordinators ) {
122
- coordinator .start ();
123
- }
124
-
125
- File dir1 = new File (tmpDir , "server1" );
126
74
for (int i = 0 ; i < 2 ; i ++) {
127
- createAndStartShuffleServerWithTags ( Sets .newHashSet (), dir1 );
75
+ prepareShuffleServerConf ( i , Sets .newHashSet (), tmpDir );
128
76
}
129
77
130
- File dir2 = new File (tmpDir , "server2" );
131
78
for (int i = 0 ; i < 2 ; i ++) {
132
- createAndStartShuffleServerWithTags ( Sets .newHashSet ("fixed" ), dir2 );
79
+ prepareShuffleServerConf ( 2 + i , Sets .newHashSet (tag1 ), tmpDir );
133
80
}
134
81
135
- File dir3 = new File (tmpDir , "server3" );
136
82
for (int i = 0 ; i < 2 ; i ++) {
137
- createAndStartShuffleServerWithTags ( Sets .newHashSet ("elastic" ), dir3 );
83
+ prepareShuffleServerConf ( 4 + i , Sets .newHashSet (tag2 ), tmpDir );
138
84
}
139
85
86
+ startServersWithRandomPorts ();
87
+ List <Integer > collect =
88
+ grpcShuffleServers .stream ().map (ShuffleServer ::getGrpcPort ).collect (Collectors .toList ());
89
+ tagOfShufflePorts .put (Constants .SHUFFLE_SERVER_VERSION , collect );
90
+ tagOfShufflePorts .put (
91
+ tag1 ,
92
+ Arrays .asList (
93
+ grpcShuffleServers .get (2 ).getGrpcPort (), grpcShuffleServers .get (3 ).getGrpcPort ()));
94
+ tagOfShufflePorts .put (
95
+ tag2 ,
96
+ Arrays .asList (
97
+ grpcShuffleServers .get (4 ).getGrpcPort (), grpcShuffleServers .get (5 ).getGrpcPort ()));
98
+
140
99
// Wait all shuffle servers registering to coordinator
141
100
long startTimeMS = System .currentTimeMillis ();
142
101
while (true ) {
@@ -151,7 +110,7 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
151
110
}
152
111
153
112
@ Test
154
- public void testTags () throws Exception {
113
+ public void testTags () {
155
114
ShuffleWriteClientImpl shuffleWriteClient =
156
115
ShuffleClientFactory .newWriteBuilder ()
157
116
.clientType (ClientType .GRPC .name ())
@@ -168,7 +127,7 @@ public void testTags() throws Exception {
168
127
.unregisterTimeSec (10 )
169
128
.unregisterRequestTimeSec (10 )
170
129
.build ();
171
- shuffleWriteClient .registerCoordinators (COORDINATOR_QUORUM );
130
+ shuffleWriteClient .registerCoordinators (getQuorum () );
172
131
173
132
// Case1 : only set the single default shuffle version tag
174
133
ShuffleAssignmentsInfo assignmentsInfo =
@@ -198,26 +157,26 @@ public void testTags() throws Exception {
198
157
199
158
// Case3: Set the single fixed tag
200
159
assignmentsInfo =
201
- shuffleWriteClient .getShuffleAssignments ("app-3" , 1 , 1 , 1 , Sets .newHashSet ("fixed" ), 1 , -1 );
160
+ shuffleWriteClient .getShuffleAssignments ("app-3" , 1 , 1 , 1 , Sets .newHashSet (tag1 ), 1 , -1 );
202
161
assignedServerPorts =
203
162
assignmentsInfo .getPartitionToServers ().values ().stream ()
204
163
.flatMap (x -> x .stream ())
205
164
.map (x -> x .getGrpcPort ())
206
165
.collect (Collectors .toList ());
207
166
assertEquals (1 , assignedServerPorts .size ());
208
- assertTrue (tagOfShufflePorts .get ("fixed" ).contains (assignedServerPorts .get (0 )));
167
+ assertTrue (tagOfShufflePorts .get (tag1 ).contains (assignedServerPorts .get (0 )));
209
168
210
169
// case4: Set the multiple tags if exists
211
170
assignmentsInfo =
212
171
shuffleWriteClient .getShuffleAssignments (
213
- "app-4" , 1 , 1 , 1 , Sets .newHashSet ("fixed" , Constants .SHUFFLE_SERVER_VERSION ), 1 , -1 );
172
+ "app-4" , 1 , 1 , 1 , Sets .newHashSet (tag1 , Constants .SHUFFLE_SERVER_VERSION ), 1 , -1 );
214
173
assignedServerPorts =
215
174
assignmentsInfo .getPartitionToServers ().values ().stream ()
216
175
.flatMap (x -> x .stream ())
217
176
.map (x -> x .getGrpcPort ())
218
177
.collect (Collectors .toList ());
219
178
assertEquals (1 , assignedServerPorts .size ());
220
- assertTrue (tagOfShufflePorts .get ("fixed" ).contains (assignedServerPorts .get (0 )));
179
+ assertTrue (tagOfShufflePorts .get (tag1 ).contains (assignedServerPorts .get (0 )));
221
180
222
181
// case5: Set the multiple tags if non-exist
223
182
try {
@@ -227,7 +186,7 @@ public void testTags() throws Exception {
227
186
1 ,
228
187
1 ,
229
188
1 ,
230
- Sets .newHashSet ("fixed" , "elastic" , Constants .SHUFFLE_SERVER_VERSION ),
189
+ Sets .newHashSet (tag1 , tag2 , Constants .SHUFFLE_SERVER_VERSION ),
231
190
1 ,
232
191
-1 );
233
192
fail ();
0 commit comments