Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added freshness aware table loading using entityId:entityVersion for ETag #1037

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.Invocation;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.types.Types;
import org.apache.polaris.core.PolarisConfiguration;
Expand Down Expand Up @@ -634,6 +636,131 @@ public void testLoadTableWithAccessDelegationForExternalCatalogWithConfigEnabled
}
}

/**
* Register a table. Then, invoke an initial loadTable request to fetch and ensure ETag is present.
* Then, invoke a second loadTable to ensure that ETag is matched.
*/
@Test
public void testLoadTableTwiceWithETag() {
Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));
restCatalog.registerTable(TableIdentifier.of(ns1, "my_table_etagged"), fileLocation);
Invocation invocation = catalogApi.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_table_etagged").build("GET");
try (Response initialLoadTable = invocation.invoke()) {
assertThat(initialLoadTable.getHeaders()).containsKey(HttpHeaders.ETAG);
String etag = initialLoadTable.getHeaders().getFirst(HttpHeaders.ETAG).toString();

Invocation etaggedInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_table_etagged")
.header(HttpHeaders.IF_NONE_MATCH, etag)
.build("GET");

try (Response etaggedLoadTable = etaggedInvocation.invoke()) {
assertThat(etaggedLoadTable.getStatus()).isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode());
}
} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

/**
* Invoke an initial registerTable request to fetch and ensure ETag is present.
* Then, invoke a second loadTable to ensure that ETag is matched.
*/
@Test
public void testRegisterAndLoadTableWithReturnedETag() {
Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));

Invocation registerInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/register")
.buildPost(Entity.json(Map.of("name", "my_etagged_table", "metadata-location", fileLocation)));
try (Response registerResponse = registerInvocation.invoke()) {
assertThat(registerResponse.getHeaders()).containsKey(HttpHeaders.ETAG);
String etag = registerResponse.getHeaders().getFirst(HttpHeaders.ETAG).toString();

Invocation etaggedInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_etagged_table")
.header(HttpHeaders.IF_NONE_MATCH, etag)
.build("GET");

try (Response etaggedLoadTable = etaggedInvocation.invoke()) {
assertThat(etaggedLoadTable.getStatus()).isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode());
}

} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

@Test
public void testCreateAndLoadTableWithReturnedEtag() {
Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));

Invocation createInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables")
.buildPost(Entity.json(CreateTableRequest.builder()
.withName("my_etagged_table")
.withLocation(tableMetadata.location())
.withPartitionSpec(tableMetadata.spec())
.withSchema(tableMetadata.schema())
.withWriteOrder(tableMetadata.sortOrder())
.build()
));
try (Response createResponse = createInvocation.invoke()) {
assertThat(createResponse.getHeaders()).containsKey(HttpHeaders.ETAG);
String etag = createResponse.getHeaders().getFirst(HttpHeaders.ETAG).toString();

Invocation etaggedInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_etagged_table")
.header(HttpHeaders.IF_NONE_MATCH, etag)
.build("GET");

try (Response etaggedLoadTable = etaggedInvocation.invoke()) {
assertThat(etaggedLoadTable.getStatus()).isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode());
}
} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

@Test
public void testSendNotificationInternalCatalog() {
Map<String, String> payload =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.core.entity;

import net.minidev.json.annotate.JsonIgnore;

/**
* Entities that can expose an ETag that can uniquely identify their current state.
*/
public interface ETaggableEntity {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface mixes REST/HTTP concerns w/ persistence concerns. Please remove the ETag functionality from persistence.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed! Removed all references to it in the persistence layer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this separation is wise. Besides the fact that it complicates the code, we'll eventually want to expose etag semantics to the persistence layer in some form. i.e. if the entity an etag refers to is indeed still in the version specified in the etag, we don't need to pull the entity out of the metastore at all.


/**
* Obtain an ETag that uniquely identifies the current state of the entity.
* @return the ETag
*/
@JsonIgnore
String getETag();

/**
* Determines if the provided etag identifies the current version of the entity.
* @param etag The etag to compare the entity against
* @return true if the etag identifies the current state of the entity, false otherwise
*/
@JsonIgnore
default boolean isCurrent(String etag) {
return getETag().equals(etag);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTUtil;

public class TableLikeEntity extends PolarisEntity {
public class TableLikeEntity extends PolarisEntity implements ETaggableEntity {
// For applicable types, this key on the "internalProperties" map will return the location
// of the internalProperties JSON file.
public static final String METADATA_LOCATION_KEY = "metadata-location";
Expand Down Expand Up @@ -79,6 +79,12 @@ public String getBaseLocation() {
return getPropertiesAsMap().get(PolarisEntityConstants.ENTITY_BASE_LOCATION);
}

@Override
@JsonIgnore
public String getETag() {
return id + ":" + entityVersion;
}

public static class Builder extends PolarisEntity.BaseBuilder<TableLikeEntity, Builder> {
public Builder(TableIdentifier identifier, String metadataLocation) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,32 @@ public void testLoadTableInsufficientPermissions() {
() -> newWrapper().loadTable(TABLE_NS1A_2, "all"));
}

@Test
public void testLoadTableIfStaleSufficientPrivileges() {
doTestSufficientPrivileges(
List.of(
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.TABLE_FULL_METADATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() -> newWrapper().loadTableIfStale(TABLE_NS1A_2, "0:0", "all"),
null /* cleanupAction */);
}

@Test
public void testLoadTableIfStaleInsufficientPermissions() {
doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() -> newWrapper().loadTableIfStale(TABLE_NS1A_2, "0:0", "all"));
}

@Test
public void testLoadTableWithReadAccessDelegationSufficientPrivileges() {
doTestSufficientPrivileges(
Expand Down Expand Up @@ -920,6 +946,61 @@ public void testLoadTableWithWriteAccessDelegationInsufficientPermissions() {
() -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"));
}

@Test
public void testLoadTableWithReadAccessDelegationIfStaleSufficientPrivileges() {
doTestSufficientPrivileges(
List.of(
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() -> newWrapper().loadTableWithAccessDelegationIfStale(TABLE_NS1A_2, "0:0", "all"),
null /* cleanupAction */);
}

@Test
public void testLoadTableWithReadAccessDelegationIfStaleInsufficientPermissions() {
doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_FULL_METADATA,
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() -> newWrapper().loadTableWithAccessDelegationIfStale(TABLE_NS1A_2, "0:0", "all"));
}

@Test
public void testLoadTableWithWriteAccessDelegationIfStaleSufficientPrivileges() {
doTestSufficientPrivileges(
List.of(
// TODO: Once we give different creds for read/write privilege, move this
// TABLE_READ_DATA into a special-case test; with only TABLE_READ_DATA we'd expet
// to receive a read-only credential.
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() -> newWrapper().loadTableWithAccessDelegationIfStale(TABLE_NS1A_2, "0:0", "all"),
null /* cleanupAction */);
}

@Test
public void testLoadTableWithWriteAccessDelegationIfStaleInsufficientPermissions() {
doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_FULL_METADATA,
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() -> newWrapper().loadTableWithAccessDelegationIfStale(TABLE_NS1A_2, "0:0", "all"));
}

@Test
public void testUpdateTableSufficientPrivileges() {
doTestSufficientPrivileges(
Expand Down
Loading