Skip to content

Commit 1dae94c

Browse files
SpriCoderspricoder
andauthored
fix(core): 修复 workload 缓冲/传感器索引的溢出与并发问题 (#534)
* fix(core): make workload buffer/sensor indexing overflow- and concurrency-safe - generateOneRow used Math.abs(stepOffset*(deviceIndex+1)) % bufferSize; Math.abs(Long.MIN_VALUE) is negative, yielding a negative buffer index. Use Math.floorMod, which is always in [0, bufferSize). - SingletonWorkDataWorkLoad cast curLoop before the modulo ((int)curLoop % n) so a curLoop past Integer.MAX_VALUE produced a wrong/negative device id; cast after the modulo instead. - Derive the sensor cursor from the same reserved curLoop so the (device, sensor) pairing is atomic instead of using a separate counter that a concurrent caller could advance between reads. Covered by WorkloadIndexTest and SingletonWorkDataWorkLoadConcurrencyTest. * ci: trigger CI * test(core): make SingletonWorkDataWorkLoadConcurrencyTest order-independent GenerateDataWorkLoad.workloadValues is a static final array built once by initWorkloadValues(), which returns null when config.hasWrite() (the first field of OPERATION_PROPORTION) is 0. If a preceding test leaves OPERATION_PROPORTION on a read-only value (e.g. OperationControllerTest ends on "0:1:...") and the workload class is first loaded during this test, workloadValues stays null and getOneBatch() NPEs on every thread. Force a write workload in setUp (restored in tearDown) so the array is populated whenever this test triggers the class load, regardless of order. --------- Co-authored-by: spricoder <maybell_uteka@mail.com>
1 parent e2989d8 commit 1dae94c

4 files changed

Lines changed: 238 additions & 6 deletions

File tree

core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/GenerateDataWorkLoad.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public long getBatchNumber() {
6969
protected List<Object> generateOneRow(int deviceIndex, int colIndex, long stepOffset)
7070
throws WorkloadException {
7171
List<Object> values = new ArrayList<>(config.getSENSOR_NUMBER());
72-
int index = (int) (Math.abs(stepOffset * (deviceIndex + 1)) % config.getWORKLOAD_BUFFER_SIZE());
72+
int index =
73+
(int)
74+
Math.floorMod(stepOffset * (deviceIndex + 1), (long) config.getWORKLOAD_BUFFER_SIZE());
7375
if (colIndex == -1) {
7476
for (int i = 0; i < config.getSENSOR_NUMBER(); i++) {
7577
values.add(workloadValues[i][index]);

core/src/main/java/cn/edu/tsinghua/iot/benchmark/workload/SingletonWorkDataWorkLoad.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,28 @@
3333
import java.util.ArrayList;
3434
import java.util.Collections;
3535
import java.util.List;
36-
import java.util.concurrent.atomic.AtomicInteger;
3736
import java.util.concurrent.atomic.AtomicLong;
3837

3938
public class SingletonWorkDataWorkLoad extends GenerateDataWorkLoad {
4039
private static final Logger LOGGER = LoggerFactory.getLogger(SingletonWorkDataWorkLoad.class);
4140
private static final List<Sensor> SENSORS = Collections.synchronizedList(config.getSENSORS());
4241
private static SingletonWorkDataWorkLoad singletonWorkDataWorkLoad = null;
43-
private static final AtomicInteger sensorIndex = new AtomicInteger();
4442
private final AtomicLong insertLoop = new AtomicLong(0);
43+
44+
/**
45+
* The value insertLoop starts at; the sensor cursor is derived as (curLoop - insertStartIndex).
46+
*/
47+
private final long insertStartIndex;
48+
4549
private static final List<Integer> deviceIds = MetaUtil.sortDeviceId();
4650

4751
private SingletonWorkDataWorkLoad() {
4852
if (config.isIS_OUT_OF_ORDER()) {
4953
long startIndex = (long) (config.getLOOP() * config.getOUT_OF_ORDER_RATIO());
5054
this.insertLoop.set(startIndex);
55+
this.insertStartIndex = startIndex;
56+
} else {
57+
this.insertStartIndex = 0L;
5158
}
5259
}
5360

@@ -79,15 +86,20 @@ public IBatch getOneBatch() throws WorkloadException {
7986
if (config.isIS_SENSOR_TS_ALIGNMENT()) {
8087
sensors = SENSORS;
8188
} else {
82-
int sensorId = sensorIndex.getAndIncrement() % config.getSENSOR_NUMBER();
89+
// Derive the sensor cursor from the SAME curLoop reserved above so the (device, sensor)
90+
// pairing is atomic: a single insertLoop.getAndIncrement() fixes both. A separate counter
91+
// could be incremented by a concurrent caller between the two reads, mis-pairing the device
92+
// with another caller's sensor (defect #3). Single-threaded this yields the same sequence,
93+
// because insertLoop and the old sensor counter advanced in lockstep on this branch.
94+
int sensorId = (int) ((curLoop - insertStartIndex) % config.getSENSOR_NUMBER());
8395
batch.setColIndex(sensorId);
8496
sensors.add(SENSORS.get(sensorId));
8597
}
8698
DeviceSchema deviceSchema =
8799
new DeviceSchema(
88-
MetaUtil.getDeviceId(deviceIds.get((int) curLoop % config.getDEVICE_NUMBER())),
100+
MetaUtil.getDeviceId(deviceIds.get((int) (curLoop % config.getDEVICE_NUMBER()))),
89101
sensors,
90-
MetaUtil.getTags(deviceIds.get((int) curLoop % config.getDEVICE_NUMBER())));
102+
MetaUtil.getTags(deviceIds.get((int) (curLoop % config.getDEVICE_NUMBER()))));
91103
// create data of batch
92104
List<Record> records = new ArrayList<>();
93105
// Sometimes the current time is used as the timestamp of records, considering the requirement
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package cn.edu.tsinghua.iot.benchmark.workload;
21+
22+
import cn.edu.tsinghua.iot.benchmark.BenchmarkTestBase;
23+
import cn.edu.tsinghua.iot.benchmark.conf.Config;
24+
import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor;
25+
import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch;
26+
import org.junit.After;
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
30+
import java.util.Set;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.CopyOnWriteArrayList;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.CyclicBarrier;
35+
36+
import static org.junit.Assert.assertEquals;
37+
import static org.junit.Assert.assertTrue;
38+
39+
/**
40+
* Concurrency regression test for defect #3 — {@code SingletonWorkDataWorkLoad} pairing atomicity.
41+
*
42+
* <p>{@code getOneBatch()} draws the device cursor ({@code insertLoop}) and the sensor cursor from
43+
* two <em>independent</em> atomic counters. Each increment is atomic on its own, but the pairing
44+
* between them is not: under concurrency thread A can take device cursor {@code L} while thread B
45+
* races ahead and takes the sensor cursor that should have paired with {@code L}, so the {@code
46+
* (device, sensor)} mapping gets shuffled.
47+
*
48+
* <p>With {@code DEVICE_NUMBER == SENSOR_NUMBER} the correct mapping is a clean bijection: device
49+
* {@code p} is always paired with the same sensor (regardless of the counter's starting offset). So
50+
* the number of distinct {@code (device, colIndex)} pairs must equal the number of distinct devices
51+
* seen. Under the bug a device gets paired with several sensors and the pair count exceeds the
52+
* device count. This holds single-threaded for both the buggy and fixed code; only concurrency
53+
* breaks it, which is exactly what this test exercises.
54+
*/
55+
public class SingletonWorkDataWorkLoadConcurrencyTest extends BenchmarkTestBase {
56+
57+
private static final Config config = ConfigDescriptor.getInstance().getConfig();
58+
59+
/** DEVICE_NUMBER == SENSOR_NUMBER so the correct device→sensor mapping is a bijection. */
60+
private static final int SIZE = 200;
61+
62+
private static final int THREADS = 16;
63+
private static final int CALLS_PER_THREAD = 600;
64+
65+
private int origDeviceNumber;
66+
private boolean origSensorTsAlignment;
67+
private int origDeviceNumPerWrite;
68+
private int origBatchSize;
69+
private boolean origOutOfOrder;
70+
private int origDataClientNumber;
71+
private int origSchemaClientNumber;
72+
private String origOperationProportion;
73+
74+
@Before
75+
public void setUp() {
76+
origDeviceNumber = config.getDEVICE_NUMBER();
77+
origSensorTsAlignment = config.isIS_SENSOR_TS_ALIGNMENT();
78+
origDeviceNumPerWrite = config.getDEVICE_NUM_PER_WRITE();
79+
origBatchSize = config.getBATCH_SIZE_PER_WRITE();
80+
origOutOfOrder = config.isIS_OUT_OF_ORDER();
81+
origDataClientNumber = config.getDATA_CLIENT_NUMBER();
82+
origSchemaClientNumber = config.getSCHEMA_CLIENT_NUMBER();
83+
origOperationProportion = config.getOPERATION_PROPORTION();
84+
85+
// Write must be enabled BEFORE the workload classes load: GenerateDataWorkLoad builds its
86+
// static final workloadValues array once, in initWorkloadValues(), which returns null when
87+
// config.hasWrite() (OPERATION_PROPORTION's first field) is 0. getOneBatch() then dereferences
88+
// that null array -> NPE. A preceding test (e.g. OperationControllerTest) can leave
89+
// OPERATION_PROPORTION on a read-only value, so we force a write workload here rather than
90+
// relying on the shared singleton's current state.
91+
config.setOPERATION_PROPORTION("1:0:0:0:0:0:0:0:0:0:0:0:0");
92+
// SENSOR_NUMBER stays at its default (200) so the pre-populated SENSORS list is consistent.
93+
config.setDEVICE_NUMBER(SIZE);
94+
// Force the buggy else-branch that pairs a device with a single sensor cursor.
95+
config.setIS_SENSOR_TS_ALIGNMENT(false);
96+
// One device per batch → one colIndex per batch, the unit we collect.
97+
config.setDEVICE_NUM_PER_WRITE(1);
98+
// Keep each getOneBatch() cheap; we care about the (device, sensor) pairing, not the records.
99+
config.setBATCH_SIZE_PER_WRITE(1);
100+
config.setIS_OUT_OF_ORDER(false);
101+
// Keep the singletons that DataWorkLoad eagerly builds consistent with the end-to-end test.
102+
config.setDATA_CLIENT_NUMBER(1);
103+
config.setSCHEMA_CLIENT_NUMBER(1);
104+
}
105+
106+
@After
107+
public void tearDown() {
108+
config.setDEVICE_NUMBER(origDeviceNumber);
109+
config.setIS_SENSOR_TS_ALIGNMENT(origSensorTsAlignment);
110+
config.setDEVICE_NUM_PER_WRITE(origDeviceNumPerWrite);
111+
config.setBATCH_SIZE_PER_WRITE(origBatchSize);
112+
config.setIS_OUT_OF_ORDER(origOutOfOrder);
113+
config.setDATA_CLIENT_NUMBER(origDataClientNumber);
114+
config.setSCHEMA_CLIENT_NUMBER(origSchemaClientNumber);
115+
config.setOPERATION_PROPORTION(origOperationProportion);
116+
}
117+
118+
@Test
119+
public void devicePairsWithExactlyOneSensorUnderConcurrency() throws Exception {
120+
SingletonWorkDataWorkLoad workload = SingletonWorkDataWorkLoad.getInstance();
121+
122+
Set<String> distinctDevices = ConcurrentHashMap.newKeySet();
123+
Set<String> distinctPairs = ConcurrentHashMap.newKeySet();
124+
CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<>();
125+
126+
CyclicBarrier startLine = new CyclicBarrier(THREADS);
127+
CountDownLatch done = new CountDownLatch(THREADS);
128+
for (int t = 0; t < THREADS; t++) {
129+
new Thread(
130+
() -> {
131+
try {
132+
startLine.await();
133+
for (int i = 0; i < CALLS_PER_THREAD; i++) {
134+
IBatch batch = workload.getOneBatch();
135+
String device = batch.getDeviceSchema().getDevice();
136+
int sensor = batch.getColIndex();
137+
distinctDevices.add(device);
138+
distinctPairs.add(device + "#" + sensor);
139+
}
140+
} catch (Throwable e) {
141+
failures.add(e);
142+
} finally {
143+
done.countDown();
144+
}
145+
})
146+
.start();
147+
}
148+
done.await();
149+
150+
assertTrue("getOneBatch threw under concurrency: " + failures, failures.isEmpty());
151+
// Sanity: enough calls actually hit a spread of devices, otherwise the assertion is vacuous.
152+
assertTrue(
153+
"expected to exercise many devices but only saw " + distinctDevices.size(),
154+
distinctDevices.size() > 1);
155+
assertEquals(
156+
"each device must pair with exactly one sensor; more pairs than devices means the device "
157+
+ "and sensor cursors were mismatched by a concurrent caller (defect #3)",
158+
distinctDevices.size(),
159+
distinctPairs.size());
160+
}
161+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package cn.edu.tsinghua.iot.benchmark.workload;
21+
22+
import org.junit.Test;
23+
24+
import static org.junit.Assert.assertTrue;
25+
26+
/**
27+
* Documents the integer-overflow fixes (issue #13) in workload index calculation: why {@code
28+
* Math.floorMod} replaces {@code Math.abs(...) % n} in {@code GenerateDataWorkLoad.generateOneRow},
29+
* and why the cast must happen after the modulo in {@code SingletonWorkDataWorkLoad.getOneBatch}.
30+
* These assertions capture the JDK behaviour that makes the old expressions produce negative
31+
* (out-of-bounds) indices and the new ones safe.
32+
*/
33+
public class WorkloadIndexTest {
34+
35+
@Test
36+
public void testFloorModNeverNegativeOnLongOverflow() {
37+
int bufferSize = 100;
38+
// stepOffset * (deviceIndex + 1) can overflow to Long.MIN_VALUE
39+
long overflowed = Long.MIN_VALUE;
40+
// old approach was buggy: Math.abs(Long.MIN_VALUE) stays negative -> negative index
41+
assertTrue("Math.abs(Long.MIN_VALUE) is still negative", Math.abs(overflowed) < 0);
42+
// fixed approach: floorMod with a positive divisor is always in [0, bufferSize)
43+
long index = Math.floorMod(overflowed, (long) bufferSize);
44+
assertTrue("floorMod index out of range", index >= 0 && index < bufferSize);
45+
}
46+
47+
@Test
48+
public void testModuloBeforeCastNeverNegativeDeviceIndex() {
49+
int deviceNumber = 50;
50+
long curLoop = (long) Integer.MAX_VALUE + 100; // exceeds int range
51+
// old approach was buggy: cast happens before modulo -> overflow to negative
52+
assertTrue("cast-before-mod overflows to negative", ((int) curLoop) % deviceNumber < 0);
53+
// fixed approach: modulo first keeps the index in [0, deviceNumber)
54+
int index = (int) (curLoop % deviceNumber);
55+
assertTrue("mod-before-cast index out of range", index >= 0 && index < deviceNumber);
56+
}
57+
}

0 commit comments

Comments
 (0)