From 83ae973907bf499234f8dc9904b54533e83b7c5b Mon Sep 17 00:00:00 2001 From: Kunal Khatua Date: Mon, 15 Apr 2019 10:38:03 -0700 Subject: [PATCH] DRILL-2362: Profile Mgmt 1. Write to Index 2. Read chronologically from Partitioned Dirs 3. Leverage Guava Cache 4. Infer which partitioned dir has a profile based on queryId alone 5. Trace Exception [qId: 259432dc-7f8e-8fc5-af69-16a1ca817689 ] TODO: 1. Auto Index for 1st time (In batches of 10000) from root dir (sync if Distributed) 2. Figure out if need to maintain profileSet cache and how! a. Flag to indicate change i.e. writeIncrementor redefine ranges Short circuit path exploration Init for Indexer Clean up Stats Range added s3 support --- .../org/apache/drill/exec/ExecConstants.java | 5 + .../apache/drill/exec/server/Drillbit.java | 13 + .../exec/server/profile/ProfileIndexer.java | 246 ++++++++++++ .../server/rest/profile/ProfileResources.java | 5 +- .../sys/store/DrillSysFilePathFilter.java | 56 +++ .../store/sys/store/LocalPersistentStore.java | 359 +++++++++++++++--- .../exec/store/sys/store/ProfileSet.java | 152 ++++++++ .../drill/exec/util/DrillFileSystemUtil.java | 12 + .../drill/exec/util/FileSystemUtil.java | 12 + .../src/main/resources/drill-module.conf | 6 + 10 files changed, 820 insertions(+), 46 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DrillSysFilePathFilter.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ProfileSet.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 24372ef5e40..d5deaafa9c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -236,6 +236,11 @@ private ExecConstants() { public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write"; public static final String PROFILES_STORE_INMEMORY = "drill.exec.profiles.store.inmemory"; public static final String PROFILES_STORE_CAPACITY = "drill.exec.profiles.store.capacity"; + public static final String PROFILES_STORE_CACHE_SIZE = "drill.exec.profiles.store.cache.size"; + public static final String PROFILES_STORE_INDEX_ENABLED = "drill.exec.profiles.store.index.enabled"; + public static final String PROFILES_STORE_INDEX_FORMAT = "drill.exec.profiles.store.index.format"; + public static final String PROFILES_STORE_INDEX_MAX = "drill.exec.profiles.store.index.max"; + public static final String PROFILES_STORE_INDEX_SUPPORTED_FS = "drill.exec.profiles.store.index.supported.fs"; public static final String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled"; public static final String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops"; public static final String AUTHENTICATION_MECHANISMS = "drill.exec.security.auth.mechanisms"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 215fbb0d830..f8cfc72e222 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -33,16 +33,20 @@ import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; +import org.apache.drill.exec.proto.UserBitShared.QueryProfile; import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState; import org.apache.drill.exec.server.options.OptionDefinition; import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.server.options.OptionValue.OptionScope; +import org.apache.drill.exec.server.profile.ProfileIndexer; import org.apache.drill.exec.server.options.SystemOptionManager; import org.apache.drill.exec.server.rest.WebServer; import org.apache.drill.exec.service.ServiceEngine; import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.store.LocalPersistentStore; import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider; import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider; import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; @@ -235,6 +239,15 @@ public void run() throws Exception { shutdownHook = new ShutdownThread(this, new StackTrace()); Runtime.getRuntime().addShutdownHook(shutdownHook); gracefulShutdownThread.start(); + + // Launch an archiving job that is # files and time bound + PersistentStore queryProfileStore = drillbitContext.getProfileStoreContext().getCompletedProfileStore(); + if (queryProfileStore instanceof LocalPersistentStore + && context.getConfig().getBoolean(ExecConstants.PROFILES_STORE_INDEX_ENABLED)) { + ProfileIndexer profileIndexer = new ProfileIndexer(coord, drillbitContext); + profileIndexer.indexProfiles(); + } + logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java new file mode 100644 index 00000000000..679861d0c1f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.server.profile; + +import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; + +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; +import org.apache.drill.exec.coord.zk.ZkDistributedSemaphore; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.proto.UserBitShared.QueryProfile; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.QueryProfileStoreContext; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.store.DrillSysFilePathFilter; +import org.apache.drill.exec.store.sys.store.LocalPersistentStore; +import org.apache.drill.exec.store.sys.store.ProfileSet; +import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider; +import org.apache.drill.exec.util.DrillFileSystemUtil; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manage profiles by archiving + */ +public class ProfileIndexer { + private static final Logger logger = LoggerFactory.getLogger(ProfileIndexer.class); + private static final String lockPathString = "/profileIndexer"; + private static final int DRILL_SYS_FILE_EXT_SIZE = DRILL_SYS_FILE_SUFFIX.length(); + + private final ZKClusterCoordinator zkCoord; + private final DrillFileSystem fs; + private final Path basePath; + private final ProfileSet profiles; + private final int indexingRate; + private final PathFilter sysFileSuffixFilter; + private SimpleDateFormat indexedPathFormat; + private final boolean useZkCoordinatedManagement; + private DrillConfig drillConfig; + + private PersistentStoreConfig pStoreConfig; + private LocalPersistentStore completedProfileStore; + private Stopwatch indexWatch; + private int indexedCount; + private int currentProfileCount; + + + /** + * ProfileIndexer + */ + public ProfileIndexer(ClusterCoordinator coord, DrillbitContext context) throws StoreException, IOException { + drillConfig = context.getConfig(); + + // FileSystem + try { + this.fs = inferFileSystem(drillConfig); + } catch (IOException ex) { + throw new StoreException("Unable to get filesystem", ex); + } + + //Use Zookeeper for coordinated management + final List supportedFS = drillConfig.getStringList(ExecConstants.PROFILES_STORE_INDEX_SUPPORTED_FS); + if (this.useZkCoordinatedManagement = supportedFS.contains(fs.getScheme())) { + this.zkCoord = (ZKClusterCoordinator) coord; + } else { + this.zkCoord = null; + } + + // Query Profile Store + QueryProfileStoreContext pStoreContext = context.getProfileStoreContext(); + this.completedProfileStore = (LocalPersistentStore) pStoreContext.getCompletedProfileStore(); + this.pStoreConfig = pStoreContext.getProfileStoreConfig(); + this.basePath = completedProfileStore.getBasePath(); + + this.indexingRate = drillConfig.getInt(ExecConstants.PROFILES_STORE_INDEX_MAX); + this.profiles = new ProfileSet(indexingRate); + this.indexWatch = Stopwatch.createUnstarted(); + this.sysFileSuffixFilter = new DrillSysFilePathFilter(); + String indexPathPattern = drillConfig.getString(ExecConstants.PROFILES_STORE_INDEX_FORMAT); + this.indexedPathFormat = new SimpleDateFormat(indexPathPattern); + logger.info("Organizing any existing unindexed profiles"); + } + + + /** + * Index profiles + */ + public void indexProfiles() { + this.indexWatch.start(); + + // Acquire lock IFF required + if (useZkCoordinatedManagement) { + DistributedSemaphore indexerMutex = new ZkDistributedSemaphore(zkCoord.getCurator(), lockPathString, 1); + try (DistributedLease lease = indexerMutex.acquire(0, TimeUnit.SECONDS)) { + if (lease != null) { + listAndIndex(); + } else { + logger.debug("Couldn't get a lease acquisition"); + } + } catch (Exception e) { + //DoNothing since lease acquisition failed + logger.error("Exception during lease-acquisition:: {}", e); + } + } else { + try { + listAndIndex(); + } catch (IOException e) { + logger.error("Failed to index: {}", e); + } + } + logger.info("Successfully indexed {} of {} profiles during startup in {} seconds", indexedCount, currentProfileCount, this.indexWatch.stop().elapsed(TimeUnit.SECONDS)); + } + + + //Lists and Indexes the latest profiles + private void listAndIndex() throws IOException { + currentProfileCount = listForArchiving(); + indexedCount = 0; + logger.info("Found {} profiles that need to be indexed. Will attempt to index {} profiles", currentProfileCount, + (currentProfileCount > this.indexingRate) ? this.indexingRate : currentProfileCount); + + // Track MRU index paths + Map mruIndexPath = new HashMap<>(); + if (currentProfileCount > 0) { + while (!this.profiles.isEmpty()) { + String profileToIndex = profiles.removeYoungest() + DRILL_SYS_FILE_SUFFIX; + Path srcPath = new Path(basePath, profileToIndex); + long profileStartTime = getProfileStart(srcPath); + if (profileStartTime < 0) { + logger.debug("Will skip indexing {}", srcPath); + continue; + } + String indexPath = indexedPathFormat.format(new Date(profileStartTime)); + //Check if dest dir exists + Path indexDestPath = null; + if (!mruIndexPath.containsKey(indexPath)) { + indexDestPath = new Path(basePath, indexPath); + if (!fs.isDirectory(indexDestPath)) { + // Build dir + if (fs.mkdirs(indexDestPath)) { + mruIndexPath.put(indexPath, indexDestPath); + } else { + //Creation failed. Did someone else create? + if (fs.isDirectory(indexDestPath)) { + mruIndexPath.put(indexPath, indexDestPath); + } + } + } else { + mruIndexPath.put(indexPath, indexDestPath); + } + } else { + indexDestPath = mruIndexPath.get(indexPath); + } + + //Attempt Move + boolean renameStatus = false; + if (indexDestPath != null) { + Path destPath = new Path(indexDestPath, profileToIndex); + renameStatus = DrillFileSystemUtil.rename(fs, srcPath, destPath); + if (renameStatus) { + indexedCount++; + } + } + if (indexDestPath == null || !renameStatus) { + // Stop attempting any more archiving since other StoreProviders might be archiving + logger.error("Move failed for {} [{} | {}]", srcPath, indexDestPath == null, renameStatus); + continue; + } + } + } + } + + // Deserialized and extract the profile's start time + private long getProfileStart(Path srcPath) { + try (InputStream is = fs.open(srcPath)) { + QueryProfile profile = pStoreConfig.getSerializer().deserialize(IOUtils.toByteArray(is)); + return profile.getStart(); + } catch (IOException e) { + logger.info("Unable to deserialize {}\n---{}====", srcPath, e.getMessage()); //Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@f76ca5b; line: 1, column: 65538] + logger.info("deserialization RCA==> \n {}", ExceptionUtils.getRootCause(e)); + } + return Long.MIN_VALUE; + } + + // List all profiles in store's root and identify potential candidates for archiving + private int listForArchiving() throws IOException { + // Not performing recursive search of profiles + List fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter ); + + int numProfilesInStore = 0; + for (FileStatus stat : fileStatuses) { + String profileName = stat.getPath().getName(); + //Strip extension and store only query ID + profiles.add(profileName.substring(0, profileName.length() - DRILL_SYS_FILE_EXT_SIZE), false); + numProfilesInStore++; + } + + return numProfilesInStore; + } + + // Infers File System of Local Store + private DrillFileSystem inferFileSystem(DrillConfig drillConfig) throws IOException { + boolean hasZkBlobRoot = drillConfig.hasPath(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT); + final Path blobRoot = hasZkBlobRoot ? + new org.apache.hadoop.fs.Path(drillConfig.getString(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) : + LocalPersistentStore.getLogDir(); + + return LocalPersistentStore.getFileSystem(drillConfig, blobRoot); + } + +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java index 1819962425b..a3998ff631a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java @@ -37,6 +37,7 @@ import javax.ws.rs.core.UriInfo; import javax.xml.bind.annotation.XmlRootElement; + import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; @@ -90,7 +91,7 @@ public static class ProfileInfo implements Comparable { public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query, String state, String user, double totalCost, String queueName) { - this.queryId = queryId; + this.queryId = queryId.substring(queryId.lastIndexOf('/') + 1); this.startTime = startTime; this.endTime = endTime; this.time = new Date(startTime); @@ -378,7 +379,7 @@ public Viewable getProfile(@PathParam("queryid") String queryId){ ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId), work.getContext().getConfig()); return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/profile.ftl", sc, wrapper); } catch (Exception | Error e) { - logger.error("Exception was thrown when fetching profile {} :\n{}", queryId, e); + logger.error("Exception was thrown when fetching profile {} :\n{}\n====\n", queryId, e); return ViewableWithPermissions.create(authEnabled.get(), "/rest/errorMessage.ftl", sc, e); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DrillSysFilePathFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DrillSysFilePathFilter.java new file mode 100644 index 00000000000..b6958f7dae2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DrillSysFilePathFilter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store; + +import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * Filter for Drill System Files + */ +public class DrillSysFilePathFilter implements PathFilter { + + //NOTE: The filename is a combination of query ID (which is monotonically + //decreasing value derived off epoch timestamp) and a random value. This + //filter helps eliminate that list + String cutoffFileName = null; + public DrillSysFilePathFilter() {} + + public DrillSysFilePathFilter(String cutoffSysFileName) { + if (cutoffSysFileName != null) { + this.cutoffFileName = cutoffSysFileName + DRILL_SYS_FILE_SUFFIX; + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.fs.PathFilter#accept(org.apache.hadoop.fs.Path) + */ + @Override + public boolean accept(Path file){ + if (file.getName().endsWith(DRILL_SYS_FILE_SUFFIX)) { + if (cutoffFileName != null) { + return (file.getName().compareTo(cutoffFileName) <= 0); + } else { + return true; + } + } + return false; + } +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java index 5d9e7dcd500..76f82f22587 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java @@ -23,8 +23,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; +import java.util.Date; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -32,8 +37,13 @@ import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.time.DateUtils; import org.apache.drill.common.collections.ImmutableEntry; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryProfile; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.store.sys.BasePersistentStore; @@ -46,8 +56,10 @@ import org.apache.drill.shaded.guava.com.google.common.base.Function; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder; +import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader; +import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache; import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,11 +67,33 @@ public class LocalPersistentStore extends BasePersistentStore { private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); + //Provides a threshold above which we report an event's time + private static final long RESPONSE_TIME_THRESHOLD_MSEC = /*200*/0L; + + private static final int DRILL_SYS_FILE_EXT_SIZE = DRILL_SYS_FILE_SUFFIX.length(); + private final Path basePath; private final PersistentStoreConfig config; private final DrillFileSystem fs; - public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig config) { + private SimpleDateFormat indexedPathFormat; + + private PathFilter sysFileSuffixFilter; + + private Comparator profilePathComparator; + + private Function> stringTransformer; + + private Function> fileStatusTransformer; + + private CacheLoader cacheLoader; + private LoadingCache deserializedVCache; + + private int deserializedCacheCapacity; + + private String indexPathPattern; + + public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig config/*, DrillConfig drillConfig*/) { this.basePath = new Path(base, config.getName()); this.config = config; this.fs = fs; @@ -68,9 +102,68 @@ public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig } catch (IOException e) { throw new RuntimeException("Failure setting pstore configuration path."); } + + //TODO: int cacheCapacity = drillConfig.getInt(ExecConstants.HTTP_MAX_PROFILES); + deserializedCacheCapacity = 100; //drillConfig.getInt(ExecConstants.PROFILES_STORE_CACHE_SIZE); + + indexPathPattern = "yyyy/MM/dd"; //TODO ExecConstants + indexedPathFormat = new SimpleDateFormat(indexPathPattern); + + this.sysFileSuffixFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX); + } + }; + + this.profilePathComparator = new Comparator() { + @Override + public int compare(String path1, String path2) { + return path1.substring(path1.lastIndexOf('/')+1).compareTo(path2.substring(path2.lastIndexOf('/')+1)); + } + }; + + // Transformer function to extract profile based on query ID String + this.stringTransformer = new Function>() { + @Nullable + @Override + public Entry apply(String key) { + return new ImmutableEntry<>(key, getViaAbsolutePath(key)); + } + }; + + /*// Transformer function to extract profile based on FileStatus + this.fileStatusTransformer = new Function>() { + @Nullable + @Override + public Entry apply(FileStatus fStatus) { + Path fPath = fStatus.getPath(); + String sanSuffixName = fPath.getName().substring(0, fPath.getName().length() - DRILL_SYS_FILE_EXT_SIZE); + return new ImmutableEntry<>(sanSuffixName, get(fStatus)); + } + };*/ + + //Defining Cache loader for handling missing entries + this.cacheLoader = new CacheLoader() { + @Override + public V load(String srcPathAsStr) { + //Cache miss to force loading from FS + //logger.info("cacheMiss::fetchFromFS:: {}", srcPathAsStr); + return deserializeFromFileSystem(srcPathAsStr); + } + }; + + //Creating the cache + this.deserializedVCache = CacheBuilder.newBuilder() + .initialCapacity(Math.max(deserializedCacheCapacity/5, 20)) //startingCapacity: 20% or 20 + .maximumSize(deserializedCacheCapacity) + .recordStats() + .build(cacheLoader); + + } - protected Path getBasePath() { + public Path getBasePath() { return basePath; } @@ -108,47 +201,88 @@ public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throw } @Override - public Iterator> getRange(int skip, int take) { - try { - // list only files with sys file suffix - PathFilter sysFileSuffixFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX); - } - }; + public Iterator> getRange/*Neo*/(int skip, int take) { + try { + List files = new LinkedList<>(); + // Sort and explore Directory stack using DepthFirstSearch + LinkedList profileDirStack = new LinkedList(DrillFileSystemUtil.listDirectoriesSafe(fs, basePath, false)); + Collections.sort(profileDirStack); + logger.info("dirSize:: {} ", profileDirStack.size()); - List fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter); - if (fileStatuses.isEmpty()) { - return Collections.emptyIterator(); - } + int collectedProfileCount = 0; + while (!profileDirStack.isEmpty()) { + // Explore dir from top of stack + FileStatus latestDir = profileDirStack.removeLast(); - List files = Lists.newArrayList(); - for (FileStatus stat : fileStatuses) { - String s = stat.getPath().getName(); - files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); - } + // Read all profiles in last dir + List profileStatus = DrillFileSystemUtil.listFiles(fs, latestDir.getPath(), false, sysFileSuffixFilter); + if (!profileStatus.isEmpty()) { + List additions = new LinkedList<>(); + for (FileStatus stat : profileStatus) { + String filePathStr = stat.getPath().toUri().getPath(); + additions.add(filePathStr.substring(0, filePathStr.length() - DRILL_SYS_FILE_EXT_SIZE)); + } + //Sort additions & append (saves time in resorting entire list) + Collections.sort(additions, profilePathComparator); + files.addAll(additions); + + //[sodBug] + if (!files.isEmpty()) { + logger.info("First is {}", files.get(0)); + } - Collections.sort(files); + int _pCount = profileStatus.size(); + if (_pCount > 0) { + collectedProfileCount += _pCount; + logger.info("# profiles added = {} [Total: {} (act) / {} (est)] ", _pCount, files.size(), collectedProfileCount); + } + //[eodBug] + } - return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function>() { - @Nullable - @Override - public Entry apply(String key) { - return new ImmutableEntry<>(key, get(key)); + // Explore subdirectories + List childSubdirStack = DrillFileSystemUtil.listDirectoriesSafe(fs, latestDir.getPath(), false); + // Sorting list before addition to stack + Collections.sort(childSubdirStack); + if (!childSubdirStack.isEmpty()) { + profileDirStack.addAll(childSubdirStack); + } else { + logger.info("foundLeaf:: {}", latestDir.getPath().toUri()); + } + + // Terminate exploration if required count has been met + if ( collectedProfileCount >= (skip + take) ) { + profileDirStack.clear(); + } } - }).iterator(); - } catch (IOException e) { - throw new RuntimeException(e); + + //Sorting not required since preSorted //dBug + logger.info("Post Scan First is {}", files.get(0)); + + Iterator> rangeIterator = Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), this.stringTransformer).iterator(); + logger.info("CacheSTATS::{}:: {}", (take+skip), this.deserializedVCache.stats().toString()); + return rangeIterator; + } catch (IOException e) { + throw new RuntimeException(e); + } } - } private Path makePath(String name) { + Preconditions.checkArgument( + //!name.contains("/") && + !name.contains(":") && + !name.contains("..")); +// return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + return new Path(name + DRILL_SYS_FILE_SUFFIX); + } + + // Using timestamp to infer correct pigeon-hole for writing destination + private Path makeIndexedPath(String name, long timestamp) { Preconditions.checkArgument( !name.contains("/") && - !name.contains(":") && - !name.contains("..")); - return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + !name.contains(":") && + !name.contains("..")); + Path indexedPath = new Path(basePath, indexedPathFormat.format(timestamp)); + return new Path(indexedPath, name + DRILL_SYS_FILE_SUFFIX); } @Override @@ -162,25 +296,34 @@ public boolean contains(String key) { @Override public V get(String key) { + Path actualPath = makePath(key); try { - Path path = makePath(key); - if (!fs.exists(path)) { - return null; + //logger.info("key2make::{}", key); + if (!fs.exists(actualPath)) { + //Generate paths within upper and lower bounds to test + List possibleDirs = getPossiblePaths(key.substring(key.lastIndexOf('/') + 1)); + actualPath = getPathFromPossibleDirList(key, possibleDirs); + if (actualPath == null) { + return null; + } } } catch (IOException e) { throw new RuntimeException(e); } - final Path path = makePath(key); - try (InputStream is = fs.open(path)) { - return config.getSerializer().deserialize(IOUtils.toByteArray(is)); - } catch (IOException e) { - throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); - } +// logger.info("DeSerializing {}", actualPath.toUri().getPath()); + return deserializedVCache.getUnchecked(actualPath.toString()); } @Override public void put(String key, V value) { - try (OutputStream os = fs.create(makePath(key))) { + Path writePath = null; + if (value instanceof QueryProfile) { + QueryProfile profile = (QueryProfile) value; + writePath = makeIndexedPath(key, profile.getStart()); + } else { + writePath = makePath(key); + } + try (OutputStream os = fs.create(writePath)) { IOUtils.write(config.getSerializer().serialize(value), os); } catch (IOException e) { throw new RuntimeException(e); @@ -217,4 +360,132 @@ public void delete(String key) { @Override public void close() { } + + // Gets deserialized by exact path (Used for listing) + private V getViaAbsolutePath(String key) { + try { +// logger.info("key2make::{}", key); + Path path = makePath(key); + if (!fs.exists(path)) { +// logger.info("gotNullPath for {} ", key); + return null; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + final Path path = makePath(key); + return deserializedVCache.getUnchecked(path.toString()); + } + + // Returns path if profile is found within list of possible direct (Used for blind lookup of key) + private Path getPathFromPossibleDirList(String key, List possibleDirList) { + for (String possibleDir : possibleDirList) { + Path testPath = new Path(basePath + "/" + possibleDir, key + DRILL_SYS_FILE_SUFFIX); + try { + if (fs.exists(testPath)) { + return testPath; + } + } catch (IOException e) { + /*DoNothing*/ + } + } + return null; + } + + // Infers the list of possible directories where the profile is located (Used for blind lookup of key) + private List getPossiblePaths(String queryIdString) { + //Reqd:: + QueryId queryId = QueryIdHelper.getQueryIdFromString(queryIdString); + long lowerBoundTime = (Integer.MAX_VALUE - ((queryId.getPart1() + Integer.MAX_VALUE) >> 32)) * 1000; // +/- 1000 for border cases + long upperBoundTime = (Integer.MAX_VALUE - ((queryId.getPart1() + Integer.MIN_VALUE) >> 32)) * 1000; // +/- 1000 for border cases + //[sodBug] + Date lowerBoundDate = new Date(lowerBoundTime); + String lowerBoundPath = indexedPathFormat.format(lowerBoundDate); + logger.info("Inferred LowerBound Time is {} . Look from {}", lowerBoundDate, lowerBoundPath); + Date upperBoundDate = new Date(upperBoundTime); + logger.info("Inferred UpperBound Time is {} . Look until {}", upperBoundDate, indexedPathFormat.format(upperBoundDate)); + //[eodBug] + + if (lowerBoundPath.equals(indexedPathFormat.format(upperBoundDate))) { + return Collections.singletonList(lowerBoundPath); + } + + final IncrementType incrementType = + indexPathPattern.contains("m") ? IncrementType.Minute : + indexPathPattern.contains("H") ? IncrementType.Hour : + indexPathPattern.contains("d") ? IncrementType.Day : + indexPathPattern.contains("M") ? IncrementType.Month : + indexPathPattern.contains("y") ? IncrementType.Year : null; + + if (incrementType == null) { + return new ArrayList<>(0); //Empty + } + + Date currDate = lowerBoundDate; + logger.info("currDate.after(upperBoundDate) : {}", currDate.after(upperBoundDate)); + int increment = 0; + List possibleSrcDirs = new ArrayList<>(); + do { + // Adding + possibleSrcDirs.add(indexedPathFormat.format(currDate)); + + // Incrementing + switch (incrementType) { + case Minute: + currDate = DateUtils.addMinutes(lowerBoundDate, ++increment); + break; + + case Hour: + currDate = DateUtils.addHours(lowerBoundDate, ++increment); + break; + + case Day: + currDate = DateUtils.addDays(lowerBoundDate, ++increment); + break; + + case Month: + currDate = DateUtils.addMonths(lowerBoundDate, ++increment); + break; + + case Year: + currDate = DateUtils.addYears(lowerBoundDate, ++increment); + break; + + default: + break; + } + } while (!currDate.after(upperBoundDate)); + + //[sodBug] + for (String possibility : possibleSrcDirs) { + logger.info("Possibility :: {}", possibility); + } + //[eodBug] + + return possibleSrcDirs; + + /* + // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence). Last 12 bytes are random. + final long time = (int) (System.currentTimeMillis()/1000); + final long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt(); + */ + } + + //TODO | FIXME: Guava to Handle RuntimeException by + //Deserialize path's contents (leveraged by Guava Cache) + private V deserializeFromFileSystem(String srcPath) { + final Path path = new Path(srcPath); + try (InputStream is = fs.open(path)) { + return config.getSerializer().deserialize(IOUtils.toByteArray(is)); + } catch (IOException e) { + logger.info("Unable to deserialize {}\n---{}====", path, e.getMessage()); //Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@f76ca5b; line: 1, column: 65538] + logger.info("deserialization RCA==> \n {}", ExceptionUtils.getRootCause(e)); + throw new RuntimeException("Unable TO deSerialize \"" + path, ExceptionUtils.getRootCause(e)); + } + } + + //Enumerator + private enum IncrementType { + Minute, Hour, Day, Month, Year + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ProfileSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ProfileSet.java new file mode 100644 index 00000000000..68a0dc88970 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ProfileSet.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store; + +import java.util.Iterator; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Wrapper around TreeSet to mimic a size-bound set ordered by name (implicitly the profiles' age) + */ +public class ProfileSet implements Iterable { + private TreeSet store; + private int maxCapacity; + //Using a dedicated counter to avoid + private AtomicInteger size; + + @SuppressWarnings("unused") + @Deprecated + private ProfileSet() {} + + public ProfileSet(int capacity) { + this.store = new TreeSet(); + this.maxCapacity = capacity; + this.size = new AtomicInteger(); + } + + public int size() { + return size.get(); + } + + /** + * Get max capacity of the profile set + * @return max capacity + */ + public int capacity() { + return maxCapacity; + } + + /** + * Add a profile name to the set, while removing the oldest, if exceeding capacity + * @param profile + * @return oldest profile + */ + public String add(String profile) { + return add(profile, false); + } + + /** + * Add a profile name to the set, while removing the oldest or youngest, based on flag + * @param profile + * @param retainOldest indicate retaining policy as oldest + * @return youngest/oldest profile + */ + public String add(String profile, boolean retainOldest) { + store.add(profile); + if (size.incrementAndGet() > maxCapacity) { + if (retainOldest) { + return removeYoungest(); + } else { + return removeOldest(); + } + } + return null; + } + + /** + * Remove the oldest profile + * @return oldest profile + */ + public String removeOldest() { + size.decrementAndGet(); + return store.pollLast(); + } + + /** + * Remove the youngest profile + * @return youngest profile + */ + public String removeYoungest() { + size.decrementAndGet(); + return store.pollFirst(); + } + + /** + * Retrieve the oldest profile without removal + * @return oldest profile + */ + public String getOldest() { + return store.last(); + } + + /** + * Retrieve the youngest profile without removal + * @return youngest profile + */ + public String getYoungest() { + return store.first(); + } + + /** + * Clear the set + */ + public void clear() { + size.set(0); + store.clear(); + } + + /** + * Clear the set with the initial capacity + * @param capacity + */ + public void clear(int capacity) { + clear(maxCapacity, false); + } + + /** + * Clear the set with the initial capacity + * @param capacity + * @param forceResize + */ + public void clear(int capacity, boolean forceResize) { + clear(); + if (forceResize || capacity > maxCapacity) { + maxCapacity = capacity; + } + } + + public boolean isEmpty() { + return store.isEmpty(); + } + + @Override + public Iterator iterator() { + return store.iterator(); + } +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java index bfb83e07c6a..f82900ec39a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java @@ -144,4 +144,16 @@ public static Path createPathSafe(String path) { return Strings.isNullOrEmpty(path) ? new Path("/") : new Path(path); } + /** + * Returns the status of a file/directory specified in source path to be renamed/moved to a destination path + * + * @param fs current file system + * @param src path to source + * @param dst path to destination + * @return status of rename/move + */ + public static boolean rename(FileSystem fs, Path src, Path dst) throws IOException { + return FileSystemUtil.rename(fs, src, dst); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java index 82500da30f6..9d8c368d2f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java @@ -191,6 +191,18 @@ public static PathFilter mergeFilters(PathFilter... filters) { return path -> Stream.of(filters).allMatch(filter -> filter.accept(path)); } + /** + * Helper method that will rename/move file specified in the source path to a destination path + * + * @param fs current file system + * @param src path to source + * @param dst path to destination + * @return status of rename/move + */ + public static boolean rename(FileSystem fs, Path src, Path dst) throws IOException { + return fs.rename(src, dst); + } + /** * Helper method that merges given filters into one and * determines which listing method should be called based on recursive flag value. diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index b2ff4a594cb..0adfce9a5d3 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -208,6 +208,12 @@ drill.exec: { } }, profiles.store: { + index: { + enabled: true, + max: 1000, + supported.fs: ["hdfs","maprfs","s3a"], + format: "yyyy/MM/dd" + }, inmemory: false, capacity: 1000 },