Skip to content

Commit

Permalink
Add ThreadCount representing number of threads
Browse files Browse the repository at this point in the history
relative to the number of physical CPUs.

On ARM64 and non-linux systems this is resolved using java's Runtime.
For AMD64 on Linux /proc/cpuinfo is parsed to detect number of physical CPUs.

Both implementations are container-aware and will return the number of
requested CPUs if specified for the container.
  • Loading branch information
wendigo committed Apr 19, 2023
1 parent 37b4e0b commit 38de9d0
Show file tree
Hide file tree
Showing 2 changed files with 321 additions and 0 deletions.
201 changes: 201 additions & 0 deletions src/main/java/io/airlift/units/ThreadCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.units;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.OptionalInt;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static io.airlift.units.Preconditions.checkArgument;
import static java.lang.Math.min;
import static java.lang.Math.round;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.nio.file.Files.exists;
import static java.nio.file.Files.lines;

public class ThreadCount
implements Comparable<ThreadCount>
{
private static final String PER_CORE_SUFFIX = "C";
private static final Supplier<Integer> AVAILABLE_PROCESSORS = MachineInfo::getAvailablePhysicalProcessorCount;
private final int threadCount;

ThreadCount(int threadCount)
{
checkArgument(threadCount >= 0, "Thread count cannot be negative");
this.threadCount = threadCount;
}

public int getThreadCount()
{
return threadCount;
}

public ThreadCount nextPowerOfTwo()
{
return new ThreadCount(nextPowerOfTwo(getThreadCount()));
}

public static ThreadCount exactValueOf(int value)
{
return new ThreadCount(value);
}

public static ThreadCount valueOf(String value)
{
if (value.endsWith(PER_CORE_SUFFIX)) {
float parsedMultiplier = parseFloat(value.substring(0, value.lastIndexOf(PER_CORE_SUFFIX)).trim());
float threadCount = parsedMultiplier * AVAILABLE_PROCESSORS.get();
checkArgument(threadCount <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1");
return new ThreadCount(round(threadCount));
}

return new ThreadCount(parseInteger(value));
}

public static ThreadCount boundedValueOf(String value, String minValue, String maxValue)
{
ThreadCount parsed = ThreadCount.valueOf(value);
ThreadCount min = ThreadCount.valueOf(minValue);
ThreadCount max = ThreadCount.valueOf(maxValue);

if (parsed.compareTo(min) < 0) {
return min;
}

if (parsed.compareTo(max) > 0) {
return max;
}
return parsed;
}

private static float parseFloat(String value)
{
try {
return Float.parseFloat(value);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException(format("Cannot parse value '%s' as float", value), e);
}
}

private static int parseInteger(String value)
{
try {
long parsed = Long.parseLong(value);
checkArgument(parsed <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1");
return toIntExact(parsed);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException(format("Cannot parse value '%s' as integer", value), e);
}
}

static int nextPowerOfTwo(int x)
{
if (x > (1 << 30)) {
throw new IllegalStateException("Thread count is too large for next power of two: " + x);
}
return 1 << -Integer.numberOfLeadingZeros(x - 1);
}

@Override
public int compareTo(ThreadCount o)
{
return Integer.compare(threadCount, o.threadCount);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ThreadCount that = (ThreadCount) o;
return threadCount == that.threadCount;
}

@Override
public int hashCode()
{
return threadCount;
}

@Override
public String toString()
{
return (threadCount == 1) ? "1 thread" : (threadCount + " threads");
}

static final class MachineInfo
{
private static final Path CPU_INFO_PATH = Paths.get("/proc/cpuinfo");

// cache physical processor count, so that it's not queried multiple times during tests
private static volatile int physicalProcessorCount = -1;

private MachineInfo() {}

public static int getAvailablePhysicalProcessorCount()
{
if (physicalProcessorCount != -1) {
return physicalProcessorCount;
}

String osArch = System.getProperty("os.arch");
// logical core count (including container cpu quota if there is any)
int availableProcessorCount = Runtime.getRuntime().availableProcessors();
int totalPhysicalProcessorCount;
if ("amd64".equals(osArch) || "x86_64".equals(osArch)) {
OptionalInt procInfo = tryReadFromProcCpuinfo();
if (procInfo.isPresent()) {
totalPhysicalProcessorCount = procInfo.getAsInt();
}
else {
totalPhysicalProcessorCount = availableProcessorCount;
}
}
else {
totalPhysicalProcessorCount = availableProcessorCount;
}

// cap available processor count to container cpu quota (if there is any).
physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount);
return physicalProcessorCount;
}

private static OptionalInt tryReadFromProcCpuinfo()
{
if (!exists(CPU_INFO_PATH)) {
return OptionalInt.empty();
}

try (Stream<String> lines = lines(CPU_INFO_PATH)) {
return OptionalInt.of(toIntExact(lines.filter(line ->
line.matches("^processor\\s+: \\d")).count()));
}
catch (IOException e) {
return OptionalInt.empty();
}
}
}
}
120 changes: 120 additions & 0 deletions src/test/java/io/airlift/units/TestThreadsCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.units;

import org.testng.annotations.Test;

import static io.airlift.units.ThreadCount.MachineInfo.getAvailablePhysicalProcessorCount;
import static io.airlift.units.ThreadCount.nextPowerOfTwo;
import static java.lang.Math.round;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

public class TestThreadsCount
{
public static final int AVAILABLE_PROCESSORS = getAvailablePhysicalProcessorCount();

@Test
public void testParsingIntegerValues()
{
assertThreadsCount("1", 1);
assertThreadsCount("2", 2);
assertThreadsCount("67", 67);
assertThreadsCount("0", 0);
assertThreadsCount(Integer.valueOf(Integer.MAX_VALUE).toString(), Integer.MAX_VALUE);
assertInvalidValue("-1", "Thread count cannot be negative");
assertInvalidValue("67.0", "Cannot parse value '67.0' as integer");
assertInvalidValue(Long.valueOf(((long) Integer.MAX_VALUE) + 1).toString(), "Thread count is greater than 2^32 - 1");
}

@Test
public void testParsingMultiplierPerCore()
{
assertThreadsCount("1C", AVAILABLE_PROCESSORS);
assertThreadsCount("0.5 C", AVAILABLE_PROCESSORS / 2);
assertThreadsCount("0.2 C", round(AVAILABLE_PROCESSORS / 5.0f));
assertThreadsCount("1.5C", round(AVAILABLE_PROCESSORS * 1.5f));
assertThreadsCount("2 C", AVAILABLE_PROCESSORS * 2);
assertThreadsCount("-0.0001 C", 0);
assertInvalidValue("-1C", "Thread count cannot be negative");
assertInvalidValue("-1SC", "Cannot parse value '-1S' as float");
assertInvalidValue("2147483647C", "Thread count is greater than 2^32 - 1");
assertInvalidValue("3147483648C", "Thread count is greater than 2^32 - 1");
}

@Test
public void testParsingMultiplierPerCorePowerOfTwo()
{
assertThreadsCountNextPowerOf2("1C", nextPowerOfTwo(AVAILABLE_PROCESSORS));
assertThreadsCountNextPowerOf2("0.5C", nextPowerOfTwo(AVAILABLE_PROCESSORS / 2));
assertThreadsCountNextPowerOf2("0.2C", nextPowerOfTwo(round(AVAILABLE_PROCESSORS / 5.0f)));
assertThreadsCountNextPowerOf2("1.5C", nextPowerOfTwo(round(AVAILABLE_PROCESSORS * 1.5f)));
assertThreadsCountNextPowerOf2("2C", nextPowerOfTwo(AVAILABLE_PROCESSORS * 2));
assertThreadsCountNextPowerOf2("0C", 1);
assertThreadsCountNextPowerOf2("0.0001C", 1);
}

@Test
public void testParsingBoundedValue()
{
assertBoundedThreadsCount("3", "1", "1", 1);
assertBoundedThreadsCount("256C", "4", "16", 16);
assertBoundedThreadsCount("3", "4", "16", 4);
}

@Test
public void testNextPowerOfTwo()
{
assertThat(nextPowerOfTwo(-100)).isEqualTo(1);
assertThat(nextPowerOfTwo(-1)).isEqualTo(1);
assertThat(nextPowerOfTwo(0)).isEqualTo(1);
assertThat(nextPowerOfTwo(1)).isEqualTo(1);
assertThat(nextPowerOfTwo(2)).isEqualTo(2);
assertThat(nextPowerOfTwo(3)).isEqualTo(4);
assertThat(nextPowerOfTwo(5)).isEqualTo(8);
assertThat(nextPowerOfTwo(8)).isEqualTo(8);
assertThat(nextPowerOfTwo(1 << 30)).isEqualTo(1 << 30);

assertThatThrownBy(() -> nextPowerOfTwo((1 << 30) + 1))
.hasMessageContaining("Thread count is too large for next power of two: 1073741825");

assertThatThrownBy(() -> nextPowerOfTwo(Integer.MAX_VALUE))
.hasMessageContaining("Thread count is too large for next power of two: 2147483647");
}

private void assertThreadsCount(String value, int expected)
{
ThreadCount threadsCount = ThreadCount.valueOf(value);
assertThat(threadsCount).isEqualTo(ThreadCount.exactValueOf(expected));
}

private void assertThreadsCountNextPowerOf2(String value, int expected)
{
ThreadCount threadsCount = ThreadCount.valueOf(value).nextPowerOfTwo();
assertThat(threadsCount).isEqualTo(new ThreadCount(expected));
}

private void assertBoundedThreadsCount(String value, String min, String max, int expected)
{
ThreadCount threadsCount = ThreadCount.boundedValueOf(value, min, max);
assertThat(threadsCount).isEqualTo(new ThreadCount(expected));
}

private void assertInvalidValue(String value, String expectedMessage)
{
assertThatThrownBy(() -> ThreadCount.valueOf(value))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(expectedMessage);
}
}

0 comments on commit 38de9d0

Please sign in to comment.