Skip to content

Commit 33d39b3

Browse files
committed
Migrate threads configuration to use ThreadCount
New type was introduced in the airlift/units 1.10
1 parent 0981eac commit 33d39b3

File tree

31 files changed

+417
-211
lines changed

31 files changed

+417
-211
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.airlift.units;
15+
16+
import java.io.IOException;
17+
import java.nio.file.Path;
18+
import java.nio.file.Paths;
19+
import java.util.OptionalInt;
20+
import java.util.function.Supplier;
21+
import java.util.stream.Stream;
22+
23+
import static io.airlift.units.Preconditions.checkArgument;
24+
import static java.lang.Math.min;
25+
import static java.lang.Math.round;
26+
import static java.lang.Math.toIntExact;
27+
import static java.lang.String.format;
28+
import static java.nio.file.Files.exists;
29+
import static java.nio.file.Files.lines;
30+
31+
// This class is a copy from airlift's units due to an inability to use
32+
// newer units version in JDBC due to different JDK target.
33+
// It is temporary solution until client and JDBC are moved to JDK 11+.
34+
// This class is added to test classes, so it won't be a part of the jdbc driver.
35+
public class ThreadCount
36+
implements Comparable<ThreadCount>
37+
{
38+
private static final String PER_CORE_SUFFIX = "C";
39+
private static final Supplier<Integer> AVAILABLE_PROCESSORS = MachineInfo::getAvailablePhysicalProcessorCount;
40+
private final int threadCount;
41+
42+
ThreadCount(int threadCount)
43+
{
44+
checkArgument(threadCount >= 0, "Thread count cannot be negative");
45+
this.threadCount = threadCount;
46+
}
47+
48+
public int getThreadCount()
49+
{
50+
return threadCount;
51+
}
52+
53+
public static ThreadCount exactValueOf(int value)
54+
{
55+
return new ThreadCount(value);
56+
}
57+
58+
public static ThreadCount valueOf(String value)
59+
{
60+
if (value.endsWith(PER_CORE_SUFFIX)) {
61+
float parsedMultiplier = parseFloat(value.substring(0, value.lastIndexOf(PER_CORE_SUFFIX)).trim());
62+
checkArgument(parsedMultiplier > 0, "Thread multiplier cannot be negative");
63+
float threadCount = parsedMultiplier * AVAILABLE_PROCESSORS.get();
64+
checkArgument(threadCount <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1");
65+
return new ThreadCount(round(threadCount));
66+
}
67+
68+
return new ThreadCount(parseInteger(value));
69+
}
70+
71+
public static ThreadCount boundedValueOf(String value, String minValue, String maxValue)
72+
{
73+
ThreadCount parsed = ThreadCount.valueOf(value);
74+
ThreadCount min = ThreadCount.valueOf(minValue);
75+
ThreadCount max = ThreadCount.valueOf(maxValue);
76+
77+
if (parsed.compareTo(min) < 0) {
78+
return min;
79+
}
80+
81+
if (parsed.compareTo(max) > 0) {
82+
return max;
83+
}
84+
return parsed;
85+
}
86+
87+
private static float parseFloat(String value)
88+
{
89+
try {
90+
return Float.parseFloat(value);
91+
}
92+
catch (NumberFormatException e) {
93+
throw new IllegalArgumentException(format("Cannot parse value '%s' as float", value), e);
94+
}
95+
}
96+
97+
private static int parseInteger(String value)
98+
{
99+
try {
100+
long parsed = Long.parseLong(value);
101+
checkArgument(parsed <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1");
102+
return toIntExact(parsed);
103+
}
104+
catch (NumberFormatException e) {
105+
throw new IllegalArgumentException(format("Cannot parse value '%s' as integer", value), e);
106+
}
107+
}
108+
109+
@Override
110+
public int compareTo(ThreadCount o)
111+
{
112+
return Integer.compare(threadCount, o.threadCount);
113+
}
114+
115+
@Override
116+
public boolean equals(Object o)
117+
{
118+
if (this == o) {
119+
return true;
120+
}
121+
if (o == null || getClass() != o.getClass()) {
122+
return false;
123+
}
124+
125+
ThreadCount that = (ThreadCount) o;
126+
return threadCount == that.threadCount;
127+
}
128+
129+
@Override
130+
public int hashCode()
131+
{
132+
return threadCount;
133+
}
134+
135+
@Override
136+
public String toString()
137+
{
138+
return (threadCount == 1) ? "1 thread" : (threadCount + " threads");
139+
}
140+
141+
static final class MachineInfo
142+
{
143+
private static final Path CPU_INFO_PATH = Paths.get("/proc/cpuinfo");
144+
145+
// cache physical processor count, so that it's not queried multiple times during tests
146+
private static volatile int physicalProcessorCount = -1;
147+
148+
private MachineInfo() {}
149+
150+
public static int getAvailablePhysicalProcessorCount()
151+
{
152+
if (physicalProcessorCount != -1) {
153+
return physicalProcessorCount;
154+
}
155+
156+
String osArch = System.getProperty("os.arch");
157+
// logical core count (including container cpu quota if there is any)
158+
int availableProcessorCount = Runtime.getRuntime().availableProcessors();
159+
int totalPhysicalProcessorCount = availableProcessorCount;
160+
if ("amd64".equals(osArch) || "x86_64".equals(osArch)) {
161+
OptionalInt procInfo = tryReadFromProcCpuinfo();
162+
if (procInfo.isPresent()) {
163+
totalPhysicalProcessorCount = procInfo.getAsInt();
164+
}
165+
}
166+
167+
// cap available processor count to container cpu quota (if there is any).
168+
physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount);
169+
return physicalProcessorCount;
170+
}
171+
172+
private static OptionalInt tryReadFromProcCpuinfo()
173+
{
174+
if (!exists(CPU_INFO_PATH)) {
175+
return OptionalInt.empty();
176+
}
177+
178+
try (Stream<String> lines = lines(CPU_INFO_PATH)) {
179+
return OptionalInt.of(toIntExact(lines.filter(line ->
180+
line.matches("^processor\\s+: \\d")).count()));
181+
}
182+
catch (IOException e) {
183+
return OptionalInt.empty();
184+
}
185+
}
186+
}
187+
}

core/trino-main/src/main/java/io/trino/FeaturesConfig.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.airlift.configuration.LegacyConfig;
2323
import io.airlift.units.DataSize;
2424
import io.airlift.units.MaxDataSize;
25+
import io.airlift.units.ThreadCount;
2526
import io.trino.sql.analyzer.RegexLibrary;
2627
import jakarta.validation.constraints.DecimalMax;
2728
import jakarta.validation.constraints.DecimalMin;
@@ -88,7 +89,7 @@ public class FeaturesConfig
8889
private boolean spillEnabled;
8990
private DataSize aggregationOperatorUnspillMemoryLimit = DataSize.of(4, DataSize.Unit.MEGABYTE);
9091
private List<Path> spillerSpillPaths = ImmutableList.of();
91-
private int spillerThreads = 4;
92+
private ThreadCount spillerThreads = ThreadCount.exactValueOf(4);
9293
private double spillMaxUsedSpaceThreshold = 0.9;
9394
private double memoryRevokingTarget = 0.5;
9495
private double memoryRevokingThreshold = 0.9;
@@ -257,14 +258,14 @@ public FeaturesConfig setSpillerSpillPaths(String spillPaths)
257258
@Min(1)
258259
public int getSpillerThreads()
259260
{
260-
return spillerThreads;
261+
return spillerThreads.getThreadCount();
261262
}
262263

263264
@Config("spiller-threads")
264265
@LegacyConfig("experimental.spiller-threads")
265-
public FeaturesConfig setSpillerThreads(int spillerThreads)
266+
public FeaturesConfig setSpillerThreads(String spillerThreads)
266267
{
267-
this.spillerThreads = spillerThreads;
268+
this.spillerThreads = ThreadCount.valueOf(spillerThreads);
268269
return this;
269270
}
270271

core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java

+17-16
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.airlift.units.Duration;
2222
import io.airlift.units.MinDataSize;
2323
import io.airlift.units.MinDuration;
24+
import io.airlift.units.ThreadCount;
2425
import io.trino.operator.RetryPolicy;
2526
import jakarta.validation.constraints.DecimalMin;
2627
import jakarta.validation.constraints.Max;
@@ -73,15 +74,15 @@ public class QueryManagerConfig
7374

7475
private Duration clientTimeout = new Duration(5, TimeUnit.MINUTES);
7576

76-
private int queryManagerExecutorPoolSize = 5;
77-
private int queryExecutorPoolSize = 1000;
78-
private int maxStateMachineCallbackThreads = 5;
77+
private ThreadCount queryManagerExecutorPoolSize = ThreadCount.exactValueOf(5);
78+
private ThreadCount queryExecutorPoolSize = ThreadCount.exactValueOf(1000);
79+
private ThreadCount maxStateMachineCallbackThreads = ThreadCount.exactValueOf(5);
7980

8081
/**
8182
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}
8283
*/
8384
private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES);
84-
private int remoteTaskMaxCallbackThreads = 1000;
85+
private ThreadCount remoteTaskMaxCallbackThreads = ThreadCount.exactValueOf(1000);
8586

