Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioning push down #23432

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,29 @@ private FaultTolerantPartitioningScheme create(PartitioningHandle partitioningHa
if (partitioningHandle.equals(FIXED_HASH_DISTRIBUTION) || partitioningHandle.equals(SCALED_WRITER_HASH_DISTRIBUTION)) {
return createSystemSchema(partitionCount.orElse(maxPartitionCount));
}
if (partitioningHandle.getCatalogHandle().isPresent()) {
Optional<ConnectorBucketNodeMap> connectorBucketNodeMap = nodePartitioningManager.getConnectorBucketNodeMap(session, partitioningHandle);
if (connectorBucketNodeMap.isEmpty()) {
return createSystemSchema(partitionCount.orElse(maxPartitionCount));

// if there is no partitioning handle, use a single partition
if (partitioningHandle.getCatalogHandle().isEmpty()) {
return new FaultTolerantPartitioningScheme(1, Optional.empty(), Optional.empty(), Optional.empty());
}

Optional<ConnectorBucketNodeMap> optionalNodeMap = nodePartitioningManager.getConnectorBucketNodeMap(session, partitioningHandle);
int bucketCount;
if (optionalNodeMap.isPresent()) {
ConnectorBucketNodeMap bucketNodeMap = optionalNodeMap.get();
bucketCount = bucketNodeMap.getBucketCount();

if (bucketNodeMap.hasFixedMapping()) {
// fixed mappings have special handling in FTE to ensure that the required node assignments are respected
ToIntFunction<Split> splitToBucket = nodePartitioningManager.getSplitToBucket(session, partitioningHandle, bucketCount);
return createFixedConnectorSpecificSchema(bucketNodeMap.getFixedMapping(), splitToBucket);
}
ToIntFunction<Split> splitToBucket = nodePartitioningManager.getSplitToBucket(session, partitioningHandle);
return createConnectorSpecificSchema(partitionCount.orElse(maxPartitionCount), connectorBucketNodeMap.get(), splitToBucket);
}
return new FaultTolerantPartitioningScheme(1, Optional.empty(), Optional.empty(), Optional.empty());
else {
bucketCount = partitionCount.orElse(maxPartitionCount);
}

return createArbitraryConnectorSpecificSchema(partitionCount.orElse(maxPartitionCount), bucketCount, partitioningHandle);
}

private static FaultTolerantPartitioningScheme createSystemSchema(int partitionCount)
Expand All @@ -101,14 +115,6 @@ private static FaultTolerantPartitioningScheme createSystemSchema(int partitionC
Optional.empty());
}

private static FaultTolerantPartitioningScheme createConnectorSpecificSchema(int partitionCount, ConnectorBucketNodeMap bucketNodeMap, ToIntFunction<Split> splitToBucket)
{
if (bucketNodeMap.hasFixedMapping()) {
return createFixedConnectorSpecificSchema(bucketNodeMap.getFixedMapping(), splitToBucket);
}
return createArbitraryConnectorSpecificSchema(partitionCount, bucketNodeMap.getBucketCount(), splitToBucket);
}

private static FaultTolerantPartitioningScheme createFixedConnectorSpecificSchema(List<Node> fixedMapping, ToIntFunction<Split> splitToBucket)
{
int bucketCount = fixedMapping.size();
Expand All @@ -133,12 +139,15 @@ private static FaultTolerantPartitioningScheme createFixedConnectorSpecificSchem
Optional.of(ImmutableList.copyOf(partitionToNodeMap)));
}

