Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9b65334
finsih e2e prototype
20001020ycx Nov 24, 2025
ef7028a
refactor: Improve ClpYamlMetadataProvider cache organization and fix …
jackluo923 Nov 12, 2025
0f69ac3
stash change before message pack
20001020ycx Nov 26, 2025
7ad5045
adding msgpack serialization
20001020ycx Nov 26, 2025
1f8ed95
feature completion
20001020ycx Nov 27, 2025
b789e2f
add all unit tests
20001020ycx Nov 27, 2025
5e4053f
Merge branch 'release-0.293-clp-connector-snapshot-metadata-refactor'…
20001020ycx Nov 27, 2025
76ef1d8
revert communication protocol change, I will bundle them in next pr
20001020ycx Nov 27, 2025
bf2b905
Merge branch '11-25-metadata-projection' of github.com:20001020ycx/pr…
20001020ycx Nov 27, 2025
45828e9
did a pass on unit tests, need for final cleanning
20001020ycx Nov 27, 2025
e2d31c1
defer communication to another pr
20001020ycx Nov 27, 2025
668141d
a pass on clean up
20001020ycx Nov 27, 2025
14c1d78
resolve Jack's comment
20001020ycx Nov 28, 2025
1a4de0e
Update presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSpl…
20001020ycx Nov 28, 2025
de904e9
Update presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimi…
20001020ycx Nov 28, 2025
bf90701
a pass on rabbit comment
20001020ycx Nov 28, 2025
6361313
Merge branch '11-25-metadata-projection' of github.com:20001020ycx/pr…
20001020ycx Nov 28, 2025
10ca0b4
rabbits comment
20001020ycx Nov 28, 2025
61c6f37
current change
20001020ycx Nov 28, 2025
a6e9673
Revert "defer communication to another pr"
20001020ycx Nov 28, 2025
27c5965
Merge branch 'release-0.293-clp-connector-snapshot-metadata-refactor'…
jackluo923 Nov 29, 2025
a664c0b
e2e all pass
20001020ycx Nov 30, 2025
f195c3c
Merge branch '11-25-metadata-projection' of github.com:20001020ycx/pr…
20001020ycx Nov 30, 2025
2c5e696
final pass
20001020ycx Nov 30, 2025
991efd7
nits
20001020ycx Nov 30, 2025
82bbb47
Merge branch 'release-0.293-clp-connector-snapshot-metadata-refactor'…
jackluo923 Nov 30, 2025
becfcf7
Merge branch 'release-0.293-clp-connector-snapshot-metadata-refactor'…
20001020ycx Dec 1, 2025
beb7cb3
add docstring for getQueryResult
20001020ycx Dec 1, 2025
acd8423
Merge branch '11-25-metadata-projection' of github.com:20001020ycx/pr…
20001020ycx Dec 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions presto-clp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@
</properties>

<dependencies>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.9.8</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.6.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ public enum ClpErrorCode
CLP_UNSUPPORTED_TYPE(3, EXTERNAL),
CLP_UNSUPPORTED_CONFIG_OPTION(4, EXTERNAL),
CLP_UNSUPPORTED_TABLE_SCHEMA_YAML(5, EXTERNAL),
CLP_UNSUPPORTED_METADATA_PROJECTION(6, EXTERNAL),

CLP_SPLIT_METADATA_CONFIG_NOT_FOUND(10, USER_ERROR),
CLP_MANDATORY_COLUMN_NOT_IN_FILTER(11, USER_ERROR);
CLP_MANDATORY_COLUMN_NOT_IN_FILTER(11, USER_ERROR),
CLP_SPLIT_METADATA_TYPE_MISMATCH_METADATA_DATABASE_TYPE(12, USER_ERROR);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;

import java.io.IOException;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -36,16 +41,56 @@ public class ClpSplit
private final String path;
private final SplitType type;
private final Optional<String> kqlQuery;
private final String metadataProjectionNameValueEncoded;

