Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added freshness aware table loading using entityId:entityVersion for ETag #1037

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.Invocation;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.types.Types;
import org.apache.polaris.core.PolarisConfiguration;
Expand Down Expand Up @@ -634,6 +636,131 @@ public void testLoadTableWithAccessDelegationForExternalCatalogWithConfigEnabled
}
}

/**
* Register a table. Then, invoke an initial loadTable request to fetch and ensure ETag is present.
* Then, invoke a second loadTable to ensure that ETag is matched.
*/
@Test
public void testLoadTableTwiceWithETag() {
Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));
restCatalog.registerTable(TableIdentifier.of(ns1, "my_table_etagged"), fileLocation);
Invocation invocation = catalogApi.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_table_etagged").build("GET");
try (Response initialLoadTable = invocation.invoke()) {
assertThat(initialLoadTable.getHeaders()).containsKey(HttpHeaders.ETAG);
String etag = initialLoadTable.getHeaders().getFirst(HttpHeaders.ETAG).toString();

Invocation etaggedInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_table_etagged")
.header(HttpHeaders.IF_NONE_MATCH, etag)
.build("GET");

try (Response etaggedLoadTable = etaggedInvocation.invoke()) {
assertThat(etaggedLoadTable.getStatus()).isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode());
}
} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

/**
* Invoke an initial registerTable request to fetch and ensure ETag is present.
* Then, invoke a second loadTable to ensure that ETag is matched.
*/
@Test
public void testRegisterAndLoadTableWithReturnedETag() {
Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));

Invocation registerInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/register")
.buildPost(Entity.json(Map.of("name", "my_etagged_table", "metadata-location", fileLocation)));
try (Response registerResponse = registerInvocation.invoke()) {
assertThat(registerResponse.getHeaders()).containsKey(HttpHeaders.ETAG);
String etag = registerResponse.getHeaders().getFirst(HttpHeaders.ETAG).toString();

Invocation etaggedInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_etagged_table")
.header(HttpHeaders.IF_NONE_MATCH, etag)
.build("GET");

try (Response etaggedLoadTable = etaggedInvocation.invoke()) {
assertThat(etaggedLoadTable.getStatus()).isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode());
}

} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

@Test
public void testCreateAndLoadTableWithReturnedEtag() {
Namespace ns1 = Namespace.of("ns1");
restCatalog.createNamespace(ns1);
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
new Schema(List.of(Types.NestedField.of(1, false, "col1", new Types.StringType()))),
PartitionSpec.unpartitioned(),
"file:///tmp/ns1/my_table",
Map.of());
try (ResolvingFileIO resolvingFileIO = new ResolvingFileIO()) {
resolvingFileIO.initialize(Map.of());
resolvingFileIO.setConf(new Configuration());
String fileLocation = "file:///tmp/ns1/my_table/metadata/v1.metadata.json";
TableMetadataParser.write(tableMetadata, resolvingFileIO.newOutputFile(fileLocation));

Invocation createInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables")
.buildPost(Entity.json(CreateTableRequest.builder()
.withName("my_etagged_table")
.withLocation(tableMetadata.location())
.withPartitionSpec(tableMetadata.spec())
.withSchema(tableMetadata.schema())
.withWriteOrder(tableMetadata.sortOrder())
.build()
));
try (Response createResponse = createInvocation.invoke()) {
assertThat(createResponse.getHeaders()).containsKey(HttpHeaders.ETAG);
String etag = createResponse.getHeaders().getFirst(HttpHeaders.ETAG).toString();

Invocation etaggedInvocation = catalogApi
.request("v1/" + currentCatalogName + "/namespaces/ns1/tables/my_etagged_table")
.header(HttpHeaders.IF_NONE_MATCH, etag)
.build("GET");

try (Response etaggedLoadTable = etaggedInvocation.invoke()) {
assertThat(etaggedLoadTable.getStatus()).isEqualTo(Response.Status.NOT_MODIFIED.getStatusCode());
}
} finally {
resolvingFileIO.deleteFile(fileLocation);
}
}
}

@Test
public void testSendNotificationInternalCatalog() {
Map<String, String> payload =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.context.CallContextCatalogFactory;
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.http.IfNoneMatch;
import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
Expand Down Expand Up @@ -865,6 +866,32 @@ public void testLoadTableInsufficientPermissions() {
() -> newWrapper().loadTable(TABLE_NS1A_2, "all"));
}