private static FaultTolerantPartitioningScheme createArbitraryConnectorSpecificSchema(int partitionCount, int bucketCount, ToIntFunction<Split> splitToBucket)
private FaultTolerantPartitioningScheme createArbitraryConnectorSpecificSchema(int partitionCount, int bucketCount, PartitioningHandle partitioningHandle)
{
// buckets are assigned round-robin to partitions
int[] bucketToPartition = new int[bucketCount];
for (int bucket = 0; bucket < bucketCount; bucket++) {
bucketToPartition[bucket] = bucket % partitionCount;
}

ToIntFunction<Split> splitToBucket = nodePartitioningManager.getSplitToBucket(session, partitioningHandle, bucketCount);
return new FaultTolerantPartitioningScheme(
// TODO: It may be possible to set the number of partitions to the number of buckets when it is known that a
// TODO: stage contains no remote sources and the engine doesn't have to partition any data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.metadata;

import io.airlift.slice.XxHash64;
import io.trino.client.NodeVersion;
import io.trino.spi.HostAddress;
import io.trino.spi.Node;
Expand All @@ -27,6 +28,7 @@
import static com.google.common.base.Strings.emptyToNull;
import static com.google.common.base.Strings.nullToEmpty;
import static io.airlift.node.AddressToHostname.tryDecodeHostnameToAddress;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -39,6 +41,7 @@ public class InternalNode
private final URI internalUri;
private final NodeVersion nodeVersion;
private final boolean coordinator;
private final long longHashCode;

public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator)
{
Expand All @@ -47,6 +50,11 @@ public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVers
this.internalUri = requireNonNull(internalUri, "internalUri is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.coordinator = coordinator;
this.longHashCode = new XxHash64(coordinator ? 1 : 0)
.update(nodeIdentifier.getBytes(UTF_8))
.update(internalUri.toString().getBytes(UTF_8))
.update(nodeVersion.getVersion().getBytes(UTF_8))
.hash();
}

@Override
Expand Down Expand Up @@ -115,10 +123,15 @@ public boolean equals(Object obj)
Objects.equals(nodeVersion, o.nodeVersion);
}

public long longHashCode()
{
return longHashCode;
}

@Override
public int hashCode()
{
return Objects.hash(nodeIdentifier, internalUri, nodeVersion, coordinator);
return (int) longHashCode;
}

@Override
Expand Down
17 changes: 10 additions & 7 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,16 @@ Optional<TableExecuteHandle> getTableHandleForExecute(
TableProperties getTableProperties(Session session, TableHandle handle);

/**
* Return a table handle whose partitioning is converted to the provided partitioning handle,
* but otherwise identical to the provided table handle.
* The provided table handle must be one that the connector can transparently convert to from
* the original partitioning handle associated with the provided table handle,
* as promised by {@link #getCommonPartitioning}.
*/
TableHandle makeCompatiblePartitioning(Session session, TableHandle table, PartitioningHandle partitioningHandle);
* Attempt to push down partitioning into the table. If a connector can provide
* data for the table using the specified partitioning, it should return a
* table handle that when passed to {@link #getTableProperties(Session, TableHandle)}
* will return TableProperties compatible with the specified partitioning.
* The returned table handle does not have to use the exact partitioning, but
* must be compatible with the specified partitioning, meaning that a table with
* specified partitioning can be repartitioned on the partitioning of the returned
* table handle.
*/
Optional<TableHandle> applyPartitioning(Session session, TableHandle tableHandle, Optional<PartitioningHandle> partitioning, List<ColumnHandle> columns);

/**
* Return a partitioning handle which the connector can transparently convert both {@code left} and {@code right} into.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,27 @@ public TableProperties getTableProperties(Session session, TableHandle handle)
}

@Override
public TableHandle makeCompatiblePartitioning(Session session, TableHandle tableHandle, PartitioningHandle partitioningHandle)
public Optional<TableHandle> applyPartitioning(Session session, TableHandle tableHandle, Optional<PartitioningHandle> partitioning, List<ColumnHandle> columns)
{
checkArgument(partitioningHandle.getCatalogHandle().isPresent(), "Expect partitioning handle from connector, got system partitioning handle");
CatalogHandle catalogHandle = partitioningHandle.getCatalogHandle().get();
checkArgument(catalogHandle.equals(tableHandle.catalogHandle()), "ConnectorId of tableHandle and partitioningHandle does not match");
if (partitioning.isPresent()) {
PartitioningHandle partitioningHandle = partitioning.get();
if (!tableHandle.catalogHandle().equals(partitioningHandle.getCatalogHandle().orElse(null)) ||
!tableHandle.transaction().equals(partitioningHandle.getTransactionHandle().orElse(null))) {
return Optional.empty();
}
}

CatalogHandle catalogHandle = tableHandle.catalogHandle();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, catalogHandle);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(session, catalogHandle);
ConnectorTransactionHandle transaction = catalogMetadata.getTransactionHandleFor(catalogHandle);
ConnectorSession connectorSession = session.toConnectorSession(catalogHandle);

ConnectorTableHandle newTableHandle = metadata.makeCompatiblePartitioning(
session.toConnectorSession(catalogHandle),
return metadata.applyPartitioning(
connectorSession,
tableHandle.connectorHandle(),
partitioningHandle.getConnectorHandle());
return new TableHandle(catalogHandle, newTableHandle, transaction);
partitioning.map(PartitioningHandle::getConnectorHandle),
columns)
.map(handle -> new TableHandle(catalogHandle, handle, tableHandle.transaction()));
}

@Override
Expand Down
Loading
Loading