Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9b65334
finsih e2e prototype
20001020ycx Nov 24, 2025
ef7028a
refactor: Improve ClpYamlMetadataProvider cache organization and fix …
jackluo923 Nov 12, 2025
0f69ac3
stash change before message pack
20001020ycx Nov 26, 2025
7ad5045
adding msgpack serialization
20001020ycx Nov 26, 2025
1f8ed95
feature completion
20001020ycx Nov 27, 2025
b789e2f
add all unit tests
20001020ycx Nov 27, 2025
5e4053f
Merge branch 'release-0.293-clp-connector-snapshot-metadata-refactor'…
20001020ycx Nov 27, 2025
76ef1d8
revert communication protocol change, I will bundle them in next pr
20001020ycx Nov 27, 2025
bf2b905
Merge branch '11-25-metadata-projection' of github.com:20001020ycx/pr…
20001020ycx Nov 27, 2025
45828e9
did a pass on unit tests, need for final cleanning
20001020ycx Nov 27, 2025
e2d31c1
defer communication to another pr
20001020ycx Nov 27, 2025
668141d
a pass on clean up
20001020ycx Nov 27, 2025
14c1d78
resolve Jack's comment
20001020ycx Nov 28, 2025
1a4de0e
Update presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSpl…
20001020ycx Nov 28, 2025
de904e9
Update presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimi…
20001020ycx Nov 28, 2025
bf90701
a pass on rabbit comment
20001020ycx Nov 28, 2025
6361313
Merge branch '11-25-metadata-projection' of github.com:20001020ycx/pr…
20001020ycx Nov 28, 2025
10ca0b4
rabbits comment
20001020ycx Nov 28, 2025
61c6f37
current change
20001020ycx Nov 28, 2025
a6e9673
Revert "defer communication to another pr"
20001020ycx Nov 28, 2025
27c5965
Merge branch 'release-0.293-clp-connector-snapshot-metadata-refactor'…
jackluo923 Nov 29, 2025
a664c0b
e2e all pass
20001020ycx Nov 30, 2025
f195c3c
Merge branch '11-25-metadata-projection' of github.com:20001020ycx/pr…
20001020ycx Nov 30, 2025
2c5e696
final pass
20001020ycx Nov 30, 2025
991efd7
nits
20001020ycx Nov 30, 2025
82bbb47
Merge branch 'release-0.293-clp-connector-snapshot-metadata-refactor'…
jackluo923 Nov 30, 2025
becfcf7
Merge branch 'release-0.293-clp-connector-snapshot-metadata-refactor'…
20001020ycx Dec 1, 2025
beb7cb3
add docstring for getQueryResult
20001020ycx Dec 1, 2025
acd8423
Merge branch '11-25-metadata-projection' of github.com:20001020ycx/pr…
20001020ycx Dec 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions presto-clp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@
</properties>

<dependencies>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.9.8</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.6.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ public enum ClpErrorCode
CLP_UNSUPPORTED_TYPE(3, EXTERNAL),
CLP_UNSUPPORTED_CONFIG_OPTION(4, EXTERNAL),
CLP_UNSUPPORTED_TABLE_SCHEMA_YAML(5, EXTERNAL),
CLP_UNSUPPORTED_METADATA_PROJECTION(6, EXTERNAL),

CLP_SPLIT_METADATA_CONFIG_NOT_FOUND(10, USER_ERROR),
CLP_MANDATORY_COLUMN_NOT_IN_FILTER(11, USER_ERROR);
CLP_MANDATORY_COLUMN_NOT_IN_FILTER(11, USER_ERROR),
CLP_SPLIT_METADATA_TYPE_MISMATCH_METADATA_DATABASE_TYPE(12, USER_ERROR);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
Optional<Set<ColumnHandle>> desiredColumns)
{
ClpTableHandle tableHandle = (ClpTableHandle) table;
ConnectorTableLayout layout = new ConnectorTableLayout(new ClpTableLayoutHandle(tableHandle, Optional.empty(), Optional.empty()));
ConnectorTableLayout layout = new ConnectorTableLayout(new ClpTableLayoutHandle(tableHandle, Optional.empty(), null));
return new ConnectorTableLayoutResult(layout, constraint.getSummary());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;

import java.io.IOException;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -36,16 +41,56 @@ public class ClpSplit
private final String path;
private final SplitType type;
private final Optional<String> kqlQuery;
private final String metadataProjectionNameValueEncoded;

/**
* Invoked by Jackson; serializes a ClpSplit to JSON format
*
* @param path the path to the split
* @param type the split type
* @param kqlQuery optional KQL query pushed down to CLP-S
* @param metadataProjectionNameValueEncoded Base64-encoded MessagePack representation of metadata projection in
* column name and value pairs
*/
@JsonCreator
public ClpSplit(
@JsonProperty("path") String path,
@JsonProperty("type") SplitType type,
@JsonProperty("kqlQuery") Optional<String> kqlQuery)
@JsonProperty("kqlQuery") Optional<String> kqlQuery,
@JsonProperty("metadataProjectionNameValue") String metadataProjectionNameValueEncoded)
{
this.path = requireNonNull(path, "Split path is null");
this.type = requireNonNull(type, "Split type is null");
this.kqlQuery = kqlQuery;
this.metadataProjectionNameValueEncoded = metadataProjectionNameValueEncoded != null ? metadataProjectionNameValueEncoded : "";
}