@Test
public void testLoadTableIfStaleSufficientPrivileges() {
doTestSufficientPrivileges(
List.of(
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.TABLE_FULL_METADATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() -> newWrapper().loadTableIfStale(TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"),
null /* cleanupAction */);
}

@Test
public void testLoadTableIfStaleInsufficientPermissions() {
doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() -> newWrapper().loadTableIfStale(TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"));
}

@Test
public void testLoadTableWithReadAccessDelegationSufficientPrivileges() {
doTestSufficientPrivileges(
Expand Down Expand Up @@ -920,6 +947,61 @@ public void testLoadTableWithWriteAccessDelegationInsufficientPermissions() {
() -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"));
}

@Test
public void testLoadTableWithReadAccessDelegationIfStaleSufficientPrivileges() {
doTestSufficientPrivileges(
List.of(
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() -> newWrapper().loadTableWithAccessDelegationIfStale(TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"),
null /* cleanupAction */);
}

@Test
public void testLoadTableWithReadAccessDelegationIfStaleInsufficientPermissions() {
doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_FULL_METADATA,
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() -> newWrapper().loadTableWithAccessDelegationIfStale(TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"));
}

@Test
public void testLoadTableWithWriteAccessDelegationIfStaleSufficientPrivileges() {
doTestSufficientPrivileges(
List.of(
// TODO: Once we give different creds for read/write privilege, move this
// TABLE_READ_DATA into a special-case test; with only TABLE_READ_DATA we'd expet
// to receive a read-only credential.
PolarisPrivilege.TABLE_READ_DATA,
PolarisPrivilege.TABLE_WRITE_DATA,
PolarisPrivilege.CATALOG_MANAGE_CONTENT),
() -> newWrapper().loadTableWithAccessDelegationIfStale(TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"),
null /* cleanupAction */);
}

@Test
public void testLoadTableWithWriteAccessDelegationIfStaleInsufficientPermissions() {
doTestInsufficientPrivileges(
List.of(
PolarisPrivilege.NAMESPACE_FULL_METADATA,
PolarisPrivilege.VIEW_FULL_METADATA,
PolarisPrivilege.TABLE_FULL_METADATA,
PolarisPrivilege.TABLE_READ_PROPERTIES,
PolarisPrivilege.TABLE_WRITE_PROPERTIES,
PolarisPrivilege.TABLE_CREATE,
PolarisPrivilege.TABLE_LIST,
PolarisPrivilege.TABLE_DROP),
() -> newWrapper().loadTableWithAccessDelegationIfStale(TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"));
}

@Test
public void testUpdateTableSufficientPrivileges() {
doTestSufficientPrivileges(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.common.collect.ImmutableSet;
import jakarta.enterprise.context.RequestScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import java.net.URLEncoder;
Expand Down Expand Up @@ -53,6 +55,7 @@
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
Expand All @@ -68,7 +71,9 @@
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService;
import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService;
import org.apache.polaris.service.catalog.response.ETaggedResponse;
import org.apache.polaris.service.context.CallContextCatalogFactory;
import org.apache.polaris.service.http.IfNoneMatch;
import org.apache.polaris.service.types.CommitTableRequest;
import org.apache.polaris.service.types.CommitViewRequest;
import org.apache.polaris.service.types.NotificationRequest;
Expand Down Expand Up @@ -300,15 +305,15 @@ public Response createTable(
.build();
}
} else if (delegationModes.isEmpty()) {
return Response.ok(
newHandlerWrapper(realmContext, securityContext, prefix)
.createTableDirect(ns, createTableRequest))
ETaggedResponse<LoadTableResponse> createResult = newHandlerWrapper(realmContext, securityContext, prefix)
.createTableDirect(ns, createTableRequest);
return Response.ok(createResult.response()).header(HttpHeaders.ETAG, createResult.eTag())
.build();
} else {
return Response.ok(
newHandlerWrapper(realmContext, securityContext, prefix)
.createTableDirectWithWriteDelegation(ns, createTableRequest))
.build();
ETaggedResponse<LoadTableResponse> createResult = newHandlerWrapper(realmContext, securityContext, prefix)
.createTableDirectWithWriteDelegation(ns, createTableRequest);
return Response.ok(createResult.response()).header(HttpHeaders.ETAG, createResult.eTag())
.build();
}
}

Expand All @@ -331,24 +336,32 @@ public Response loadTable(
String namespace,
String table,
String accessDelegationMode,
String ifNoneMatchHeader,
String snapshots,
RealmContext realmContext,
SecurityContext securityContext) {
EnumSet<AccessDelegationMode> delegationModes =
parseAccessDelegationModes(accessDelegationMode);
Namespace ns = decodeNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table));
if (delegationModes.isEmpty()) {
return Response.ok(
newHandlerWrapper(realmContext, securityContext, prefix)
.loadTable(tableIdentifier, snapshots))
.build();
} else {
return Response.ok(
newHandlerWrapper(realmContext, securityContext, prefix)
.loadTableWithAccessDelegation(tableIdentifier, snapshots))
ETaggedResponse<LoadTableResponse> loadTableResult;

IfNoneMatch ifNoneMatch = IfNoneMatch.fromHeader(ifNoneMatchHeader);

if (ifNoneMatch.isWildcard())
throw new BadRequestException("If-None-Match may not take the value of '*'");
Comment on lines +351 to +352
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use braces here


if (delegationModes.isEmpty()) {
loadTableResult = newHandlerWrapper(realmContext, securityContext, prefix)
.loadTableIfStale(tableIdentifier, ifNoneMatch, snapshots)
.orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED));
} else {
loadTableResult = newHandlerWrapper(realmContext, securityContext, prefix)
.loadTableWithAccessDelegationIfStale(tableIdentifier, ifNoneMatch, snapshots)
.orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED));
}
return Response.ok(loadTableResult.response()).header(HttpHeaders.ETAG, loadTableResult.eTag())
.build();
}
}

@Override
Expand Down Expand Up @@ -392,9 +405,9 @@ public Response registerTable(
RealmContext realmContext,
SecurityContext securityContext) {
Namespace ns = decodeNamespace(namespace);
return Response.ok(
newHandlerWrapper(realmContext, securityContext, prefix)
.registerTable(ns, registerTableRequest))
ETaggedResponse<LoadTableResponse> registerTableResult = newHandlerWrapper(realmContext, securityContext, prefix)
.registerTable(ns, registerTableRequest);
return Response.ok(registerTableResult.response()).header(HttpHeaders.ETAG, registerTableResult.eTag())
.build();
}

Expand Down
Loading