8687
private String queryExecutionPolicy = "phased";
8788
private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS);
@@ -350,40 +351,40 @@ public QueryManagerConfig setClientTimeout(Duration clientTimeout)
350351
@Min(1)
351352
public int getQueryManagerExecutorPoolSize()
352353
{
353-
return queryManagerExecutorPoolSize;
354+
return queryManagerExecutorPoolSize.getThreadCount();
354355
}
355356

356357
@Config("query.manager-executor-pool-size")
357-
public QueryManagerConfig setQueryManagerExecutorPoolSize(int queryManagerExecutorPoolSize)
358+
public QueryManagerConfig setQueryManagerExecutorPoolSize(String queryManagerExecutorPoolSize)
358359
{
359-
this.queryManagerExecutorPoolSize = queryManagerExecutorPoolSize;
360+
this.queryManagerExecutorPoolSize = ThreadCount.valueOf(queryManagerExecutorPoolSize);
360361
return this;
361362
}
362363

363364
@Min(1)
364365
public int getQueryExecutorPoolSize()
365366
{
366-
return queryExecutorPoolSize;
367+
return queryExecutorPoolSize.getThreadCount();
367368
}
368369

369370
@Config("query.executor-pool-size")
370-
public QueryManagerConfig setQueryExecutorPoolSize(int queryExecutorPoolSize)
371+
public QueryManagerConfig setQueryExecutorPoolSize(String queryExecutorPoolSize)
371372
{
372-
this.queryExecutorPoolSize = queryExecutorPoolSize;
373+
this.queryExecutorPoolSize = ThreadCount.valueOf(queryExecutorPoolSize);
373374
return this;
374375
}
375376

