-
Notifications
You must be signed in to change notification settings - Fork 415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#6566] improvement(core): Add the cache mechanism for metalake and use cache to load in-use
information.
#6569
base: main
Are you sure you want to change the base?
Changes from 18 commits
b6e7c82
0e1c41f
302d241
bade110
1d181a8
9e11e3e
aa8aa80
1b73712
7a36a92
8262b95
d037f02
3c98a5d
70897de
b1d7ab4
a78f9ec
84f66f3
084820d
52e65ca
0a78a3a
15cfc06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -262,7 +262,7 @@ private ModelCatalog asModels() { | |
|
||
private final Config config; | ||
|
||
@VisibleForTesting final Cache<NameIdentifier, CatalogWrapper> catalogCache; | ||
@VisibleForTesting static Cache<NameIdentifier, CatalogWrapper> catalogCache; | ||
|
||
private final EntityStore store; | ||
|
||
|
@@ -281,7 +281,7 @@ public CatalogManager(Config config, EntityStore store, IdGenerator idGenerator) | |
this.idGenerator = idGenerator; | ||
|
||
long cacheEvictionIntervalInMs = config.get(Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS); | ||
this.catalogCache = | ||
catalogCache = | ||
Caffeine.newBuilder() | ||
.expireAfterAccess(cacheEvictionIntervalInMs, TimeUnit.MILLISECONDS) | ||
.removalListener( | ||
|
@@ -848,8 +848,13 @@ private static boolean catalogInUse(EntityStore store, NameIdentifier ident) | |
|
||
private static boolean getCatalogInUseValue(EntityStore store, NameIdentifier catalogIdent) { | ||
try { | ||
CatalogEntity catalogEntity = | ||
store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class); | ||
CatalogWrapper wrapper = catalogCache.getIfPresent(catalogIdent); | ||
CatalogEntity catalogEntity; | ||
if (wrapper != null) { | ||
catalogEntity = wrapper.catalog.entity(); | ||
} else { | ||
catalogEntity = store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we put this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Loading |
||
} | ||
return (boolean) | ||
BASIC_CATALOG_PROPERTIES_METADATA.getOrDefault( | ||
catalogEntity.getProperties(), PROPERTY_IN_USE); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,13 +20,20 @@ | |
|
||
import static org.apache.gravitino.Metalake.PROPERTY_IN_USE; | ||
|
||
import com.github.benmanes.caffeine.cache.Cache; | ||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import com.github.benmanes.caffeine.cache.Scheduler; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.collect.Maps; | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.gravitino.Entity.EntityType; | ||
import org.apache.gravitino.EntityAlreadyExistsException; | ||
import org.apache.gravitino.EntityStore; | ||
|
@@ -64,6 +71,26 @@ public class MetalakeManager implements MetalakeDispatcher { | |
|
||
private final IdGenerator idGenerator; | ||
|
||
@VisibleForTesting | ||
static final Cache<NameIdentifier, BaseMetalake> METALAKE_CACHE = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If our goal is to accelerate the acquisition of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cacheing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think caching metalake is better, because the amount of metalake is quite limited, with small memory size we can improve the performance a lot, it is worthy to cache the metalake. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you use uppercase for this variable? Typically, we only use uppercase letter for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It indeed has |
||
Caffeine.newBuilder() | ||
.expireAfterAccess(24, TimeUnit.HOURS) | ||
.removalListener((k, v, c) -> LOG.info("Closing metalake {}.", k)) | ||
.scheduler( | ||
Scheduler.forScheduledExecutorService( | ||
new ScheduledThreadPoolExecutor( | ||
1, | ||
new ThreadFactoryBuilder() | ||
.setDaemon(true) | ||
.setNameFormat("metalake-cleaner-%d") | ||
.build()))) | ||
.build(); | ||
|
||
@VisibleForTesting | ||
public static void clearCache() { | ||
METALAKE_CACHE.invalidateAll(); | ||
} | ||
|
||
/** | ||
* Constructs a MetalakeManager instance. | ||
* | ||
|
@@ -73,6 +100,13 @@ public class MetalakeManager implements MetalakeDispatcher { | |
public MetalakeManager(EntityStore store, IdGenerator idGenerator) { | ||
this.store = store; | ||
this.idGenerator = idGenerator; | ||
|
||
// pre-load all metalakes and put them into cache, this is useful when user load schema/table | ||
// directly without list/get metalake first. | ||
BaseMetalake[] metalakes = listMetalakes(); | ||
for (BaseMetalake metalake : metalakes) { | ||
METALAKE_CACHE.put(metalake.nameIdentifier(), metalake); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -103,10 +137,12 @@ public static void checkMetalake(NameIdentifier ident, EntityStore store) | |
public static boolean metalakeInUse(EntityStore store, NameIdentifier ident) | ||
throws NoSuchMetalakeException { | ||
try { | ||
BaseMetalake metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class); | ||
BaseMetalake metalake = METALAKE_CACHE.getIfPresent(ident); | ||
if (metalake == null) { | ||
metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class); | ||
} | ||
mchades marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return (boolean) | ||
metalake.propertiesMetadata().getOrDefault(metalake.properties(), PROPERTY_IN_USE); | ||
|
||
} catch (NoSuchEntityException e) { | ||
LOG.warn("Metalake {} does not exist", ident, e); | ||
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); | ||
|
@@ -149,20 +185,23 @@ public BaseMetalake[] listMetalakes() { | |
*/ | ||
@Override | ||
public BaseMetalake loadMetalake(NameIdentifier ident) throws NoSuchMetalakeException { | ||
try { | ||
BaseMetalake baseMetalake = | ||
TreeLockUtils.doWithTreeLock( | ||
ident, | ||
LockType.READ, | ||
() -> store.get(ident, EntityType.METALAKE, BaseMetalake.class)); | ||
return newMetalakeWithResolvedProperties(baseMetalake); | ||
} catch (NoSuchEntityException e) { | ||
LOG.warn("Metalake {} does not exist", ident, e); | ||
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); | ||
} catch (IOException ioe) { | ||
LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); | ||
throw new RuntimeException(ioe); | ||
} | ||
return TreeLockUtils.doWithTreeLock( | ||
ident, | ||
LockType.READ, | ||
() -> { | ||
try { | ||
BaseMetalake baseMetalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we still need to get the metalake from store, rather than from cache? With your code, what's the meaning of |
||
baseMetalake = newMetalakeWithResolvedProperties(baseMetalake); | ||
METALAKE_CACHE.put(ident, baseMetalake); | ||
return baseMetalake; | ||
} catch (NoSuchEntityException e) { | ||
LOG.warn("Metalake {} does not exist", ident, e); | ||
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); | ||
} catch (IOException ioe) { | ||
LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe); | ||
throw new RuntimeException(ioe); | ||
} | ||
}); | ||
} | ||
|
||
private BaseMetalake newMetalakeWithResolvedProperties(BaseMetalake metalakeEntity) { | ||
|
@@ -222,6 +261,7 @@ public BaseMetalake createMetalake( | |
() -> { | ||
try { | ||
store.put(metalake, false /* overwritten */); | ||
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(metalake)); | ||
return metalake; | ||
} catch (EntityAlreadyExistsException | AlreadyExistsException e) { | ||
LOG.warn("Metalake {} already exists", ident, e); | ||
|
@@ -253,22 +293,24 @@ public BaseMetalake alterMetalake(NameIdentifier ident, MetalakeChange... change | |
throw new MetalakeNotInUseException( | ||
"Metalake %s is not in use, please enable it first", ident); | ||
} | ||
|
||
return store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
builder = updateEntity(builder, newProps, changes); | ||
|
||
return builder.build(); | ||
}); | ||
|
||
METALAKE_CACHE.invalidate(ident); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the result be cached after updating? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put it back to cache seems to be an optional if this optional is not frequently called. Anyway, this is an improvement, let me check if we can add it back. |
||
BaseMetalake baseMetalake = | ||
store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
builder = updateEntity(builder, newProps, changes); | ||
|
||
return builder.build(); | ||
}); | ||
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here should use ident of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, good catch |
||
return baseMetalake; | ||
} catch (NoSuchEntityException ne) { | ||
LOG.warn("Metalake {} does not exist", ident, ne); | ||
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident); | ||
|
@@ -305,6 +347,8 @@ public boolean dropMetalake(NameIdentifier ident, boolean force) | |
"Metalake %s is in use, please disable it first or use force option", ident); | ||
} | ||
|
||
METALAKE_CACHE.invalidate(ident); | ||
|
||
List<CatalogEntity> catalogEntities = | ||
store.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG); | ||
if (!catalogEntities.isEmpty() && !force) { | ||
|
@@ -331,22 +375,25 @@ public void enableMetalake(NameIdentifier ident) throws NoSuchMetalakeException | |
try { | ||
boolean inUse = metalakeInUse(store, ident); | ||
if (!inUse) { | ||
store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
|
||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
newProps.put(PROPERTY_IN_USE, "true"); | ||
builder.withProperties(newProps); | ||
|
||
return builder.build(); | ||
}); | ||
METALAKE_CACHE.invalidate(ident); | ||
BaseMetalake baseMetalake = | ||
store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
|
||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
newProps.put(PROPERTY_IN_USE, "true"); | ||
builder.withProperties(newProps); | ||
|
||
return builder.build(); | ||
}); | ||
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake)); | ||
} | ||
|
||
return null; | ||
|
@@ -365,22 +412,25 @@ public void disableMetalake(NameIdentifier ident) throws NoSuchMetalakeException | |
try { | ||
boolean inUse = metalakeInUse(store, ident); | ||
if (inUse) { | ||
store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
|
||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
newProps.put(PROPERTY_IN_USE, "false"); | ||
builder.withProperties(newProps); | ||
|
||
return builder.build(); | ||
}); | ||
METALAKE_CACHE.invalidate(ident); | ||
yuqi1129 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
BaseMetalake baseMetalake = | ||
store.update( | ||
ident, | ||
BaseMetalake.class, | ||
EntityType.METALAKE, | ||
metalake -> { | ||
BaseMetalake.Builder builder = newMetalakeBuilder(metalake); | ||
|
||
Map<String, String> newProps = | ||
metalake.properties() == null | ||
? Maps.newHashMap() | ||
: Maps.newHashMap(metalake.properties()); | ||
newProps.put(PROPERTY_IN_USE, "false"); | ||
builder.withProperties(newProps); | ||
|
||
return builder.build(); | ||
}); | ||
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake)); | ||
} | ||
return null; | ||
} catch (IOException e) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we make it static? I assume there will be only one
CatalogManager
, so there should be only onecatalogCache
, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method check
catalogInUse
andmetalakeInUse
are all static. If we want to use cache for them, we need to change it tostatic
Yes, there will be only one cache and all catalogs shares the same instance, It's not a big problem I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should named
CATALOG_CACHE
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be
final
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, let me change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use
static final
flag, then Gravitino server configuraoin likegravitino.catalog.cache.evictionIntervalMs
should be remove as we can't used it in static code block, Is that acceptable to you?