Skip to content

Commit

Permalink
Fix region aware placement policy disk weight dose not update.
Browse files Browse the repository at this point in the history
  • Loading branch information
druidliu committed Nov 11, 2024
1 parent f148f63 commit da24cb8
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,4 +663,12 @@ public PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy(List<BookieI
*/
return PlacementPolicyAdherence.MEETS_STRICT;
}

@Override
public void updateBookieInfo(Map<BookieId, BookieInfoReader.BookieInfo> bookieInfoMap) {
super.updateBookieInfo(bookieInfoMap);
for (TopologyAwareEnsemblePlacementPolicy policy : perRegionPlacement.values()) {
policy.updateBookieInfo(bookieInfoMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.lang.reflect.Modifier;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -1787,6 +1788,72 @@ public void testRegionsWithDiskWeight() throws Exception {
assertEquals(3, ensemble.size());
}

@Test
public void testRegionsWithDifferentDiskWeight() throws Exception {
repp.uninitalize();
repp = new RegionAwareEnsemblePlacementPolicy();
conf.setProperty(REPP_ENABLE_VALIDATION, false);
conf.setDiskWeightBasedPlacementEnabled(true);
repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL,
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);

// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region2/r2");
// Update cluster
Set<BookieId> addrs = new HashSet<>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
addrs.add(addr5.toBookieId());
repp.onClusterChanged(addrs, new HashSet<>());

// update bookie weight
// due to default BookieMaxWeightMultipleForWeightBasedPlacement=3, the test cases need to be in the range
Map<BookieId, BookieInfoReader.BookieInfo> bookieInfoMap = new HashMap<>();
bookieInfoMap.put(addr1.toBookieId(), new BookieInfoReader.BookieInfo(1000000, 800000));
bookieInfoMap.put(addr2.toBookieId(), new BookieInfoReader.BookieInfo(1000000, 400000));
bookieInfoMap.put(addr3.toBookieId(), new BookieInfoReader.BookieInfo(1000000, 200000));
bookieInfoMap.put(addr4.toBookieId(), new BookieInfoReader.BookieInfo(1000000, 300000));
bookieInfoMap.put(addr5.toBookieId(), new BookieInfoReader.BookieInfo(1000000, 400000));
repp.updateBookieInfo(bookieInfoMap);

List<BookieId> ensemble;
Map<BookieId, Integer> countMap = new HashMap<>();
addrs.forEach(a -> countMap.put(a, 0));
int loopTimes = 5000;
for (int i = 0; i < loopTimes; ++i) {
ensemble = repp.newEnsemble(2, 2, 2, null,
new HashSet<>()).getResult();
for (BookieId bookieId : ensemble) {
countMap.put(bookieId, countMap.get(bookieId) + 1);
}
}

// c1 should be 2x than c2
// c4 should be 1.5x than c3
// c5 should be 2x than c3
// we allow a range of (-50%, 50%) deviation instead of the exact multiples
int c1, c2, c3, c4, c5;
c1 = countMap.get(addr1.toBookieId());
c2 = countMap.get(addr2.toBookieId());
c3 = countMap.get(addr3.toBookieId());
c4 = countMap.get(addr4.toBookieId());
c5 = countMap.get(addr5.toBookieId());
assertTrue(Math.abs((double)c1 / c2 - 2.0) < 1.0);
assertTrue(Math.abs((double)c4 / c3 - 1.5) < 1.0);
assertTrue(Math.abs((double)c5 / c3 - 2.0) < 1.0);
}

@Test
public void testNotifyRackChangeWithOldRegion() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
Expand Down

0 comments on commit da24cb8

Please sign in to comment.