/**
* Invoked by Jackson; serializes a ClpSplit to JSON format
*
* @param path the path to the split
* @param type the split type
* @param kqlQuery optional KQL query pushed down to CLP-S
* @param metadataProjectionNameValueEncoded Base64-encoded MessagePack representation of metadata projection in
* column name and value pairs
*/
@JsonCreator
public ClpSplit(
@JsonProperty("path") String path,
@JsonProperty("type") SplitType type,
@JsonProperty("kqlQuery") Optional<String> kqlQuery)
@JsonProperty("kqlQuery") Optional<String> kqlQuery,
@JsonProperty("metadataProjectionNameValue") String metadataProjectionNameValueEncoded)
{
this.path = requireNonNull(path, "Split path is null");
this.type = requireNonNull(type, "Split type is null");
this.kqlQuery = kqlQuery;
this.metadataProjectionNameValueEncoded = metadataProjectionNameValueEncoded != null ? metadataProjectionNameValueEncoded : "";
}

/**
* Creates a ClpSplit for internal use
*
* @param path the path to the split
* @param type the split type
* @param kqlQuery optional KQL query pushed down to CLP-S
* @param metadataProjectionNameValue optional map of metadata projection column names to their values
* @throws RuntimeException if encoding metadata projection name-value pairs fails
*/
public ClpSplit(
String path,
SplitType type,
Optional<String> kqlQuery,
Optional<Map<String, Object>> metadataProjectionNameValue)
{
this.path = requireNonNull(path, "Split path is null");
this.type = requireNonNull(type, "Split type is null");
this.kqlQuery = kqlQuery;

try {
this.metadataProjectionNameValueEncoded = encodeProjectionNameValue(
metadataProjectionNameValue.orElse(new HashMap<>()));
}
catch (IOException e) {
throw new RuntimeException("Failed to encode projection name value", e);
}
}

@JsonProperty
Expand All @@ -66,6 +111,52 @@ public Optional<String> getKqlQuery()
return kqlQuery;
}

@JsonProperty("metadataProjectionNameValue")
public String getmetadataProjectionNameValue()
{
return metadataProjectionNameValueEncoded;
}

/**
* @param metadataColumnNameValue map of metadata column names to their values.
* @return Base64-encoded MessagePack representation of the metadataColumnNameValue map
* @throws IOException if MessagePack serialization fails
* @throws IllegalArgumentException if a value type is not String, Long, or Double
*/
private String encodeProjectionNameValue(Map<String, Object> metadataColumnNameValue) throws IOException
{
if (metadataColumnNameValue.isEmpty()) {
return "";
}

MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();

packer.packMapHeader(metadataColumnNameValue.size());
for (Map.Entry<String, Object> entry : metadataColumnNameValue.entrySet()) {
packer.packString(entry.getKey());

Object value = entry.getValue();
if (value instanceof String) {
packer.packString((String) value);
}
else if (value instanceof Long) {
packer.packLong((Long) value);
}
else if (value instanceof Double) {
packer.packDouble((Double) value);
}
else {
throw new IllegalArgumentException(
"Unsupported type for column " + entry.getKey() + ": " + value.getClass());
}
}

byte[] bytes = packer.toByteArray();
packer.close();

return Base64.getEncoder().encodeToString(bytes);
}

@Override
public NodeSelectionStrategy getNodeSelectionStrategy()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;

Expand All @@ -33,18 +35,22 @@ public class ClpTableLayoutHandle
private final boolean metadataQueryOnly;
private final Optional<ClpTopNSpec> topN;

private Optional<Set<String>> splitMetadataColumnNames;

@JsonCreator
public ClpTableLayoutHandle(
@JsonProperty("table") ClpTableHandle table,
@JsonProperty("kqlQuery") Optional<String> kqlQuery,
@JsonProperty("metadataExpression") Optional<RowExpression> metadataExpression,
@JsonProperty("metadataQueryOnly") boolean metadataQueryOnly,
@JsonProperty("splitMetadataColumnNames") Optional<Set<String>> splitMetadataColumnNames,
@JsonProperty("topN") Optional<ClpTopNSpec> topN)
{
this.table = table;
this.kqlQuery = kqlQuery;
this.metadataExpression = metadataExpression;
this.metadataQueryOnly = metadataQueryOnly;
this.splitMetadataColumnNames = splitMetadataColumnNames;
this.topN = topN;
}

