Skip to content

Commit

Permalink
Add ThreadCount representing number of threads
Browse files Browse the repository at this point in the history
relative to the number of physical CPUs.

On ARM64 and non-linux systems this is resolved using java's Runtime.
For AMD64 on Linux /proc/cpuinfo is parsed to detect number of physical CPUs.

Both implementations are container-aware and will return the number of
requested CPUs if specified for the container.
  • Loading branch information
wendigo authored and electrum committed Jul 5, 2023
1 parent 9600101 commit 1a6a60e
Show file tree
Hide file tree
Showing 10 changed files with 840 additions and 0 deletions.
44 changes: 44 additions & 0 deletions src/main/java/io/airlift/units/MaxThreadCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.units;

import jakarta.validation.Constraint;
import jakarta.validation.Payload;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.ANNOTATION_TYPE;
import static java.lang.annotation.ElementType.CONSTRUCTOR;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE_USE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Target({METHOD, FIELD, ANNOTATION_TYPE, CONSTRUCTOR, PARAMETER, TYPE_USE})
@Retention(RUNTIME)
@Documented
@Constraint(validatedBy = MaxThreadCountValidator.class)
public @interface MaxThreadCount
{
String value();

String message() default "{io.airlift.units.MaxThreadCount.message}";

Class<?>[] groups() default {};

Class<? extends Payload>[] payload() default {};
}
41 changes: 41 additions & 0 deletions src/main/java/io/airlift/units/MaxThreadCountValidator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.units;

import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext;

public class MaxThreadCountValidator
implements ConstraintValidator<MaxThreadCount, ThreadCount>
{
private ThreadCount max;

@Override
public void initialize(MaxThreadCount annotation)
{
this.max = ThreadCount.valueOf(annotation.value());
}

@Override
public boolean isValid(ThreadCount threadCount, ConstraintValidatorContext constraintValidatorContext)
{
return threadCount == null || threadCount.compareTo(max) <= 0;
}

@Override
public String toString()
{
return "max: " + max;
}
}
44 changes: 44 additions & 0 deletions src/main/java/io/airlift/units/MinThreadCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.units;

import jakarta.validation.Constraint;
import jakarta.validation.Payload;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.ANNOTATION_TYPE;
import static java.lang.annotation.ElementType.CONSTRUCTOR;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE_USE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Target({METHOD, FIELD, ANNOTATION_TYPE, CONSTRUCTOR, PARAMETER, TYPE_USE})
@Retention(RUNTIME)
@Documented
@Constraint(validatedBy = MinThreadCountValidator.class)
public @interface MinThreadCount
{
String value();

String message() default "{io.airlift.units.MinThreadCount.message}";

Class<?>[] groups() default {};

Class<? extends Payload>[] payload() default {};
}
41 changes: 41 additions & 0 deletions src/main/java/io/airlift/units/MinThreadCountValidator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.units;

import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext;

public class MinThreadCountValidator
implements ConstraintValidator<MinThreadCount, ThreadCount>
{
private ThreadCount min;

@Override
public void initialize(MinThreadCount annotation)
{
this.min = ThreadCount.valueOf(annotation.value());
}

@Override
public boolean isValid(ThreadCount threadCount, ConstraintValidatorContext constraintValidatorContext)
{
return threadCount == null || threadCount.compareTo(min) >= 0;
}

@Override
public String toString()
{
return "min: " + min;
}
}
183 changes: 183 additions & 0 deletions src/main/java/io/airlift/units/ThreadCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.units;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.OptionalInt;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static io.airlift.units.Preconditions.checkArgument;
import static java.lang.Math.min;
import static java.lang.Math.round;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.nio.file.Files.exists;
import static java.nio.file.Files.lines;