376377
@Min(1)
377378
public int getMaxStateMachineCallbackThreads()
378379
{
379-
return maxStateMachineCallbackThreads;
380+
return maxStateMachineCallbackThreads.getThreadCount();
380381
}
381382

382383
@Config("query.max-state-machine-callback-threads")
383384
@ConfigDescription("The maximum number of threads allowed to run query and stage state machine listener callbacks concurrently for each query")
384-
public QueryManagerConfig setMaxStateMachineCallbackThreads(int maxStateMachineCallbackThreads)
385+
public QueryManagerConfig setMaxStateMachineCallbackThreads(String maxStateMachineCallbackThreads)
385386
{
386-
this.maxStateMachineCallbackThreads = maxStateMachineCallbackThreads;
387+
this.maxStateMachineCallbackThreads = ThreadCount.valueOf(maxStateMachineCallbackThreads);
387388
return this;
388389
}
389390

@@ -483,13 +484,13 @@ public QueryManagerConfig setQueryReportedRuleStatsLimit(int queryReportedRuleSt
483484
@Min(1)
484485
public int getRemoteTaskMaxCallbackThreads()
485486
{
486-
return remoteTaskMaxCallbackThreads;
487+
return remoteTaskMaxCallbackThreads.getThreadCount();
487488
}
488489

489490
@Config("query.remote-task.max-callback-threads")
490-
public QueryManagerConfig setRemoteTaskMaxCallbackThreads(int remoteTaskMaxCallbackThreads)
491+
public QueryManagerConfig setRemoteTaskMaxCallbackThreads(String remoteTaskMaxCallbackThreads)
491492
{
492-
this.remoteTaskMaxCallbackThreads = remoteTaskMaxCallbackThreads;
493+
this.remoteTaskMaxCallbackThreads = ThreadCount.valueOf(remoteTaskMaxCallbackThreads);
493494
return this;
494495
}
495496

0 commit comments

Comments
 (0)