Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure thread pools size relative to the number of available cores #16303

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions client/trino-jdbc/src/test/java/io/airlift/units/ThreadCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.units;

import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.OptionalInt;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static io.airlift.units.Preconditions.checkArgument;
import static java.lang.Math.min;
import static java.lang.Math.round;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.nio.file.Files.exists;
import static java.nio.file.Files.lines;

// This class is a copy from airlift's units due to an inability to use
// newer units version in JDBC due to different JDK target.
// It is temporary solution until client and JDBC are moved to JDK 11+.
// This class is added to test classes, so it won't be a part of the jdbc driver.
public class ThreadCount
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need this copy?
plase remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jdk 8 target and newer units is on 11

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know. but trino-jdbc should avoid having classes in io.airlift

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s the matter of test scope

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this class resides in trino-jdbc tests, it is unused there, and must remain API-compatible with the newly added ThreadCount class in units. Does this sounds like we're doing something wrong here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Keeping JDBC on JDK 8 which prohibits upgrading dependencies like airlift units

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If i understand correctly, trino-jdbc ships units 1.7 for its own use and it downgrades units version for test scope resulting in NoClassDefFoundError when we try to start trino server.

unless maven allows us to package units 1.7 but still use a newer version for test classpath, i see the following options:

  • stop using airlift units in trino-jdbc (perhaps copying & repackaging necessary classed if this is indeed warranted)
  • move jdbc tests that require trino server to a separate module, where we will use modern units.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving tests seems like a better option

implements Comparable<ThreadCount>
{
private static final String PER_CORE_SUFFIX = "C";
private static final Supplier<Integer> AVAILABLE_PROCESSORS = MachineInfo::getAvailablePhysicalProcessorCount;
private final int threadCount;

ThreadCount(int threadCount)
{
checkArgument(threadCount >= 0, "Thread count cannot be negative");
this.threadCount = threadCount;
}

public int getThreadCount()
{
return threadCount;
}

public static ThreadCount exactValueOf(int value)
{
return new ThreadCount(value);
}

public static ThreadCount valueOf(String value)
{
if (value.endsWith(PER_CORE_SUFFIX)) {
float parsedMultiplier = parseFloat(value.substring(0, value.lastIndexOf(PER_CORE_SUFFIX)).trim());
checkArgument(parsedMultiplier > 0, "Thread multiplier cannot be negative");
float threadCount = parsedMultiplier * AVAILABLE_PROCESSORS.get();
checkArgument(threadCount <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1");
return new ThreadCount(round(threadCount));
}

return new ThreadCount(parseInteger(value));
}

public static ThreadCount boundedValueOf(String value, String minValue, String maxValue)
{
ThreadCount parsed = ThreadCount.valueOf(value);
ThreadCount min = ThreadCount.valueOf(minValue);
ThreadCount max = ThreadCount.valueOf(maxValue);

if (parsed.compareTo(min) < 0) {
return min;
}

if (parsed.compareTo(max) > 0) {
return max;
}
return parsed;
}

private static float parseFloat(String value)
{
try {
return Float.parseFloat(value);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException(format("Cannot parse value '%s' as float", value), e);
}
}

private static int parseInteger(String value)
{
try {
long parsed = Long.parseLong(value);
checkArgument(parsed <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1");
return toIntExact(parsed);
}
catch (NumberFormatException e) {
throw new IllegalArgumentException(format("Cannot parse value '%s' as integer", value), e);
}
}

@Override
public int compareTo(ThreadCount o)
{
return Integer.compare(threadCount, o.threadCount);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ThreadCount that = (ThreadCount) o;
return threadCount == that.threadCount;
}

@Override
public int hashCode()
{
return threadCount;
}

@Override
public String toString()
{
return (threadCount == 1) ? "1 thread" : (threadCount + " threads");
}

static final class MachineInfo
{
private static final Path CPU_INFO_PATH = Paths.get("/proc/cpuinfo");

// cache physical processor count, so that it's not queried multiple times during tests
private static volatile int physicalProcessorCount = -1;

private MachineInfo() {}

public static int getAvailablePhysicalProcessorCount()
{
if (physicalProcessorCount != -1) {
return physicalProcessorCount;
}

String osArch = System.getProperty("os.arch");
// logical core count (including container cpu quota if there is any)
int availableProcessorCount = Runtime.getRuntime().availableProcessors();
int totalPhysicalProcessorCount = availableProcessorCount;
if ("amd64".equals(osArch) || "x86_64".equals(osArch)) {
OptionalInt procInfo = tryReadFromProcCpuinfo();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is it better than Runtime.getRuntime().availableProcessors() and why take min of the two instead of just using "the better one"?
This should be documented.

if (procInfo.isPresent()) {
totalPhysicalProcessorCount = procInfo.getAsInt();
}
}

// cap available processor count to container cpu quota (if there is any).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why container quota is inspected for certain architectures only?

physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount);
return physicalProcessorCount;
}

private static OptionalInt tryReadFromProcCpuinfo()
{
if (!exists(CPU_INFO_PATH)) {
return OptionalInt.empty();
}

try (Stream<String> lines = lines(CPU_INFO_PATH)) {
return OptionalInt.of(toIntExact(lines.filter(line ->
line.matches("^processor\\s+: \\d")).count()));
}
catch (IOException e) {
return OptionalInt.empty();
}
}
}
}
9 changes: 5 additions & 4 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.airlift.units.MaxDataSize;
import io.airlift.units.ThreadCount;
import io.trino.sql.analyzer.RegexLibrary;
import jakarta.validation.constraints.DecimalMax;
import jakarta.validation.constraints.DecimalMin;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class FeaturesConfig
private boolean spillEnabled;
private DataSize aggregationOperatorUnspillMemoryLimit = DataSize.of(4, DataSize.Unit.MEGABYTE);
private List<Path> spillerSpillPaths = ImmutableList.of();
private int spillerThreads = 4;
private ThreadCount spillerThreads = ThreadCount.exactValueOf(4);
private double spillMaxUsedSpaceThreshold = 0.9;
private double memoryRevokingTarget = 0.5;
private double memoryRevokingThreshold = 0.9;
Expand Down Expand Up @@ -257,14 +258,14 @@ public FeaturesConfig setSpillerSpillPaths(String spillPaths)
@Min(1)
public int getSpillerThreads()
{
return spillerThreads;
return spillerThreads.getThreadCount();
}

@Config("spiller-threads")
@LegacyConfig("experimental.spiller-threads")
public FeaturesConfig setSpillerThreads(int spillerThreads)
public FeaturesConfig setSpillerThreads(String spillerThreads)
{
this.spillerThreads = spillerThreads;
this.spillerThreads = ThreadCount.valueOf(spillerThreads);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.units.Duration;
import io.airlift.units.MinDataSize;
import io.airlift.units.MinDuration;
import io.airlift.units.ThreadCount;
import io.trino.operator.RetryPolicy;
import jakarta.validation.constraints.DecimalMin;
import jakarta.validation.constraints.Max;
Expand Down Expand Up @@ -73,15 +74,15 @@ public class QueryManagerConfig

private Duration clientTimeout = new Duration(5, TimeUnit.MINUTES);

private int queryManagerExecutorPoolSize = 5;
private int queryExecutorPoolSize = 1000;
private int maxStateMachineCallbackThreads = 5;
private ThreadCount queryManagerExecutorPoolSize = ThreadCount.exactValueOf(5);
private ThreadCount queryExecutorPoolSize = ThreadCount.exactValueOf(1000);
private ThreadCount maxStateMachineCallbackThreads = ThreadCount.exactValueOf(5);

/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}
*/
private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES);
private int remoteTaskMaxCallbackThreads = 1000;
private ThreadCount remoteTaskMaxCallbackThreads = ThreadCount.exactValueOf(1000);

