Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core persistence refactor phase 2 - Distinguish "InCurrentTxn" variations of all BasePersistence methods in TransactionalPersistence #1135

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,25 @@
*/
package org.apache.polaris.core.persistence;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Nonnull;
import java.util.Map;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntityConstants;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.dao.entity.GenerateEntityIdResult;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;

/** Shared basic PolarisMetaStoreManager logic for transactional and non-transactional impls. */
public abstract class BaseMetaStoreManager implements PolarisMetaStoreManager {
/** mapper, allows to serialize/deserialize properties to/from JSON */
private static final ObjectMapper MAPPER = new ObjectMapper();

public static PolarisStorageConfigurationInfo extractStorageConfiguration(
@Nonnull PolarisCallContext callCtx, PolarisBaseEntity reloadedEntity) {
Map<String, String> propMap =
Expand All @@ -45,4 +56,221 @@ public static PolarisStorageConfigurationInfo extractStorageConfiguration(
return PolarisStorageConfigurationInfo.deserialize(
callCtx.getDiagServices(), storageConfigInfoStr);
}

/**
* Given the internal property as a map of key/value pairs, serialize it to a String
*
* @param properties a map of key/value pairs
* @return a String, the JSON representation of the map
*/
public String serializeProperties(PolarisCallContext callCtx, Map<String, String> properties) {

String jsonString = null;
try {
// Deserialize the JSON string to a Map<String, String>
jsonString = MAPPER.writeValueAsString(properties);
} catch (JsonProcessingException ex) {
callCtx.getDiagServices().fail("got_json_processing_exception", "ex={}", ex);
}

return jsonString;
}

/**
* Given the serialized properties, deserialize those to a {@code Map<String, String>}
*
* @param properties a JSON string representing the set of properties
* @return a Map of string
*/
public Map<String, String> deserializeProperties(PolarisCallContext callCtx, String properties) {

Map<String, String> retProperties = null;
try {
// Deserialize the JSON string to a Map<String, String>
retProperties = MAPPER.readValue(properties, new TypeReference<>() {});
} catch (JsonMappingException ex) {
callCtx.getDiagServices().fail("got_json_mapping_exception", "ex={}", ex);
} catch (JsonProcessingException ex) {
callCtx.getDiagServices().fail("got_json_processing_exception", "ex={}", ex);
}

return retProperties;
}

/**
* Performs basic validation of expected invariants on a new entity, then returns the entity with
* fields filled out for which the persistence layer is responsible.
*
* @param callCtx call context
* @param ms meta store in read/write mode
* @param entity entity we need a new persisted record for
*/
protected PolarisBaseEntity prepareToPersistNewEntity(
@Nonnull PolarisCallContext callCtx,
@Nonnull BasePersistence ms,
@Nonnull PolarisBaseEntity entity) {

// validate the entity type and subtype
callCtx.getDiagServices().checkNotNull(entity, "unexpected_null_entity");
callCtx
.getDiagServices()
.checkNotNull(entity.getName(), "unexpected_null_name", "entity={}", entity);
PolarisEntityType type = PolarisEntityType.fromCode(entity.getTypeCode());
callCtx.getDiagServices().checkNotNull(type, "unknown_type", "entity={}", entity);
PolarisEntitySubType subType = PolarisEntitySubType.fromCode(entity.getSubTypeCode());
callCtx.getDiagServices().checkNotNull(subType, "unexpected_null_subType", "entity={}", entity);
callCtx
.getDiagServices()
.check(
subType.getParentType() == null || subType.getParentType() == type,
"invalid_subtype",
"type={} subType={}",
type,
subType);

// if top-level entity, its parent should be the account
callCtx
.getDiagServices()
.check(
!type.isTopLevel() || entity.getParentId() == PolarisEntityConstants.getRootEntityId(),
"top_level_parent_should_be_account",
"entity={}",
entity);

// id should not be null
callCtx
.getDiagServices()
.check(
entity.getId() != 0 || type == PolarisEntityType.ROOT,
"id_not_set",
"entity={}",
entity);

// creation timestamp must be filled
callCtx.getDiagServices().check(entity.getCreateTimestamp() != 0, "null_create_timestamp");

// this is the first change
entity.setLastUpdateTimestamp(entity.getCreateTimestamp());

// set all other timestamps to 0
entity.setDropTimestamp(0);
entity.setPurgeTimestamp(0);
entity.setToPurgeTimestamp(0);
return entity;
}

/**
* Persist the specified new entity.
*
* @param callCtx call context
* @param ms meta store in read/write mode
* @param entity entity we need a new persisted record for
*/
protected void persistNewEntity(
@Nonnull PolarisCallContext callCtx,
@Nonnull BasePersistence ms,
@Nonnull PolarisBaseEntity entity) {
// Invoke shared logic for validation and filling out remaining fields.
entity = prepareToPersistNewEntity(callCtx, ms, entity);

// write it
ms.writeEntityAtomically(callCtx, entity, true, null);
}

/**
* Performs basic validation of expected invariants on a changed entity, then returns the entity
* with fields filled out for which the persistence layer is responsible.
*
* @param callCtx call context
* @param ms meta store
* @param entity the entity which has been changed
* @param nameOrParentChanged indicates if parent or name changed
* @param originalEntity the original state of the entity before changes
* @return the entity with its version and lastUpdateTimestamp updated
*/
protected @Nonnull PolarisBaseEntity prepareToPersistEntityAfterChange(
@Nonnull PolarisCallContext callCtx,
@Nonnull BasePersistence ms,
@Nonnull PolarisBaseEntity entity,
boolean nameOrParentChanged,
@Nonnull PolarisBaseEntity originalEntity) {

// validate the entity type and subtype
callCtx.getDiagServices().checkNotNull(entity, "unexpected_null_entity");
callCtx
.getDiagServices()
.checkNotNull(entity.getName(), "unexpected_null_name", "entity={}", entity);
PolarisEntityType type = entity.getType();
callCtx.getDiagServices().checkNotNull(type, "unexpected_null_type", "entity={}", entity);
PolarisEntitySubType subType = entity.getSubType();
callCtx.getDiagServices().checkNotNull(subType, "unexpected_null_subType", "entity={}", entity);
callCtx
.getDiagServices()
.check(
subType.getParentType() == null || subType.getParentType() == type,
"invalid_subtype",
"type={} subType={} entity={}",
type,
subType,
entity);

// entity should not have been dropped
callCtx
.getDiagServices()
.check(entity.getDropTimestamp() == 0, "entity_dropped", "entity={}", entity);

// creation timestamp must be filled
long createTimestamp = entity.getCreateTimestamp();
callCtx
.getDiagServices()
.check(createTimestamp != 0, "null_create_timestamp", "entity={}", entity);

// ensure time is not moving backward...
long now = System.currentTimeMillis();
if (now < entity.getCreateTimestamp()) {
now = entity.getCreateTimestamp() + 1;
}

// update last update timestamp and increment entity version
entity.setLastUpdateTimestamp(now);
entity.setEntityVersion(entity.getEntityVersion() + 1);
return entity;
}

/**
* Persist the specified entity after it has been changed. We will update the last changed time,
* increment the entity version and persist it in one atomic operation.
*
* @param callCtx call context
* @param ms meta store
* @param entity the entity which has been changed
* @param nameOrParentChanged indicates if parent or name changed
* @param originalEntity the original state of the entity before changes
* @return the entity with its version and lastUpdateTimestamp updated
*/
protected @Nonnull PolarisBaseEntity persistEntityAfterChange(
@Nonnull PolarisCallContext callCtx,
@Nonnull BasePersistence ms,
@Nonnull PolarisBaseEntity entity,
boolean nameOrParentChanged,
@Nonnull PolarisBaseEntity originalEntity) {
// Invoke shared logic for validation and updating expected fields.
entity =
prepareToPersistEntityAfterChange(callCtx, ms, entity, nameOrParentChanged, originalEntity);

// persist it to the various slices
ms.writeEntityAtomically(callCtx, entity, nameOrParentChanged, originalEntity);

// return it
return entity;
}

/** {@inheritDoc} */
@Override
public @Nonnull GenerateEntityIdResult generateNewEntityId(@Nonnull PolarisCallContext callCtx) {
// get meta store we should be using
BasePersistence ms = callCtx.getMetaStore();

return new GenerateEntityIdResult(ms.generateNewIdAtomically(callCtx));
}
}
Loading