Expand All @@ -57,6 +63,19 @@ public ClpTableLayoutHandle(
this.kqlQuery = kqlQuery;
this.metadataExpression = metadataExpression;
this.metadataQueryOnly = false;
this.splitMetadataColumnNames = Optional.empty();
this.topN = Optional.empty();
}

public ClpTableLayoutHandle(
@JsonProperty("table") ClpTableHandle table,
@JsonProperty("splitMetadataColumnNames") Optional<Set<String>> splitMetadataColumnNames)
{
this.table = table;
this.kqlQuery = Optional.empty();
this.metadataExpression = Optional.empty();
this.metadataQueryOnly = false;
this.splitMetadataColumnNames = splitMetadataColumnNames;
this.topN = Optional.empty();
}

Expand Down Expand Up @@ -84,6 +103,15 @@ public boolean isMetadataQueryOnly()
return metadataQueryOnly;
}

@JsonProperty
public Set<String> getSplitMetadataColumnNames()
{
if (!splitMetadataColumnNames.isPresent()) {
splitMetadataColumnNames = Optional.of(new HashSet<>());
}
return splitMetadataColumnNames.get();
}

@JsonProperty
public Optional<ClpTopNSpec> getTopN()
{
Expand All @@ -104,13 +132,15 @@ public boolean equals(Object o)
Objects.equals(kqlQuery, that.kqlQuery) &&
Objects.equals(metadataExpression, that.metadataExpression) &&
Objects.equals(metadataQueryOnly, that.metadataQueryOnly) &&
Objects.equals(splitMetadataColumnNames, that.splitMetadataColumnNames) &&
Objects.equals(topN, that.topN);
}

@Override
public int hashCode()
{
return Objects.hash(table, kqlQuery, metadataExpression, metadataQueryOnly, topN);
return Objects.hash(
table, kqlQuery, metadataExpression, metadataQueryOnly, splitMetadataColumnNames, topN);
}

@Override
Expand All @@ -121,6 +151,7 @@ public String toString()
.add("kqlQuery", kqlQuery)
.add("metadataExpression", metadataExpression)
.add("metadataQueryOnly", metadataQueryOnly)
.add("splitMetaColumnNames", splitMetadataColumnNames)
.add("topN", topN)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.spi.ConnectorPlanRewriter;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.VariableAllocator;
Expand All @@ -47,12 +48,14 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_METADATA_PROJECTION;
import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
Expand Down Expand Up @@ -100,7 +103,91 @@ public PlanNode visitFilter(FilterNode node, RewriteContext<Void> context)
return node;
}

return processFilter(node, (TableScanNode) node.getSource());
PlanNode processedNode = processFilter(node, (TableScanNode) node.getSource());

if (processedNode instanceof TableScanNode) {
return context.rewrite(processedNode, null);
}

return processedNode;
}