private String queryExecutionPolicy = "phased";
private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS);
Expand Down Expand Up @@ -350,40 +351,40 @@ public QueryManagerConfig setClientTimeout(Duration clientTimeout)
@Min(1)
public int getQueryManagerExecutorPoolSize()
{
return queryManagerExecutorPoolSize;
return queryManagerExecutorPoolSize.getThreadCount();
}

@Config("query.manager-executor-pool-size")
public QueryManagerConfig setQueryManagerExecutorPoolSize(int queryManagerExecutorPoolSize)
public QueryManagerConfig setQueryManagerExecutorPoolSize(String queryManagerExecutorPoolSize)
{
this.queryManagerExecutorPoolSize = queryManagerExecutorPoolSize;
this.queryManagerExecutorPoolSize = ThreadCount.valueOf(queryManagerExecutorPoolSize);
return this;
}

@Min(1)
public int getQueryExecutorPoolSize()
{
return queryExecutorPoolSize;
return queryExecutorPoolSize.getThreadCount();
}

@Config("query.executor-pool-size")
public QueryManagerConfig setQueryExecutorPoolSize(int queryExecutorPoolSize)
public QueryManagerConfig setQueryExecutorPoolSize(String queryExecutorPoolSize)
{
this.queryExecutorPoolSize = queryExecutorPoolSize;
this.queryExecutorPoolSize = ThreadCount.valueOf(queryExecutorPoolSize);
return this;
}

@Min(1)
public int getMaxStateMachineCallbackThreads()
{
return maxStateMachineCallbackThreads;
return maxStateMachineCallbackThreads.getThreadCount();
}

@Config("query.max-state-machine-callback-threads")
@ConfigDescription("The maximum number of threads allowed to run query and stage state machine listener callbacks concurrently for each query")
public QueryManagerConfig setMaxStateMachineCallbackThreads(int maxStateMachineCallbackThreads)
public QueryManagerConfig setMaxStateMachineCallbackThreads(String maxStateMachineCallbackThreads)
{
this.maxStateMachineCallbackThreads = maxStateMachineCallbackThreads;
this.maxStateMachineCallbackThreads = ThreadCount.valueOf(maxStateMachineCallbackThreads);
return this;
}

Expand Down Expand Up @@ -483,13 +484,13 @@ public QueryManagerConfig setQueryReportedRuleStatsLimit(int queryReportedRuleSt
@Min(1)
public int getRemoteTaskMaxCallbackThreads()
{
return remoteTaskMaxCallbackThreads;
return remoteTaskMaxCallbackThreads.getThreadCount();
}

@Config("query.remote-task.max-callback-threads")
public QueryManagerConfig setRemoteTaskMaxCallbackThreads(int remoteTaskMaxCallbackThreads)
public QueryManagerConfig setRemoteTaskMaxCallbackThreads(String remoteTaskMaxCallbackThreads)
{
this.remoteTaskMaxCallbackThreads = remoteTaskMaxCallbackThreads;
this.remoteTaskMaxCallbackThreads = ThreadCount.valueOf(remoteTaskMaxCallbackThreads);
return this;
}

Expand Down
Loading