Skip to content

Commit

Permalink
GEODE-9640: Initiate threadId in EventID. (#6905)
Browse files Browse the repository at this point in the history
  * This is to make sure a new EventID can be generated after server restarted
    after a whole cluster is shut down.

 * Wrap around original threadID before it interferes with bulkOp or wan generated threadID.

(cherry picked from commit 4b3c49e)
  • Loading branch information
pivotal-eshu committed Oct 11, 2021
1 parent 5a176a9 commit 227ce9c
Show file tree
Hide file tree
Showing 3 changed files with 379 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -825,10 +825,25 @@ static class ThreadAndSequenceIDWrapper {
long sequenceID = (HARegionQueue.INIT_OF_SEQUENCEID + 1);

@MakeNotStatic
private static final AtomicLong atmLong = new AtomicLong(0);
private static final AtomicLong atmLong = new AtomicLong(System.currentTimeMillis() %
ThreadIdentifier.MAX_THREAD_PER_CLIENT);

ThreadAndSequenceIDWrapper() {
threadID = atmLong.incrementAndGet();
long id = atmLong.incrementAndGet();
// wrap around before hitting 1,000,000 as higher number will interfere with bulkOp threadID
// generation.
if (id < ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
threadID = id;
} else if (id == ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
atmLong.set(0);
threadID = atmLong.incrementAndGet();
} else {
id = atmLong.incrementAndGet();
while (id > ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
id = atmLong.incrementAndGet();
}
threadID = id;
}
}

long getAndIncrementSequenceID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,32 @@
*/
package org.apache.geode.internal.cache;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.DataSerializer;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.VersionedDataInputStream;
import org.apache.geode.test.junit.Repeat;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
import org.apache.geode.test.junit.rules.RepeatRule;

public class EventIDTest {
@Rule
public ExecutorServiceRule executorService = new ExecutorServiceRule();

@Rule
public RepeatRule repeat = new RepeatRule();

@Test
public void emptyEventIdCanBeSerializedWithCurrentVersion()
Expand All @@ -48,7 +62,37 @@ private void emptyEventIdCanBeSerialized(Version version)
EventID result = DataSerializer.readObject(
new VersionedDataInputStream(new ByteArrayInputStream(out.toByteArray()), version));

Assertions.assertThat(result.getMembershipID()).isEqualTo(eventID.getMembershipID());
assertThat(result.getMembershipID()).isEqualTo(eventID.getMembershipID());
}

@Test
@Repeat(10)
public void threadIDIsWrappedAround() throws Exception {
EventID.ThreadAndSequenceIDWrapper wrapper = new EventID.ThreadAndSequenceIDWrapper();
long start = wrapper.threadID;

int numberOfThreads = 100000;

List<Future<Long>> futures = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
futures.add(executorService.submit(this::getThreadID));
}
for (Future<Long> future : futures) {
future.get();
}
long lastThreadID = executorService.submit(this::getThreadID).get();
long expected = start + numberOfThreads + 1;
if (expected >= ThreadIdentifier.MAX_THREAD_PER_CLIENT) {
// wrap around ThreadIdentifier.MAX_THREAD_PER_CLIENT (1,000,000) and 1,000,000
// is never used.
assertThat(lastThreadID).isEqualTo(expected - ThreadIdentifier.MAX_THREAD_PER_CLIENT + 1);
} else {
assertThat(lastThreadID).isEqualTo(expected);
}
}

private long getThreadID() {
EventID.ThreadAndSequenceIDWrapper wrapper = new EventID.ThreadAndSequenceIDWrapper();
return wrapper.threadID;
}
}
Loading

0 comments on commit 227ce9c

Please sign in to comment.