public class ThreadCount
implements Comparable<ThreadCount>
{
private static final String PER_CORE_SUFFIX = "C";
private static final Supplier<Integer> AVAILABLE_PROCESSORS = MachineInfo::getAvailablePhysicalProcessorCount;
private final int threadCount;

ThreadCount(int threadCount)
{
checkArgument(threadCount >= 0, "Thread count cannot be negative");
this.threadCount = threadCount;
}

public int getThreadCount()
{
return threadCount;
}

public static ThreadCount exactValueOf(int value)
{
return new ThreadCount(value);
}

public static ThreadCount valueOf(String value)
{
if (value.endsWith(PER_CORE_SUFFIX)) {
float parsedMultiplier = parseFloat(value.substring(0, value.lastIndexOf(PER_CORE_SUFFIX)).trim());
checkArgument(parsedMultiplier > 0, "Thread multiplier cannot be negative");
float threadCount = parsedMultiplier * AVAILABLE_PROCESSORS.get();
checkArgument(threadCount <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1");
return new ThreadCount(round(threadCount));
}

return new ThreadCount(parseInteger(value));
}

public static ThreadCount boundedValueOf(String value, String minValue, String maxValue)
{
ThreadCount parsed = ThreadCount.valueOf(value);
ThreadCount min = ThreadCount.valueOf(minValue);
ThreadCount max = ThreadCount.valueOf(maxValue);

if (parsed.compareTo(min) < 0) {
return min;
}

if (parsed.compareTo(max) > 0) {
return max;
}
return parsed;
}

private static float parseFloat(String value)
{
try {
return Float.parseFloat(value);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException(format("Cannot parse value '%s' as float", value), e);
}
}

private static int parseInteger(String value)
{
try {
long parsed = Long.parseLong(value);
checkArgument(parsed <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1");
return toIntExact(parsed);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException(format("Cannot parse value '%s' as integer", value), e);
}
}

@Override
public int compareTo(ThreadCount o)
{
return Integer.compare(threadCount, o.threadCount);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ThreadCount that = (ThreadCount) o;
return threadCount == that.threadCount;
}

@Override
public int hashCode()
{
return threadCount;
}

@Override
public String toString()
{
return (threadCount == 1) ? "1 thread" : (threadCount + " threads");
}

static final class MachineInfo
{
private static final Path CPU_INFO_PATH = Paths.get("/proc/cpuinfo");

// cache physical processor count, so that it's not queried multiple times during tests
private static volatile int physicalProcessorCount = -1;

private MachineInfo() {}

public static int getAvailablePhysicalProcessorCount()
{
if (physicalProcessorCount != -1) {
return physicalProcessorCount;
}

String osArch = System.getProperty("os.arch");
// logical core count (including container cpu quota if there is any)
int availableProcessorCount = Runtime.getRuntime().availableProcessors();
int totalPhysicalProcessorCount = availableProcessorCount;
if ("amd64".equals(osArch) || "x86_64".equals(osArch)) {
OptionalInt procInfo = tryReadFromProcCpuinfo();
if (procInfo.isPresent()) {
totalPhysicalProcessorCount = procInfo.getAsInt();
}
}

// cap available processor count to container cpu quota (if there is any).
physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount);
return physicalProcessorCount;
}

private static OptionalInt tryReadFromProcCpuinfo()
{
if (!exists(CPU_INFO_PATH)) {
return OptionalInt.empty();
}

try (Stream<String> lines = lines(CPU_INFO_PATH)) {
return OptionalInt.of(toIntExact(lines.filter(line ->
line.matches("^processor\\s+: \\d")).count()));
}
catch (IOException e) {
return OptionalInt.empty();
}
}
}
}
2 changes: 2 additions & 0 deletions src/main/resources/ValidationMessages.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ io.airlift.units.MinDuration.message=must be greater than or equal to {value}
io.airlift.units.MaxDuration.message=must be less than or equal to {value}
io.airlift.units.MinDataSize.message=must be greater than or equal to {value}
io.airlift.units.MaxDataSize.message=must be less than or equal to {value}
io.airlift.units.MinThreadCount.message=must be greater than or equal to {value}
io.airlift.units.MaxThreadCount.message=must be less than or equal to {value}
Loading

0 comments on commit 1a6a60e

Please sign in to comment.