From 33d39b394de1719cf1772bce1d302cafc200f764 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Tue, 28 Feb 2023 12:25:42 +0100 Subject: [PATCH] Migrate threads configuration to use ThreadCount New type was introduced in the airlift/units 1.10 --- .../java/io/airlift/units/ThreadCount.java | 187 ++++++++++++++++++ .../main/java/io/trino/FeaturesConfig.java | 9 +- .../trino/execution/QueryManagerConfig.java | 33 ++-- .../io/trino/execution/TaskManagerConfig.java | 92 +++++---- .../operator/DirectExchangeClientConfig.java | 17 +- .../io/trino/testing/LocalQueryRunner.java | 2 +- .../execution/TestQueryManagerConfig.java | 16 +- .../execution/TestTaskManagerConfig.java | 47 ++--- .../TestDirectExchangeClientConfig.java | 8 +- .../TestGenericPartitioningSpiller.java | 2 +- .../sql/analyzer/TestFeaturesConfig.java | 4 +- .../elasticsearch/ElasticsearchConfig.java | 10 +- .../TestElasticsearchConfig.java | 4 +- .../java/io/trino/plugin/hive/HiveConfig.java | 25 +-- .../glue/GlueHiveMetastoreConfig.java | 25 +-- .../io/trino/plugin/hive/TestHiveConfig.java | 12 +- .../glue/TestGlueHiveMetastoreConfig.java | 12 +- .../raptor/legacy/backup/BackupConfig.java | 17 +- .../legacy/metadata/ShardCleanerConfig.java | 9 +- .../legacy/storage/StorageManagerConfig.java | 27 ++- .../legacy/backup/TestBackupConfig.java | 8 +- .../metadata/TestShardCleanerConfig.java | 4 +- .../storage/TestStorageManagerConfig.java | 14 +- .../plugin/thrift/ThriftConnectorConfig.java | 9 +- .../thrift/TestThriftConnectorConfig.java | 4 +- plugin/trino-tpcds/pom.xml | 5 + .../io/trino/plugin/tpcds/TpcdsConfig.java | 7 +- pom.xml | 2 +- .../io/trino/verifier/VerifierConfig.java | 9 +- .../io/trino/verifier/TestVerifierConfig.java | 4 +- .../verifier/TestVerifierRewriteQueries.java | 4 +- 31 files changed, 417 insertions(+), 211 deletions(-) create mode 100644 client/trino-jdbc/src/test/java/io/airlift/units/ThreadCount.java diff --git a/client/trino-jdbc/src/test/java/io/airlift/units/ThreadCount.java b/client/trino-jdbc/src/test/java/io/airlift/units/ThreadCount.java new file mode 100644 index 0000000000000..719ea8b3e45fa --- /dev/null +++ b/client/trino-jdbc/src/test/java/io/airlift/units/ThreadCount.java @@ -0,0 +1,187 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.airlift.units; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.OptionalInt; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import static io.airlift.units.Preconditions.checkArgument; +import static java.lang.Math.min; +import static java.lang.Math.round; +import static java.lang.Math.toIntExact; +import static java.lang.String.format; +import static java.nio.file.Files.exists; +import static java.nio.file.Files.lines; + +// This class is a copy from airlift's units due to an inability to use +// newer units version in JDBC due to different JDK target. +// It is temporary solution until client and JDBC are moved to JDK 11+. +// This class is added to test classes, so it won't be a part of the jdbc driver. +public class ThreadCount + implements Comparable +{ + private static final String PER_CORE_SUFFIX = "C"; + private static final Supplier AVAILABLE_PROCESSORS = MachineInfo::getAvailablePhysicalProcessorCount; + private final int threadCount; + + ThreadCount(int threadCount) + { + checkArgument(threadCount >= 0, "Thread count cannot be negative"); + this.threadCount = threadCount; + } + + public int getThreadCount() + { + return threadCount; + } + + public static ThreadCount exactValueOf(int value) + { + return new ThreadCount(value); + } + + public static ThreadCount valueOf(String value) + { + if (value.endsWith(PER_CORE_SUFFIX)) { + float parsedMultiplier = parseFloat(value.substring(0, value.lastIndexOf(PER_CORE_SUFFIX)).trim()); + checkArgument(parsedMultiplier > 0, "Thread multiplier cannot be negative"); + float threadCount = parsedMultiplier * AVAILABLE_PROCESSORS.get(); + checkArgument(threadCount <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1"); + return new ThreadCount(round(threadCount)); + } + + return new ThreadCount(parseInteger(value)); + } + + public static ThreadCount boundedValueOf(String value, String minValue, String maxValue) + { + ThreadCount parsed = ThreadCount.valueOf(value); + ThreadCount min = ThreadCount.valueOf(minValue); + ThreadCount max = ThreadCount.valueOf(maxValue); + + if (parsed.compareTo(min) < 0) { + return min; + } + + if (parsed.compareTo(max) > 0) { + return max; + } + return parsed; + } + + private static float parseFloat(String value) + { + try { + return Float.parseFloat(value); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException(format("Cannot parse value '%s' as float", value), e); + } + } + + private static int parseInteger(String value) + { + try { + long parsed = Long.parseLong(value); + checkArgument(parsed <= Integer.MAX_VALUE, "Thread count is greater than 2^32 - 1"); + return toIntExact(parsed); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException(format("Cannot parse value '%s' as integer", value), e); + } + } + + @Override + public int compareTo(ThreadCount o) + { + return Integer.compare(threadCount, o.threadCount); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ThreadCount that = (ThreadCount) o; + return threadCount == that.threadCount; + } + + @Override + public int hashCode() + { + return threadCount; + } + + @Override + public String toString() + { + return (threadCount == 1) ? "1 thread" : (threadCount + " threads"); + } + + static final class MachineInfo + { + private static final Path CPU_INFO_PATH = Paths.get("/proc/cpuinfo"); + + // cache physical processor count, so that it's not queried multiple times during tests + private static volatile int physicalProcessorCount = -1; + + private MachineInfo() {} + + public static int getAvailablePhysicalProcessorCount() + { + if (physicalProcessorCount != -1) { + return physicalProcessorCount; + } + + String osArch = System.getProperty("os.arch"); + // logical core count (including container cpu quota if there is any) + int availableProcessorCount = Runtime.getRuntime().availableProcessors(); + int totalPhysicalProcessorCount = availableProcessorCount; + if ("amd64".equals(osArch) || "x86_64".equals(osArch)) { + OptionalInt procInfo = tryReadFromProcCpuinfo(); + if (procInfo.isPresent()) { + totalPhysicalProcessorCount = procInfo.getAsInt(); + } + } + + // cap available processor count to container cpu quota (if there is any). + physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount); + return physicalProcessorCount; + } + + private static OptionalInt tryReadFromProcCpuinfo() + { + if (!exists(CPU_INFO_PATH)) { + return OptionalInt.empty(); + } + + try (Stream lines = lines(CPU_INFO_PATH)) { + return OptionalInt.of(toIntExact(lines.filter(line -> + line.matches("^processor\\s+: \\d")).count())); + } + catch (IOException e) { + return OptionalInt.empty(); + } + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java index 9a172e069f58b..deea3e87e7ad8 100644 --- a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java +++ b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java @@ -22,6 +22,7 @@ import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; import io.airlift.units.MaxDataSize; +import io.airlift.units.ThreadCount; import io.trino.sql.analyzer.RegexLibrary; import jakarta.validation.constraints.DecimalMax; import jakarta.validation.constraints.DecimalMin; @@ -88,7 +89,7 @@ public class FeaturesConfig private boolean spillEnabled; private DataSize aggregationOperatorUnspillMemoryLimit = DataSize.of(4, DataSize.Unit.MEGABYTE); private List spillerSpillPaths = ImmutableList.of(); - private int spillerThreads = 4; + private ThreadCount spillerThreads = ThreadCount.exactValueOf(4); private double spillMaxUsedSpaceThreshold = 0.9; private double memoryRevokingTarget = 0.5; private double memoryRevokingThreshold = 0.9; @@ -257,14 +258,14 @@ public FeaturesConfig setSpillerSpillPaths(String spillPaths) @Min(1) public int getSpillerThreads() { - return spillerThreads; + return spillerThreads.getThreadCount(); } @Config("spiller-threads") @LegacyConfig("experimental.spiller-threads") - public FeaturesConfig setSpillerThreads(int spillerThreads) + public FeaturesConfig setSpillerThreads(String spillerThreads) { - this.spillerThreads = spillerThreads; + this.spillerThreads = ThreadCount.valueOf(spillerThreads); return this; } diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 8e8354ccd9c9d..784aa9cccd3ec 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -21,6 +21,7 @@ import io.airlift.units.Duration; import io.airlift.units.MinDataSize; import io.airlift.units.MinDuration; +import io.airlift.units.ThreadCount; import io.trino.operator.RetryPolicy; import jakarta.validation.constraints.DecimalMin; import jakarta.validation.constraints.Max; @@ -73,15 +74,15 @@ public class QueryManagerConfig private Duration clientTimeout = new Duration(5, TimeUnit.MINUTES); - private int queryManagerExecutorPoolSize = 5; - private int queryExecutorPoolSize = 1000; - private int maxStateMachineCallbackThreads = 5; + private ThreadCount queryManagerExecutorPoolSize = ThreadCount.exactValueOf(5); + private ThreadCount queryExecutorPoolSize = ThreadCount.exactValueOf(1000); + private ThreadCount maxStateMachineCallbackThreads = ThreadCount.exactValueOf(5); /** * default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()} */ private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES); - private int remoteTaskMaxCallbackThreads = 1000; + private ThreadCount remoteTaskMaxCallbackThreads = ThreadCount.exactValueOf(1000); private String queryExecutionPolicy = "phased"; private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS); @@ -350,40 +351,40 @@ public QueryManagerConfig setClientTimeout(Duration clientTimeout) @Min(1) public int getQueryManagerExecutorPoolSize() { - return queryManagerExecutorPoolSize; + return queryManagerExecutorPoolSize.getThreadCount(); } @Config("query.manager-executor-pool-size") - public QueryManagerConfig setQueryManagerExecutorPoolSize(int queryManagerExecutorPoolSize) + public QueryManagerConfig setQueryManagerExecutorPoolSize(String queryManagerExecutorPoolSize) { - this.queryManagerExecutorPoolSize = queryManagerExecutorPoolSize; + this.queryManagerExecutorPoolSize = ThreadCount.valueOf(queryManagerExecutorPoolSize); return this; } @Min(1) public int getQueryExecutorPoolSize() { - return queryExecutorPoolSize; + return queryExecutorPoolSize.getThreadCount(); } @Config("query.executor-pool-size") - public QueryManagerConfig setQueryExecutorPoolSize(int queryExecutorPoolSize) + public QueryManagerConfig setQueryExecutorPoolSize(String queryExecutorPoolSize) { - this.queryExecutorPoolSize = queryExecutorPoolSize; + this.queryExecutorPoolSize = ThreadCount.valueOf(queryExecutorPoolSize); return this; } @Min(1) public int getMaxStateMachineCallbackThreads() { - return maxStateMachineCallbackThreads; + return maxStateMachineCallbackThreads.getThreadCount(); } @Config("query.max-state-machine-callback-threads") @ConfigDescription("The maximum number of threads allowed to run query and stage state machine listener callbacks concurrently for each query") - public QueryManagerConfig setMaxStateMachineCallbackThreads(int maxStateMachineCallbackThreads) + public QueryManagerConfig setMaxStateMachineCallbackThreads(String maxStateMachineCallbackThreads) { - this.maxStateMachineCallbackThreads = maxStateMachineCallbackThreads; + this.maxStateMachineCallbackThreads = ThreadCount.valueOf(maxStateMachineCallbackThreads); return this; } @@ -483,13 +484,13 @@ public QueryManagerConfig setQueryReportedRuleStatsLimit(int queryReportedRuleSt @Min(1) public int getRemoteTaskMaxCallbackThreads() { - return remoteTaskMaxCallbackThreads; + return remoteTaskMaxCallbackThreads.getThreadCount(); } @Config("query.remote-task.max-callback-threads") - public QueryManagerConfig setRemoteTaskMaxCallbackThreads(int remoteTaskMaxCallbackThreads) + public QueryManagerConfig setRemoteTaskMaxCallbackThreads(String remoteTaskMaxCallbackThreads) { - this.remoteTaskMaxCallbackThreads = remoteTaskMaxCallbackThreads; + this.remoteTaskMaxCallbackThreads = ThreadCount.valueOf(remoteTaskMaxCallbackThreads); return this; } diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java index 1738003830edb..82d329f9722de 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java @@ -22,6 +22,7 @@ import io.airlift.units.Duration; import io.airlift.units.MaxDuration; import io.airlift.units.MinDuration; +import io.airlift.units.ThreadCount; import io.trino.util.PowerOfTwo; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; @@ -29,10 +30,7 @@ import java.math.BigDecimal; import java.util.concurrent.TimeUnit; -import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount; -import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo; -import static java.lang.Math.max; -import static java.lang.Math.min; +import static com.google.common.math.IntMath.ceilingPowerOfTwo; @DefunctConfig({ "experimental.big-query-max-task-memory", @@ -53,7 +51,7 @@ public class TaskManagerConfig private DataSize maxLocalExchangeBufferSize = DataSize.of(128, Unit.MEGABYTE); private DataSize maxIndexMemoryUsage = DataSize.of(64, Unit.MEGABYTE); private boolean shareIndexLoading; - private int maxWorkerThreads = Runtime.getRuntime().availableProcessors() * 2; + private ThreadCount maxWorkerThreads = ThreadCount.valueOf("2C"); private Integer minDrivers; private Integer initialSplitsPerNode; private int minDriversPerTask = 3; @@ -63,7 +61,7 @@ public class TaskManagerConfig private DataSize sinkMaxBufferSize = DataSize.of(32, Unit.MEGABYTE); private DataSize sinkMaxBroadcastBufferSize = DataSize.of(200, Unit.MEGABYTE); private DataSize maxPagePartitioningBufferSize = DataSize.of(32, Unit.MEGABYTE); - private int pagePartitioningBufferPoolSize = 8; + private ThreadCount pagePartitioningBufferPoolSize = ThreadCount.exactValueOf(8); private Duration clientTimeout = new Duration(2, TimeUnit.MINUTES); private Duration infoMaxAge = new Duration(15, TimeUnit.MINUTES); @@ -82,14 +80,17 @@ public class TaskManagerConfig // because preferred write partitioning is always enabled for local exchange thus partitioned inserts will never // use this property. Hence, there is no risk in terms of more numbers of physical writers which can cause high // resource utilization. - private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32) * 2; + + private ThreadCount scaleWritersMaxWriterCount = ThreadCount.boundedValueOf(nextPowerOfTwo(), "2", "64"); + private int writerCount = 1; // Default value of partitioned task writer count should be above 1, otherwise it can create a plan // with a single gather exchange node on the coordinator due to a single available processor. Whereas, // on the worker nodes due to more available processors, the default value could be above 1. Therefore, // it can cause error due to config mismatch during execution. Additionally, cap it to 32 in order to // avoid small pages produced by local partitioning exchanges. - private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32) * 2; + private ThreadCount partitionedWriterCount = ThreadCount.boundedValueOf(nextPowerOfTwo(), "2", "32"); + // Default value of task concurrency should be above 1, otherwise it can create a plan with a single gather // exchange node on the coordinator due to a single available processor. Whereas, on the worker nodes due to // more available processors, the default value could be above 1. Therefore, it can cause error due to config @@ -98,12 +99,12 @@ public class TaskManagerConfig /** * default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}} */ - private int taskConcurrency = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32); - private int httpResponseThreads = 100; - private int httpTimeoutThreads = 3; + private ThreadCount taskConcurrency = ThreadCount.boundedValueOf(nextPowerOfTwo(), "2", "32"); + private ThreadCount httpResponseThreads = ThreadCount.exactValueOf(100); + private ThreadCount httpTimeoutThreads = ThreadCount.exactValueOf(3); - private int taskNotificationThreads = 5; - private int taskYieldThreads = 3; + private ThreadCount taskNotificationThreads = ThreadCount.exactValueOf(5); + private ThreadCount taskYieldThreads = ThreadCount.exactValueOf(3); private BigDecimal levelTimeMultiplier = new BigDecimal(2.0); @@ -272,14 +273,14 @@ public TaskManagerConfig setLevelTimeMultiplier(BigDecimal levelTimeMultiplier) @Min(1) public int getMaxWorkerThreads() { - return maxWorkerThreads; + return maxWorkerThreads.getThreadCount(); } @LegacyConfig("task.shard.max-threads") @Config("task.max-worker-threads") - public TaskManagerConfig setMaxWorkerThreads(int maxWorkerThreads) + public TaskManagerConfig setMaxWorkerThreads(String maxWorkerThreads) { - this.maxWorkerThreads = maxWorkerThreads; + this.maxWorkerThreads = ThreadCount.valueOf(maxWorkerThreads); return this; } @@ -287,7 +288,7 @@ public TaskManagerConfig setMaxWorkerThreads(int maxWorkerThreads) public int getInitialSplitsPerNode() { if (initialSplitsPerNode == null) { - return maxWorkerThreads; + return maxWorkerThreads.getThreadCount(); } return initialSplitsPerNode; } @@ -316,7 +317,7 @@ public TaskManagerConfig setSplitConcurrencyAdjustmentInterval(Duration splitCon public int getMinDrivers() { if (minDrivers == null) { - return 2 * maxWorkerThreads; + return 2 * maxWorkerThreads.getThreadCount(); } return minDrivers; } @@ -397,14 +398,14 @@ public TaskManagerConfig setMaxPagePartitioningBufferSize(DataSize size) @Min(0) public int getPagePartitioningBufferPoolSize() { - return pagePartitioningBufferPoolSize; + return pagePartitioningBufferPoolSize.getThreadCount(); } @Config("driver.page-partitioning-buffer-pool-size") @ConfigDescription("Maximum number of free buffers in the per task partitioned page buffer pool. Setting this to zero effectively disables the pool") - public TaskManagerConfig setPagePartitioningBufferPoolSize(int pagePartitioningBufferPoolSize) + public TaskManagerConfig setPagePartitioningBufferPoolSize(String pagePartitioningBufferPoolSize) { - this.pagePartitioningBufferPoolSize = pagePartitioningBufferPoolSize; + this.pagePartitioningBufferPoolSize = ThreadCount.valueOf(pagePartitioningBufferPoolSize); return this; } @@ -451,14 +452,14 @@ public TaskManagerConfig setScaleWritersEnabled(boolean scaleWritersEnabled) @Min(1) public int getScaleWritersMaxWriterCount() { - return scaleWritersMaxWriterCount; + return scaleWritersMaxWriterCount.getThreadCount(); } @Config("task.scale-writers.max-writer-count") @ConfigDescription("Maximum number of writers per task up to which scaling will happen if task.scale-writers.enabled is set") - public TaskManagerConfig setScaleWritersMaxWriterCount(int scaleWritersMaxWriterCount) + public TaskManagerConfig setScaleWritersMaxWriterCount(String scaleWritersMaxWriterCount) { - this.scaleWritersMaxWriterCount = scaleWritersMaxWriterCount; + this.scaleWritersMaxWriterCount = ThreadCount.valueOf(scaleWritersMaxWriterCount); return this; } @@ -480,14 +481,14 @@ public TaskManagerConfig setWriterCount(int writerCount) @PowerOfTwo public int getPartitionedWriterCount() { - return partitionedWriterCount; + return partitionedWriterCount.getThreadCount(); } @Config("task.partitioned-writer-count") @ConfigDescription("Number of local parallel table writers per task when prefer partitioning is used") - public TaskManagerConfig setPartitionedWriterCount(int partitionedWriterCount) + public TaskManagerConfig setPartitionedWriterCount(String partitionedWriterCount) { - this.partitionedWriterCount = partitionedWriterCount; + this.partitionedWriterCount = ThreadCount.valueOf(partitionedWriterCount); return this; } @@ -495,68 +496,68 @@ public TaskManagerConfig setPartitionedWriterCount(int partitionedWriterCount) @PowerOfTwo public int getTaskConcurrency() { - return taskConcurrency; + return taskConcurrency.getThreadCount(); } @Config("task.concurrency") @ConfigDescription("Default number of local parallel jobs per worker") - public TaskManagerConfig setTaskConcurrency(int taskConcurrency) + public TaskManagerConfig setTaskConcurrency(String taskConcurrency) { - this.taskConcurrency = taskConcurrency; + this.taskConcurrency = ThreadCount.valueOf(taskConcurrency); return this; } @Min(1) public int getHttpResponseThreads() { - return httpResponseThreads; + return httpResponseThreads.getThreadCount(); } @Config("task.http-response-threads") - public TaskManagerConfig setHttpResponseThreads(int httpResponseThreads) + public TaskManagerConfig setHttpResponseThreads(String httpResponseThreads) { - this.httpResponseThreads = httpResponseThreads; + this.httpResponseThreads = ThreadCount.valueOf(httpResponseThreads); return this; } @Min(1) public int getHttpTimeoutThreads() { - return httpTimeoutThreads; + return httpTimeoutThreads.getThreadCount(); } @Config("task.http-timeout-threads") - public TaskManagerConfig setHttpTimeoutThreads(int httpTimeoutThreads) + public TaskManagerConfig setHttpTimeoutThreads(String httpTimeoutThreads) { - this.httpTimeoutThreads = httpTimeoutThreads; + this.httpTimeoutThreads = ThreadCount.valueOf(httpTimeoutThreads); return this; } @Min(1) public int getTaskNotificationThreads() { - return taskNotificationThreads; + return taskNotificationThreads.getThreadCount(); } @Config("task.task-notification-threads") @ConfigDescription("Number of threads used for internal task event notifications") - public TaskManagerConfig setTaskNotificationThreads(int taskNotificationThreads) + public TaskManagerConfig setTaskNotificationThreads(String taskNotificationThreads) { - this.taskNotificationThreads = taskNotificationThreads; + this.taskNotificationThreads = ThreadCount.valueOf(taskNotificationThreads); return this; } @Min(1) public int getTaskYieldThreads() { - return taskYieldThreads; + return taskYieldThreads.getThreadCount(); } @Config("task.task-yield-threads") @ConfigDescription("Number of threads used for setting yield signals") - public TaskManagerConfig setTaskYieldThreads(int taskYieldThreads) + public TaskManagerConfig setTaskYieldThreads(String taskYieldThreads) { - this.taskYieldThreads = taskYieldThreads; + this.taskYieldThreads = ThreadCount.valueOf(taskYieldThreads); return this; } @@ -616,6 +617,11 @@ public TaskManagerConfig setInterruptStuckSplitTasksDetectionInterval(Duration i public void applyFaultTolerantExecutionDefaults() { - taskConcurrency = 8; + taskConcurrency = ThreadCount.exactValueOf(8); + } + + private static String nextPowerOfTwo() + { + return String.valueOf(ceilingPowerOfTwo(ThreadCount.valueOf("1C").getThreadCount())); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientConfig.java b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientConfig.java index 615f5ed75a4bf..d144cec9a5e45 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientConfig.java +++ b/core/trino-main/src/main/java/io/trino/operator/DirectExchangeClientConfig.java @@ -21,6 +21,7 @@ import io.airlift.units.Duration; import io.airlift.units.MinDataSize; import io.airlift.units.MinDuration; +import io.airlift.units.ThreadCount; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; @@ -33,8 +34,8 @@ public class DirectExchangeClientConfig private int concurrentRequestMultiplier = 3; private Duration maxErrorDuration = new Duration(5, TimeUnit.MINUTES); private DataSize maxResponseSize = new HttpClientConfig().getMaxContentLength(); - private int clientThreads = 25; - private int pageBufferClientMaxCallbackThreads = 25; + private ThreadCount clientThreads = ThreadCount.exactValueOf(25); + private ThreadCount pageBufferClientMaxCallbackThreads = ThreadCount.exactValueOf(25); private boolean acknowledgePages = true; private DataSize deduplicationBufferSize = DataSize.of(32, Unit.MEGABYTE); @@ -95,26 +96,26 @@ public DirectExchangeClientConfig setMaxResponseSize(DataSize maxResponseSize) @Min(1) public int getClientThreads() { - return clientThreads; + return clientThreads.getThreadCount(); } @Config("exchange.client-threads") - public DirectExchangeClientConfig setClientThreads(int clientThreads) + public DirectExchangeClientConfig setClientThreads(String clientThreads) { - this.clientThreads = clientThreads; + this.clientThreads = ThreadCount.valueOf(clientThreads); return this; } @Min(1) public int getPageBufferClientMaxCallbackThreads() { - return pageBufferClientMaxCallbackThreads; + return pageBufferClientMaxCallbackThreads.getThreadCount(); } @Config("exchange.page-buffer-client.max-callback-threads") - public DirectExchangeClientConfig setPageBufferClientMaxCallbackThreads(int pageBufferClientMaxCallbackThreads) + public DirectExchangeClientConfig setPageBufferClientMaxCallbackThreads(String pageBufferClientMaxCallbackThreads) { - this.pageBufferClientMaxCallbackThreads = pageBufferClientMaxCallbackThreads; + this.pageBufferClientMaxCallbackThreads = ThreadCount.valueOf(pageBufferClientMaxCallbackThreads); return this; } diff --git a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java index 2ed2ebaf2f990..8176281f8e635 100644 --- a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java +++ b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java @@ -350,7 +350,7 @@ private LocalQueryRunner( checkArgument(defaultSession.getTransactionId().isEmpty() || !withInitialTransaction, "Already in transaction"); Tracer tracer = noopTracer(); - this.taskManagerConfig = new TaskManagerConfig().setTaskConcurrency(4); + this.taskManagerConfig = new TaskManagerConfig().setTaskConcurrency("4"); requireNonNull(nodeSpillConfig, "nodeSpillConfig is null"); this.maxSpillPerNode = nodeSpillConfig.getMaxSpillPerNode(); this.queryMaxSpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode(); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java index 73bd6afd9b4b5..63dd2cc54377f 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java @@ -53,11 +53,11 @@ public void testDefaults() .setMaxHashPartitionCount(100) .setMinHashPartitionCount(4) .setMinHashPartitionCountForWrite(50) - .setQueryManagerExecutorPoolSize(5) - .setQueryExecutorPoolSize(1000) - .setMaxStateMachineCallbackThreads(5) + .setQueryManagerExecutorPoolSize("5") + .setQueryExecutorPoolSize("1000") + .setMaxStateMachineCallbackThreads("5") .setRemoteTaskMaxErrorDuration(new Duration(5, MINUTES)) - .setRemoteTaskMaxCallbackThreads(1000) + .setRemoteTaskMaxCallbackThreads("1000") .setQueryExecutionPolicy("phased") .setQueryMaxRunTime(new Duration(100, DAYS)) .setQueryMaxExecutionTime(new Duration(100, DAYS)) @@ -194,11 +194,11 @@ public void testExplicitPropertyMappings() .setMaxHashPartitionCount(16) .setMinHashPartitionCount(2) .setMinHashPartitionCountForWrite(88) - .setQueryManagerExecutorPoolSize(11) - .setQueryExecutorPoolSize(111) - .setMaxStateMachineCallbackThreads(112) + .setQueryManagerExecutorPoolSize("11") + .setQueryExecutorPoolSize("111") + .setMaxStateMachineCallbackThreads("112") .setRemoteTaskMaxErrorDuration(new Duration(60, SECONDS)) - .setRemoteTaskMaxCallbackThreads(10) + .setRemoteTaskMaxCallbackThreads("10") .setQueryExecutionPolicy("foo-bar-execution-policy") .setQueryMaxRunTime(new Duration(2, HOURS)) .setQueryMaxExecutionTime(new Duration(3, HOURS)) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java index 97628cd9b274b..9e350029744a8 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java @@ -16,26 +16,27 @@ import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.airlift.units.ThreadCount; import org.testng.annotations.Test; import java.math.BigDecimal; import java.util.Map; import java.util.concurrent.TimeUnit; +import static com.google.common.math.IntMath.ceilingPowerOfTwo; import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.airlift.units.DataSize.Unit; import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount; -import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo; import static java.lang.Math.max; import static java.lang.Math.min; public class TestTaskManagerConfig { - private static final int DEFAULT_PROCESSOR_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32); - private static final int DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = min(getAvailablePhysicalProcessorCount(), 32) * 2; - private static final int DEFAULT_PARTITIONED_WRITER_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32) * 2; + private static final String DEFAULT_PROCESSOR_COUNT = String.valueOf(min(max(ceilingPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32)); + private static final String DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = String.valueOf(min(ceilingPowerOfTwo(getAvailablePhysicalProcessorCount()), 32)); + private static final String DEFAULT_PARTITIONED_WRITER_COUNT = String.valueOf(min(max(ceilingPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32)); @Test public void testDefaults() @@ -48,8 +49,8 @@ public void testDefaults() .setTaskTerminationTimeout(new Duration(1, TimeUnit.MINUTES)) .setPerOperatorCpuTimerEnabled(true) .setTaskCpuTimerEnabled(true) - .setMaxWorkerThreads(Runtime.getRuntime().availableProcessors() * 2) - .setMinDrivers(Runtime.getRuntime().availableProcessors() * 2 * 2) + .setMaxWorkerThreads("2C") + .setMinDrivers(ThreadCount.valueOf("2C").getThreadCount() * 2) .setMinDriversPerTask(3) .setMaxDriversPerTask(Integer.MAX_VALUE) .setInfoMaxAge(new Duration(15, TimeUnit.MINUTES)) @@ -62,16 +63,16 @@ public void testDefaults() .setSinkMaxBufferSize(DataSize.of(32, Unit.MEGABYTE)) .setSinkMaxBroadcastBufferSize(DataSize.of(200, Unit.MEGABYTE)) .setMaxPagePartitioningBufferSize(DataSize.of(32, Unit.MEGABYTE)) - .setPagePartitioningBufferPoolSize(8) + .setPagePartitioningBufferPoolSize("8") .setScaleWritersEnabled(true) .setScaleWritersMaxWriterCount(DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT) .setWriterCount(1) .setPartitionedWriterCount(DEFAULT_PARTITIONED_WRITER_COUNT) .setTaskConcurrency(DEFAULT_PROCESSOR_COUNT) - .setHttpResponseThreads(100) - .setHttpTimeoutThreads(3) - .setTaskNotificationThreads(5) - .setTaskYieldThreads(3) + .setHttpResponseThreads("100") + .setHttpTimeoutThreads("3") + .setTaskNotificationThreads("5") + .setTaskYieldThreads("3") .setLevelTimeMultiplier(new BigDecimal("2")) .setStatisticsCpuTimerEnabled(true) .setInterruptStuckSplitTasksEnabled(true) @@ -83,9 +84,9 @@ public void testDefaults() @Test public void testExplicitPropertyMappings() { - int processorCount = DEFAULT_PROCESSOR_COUNT == 32 ? 16 : 32; - int maxWriterCount = DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT == 32 ? 16 : 32; - int partitionedWriterCount = DEFAULT_PARTITIONED_WRITER_COUNT == 64 ? 32 : 64; + String processorCount = DEFAULT_PROCESSOR_COUNT.contentEquals("32") ? "16" : "32"; + String maxWriterCount = DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT.contentEquals("32") ? "16" : "32"; + String partitionedWriterCount = DEFAULT_PARTITIONED_WRITER_COUNT.contentEquals("64") ? "32" : "64"; Map properties = ImmutableMap.builder() .put("task.initial-splits-per-node", "1") .put("task.split-concurrency-adjustment-interval", "1s") @@ -110,10 +111,10 @@ public void testExplicitPropertyMappings() .put("driver.max-page-partitioning-buffer-size", "40MB") .put("driver.page-partitioning-buffer-pool-size", "0") .put("task.scale-writers.enabled", "false") - .put("task.scale-writers.max-writer-count", Integer.toString(maxWriterCount)) + .put("task.scale-writers.max-writer-count", maxWriterCount) .put("task.writer-count", "4") - .put("task.partitioned-writer-count", Integer.toString(partitionedWriterCount)) - .put("task.concurrency", Integer.toString(processorCount)) + .put("task.partitioned-writer-count", partitionedWriterCount) + .put("task.concurrency", processorCount) .put("task.http-response-threads", "4") .put("task.http-timeout-threads", "10") .put("task.task-notification-threads", "13") @@ -139,7 +140,7 @@ public void testExplicitPropertyMappings() .setMaxPartialAggregationMemoryUsage(DataSize.of(32, Unit.MEGABYTE)) .setMaxPartialTopNMemory(DataSize.of(32, Unit.MEGABYTE)) .setMaxLocalExchangeBufferSize(DataSize.of(33, Unit.MEGABYTE)) - .setMaxWorkerThreads(3) + .setMaxWorkerThreads("3") .setMinDrivers(2) .setMinDriversPerTask(5) .setMaxDriversPerTask(13) @@ -148,16 +149,16 @@ public void testExplicitPropertyMappings() .setSinkMaxBufferSize(DataSize.of(42, Unit.MEGABYTE)) .setSinkMaxBroadcastBufferSize(DataSize.of(128, Unit.MEGABYTE)) .setMaxPagePartitioningBufferSize(DataSize.of(40, Unit.MEGABYTE)) - .setPagePartitioningBufferPoolSize(0) + .setPagePartitioningBufferPoolSize("0") .setScaleWritersEnabled(false) .setScaleWritersMaxWriterCount(maxWriterCount) .setWriterCount(4) .setPartitionedWriterCount(partitionedWriterCount) .setTaskConcurrency(processorCount) - .setHttpResponseThreads(4) - .setHttpTimeoutThreads(10) - .setTaskNotificationThreads(13) - .setTaskYieldThreads(8) + .setHttpResponseThreads("4") + .setHttpTimeoutThreads("10") + .setTaskNotificationThreads("13") + .setTaskYieldThreads("8") .setLevelTimeMultiplier(new BigDecimal("2.1")) .setStatisticsCpuTimerEnabled(false) .setInterruptStuckSplitTasksEnabled(false) diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClientConfig.java b/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClientConfig.java index 75d66074d33b6..f0a4ba11d52d8 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClientConfig.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClientConfig.java @@ -37,8 +37,8 @@ public void testDefaults() .setConcurrentRequestMultiplier(3) .setMaxErrorDuration(new Duration(5, TimeUnit.MINUTES)) .setMaxResponseSize(new HttpClientConfig().getMaxContentLength()) - .setPageBufferClientMaxCallbackThreads(25) - .setClientThreads(25) + .setPageBufferClientMaxCallbackThreads("25") + .setClientThreads("25") .setAcknowledgePages(true) .setDeduplicationBufferSize(DataSize.of(32, Unit.MEGABYTE))); } @@ -62,8 +62,8 @@ public void testExplicitPropertyMappings() .setConcurrentRequestMultiplier(13) .setMaxErrorDuration(new Duration(33, TimeUnit.SECONDS)) .setMaxResponseSize(DataSize.of(1, Unit.MEGABYTE)) - .setClientThreads(2) - .setPageBufferClientMaxCallbackThreads(16) + .setClientThreads("2") + .setPageBufferClientMaxCallbackThreads("16") .setAcknowledgePages(false) .setDeduplicationBufferSize(DataSize.of(2, Unit.MEGABYTE)); diff --git a/core/trino-main/src/test/java/io/trino/spiller/TestGenericPartitioningSpiller.java b/core/trino-main/src/test/java/io/trino/spiller/TestGenericPartitioningSpiller.java index c16fcbac77ca6..106a75391cb85 100644 --- a/core/trino-main/src/test/java/io/trino/spiller/TestGenericPartitioningSpiller.java +++ b/core/trino-main/src/test/java/io/trino/spiller/TestGenericPartitioningSpiller.java @@ -73,7 +73,7 @@ public void setUp() tempDirectory = createTempDirectory(getClass().getSimpleName()); FeaturesConfig featuresConfig = new FeaturesConfig(); featuresConfig.setSpillerSpillPaths(tempDirectory.toString()); - featuresConfig.setSpillerThreads(8); + featuresConfig.setSpillerThreads("8"); featuresConfig.setSpillMaxUsedSpaceThreshold(1.0); SingleStreamSpillerFactory singleStreamSpillerFactory = new FileSingleStreamSpillerFactory( new TestingBlockEncodingSerde(), diff --git a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java index 2226d98ac87f3..6fbe0e7f2dd7d 100644 --- a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java +++ b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java @@ -46,7 +46,7 @@ public void testDefaults() .setSpillEnabled(false) .setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("4MB")) .setSpillerSpillPaths("") - .setSpillerThreads(4) + .setSpillerThreads("4") .setSpillMaxUsedSpaceThreshold(0.9) .setMemoryRevokingThreshold(0.9) .setMemoryRevokingTarget(0.5) @@ -113,7 +113,7 @@ public void testExplicitPropertyMappings() .setSpillEnabled(true) .setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("100MB")) .setSpillerSpillPaths("/tmp/custom/spill/path1,/tmp/custom/spill/path2") - .setSpillerThreads(42) + .setSpillerThreads("42") .setSpillMaxUsedSpaceThreshold(0.8) .setMemoryRevokingThreshold(0.2) .setMemoryRevokingTarget(0.8) diff --git a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java index 339dd233e6af5..e7a523882a9ce 100644 --- a/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java +++ b/plugin/trino-elasticsearch/src/main/java/io/trino/plugin/elasticsearch/ElasticsearchConfig.java @@ -20,6 +20,7 @@ import io.airlift.configuration.validation.FileExists; import io.airlift.units.Duration; import io.airlift.units.MinDuration; +import io.airlift.units.ThreadCount; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; @@ -66,7 +67,7 @@ public enum Security private Duration maxRetryTime = new Duration(30, SECONDS); private Duration nodeRefreshInterval = new Duration(1, MINUTES); private int maxHttpConnections = 25; - private int httpThreadCount = Runtime.getRuntime().availableProcessors(); + private ThreadCount httpThreadCount = ThreadCount.valueOf("1C"); private boolean tlsEnabled; private File keystorePath; @@ -247,16 +248,15 @@ public int getMaxHttpConnections() @Config("elasticsearch.http-thread-count") @ConfigDescription("Number of threads handling HTTP connections to Elasticsearch") - public ElasticsearchConfig setHttpThreadCount(int count) + public ElasticsearchConfig setHttpThreadCount(String count) { - this.httpThreadCount = count; + this.httpThreadCount = ThreadCount.valueOf(count); return this; } - @NotNull public int getHttpThreadCount() { - return httpThreadCount; + return httpThreadCount.getThreadCount(); } public boolean isTlsEnabled() diff --git a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchConfig.java b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchConfig.java index 9b3b8081b24cc..418494dfd7503 100644 --- a/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchConfig.java +++ b/plugin/trino-elasticsearch/src/test/java/io/trino/plugin/elasticsearch/TestElasticsearchConfig.java @@ -49,7 +49,7 @@ public void testDefaults() .setMaxRetryTime(new Duration(30, SECONDS)) .setNodeRefreshInterval(new Duration(1, MINUTES)) .setMaxHttpConnections(25) - .setHttpThreadCount(Runtime.getRuntime().availableProcessors()) + .setHttpThreadCount("1C") .setTlsEnabled(false) .setKeystorePath(null) .setKeystorePassword(null) @@ -104,7 +104,7 @@ public void testExplicitPropertyMappings() .setMaxRetryTime(new Duration(10, SECONDS)) .setNodeRefreshInterval(new Duration(10, MINUTES)) .setMaxHttpConnections(100) - .setHttpThreadCount(30) + .setHttpThreadCount("30") .setTlsEnabled(true) .setKeystorePath(keystoreFile.toFile()) .setKeystorePassword("keystore-password") diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index 626cfadde1977..a9b6229295093 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -24,6 +24,7 @@ import io.airlift.units.Duration; import io.airlift.units.MaxDataSize; import io.airlift.units.MinDataSize; +import io.airlift.units.ThreadCount; import io.trino.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior; import jakarta.annotation.Nullable; import jakarta.validation.constraints.AssertTrue; @@ -73,7 +74,7 @@ public class HiveConfig private int maxPartitionsForEagerLoad = 100_000; private int maxOutstandingSplits = 1_000; private DataSize maxOutstandingSplitsSize = DataSize.of(256, MEGABYTE); - private int maxSplitIteratorThreads = 1_000; + private ThreadCount maxSplitIteratorThreads = ThreadCount.exactValueOf(1_000); private int minPartitionBatchSize = 10; private int maxPartitionBatchSize = 100; private int maxInitialSplits = 200; @@ -103,7 +104,7 @@ public class HiveConfig // to avoid deleting those files if Trino is unable to check. private boolean deleteSchemaLocationsFallback; private int maxPartitionsPerWriter = 100; - private int writeValidationThreads = 16; + private ThreadCount writeValidationThreads = ThreadCount.exactValueOf(16); private boolean validateBucketing = true; private boolean parallelPartitionedBucketedWrites = true; @@ -150,7 +151,7 @@ public class HiveConfig private boolean hiveViewsRunAsInvoker; private Optional hiveTransactionHeartbeatInterval = Optional.empty(); - private int hiveTransactionHeartbeatThreads = 5; + private ThreadCount hiveTransactionHeartbeatThreads = ThreadCount.exactValueOf(5); private boolean allowRegisterPartition; private boolean queryPartitionFilterRequired; @@ -432,13 +433,13 @@ public HiveConfig setMaxOutstandingSplitsSize(DataSize maxOutstandingSplits) @Min(1) public int getMaxSplitIteratorThreads() { - return maxSplitIteratorThreads; + return maxSplitIteratorThreads.getThreadCount(); } @Config("hive.max-split-iterator-threads") - public HiveConfig setMaxSplitIteratorThreads(int maxSplitIteratorThreads) + public HiveConfig setMaxSplitIteratorThreads(String maxSplitIteratorThreads) { - this.maxSplitIteratorThreads = maxSplitIteratorThreads; + this.maxSplitIteratorThreads = ThreadCount.valueOf(maxSplitIteratorThreads); return this; } @@ -594,14 +595,14 @@ public HiveConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter) public int getWriteValidationThreads() { - return writeValidationThreads; + return writeValidationThreads.getThreadCount(); } @Config("hive.write-validation-threads") @ConfigDescription("Number of threads used for verifying data after a write") - public HiveConfig setWriteValidationThreads(int writeValidationThreads) + public HiveConfig setWriteValidationThreads(String writeValidationThreads) { - this.writeValidationThreads = writeValidationThreads; + this.writeValidationThreads = ThreadCount.valueOf(writeValidationThreads); return this; } @@ -1082,14 +1083,14 @@ public Optional getHiveTransactionHeartbeatInterval() public int getHiveTransactionHeartbeatThreads() { - return hiveTransactionHeartbeatThreads; + return hiveTransactionHeartbeatThreads.getThreadCount(); } @Config("hive.transaction-heartbeat-threads") @ConfigDescription("Number of threads to run in the Hive transaction heartbeat service") - public HiveConfig setHiveTransactionHeartbeatThreads(int hiveTransactionHeartbeatThreads) + public HiveConfig setHiveTransactionHeartbeatThreads(String hiveTransactionHeartbeatThreads) { - this.hiveTransactionHeartbeatThreads = hiveTransactionHeartbeatThreads; + this.hiveTransactionHeartbeatThreads = ThreadCount.valueOf(hiveTransactionHeartbeatThreads); return this; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastoreConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastoreConfig.java index 9d7873f730872..8c2b1711241eb 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastoreConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastoreConfig.java @@ -17,6 +17,7 @@ import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.ConfigSecuritySensitive; import io.airlift.configuration.DefunctConfig; +import io.airlift.units.ThreadCount; import jakarta.annotation.PostConstruct; import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; @@ -44,9 +45,9 @@ public class GlueHiveMetastoreConfig private Optional awsCredentialsProvider = Optional.empty(); private Optional catalogId = Optional.empty(); private int partitionSegments = 5; - private int getPartitionThreads = 20; - private int readStatisticsThreads = 5; - private int writeStatisticsThreads = 20; + private ThreadCount getPartitionThreads = ThreadCount.exactValueOf(20); + private ThreadCount readStatisticsThreads = ThreadCount.exactValueOf(5); + private ThreadCount writeStatisticsThreads = ThreadCount.exactValueOf(20); private boolean assumeCanonicalPartitionKeys; public Optional getGlueRegion() @@ -265,14 +266,14 @@ public GlueHiveMetastoreConfig setPartitionSegments(int partitionSegments) @Min(1) public int getGetPartitionThreads() { - return getPartitionThreads; + return getPartitionThreads.getThreadCount(); } @Config("hive.metastore.glue.get-partition-threads") @ConfigDescription("Number of threads for parallel partition fetches from Glue") - public GlueHiveMetastoreConfig setGetPartitionThreads(int getPartitionThreads) + public GlueHiveMetastoreConfig setGetPartitionThreads(String getPartitionThreads) { - this.getPartitionThreads = getPartitionThreads; + this.getPartitionThreads = ThreadCount.valueOf(getPartitionThreads); return this; } @@ -292,28 +293,28 @@ public GlueHiveMetastoreConfig setAssumeCanonicalPartitionKeys(boolean assumeCan @Min(1) public int getReadStatisticsThreads() { - return readStatisticsThreads; + return readStatisticsThreads.getThreadCount(); } @Config("hive.metastore.glue.read-statistics-threads") @ConfigDescription("Number of threads for parallel statistics reads from Glue") - public GlueHiveMetastoreConfig setReadStatisticsThreads(int getReadStatisticsThreads) + public GlueHiveMetastoreConfig setReadStatisticsThreads(String getReadStatisticsThreads) { - this.readStatisticsThreads = getReadStatisticsThreads; + this.readStatisticsThreads = ThreadCount.valueOf(getReadStatisticsThreads); return this; } @Min(1) public int getWriteStatisticsThreads() { - return writeStatisticsThreads; + return writeStatisticsThreads.getThreadCount(); } @Config("hive.metastore.glue.write-statistics-threads") @ConfigDescription("Number of threads for parallel statistics writes to Glue") - public GlueHiveMetastoreConfig setWriteStatisticsThreads(int writeStatisticsThreads) + public GlueHiveMetastoreConfig setWriteStatisticsThreads(String writeStatisticsThreads) { - this.writeStatisticsThreads = writeStatisticsThreads; + this.writeStatisticsThreads = ThreadCount.valueOf(writeStatisticsThreads); return this; } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index 0d06f9e792cfa..204183f367d30 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -45,7 +45,7 @@ public void testDefaults() .setMaxPartitionsForEagerLoad(100_000) .setMaxOutstandingSplits(1_000) .setMaxOutstandingSplitsSize(DataSize.of(256, Unit.MEGABYTE)) - .setMaxSplitIteratorThreads(1_000) + .setMaxSplitIteratorThreads("1000") .setPerTransactionMetastoreCacheMaximumSize(1000) .setMinPartitionBatchSize(10) .setMaxPartitionBatchSize(100) @@ -72,7 +72,7 @@ public void testDefaults() .setSortedWritingEnabled(true) .setPropagateTableScanSortingProperties(false) .setMaxPartitionsPerWriter(100) - .setWriteValidationThreads(16) + .setWriteValidationThreads("16") .setValidateBucketing(true) .setParallelPartitionedBucketedWrites(true) .setTextMaxLineLength(DataSize.of(100, Unit.MEGABYTE)) @@ -104,7 +104,7 @@ public void testDefaults() .setLegacyHiveViewTranslation(false) .setHiveViewsRunAsInvoker(false) .setHiveTransactionHeartbeatInterval(null) - .setHiveTransactionHeartbeatThreads(5) + .setHiveTransactionHeartbeatThreads("5") .setAllowRegisterPartition(false) .setQueryPartitionFilterRequired(false) .setQueryPartitionFilterRequiredSchemas("") @@ -214,7 +214,7 @@ public void testExplicitPropertyMappings() .setMaxPartitionsForEagerLoad(122) .setMaxOutstandingSplits(10) .setMaxOutstandingSplitsSize(DataSize.of(32, Unit.MEGABYTE)) - .setMaxSplitIteratorThreads(10) + .setMaxSplitIteratorThreads("10") .setPerTransactionMetastoreCacheMaximumSize(500) .setMinPartitionBatchSize(1) .setMaxPartitionBatchSize(1000) @@ -239,7 +239,7 @@ public void testExplicitPropertyMappings() .setCreateEmptyBucketFiles(true) .setDeleteSchemaLocationsFallback(true) .setMaxPartitionsPerWriter(222) - .setWriteValidationThreads(11) + .setWriteValidationThreads("11") .setValidateBucketing(false) .setParallelPartitionedBucketedWrites(false) .setTextMaxLineLength(DataSize.of(13, Unit.MEGABYTE)) @@ -273,7 +273,7 @@ public void testExplicitPropertyMappings() .setLegacyHiveViewTranslation(true) .setHiveViewsRunAsInvoker(true) .setHiveTransactionHeartbeatInterval(new Duration(10, TimeUnit.SECONDS)) - .setHiveTransactionHeartbeatThreads(10) + .setHiveTransactionHeartbeatThreads("10") .setAllowRegisterPartition(true) .setQueryPartitionFilterRequired(true) .setQueryPartitionFilterRequiredSchemas("foo, bar") diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueHiveMetastoreConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueHiveMetastoreConfig.java index a5c6a4472fa85..e83be0560a069 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueHiveMetastoreConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueHiveMetastoreConfig.java @@ -44,10 +44,10 @@ public void testDefaults() .setAwsCredentialsProvider(null) .setCatalogId(null) .setPartitionSegments(5) - .setGetPartitionThreads(20) + .setGetPartitionThreads("20") .setAssumeCanonicalPartitionKeys(false) - .setReadStatisticsThreads(5) - .setWriteStatisticsThreads(20)); + .setReadStatisticsThreads("5") + .setWriteStatisticsThreads("20")); } @Test @@ -93,10 +93,10 @@ public void testExplicitPropertyMapping() .setAwsCredentialsProvider("custom") .setCatalogId("0123456789") .setPartitionSegments(10) - .setGetPartitionThreads(42) + .setGetPartitionThreads("42") .setAssumeCanonicalPartitionKeys(true) - .setReadStatisticsThreads(42) - .setWriteStatisticsThreads(43); + .setReadStatisticsThreads("42") + .setWriteStatisticsThreads("43"); assertFullMapping(properties, expected); } diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupConfig.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupConfig.java index b081961f8f4e3..1da61a11b5ea3 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupConfig.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/backup/BackupConfig.java @@ -18,6 +18,7 @@ import io.airlift.units.Duration; import io.airlift.units.MaxDuration; import io.airlift.units.MinDuration; +import io.airlift.units.ThreadCount; import jakarta.annotation.Nullable; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; @@ -27,9 +28,9 @@ public class BackupConfig { private Duration timeout = new Duration(1, MINUTES); - private int timeoutThreads = 1000; + private ThreadCount timeoutThreads = ThreadCount.exactValueOf(1000); private String provider; - private int backupThreads = 5; + private ThreadCount backupThreads = ThreadCount.exactValueOf(5); @NotNull @MinDuration("1s") @@ -50,14 +51,14 @@ public BackupConfig setTimeout(Duration timeout) @Min(1) public int getTimeoutThreads() { - return timeoutThreads; + return timeoutThreads.getThreadCount(); } @Config("backup.timeout-threads") @ConfigDescription("Maximum number of timeout threads for backup operations") - public BackupConfig setTimeoutThreads(int timeoutThreads) + public BackupConfig setTimeoutThreads(String timeoutThreads) { - this.timeoutThreads = timeoutThreads; + this.timeoutThreads = ThreadCount.valueOf(timeoutThreads); return this; } @@ -78,14 +79,14 @@ public BackupConfig setProvider(String provider) @Min(1) public int getBackupThreads() { - return backupThreads; + return backupThreads.getThreadCount(); } @Config("backup.threads") @ConfigDescription("Maximum number of shards to backup at once") - public BackupConfig setBackupThreads(int backupThreads) + public BackupConfig setBackupThreads(String backupThreads) { - this.backupThreads = backupThreads; + this.backupThreads = ThreadCount.valueOf(backupThreads); return this; } } diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/ShardCleanerConfig.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/ShardCleanerConfig.java index 9c112b2773df7..b997b9f3ab366 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/ShardCleanerConfig.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/metadata/ShardCleanerConfig.java @@ -18,6 +18,7 @@ import io.airlift.units.Duration; import io.airlift.units.MaxDuration; import io.airlift.units.MinDuration; +import io.airlift.units.ThreadCount; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; @@ -33,7 +34,7 @@ public class ShardCleanerConfig private Duration localCleanTime = new Duration(4, HOURS); private Duration backupCleanerInterval = new Duration(5, MINUTES); private Duration backupCleanTime = new Duration(1, DAYS); - private int backupDeletionThreads = 50; + private ThreadCount backupDeletionThreads = ThreadCount.exactValueOf(50); private Duration maxCompletedTransactionAge = new Duration(1, DAYS); @NotNull @@ -128,14 +129,14 @@ public ShardCleanerConfig setBackupCleanTime(Duration backupCleanTime) @Min(1) public int getBackupDeletionThreads() { - return backupDeletionThreads; + return backupDeletionThreads.getThreadCount(); } @Config("raptor.backup-deletion-threads") @ConfigDescription("Maximum number of threads to use for deleting shards from backup store") - public ShardCleanerConfig setBackupDeletionThreads(int backupDeletionThreads) + public ShardCleanerConfig setBackupDeletionThreads(String backupDeletionThreads) { - this.backupDeletionThreads = backupDeletionThreads; + this.backupDeletionThreads = ThreadCount.valueOf(backupDeletionThreads); return this; } diff --git a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/StorageManagerConfig.java b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/StorageManagerConfig.java index 2ce5a87410cbe..f40851b9b9fe4 100644 --- a/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/StorageManagerConfig.java +++ b/plugin/trino-raptor-legacy/src/main/java/io/trino/plugin/raptor/legacy/storage/StorageManagerConfig.java @@ -22,6 +22,7 @@ import io.airlift.units.MaxDataSize; import io.airlift.units.MinDataSize; import io.airlift.units.MinDuration; +import io.airlift.units.ThreadCount; import io.trino.orc.OrcReaderOptions; import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; @@ -31,8 +32,6 @@ import java.util.concurrent.TimeUnit; import static io.airlift.units.DataSize.Unit.MEGABYTE; -import static java.lang.Math.max; -import static java.lang.Runtime.getRuntime; @DefunctConfig({ "storage.backup-directory", @@ -48,9 +47,9 @@ public class StorageManagerConfig private Duration compactionInterval = new Duration(1, TimeUnit.HOURS); private Duration shardEjectorInterval = new Duration(4, TimeUnit.HOURS); private OrcReaderOptions options = new OrcReaderOptions(); - private int deletionThreads = max(1, getRuntime().availableProcessors() / 2); - private int recoveryThreads = 10; - private int organizationThreads = 5; + private ThreadCount deletionThreads = ThreadCount.valueOf("0.5C"); + private ThreadCount recoveryThreads = ThreadCount.exactValueOf(10); + private ThreadCount organizationThreads = ThreadCount.exactValueOf(5); private boolean organizationEnabled = true; private Duration organizationDiscoveryInterval = new Duration(6, TimeUnit.HOURS); private Duration organizationInterval = new Duration(7, TimeUnit.DAYS); @@ -178,14 +177,14 @@ public StorageManagerConfig setOrcNestedLazy(boolean nestedLazy) @Min(1) public int getDeletionThreads() { - return deletionThreads; + return deletionThreads.getThreadCount(); } @Config("storage.max-deletion-threads") @ConfigDescription("Maximum number of threads to use for deletions") - public StorageManagerConfig setDeletionThreads(int deletionThreads) + public StorageManagerConfig setDeletionThreads(String deletionThreads) { - this.deletionThreads = deletionThreads; + this.deletionThreads = ThreadCount.valueOf(deletionThreads); return this; } @@ -278,30 +277,30 @@ public StorageManagerConfig setShardEjectorInterval(Duration shardEjectorInterva @Min(1) public int getRecoveryThreads() { - return recoveryThreads; + return recoveryThreads.getThreadCount(); } @Config("storage.max-recovery-threads") @ConfigDescription("Maximum number of threads to use for recovery") - public StorageManagerConfig setRecoveryThreads(int recoveryThreads) + public StorageManagerConfig setRecoveryThreads(String recoveryThreads) { - this.recoveryThreads = recoveryThreads; + this.recoveryThreads = ThreadCount.valueOf(recoveryThreads); return this; } @LegacyConfig("storage.max-compaction-threads") @Config("storage.max-organization-threads") @ConfigDescription("Maximum number of threads to use for organization") - public StorageManagerConfig setOrganizationThreads(int organizationThreads) + public StorageManagerConfig setOrganizationThreads(String organizationThreads) { - this.organizationThreads = organizationThreads; + this.organizationThreads = ThreadCount.valueOf(organizationThreads); return this; } @Min(1) public int getOrganizationThreads() { - return organizationThreads; + return organizationThreads.getThreadCount(); } @Min(1) diff --git a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/backup/TestBackupConfig.java b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/backup/TestBackupConfig.java index 9c8084fdaa35b..7febcf0eaebe7 100644 --- a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/backup/TestBackupConfig.java +++ b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/backup/TestBackupConfig.java @@ -32,9 +32,9 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(BackupConfig.class) .setProvider(null) - .setTimeoutThreads(1000) + .setTimeoutThreads("1000") .setTimeout(new Duration(1, MINUTES)) - .setBackupThreads(5)); + .setBackupThreads("5")); } @Test @@ -50,8 +50,8 @@ public void testExplicitPropertyMappings() BackupConfig expected = new BackupConfig() .setProvider("file") .setTimeout(new Duration(42, SECONDS)) - .setTimeoutThreads(13) - .setBackupThreads(3); + .setTimeoutThreads("13") + .setBackupThreads("3"); assertFullMapping(properties, expected); } diff --git a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/metadata/TestShardCleanerConfig.java b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/metadata/TestShardCleanerConfig.java index 9c4710d7acaf6..2d5ed927861d1 100644 --- a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/metadata/TestShardCleanerConfig.java +++ b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/metadata/TestShardCleanerConfig.java @@ -38,7 +38,7 @@ public void testDefaults() .setLocalCleanTime(new Duration(4, HOURS)) .setBackupCleanerInterval(new Duration(5, MINUTES)) .setBackupCleanTime(new Duration(1, DAYS)) - .setBackupDeletionThreads(50) + .setBackupDeletionThreads("50") .setMaxCompletedTransactionAge(new Duration(1, DAYS))); } @@ -63,7 +63,7 @@ public void testExplicitPropertyMappings() .setLocalCleanTime(new Duration(32, MINUTES)) .setBackupCleanerInterval(new Duration(34, MINUTES)) .setBackupCleanTime(new Duration(35, MINUTES)) - .setBackupDeletionThreads(37) + .setBackupDeletionThreads("37") .setMaxCompletedTransactionAge(new Duration(39, MINUTES)); assertFullMapping(properties, expected); diff --git a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/storage/TestStorageManagerConfig.java b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/storage/TestStorageManagerConfig.java index 57116c729648c..ad21fc78c8f34 100644 --- a/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/storage/TestStorageManagerConfig.java +++ b/plugin/trino-raptor-legacy/src/test/java/io/trino/plugin/raptor/legacy/storage/TestStorageManagerConfig.java @@ -29,8 +29,6 @@ import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.KILOBYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; -import static java.lang.Math.max; -import static java.lang.Runtime.getRuntime; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MINUTES; @@ -50,13 +48,13 @@ public void testDefaults() .setOrcTinyStripeThreshold(DataSize.of(8, MEGABYTE)) .setOrcLazyReadSmallRanges(true) .setOrcNestedLazy(true) - .setDeletionThreads(max(1, getRuntime().availableProcessors() / 2)) + .setDeletionThreads("0.5C") .setShardRecoveryTimeout(new Duration(30, SECONDS)) .setMissingShardDiscoveryInterval(new Duration(5, MINUTES)) .setCompactionInterval(new Duration(1, HOURS)) .setShardEjectorInterval(new Duration(4, HOURS)) - .setRecoveryThreads(10) - .setOrganizationThreads(5) + .setRecoveryThreads("10") + .setOrganizationThreads("5") .setCompactionEnabled(true) .setOrganizationEnabled(true) .setOrganizationInterval(new Duration(7, DAYS)) @@ -105,7 +103,7 @@ public void testExplicitPropertyMappings() .setOrcTinyStripeThreshold(DataSize.of(15, KILOBYTE)) .setOrcLazyReadSmallRanges(false) .setOrcNestedLazy(false) - .setDeletionThreads(999) + .setDeletionThreads("999") .setShardRecoveryTimeout(new Duration(1, MINUTES)) .setMissingShardDiscoveryInterval(new Duration(4, MINUTES)) .setCompactionEnabled(false) @@ -114,8 +112,8 @@ public void testExplicitPropertyMappings() .setOrganizationInterval(new Duration(4, HOURS)) .setOrganizationDiscoveryInterval(new Duration(2, HOURS)) .setShardEjectorInterval(new Duration(9, HOURS)) - .setRecoveryThreads(12) - .setOrganizationThreads(12) + .setRecoveryThreads("12") + .setOrganizationThreads("12") .setMaxShardRows(10_000) .setMaxShardSize(DataSize.of(10, MEGABYTE)) .setMaxBufferSize(DataSize.of(512, MEGABYTE)) diff --git a/plugin/trino-thrift/src/main/java/io/trino/plugin/thrift/ThriftConnectorConfig.java b/plugin/trino-thrift/src/main/java/io/trino/plugin/thrift/ThriftConnectorConfig.java index 5f7c972d099b2..53da5039b6ab7 100644 --- a/plugin/trino-thrift/src/main/java/io/trino/plugin/thrift/ThriftConnectorConfig.java +++ b/plugin/trino-thrift/src/main/java/io/trino/plugin/thrift/ThriftConnectorConfig.java @@ -17,6 +17,7 @@ import io.airlift.units.DataSize; import io.airlift.units.MaxDataSize; import io.airlift.units.MinDataSize; +import io.airlift.units.ThreadCount; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; @@ -25,7 +26,7 @@ public class ThriftConnectorConfig { private DataSize maxResponseSize = DataSize.of(16, MEGABYTE); - private int metadataRefreshThreads = 1; + private ThreadCount metadataRefreshThreads = ThreadCount.exactValueOf(1); private int lookupRequestsConcurrency = 1; @NotNull @@ -46,13 +47,13 @@ public ThriftConnectorConfig setMaxResponseSize(DataSize maxResponseSize) @Min(1) public int getMetadataRefreshThreads() { - return metadataRefreshThreads; + return metadataRefreshThreads.getThreadCount(); } @Config("trino-thrift.metadata-refresh-threads") - public ThriftConnectorConfig setMetadataRefreshThreads(int metadataRefreshThreads) + public ThriftConnectorConfig setMetadataRefreshThreads(String metadataRefreshThreads) { - this.metadataRefreshThreads = metadataRefreshThreads; + this.metadataRefreshThreads = ThreadCount.valueOf(metadataRefreshThreads); return this; } diff --git a/plugin/trino-thrift/src/test/java/io/trino/plugin/thrift/TestThriftConnectorConfig.java b/plugin/trino-thrift/src/test/java/io/trino/plugin/thrift/TestThriftConnectorConfig.java index 63603f5d3efbb..3e113857f3440 100644 --- a/plugin/trino-thrift/src/test/java/io/trino/plugin/thrift/TestThriftConnectorConfig.java +++ b/plugin/trino-thrift/src/test/java/io/trino/plugin/thrift/TestThriftConnectorConfig.java @@ -31,7 +31,7 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(ThriftConnectorConfig.class) .setMaxResponseSize(DataSize.of(16, MEGABYTE)) - .setMetadataRefreshThreads(1) + .setMetadataRefreshThreads("1") .setLookupRequestsConcurrency(1)); } @@ -46,7 +46,7 @@ public void testExplicitPropertyMappings() ThriftConnectorConfig expected = new ThriftConnectorConfig() .setMaxResponseSize(DataSize.of(2, MEGABYTE)) - .setMetadataRefreshThreads(10) + .setMetadataRefreshThreads("10") .setLookupRequestsConcurrency(8); assertFullMapping(properties, expected); diff --git a/plugin/trino-tpcds/pom.xml b/plugin/trino-tpcds/pom.xml index 09d797f644030..ad2074b025f25 100644 --- a/plugin/trino-tpcds/pom.xml +++ b/plugin/trino-tpcds/pom.xml @@ -47,6 +47,11 @@ configuration + + io.airlift + units + + io.trino trino-plugin-toolkit diff --git a/plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsConfig.java b/plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsConfig.java index ee068000c926f..e45394e892a09 100644 --- a/plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsConfig.java +++ b/plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsConfig.java @@ -15,24 +15,25 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.units.ThreadCount; import jakarta.validation.constraints.Min; public class TpcdsConfig { - private int splitsPerNode = Runtime.getRuntime().availableProcessors(); + private ThreadCount splitsPerNode = ThreadCount.valueOf("1C"); private boolean withNoSexism; private Integer splitCount; @Min(1) public int getSplitsPerNode() { - return splitsPerNode; + return splitsPerNode.getThreadCount(); } @Config("tpcds.splits-per-node") public TpcdsConfig setSplitsPerNode(int splitsPerNode) { - this.splitsPerNode = splitsPerNode; + this.splitsPerNode = ThreadCount.exactValueOf(splitsPerNode); return this; } diff --git a/pom.xml b/pom.xml index bd1a6fe2be381..837a4fe8b7e91 100644 --- a/pom.xml +++ b/pom.xml @@ -674,7 +674,7 @@ io.airlift units - 1.9 + 1.10 diff --git a/service/trino-verifier/src/main/java/io/trino/verifier/VerifierConfig.java b/service/trino-verifier/src/main/java/io/trino/verifier/VerifierConfig.java index 22f7b4da045c5..72c98671d2758 100644 --- a/service/trino-verifier/src/main/java/io/trino/verifier/VerifierConfig.java +++ b/service/trino-verifier/src/main/java/io/trino/verifier/VerifierConfig.java @@ -21,6 +21,7 @@ import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.LegacyConfig; import io.airlift.units.Duration; +import io.airlift.units.ThreadCount; import io.trino.sql.tree.Identifier; import io.trino.sql.tree.QualifiedName; import jakarta.annotation.Nullable; @@ -52,7 +53,7 @@ public class VerifierConfig private String source; private String runId = new DateTime().toString("yyyy-MM-dd"); private Set eventClients = ImmutableSet.of("human-readable"); - private int threadCount = 10; + private ThreadCount threadCount = ThreadCount.exactValueOf(10); private String queryDatabase; private String controlGateway; private String testGateway; @@ -249,14 +250,14 @@ public VerifierConfig setSuites(String suites) @Min(1) public int getThreadCount() { - return threadCount; + return threadCount.getThreadCount(); } @ConfigDescription("The concurrency level") @Config("thread-count") - public VerifierConfig setThreadCount(int threadCount) + public VerifierConfig setThreadCount(String threadCount) { - this.threadCount = threadCount; + this.threadCount = ThreadCount.valueOf(threadCount); return this; } diff --git a/service/trino-verifier/src/test/java/io/trino/verifier/TestVerifierConfig.java b/service/trino-verifier/src/test/java/io/trino/verifier/TestVerifierConfig.java index ddda7a5826664..a374c82906fee 100644 --- a/service/trino-verifier/src/test/java/io/trino/verifier/TestVerifierConfig.java +++ b/service/trino-verifier/src/test/java/io/trino/verifier/TestVerifierConfig.java @@ -46,7 +46,7 @@ public void testDefaults() .setSource(null) .setRunId(new DateTime().toString("yyyy-MM-dd")) .setEventClients("human-readable") - .setThreadCount(10) + .setThreadCount("10") .setQueryDatabase(null) .setControlGateway(null) .setTestGateway(null) @@ -153,7 +153,7 @@ public void testExplicitPropertyMappings() .setSource("my_source") .setRunId("my_run_id") .setEventClients("file,human-readable") - .setThreadCount(1) + .setThreadCount("1") .setBannedQueries("1,2") .setAllowedQueries("3,4") .setMaxRowCount(1) diff --git a/service/trino-verifier/src/test/java/io/trino/verifier/TestVerifierRewriteQueries.java b/service/trino-verifier/src/test/java/io/trino/verifier/TestVerifierRewriteQueries.java index f1e093d11e74e..c3f694c7f7be4 100644 --- a/service/trino-verifier/src/test/java/io/trino/verifier/TestVerifierRewriteQueries.java +++ b/service/trino-verifier/src/test/java/io/trino/verifier/TestVerifierRewriteQueries.java @@ -93,7 +93,7 @@ public void close() public void testSingleThread() { config.setControlGateway(URL); - config.setThreadCount(1); + config.setThreadCount("1"); List rewrittenQueries = rewriteQueries(parser, config, queryPairs); assertEquals(rewrittenQueries.size(), queryPairs.size()); } @@ -102,7 +102,7 @@ public void testSingleThread() public void testMultipleThreads() { config.setControlGateway(URL); - config.setThreadCount(5); + config.setThreadCount("5"); List rewrittenQueries = rewriteQueries(parser, config, queryPairs); assertEquals(rewrittenQueries.size(), queryPairs.size()); }