/**
* Rewrites a TableScanNode to attach metadata projection column.
*
* @param node the original TableScanNode to rewrite.
* @param context
* @return a new TableScanNode with metadata projection in the layout handle
* @throw PrestoException if a metadata column maps to a range-bound column - this is an unsupported feature
*/
@Override
public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)
{
// Retrieve projection column names
Set<String> projectionColumns = new HashSet<>();
for (VariableReferenceExpression variable : node.getOutputVariables()) {
projectionColumns.add(variable.getName());
}

// Retrieve metadata column
TableHandle tableHandle = node.getTable();
ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle.getConnectorHandle();
SchemaTableName schemaTableName = clpTableHandle.getSchemaTableName();
Set<String> metadataColumns = metadataConfig.getMetadataColumns(schemaTableName).keySet();

// Metadata Projection: intersection between the projection column and metadata column
Set<String> metadataProjections = new HashSet<>();
for (String columnName : projectionColumns) {
if (metadataColumns.contains(columnName)) {
Set<String> metadataColumnsWithRangeBound =
metadataConfig.getMetadataColumnsWithRangeBounds(schemaTableName);
Map<String, String> exposedToOriginalMap =
metadataConfig.getExposedToOriginalMapping(schemaTableName);

// Resolve exposed column names to their original names in the metadata database.
// After extracting values from the metadata database, these will be mapped back to exposed names
// for projection.
String originalColumnName = exposedToOriginalMap.get(columnName);
if (metadataColumnsWithRangeBound.contains(originalColumnName)) {
throw new PrestoException(CLP_UNSUPPORTED_METADATA_PROJECTION,
format("Unsupported metadata projection column: %s", columnName));
}
metadataProjections.add(originalColumnName);
}
}

if (metadataProjections.isEmpty()) {
return node;
}

// TableScan optimization happens late in planning; append to existing layout if present.
Optional<ConnectorTableLayoutHandle> layout = tableHandle.getLayout();
if (layout.isPresent() && layout.get() instanceof ClpTableLayoutHandle) {
ClpTableLayoutHandle cl = (ClpTableLayoutHandle) layout.get();
for (String metadataProjection : metadataProjections) {
cl.getSplitMetadataColumnNames().add(metadataProjection);
}
return node;
}

ClpTableLayoutHandle newLayout = new ClpTableLayoutHandle(
clpTableHandle, Optional.of(metadataProjections));

// TableScanNode is immutable, we need to copy-on-write
return new TableScanNode(
node.getSourceLocation(),
idAllocator.getNextId(),
new TableHandle(
tableHandle.getConnectorId(),
clpTableHandle,
tableHandle.getTransaction(),
Optional.of(newLayout)),
node.getOutputVariables(),
node.getAssignments(),
node.getTableConstraints(),
node.getCurrentConstraint(),
node.getEnforcedConstraint(),
node.getCteMaterializationInfo());
}

@Override
Expand Down Expand Up @@ -191,7 +278,7 @@ public PlanNode visitTopN(TopNNode node, RewriteContext<Void> context)
ClpTopNSpec tightened = new ClpTopNSpec(mergedLimit, ex.getOrderings());
ClpTableHandle clpHandle = (ClpTableHandle) tableHandle.getConnectorHandle();
ClpTableLayoutHandle newLayout =
new ClpTableLayoutHandle(clpHandle, kql, metadataSql, true, Optional.of(tightened));
new ClpTableLayoutHandle(clpHandle, kql, metadataSql, true, Optional.empty(), Optional.of(tightened));

TableScanNode newScan = new TableScanNode(
scan.getSourceLocation(),
Expand Down Expand Up @@ -227,7 +314,7 @@ public PlanNode visitTopN(TopNNode node, RewriteContext<Void> context)
ClpTopNSpec spec = new ClpTopNSpec(node.getCount(), newOrderings);
ClpTableHandle clpHandle = (ClpTableHandle) tableHandle.getConnectorHandle();
ClpTableLayoutHandle newLayout =
new ClpTableLayoutHandle(clpHandle, kql, metadataSql, true, Optional.of(spec));
new ClpTableLayoutHandle(clpHandle, kql, metadataSql, true, Optional.empty(), Optional.of(spec));

TableScanNode newScanNode = new TableScanNode(
scan.getSourceLocation(),
Expand Down Expand Up @@ -283,7 +370,7 @@ private PlanNode processFilter(FilterNode filterNode, TableScanNode tableScanNod
kqlQuery.ifPresent(s -> log.debug("KQL query: %s", s));

ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle(
clpTableHandle, kqlQuery, metadataExpression, allInMetadata, Optional.empty());
clpTableHandle, kqlQuery, metadataExpression, allInMetadata, Optional.empty(), Optional.empty());
TableHandle newTableHandle = new TableHandle(
tableHandle.getConnectorId(),
clpTableHandle,
Expand Down
Loading
Loading