/**
* Creates a ClpSplit for internal use
*
* @param path the path to the split
* @param type the split type
* @param kqlQuery optional KQL query pushed down to CLP-S
* @param metadataProjectionNameValue optional map of metadata projection column names to their values
* @throws RuntimeException if encoding metadata projection name-value pairs fails
*/
public ClpSplit(
String path,
SplitType type,
Optional<String> kqlQuery,
Optional<Map<String, Object>> metadataProjectionNameValue)
{
this.path = requireNonNull(path, "Split path is null");
this.type = requireNonNull(type, "Split type is null");
this.kqlQuery = kqlQuery;

try {
this.metadataProjectionNameValueEncoded = encodeProjectionNameValue(
metadataProjectionNameValue.orElse(new HashMap<>()));
}
catch (IOException e) {
throw new RuntimeException("Failed to encode projection name value", e);
}
}

@JsonProperty
Expand All @@ -66,6 +111,48 @@ public Optional<String> getKqlQuery()
return kqlQuery;
}

@JsonProperty("metadataProjectionNameValue")
public String getMetadataProjectionNameValue()
{
return metadataProjectionNameValueEncoded;
}

/**
* @param metadataColumnNameValue map of metadata column names to their values.
* @return Base64-encoded MessagePack representation of the metadataColumnNameValue map
* @throws IOException if MessagePack serialization fails
* @throws IllegalArgumentException if a value type is not String, Long, or Double
*/
private String encodeProjectionNameValue(Map<String, Object> metadataColumnNameValue) throws IOException
{
if (metadataColumnNameValue.isEmpty()) {
return "";
}

try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) {
packer.packMapHeader(metadataColumnNameValue.size());
for (Map.Entry<String, Object> entry : metadataColumnNameValue.entrySet()) {
packer.packString(entry.getKey());

Object value = entry.getValue();
if (value instanceof String) {
packer.packString((String) value);
}
else if (value instanceof Long) {
packer.packLong((Long) value);
}
else if (value instanceof Double) {
packer.packDouble((Double) value);
}
else {
throw new IllegalArgumentException(
"Unsupported type for column " + entry.getKey() + ": " + value.getClass());
}
}
return Base64.getEncoder().encodeToString(packer.toByteArray());
}
}

@Override
public NodeSelectionStrategy getNodeSelectionStrategy()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;

Expand All @@ -29,34 +31,51 @@ public class ClpTableLayoutHandle
{
private final ClpTableHandle table;
private final Optional<String> kqlQuery;
private final Optional<RowExpression> metadataExpression;
private final RowExpression metadataExpression;
private final boolean metadataQueryOnly;
private final Optional<ClpTopNSpec> topN;

private Optional<Set<String>> splitMetadataColumnNames;

@JsonCreator
public ClpTableLayoutHandle(
@JsonProperty("table") ClpTableHandle table,
@JsonProperty("kqlQuery") Optional<String> kqlQuery,
@JsonProperty("metadataExpression") Optional<RowExpression> metadataExpression,
@JsonProperty("metadataExpression") RowExpression metadataExpression,
@JsonProperty("metadataQueryOnly") boolean metadataQueryOnly,
@JsonProperty("splitMetadataColumnNames") Optional<Set<String>> splitMetadataColumnNames,
@JsonProperty("topN") Optional<ClpTopNSpec> topN)
{
this.table = table;
this.kqlQuery = kqlQuery;
this.metadataExpression = metadataExpression;
this.metadataQueryOnly = metadataQueryOnly;
this.splitMetadataColumnNames = splitMetadataColumnNames;
this.topN = topN;
}

public ClpTableLayoutHandle(
@JsonProperty("table") ClpTableHandle table,
@JsonProperty("kqlQuery") Optional<String> kqlQuery,
@JsonProperty("metadataExpression") Optional<RowExpression> metadataExpression)
@JsonProperty("metadataExpression") RowExpression metadataExpression)
{
this.table = table;
this.kqlQuery = kqlQuery;
this.metadataExpression = metadataExpression;
this.metadataQueryOnly = false;
this.splitMetadataColumnNames = Optional.empty();
this.topN = Optional.empty();
}

public ClpTableLayoutHandle(
@JsonProperty("table") ClpTableHandle table,
@JsonProperty("splitMetadataColumnNames") Optional<Set<String>> splitMetadataColumnNames)
{
this.table = table;
this.kqlQuery = Optional.empty();
this.metadataExpression = null;
this.metadataQueryOnly = false;
this.splitMetadataColumnNames = splitMetadataColumnNames;
this.topN = Optional.empty();
}

Expand All @@ -73,7 +92,7 @@ public Optional<String> getKqlQuery()
}

@JsonProperty
public Optional<RowExpression> getMetadataExpression()
public RowExpression getMetadataExpression()
{
return metadataExpression;
}
Expand All @@ -84,6 +103,20 @@ public boolean isMetadataQueryOnly()
return metadataQueryOnly;
}

@JsonProperty
public Optional<Set<String>> getSplitMetadataColumnNames()
{
return splitMetadataColumnNames;
}

public Set<String> getOrInitializeSplitMetadataColumnNames()
{
if (!splitMetadataColumnNames.isPresent()) {
splitMetadataColumnNames = Optional.of(new HashSet<>());
}
return splitMetadataColumnNames.get();
}

@JsonProperty
public Optional<ClpTopNSpec> getTopN()
{
Expand All @@ -103,14 +136,15 @@ public boolean equals(Object o)
return Objects.equals(table, that.table) &&
Objects.equals(kqlQuery, that.kqlQuery) &&
Objects.equals(metadataExpression, that.metadataExpression) &&
Objects.equals(metadataQueryOnly, that.metadataQueryOnly) &&
Objects.equals(splitMetadataColumnNames, that.splitMetadataColumnNames) &&
Objects.equals(topN, that.topN);
}

@Override
public int hashCode()
{
return Objects.hash(table, kqlQuery, metadataExpression, metadataQueryOnly, topN);
return Objects.hash(
table, kqlQuery, metadataExpression, metadataQueryOnly, topN);
}

@Override
Expand Down
Loading
Loading