diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 24372ef5e40..d5deaafa9c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -236,6 +236,11 @@ private ExecConstants() { public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write"; public static final String PROFILES_STORE_INMEMORY = "drill.exec.profiles.store.inmemory"; public static final String PROFILES_STORE_CAPACITY = "drill.exec.profiles.store.capacity"; + public static final String PROFILES_STORE_CACHE_SIZE = "drill.exec.profiles.store.cache.size"; + public static final String PROFILES_STORE_INDEX_ENABLED = "drill.exec.profiles.store.index.enabled"; + public static final String PROFILES_STORE_INDEX_FORMAT = "drill.exec.profiles.store.index.format"; + public static final String PROFILES_STORE_INDEX_MAX = "drill.exec.profiles.store.index.max"; + public static final String PROFILES_STORE_INDEX_SUPPORTED_FS = "drill.exec.profiles.store.index.supported.fs"; public static final String IMPERSONATION_ENABLED = "drill.exec.impersonation.enabled"; public static final String IMPERSONATION_MAX_CHAINED_USER_HOPS = "drill.exec.impersonation.max_chained_user_hops"; public static final String AUTHENTICATION_MECHANISMS = "drill.exec.security.auth.mechanisms"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 215fbb0d830..f8cfc72e222 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -33,16 +33,20 @@ import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; +import org.apache.drill.exec.proto.UserBitShared.QueryProfile; import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState; import org.apache.drill.exec.server.options.OptionDefinition; import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.server.options.OptionValue.OptionScope; +import org.apache.drill.exec.server.profile.ProfileIndexer; import org.apache.drill.exec.server.options.SystemOptionManager; import org.apache.drill.exec.server.rest.WebServer; import org.apache.drill.exec.service.ServiceEngine; import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.store.LocalPersistentStore; import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider; import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider; import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; @@ -235,6 +239,15 @@ public void run() throws Exception { shutdownHook = new ShutdownThread(this, new StackTrace()); Runtime.getRuntime().addShutdownHook(shutdownHook); gracefulShutdownThread.start(); + + // Launch an archiving job that is # files and time bound + PersistentStore queryProfileStore = drillbitContext.getProfileStoreContext().getCompletedProfileStore(); + if (queryProfileStore instanceof LocalPersistentStore + && context.getConfig().getBoolean(ExecConstants.PROFILES_STORE_INDEX_ENABLED)) { + ProfileIndexer profileIndexer = new ProfileIndexer(coord, drillbitContext); + profileIndexer.indexProfiles(); + } + logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java new file mode 100644 index 00000000000..679861d0c1f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.server.profile; + +import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; + +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; +import org.apache.drill.exec.coord.zk.ZkDistributedSemaphore; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.proto.UserBitShared.QueryProfile; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.QueryProfileStoreContext; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.store.DrillSysFilePathFilter; +import org.apache.drill.exec.store.sys.store.LocalPersistentStore; +import org.apache.drill.exec.store.sys.store.ProfileSet; +import org.apache.drill.exec.store.sys.store.provider.ZookeeperPersistentStoreProvider; +import org.apache.drill.exec.util.DrillFileSystemUtil; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manage profiles by archiving + */ +public class ProfileIndexer { + private static final Logger logger = LoggerFactory.getLogger(ProfileIndexer.class); + private static final String lockPathString = "/profileIndexer"; + private static final int DRILL_SYS_FILE_EXT_SIZE = DRILL_SYS_FILE_SUFFIX.length(); + + private final ZKClusterCoordinator zkCoord; + private final DrillFileSystem fs; + private final Path basePath; + private final ProfileSet profiles; + private final int indexingRate; + private final PathFilter sysFileSuffixFilter; + private SimpleDateFormat indexedPathFormat; + private final boolean useZkCoordinatedManagement; + private DrillConfig drillConfig; + + private PersistentStoreConfig pStoreConfig; + private LocalPersistentStore completedProfileStore; + private Stopwatch indexWatch; + private int indexedCount; + private int currentProfileCount; + + + /** + * ProfileIndexer + */ + public ProfileIndexer(ClusterCoordinator coord, DrillbitContext context) throws StoreException, IOException { + drillConfig = context.getConfig(); + + // FileSystem + try { + this.fs = inferFileSystem(drillConfig); + } catch (IOException ex) { + throw new StoreException("Unable to get filesystem", ex); + } + + //Use Zookeeper for coordinated management + final List supportedFS = drillConfig.getStringList(ExecConstants.PROFILES_STORE_INDEX_SUPPORTED_FS); + if (this.useZkCoordinatedManagement = supportedFS.contains(fs.getScheme())) { + this.zkCoord = (ZKClusterCoordinator) coord; + } else { + this.zkCoord = null; + } + + // Query Profile Store + QueryProfileStoreContext pStoreContext = context.getProfileStoreContext(); + this.completedProfileStore = (LocalPersistentStore) pStoreContext.getCompletedProfileStore(); + this.pStoreConfig = pStoreContext.getProfileStoreConfig(); + this.basePath = completedProfileStore.getBasePath(); + + this.indexingRate = drillConfig.getInt(ExecConstants.PROFILES_STORE_INDEX_MAX); + this.profiles = new ProfileSet(indexingRate); + this.indexWatch = Stopwatch.createUnstarted(); + this.sysFileSuffixFilter = new DrillSysFilePathFilter(); + String indexPathPattern = drillConfig.getString(ExecConstants.PROFILES_STORE_INDEX_FORMAT); + this.indexedPathFormat = new SimpleDateFormat(indexPathPattern); + logger.info("Organizing any existing unindexed profiles"); + } + + + /** + * Index profiles + */ + public void indexProfiles() { + this.indexWatch.start(); + + // Acquire lock IFF required + if (useZkCoordinatedManagement) { + DistributedSemaphore indexerMutex = new ZkDistributedSemaphore(zkCoord.getCurator(), lockPathString, 1); + try (DistributedLease lease = indexerMutex.acquire(0, TimeUnit.SECONDS)) { + if (lease != null) { + listAndIndex(); + } else { + logger.debug("Couldn't get a lease acquisition"); + } + } catch (Exception e) { + //DoNothing since lease acquisition failed + logger.error("Exception during lease-acquisition:: {}", e); + } + } else { + try { + listAndIndex(); + } catch (IOException e) { + logger.error("Failed to index: {}", e); + } + } + logger.info("Successfully indexed {} of {} profiles during startup in {} seconds", indexedCount, currentProfileCount, this.indexWatch.stop().elapsed(TimeUnit.SECONDS)); + } + + + //Lists and Indexes the latest profiles + private void listAndIndex() throws IOException { + currentProfileCount = listForArchiving(); + indexedCount = 0; + logger.info("Found {} profiles that need to be indexed. Will attempt to index {} profiles", currentProfileCount, + (currentProfileCount > this.indexingRate) ? this.indexingRate : currentProfileCount); + + // Track MRU index paths + Map mruIndexPath = new HashMap<>(); + if (currentProfileCount > 0) { + while (!this.profiles.isEmpty()) { + String profileToIndex = profiles.removeYoungest() + DRILL_SYS_FILE_SUFFIX; + Path srcPath = new Path(basePath, profileToIndex); + long profileStartTime = getProfileStart(srcPath); + if (profileStartTime < 0) { + logger.debug("Will skip indexing {}", srcPath); + continue; + } + String indexPath = indexedPathFormat.format(new Date(profileStartTime)); + //Check if dest dir exists + Path indexDestPath = null; + if (!mruIndexPath.containsKey(indexPath)) { + indexDestPath = new Path(basePath, indexPath); + if (!fs.isDirectory(indexDestPath)) { + // Build dir + if (fs.mkdirs(indexDestPath)) { + mruIndexPath.put(indexPath, indexDestPath); + } else { + //Creation failed. Did someone else create? + if (fs.isDirectory(indexDestPath)) { + mruIndexPath.put(indexPath, indexDestPath); + } + } + } else { + mruIndexPath.put(indexPath, indexDestPath); + } + } else { + indexDestPath = mruIndexPath.get(indexPath); + } + + //Attempt Move + boolean renameStatus = false; + if (indexDestPath != null) { + Path destPath = new Path(indexDestPath, profileToIndex); + renameStatus = DrillFileSystemUtil.rename(fs, srcPath, destPath); + if (renameStatus) { + indexedCount++; + } + } + if (indexDestPath == null || !renameStatus) { + // Stop attempting any more archiving since other StoreProviders might be archiving + logger.error("Move failed for {} [{} | {}]", srcPath, indexDestPath == null, renameStatus); + continue; + } + } + } + } + + // Deserialized and extract the profile's start time + private long getProfileStart(Path srcPath) { + try (InputStream is = fs.open(srcPath)) { + QueryProfile profile = pStoreConfig.getSerializer().deserialize(IOUtils.toByteArray(is)); + return profile.getStart(); + } catch (IOException e) { + logger.info("Unable to deserialize {}\n---{}====", srcPath, e.getMessage()); //Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@f76ca5b; line: 1, column: 65538] + logger.info("deserialization RCA==> \n {}", ExceptionUtils.getRootCause(e)); + } + return Long.MIN_VALUE; + } + + // List all profiles in store's root and identify potential candidates for archiving + private int listForArchiving() throws IOException { + // Not performing recursive search of profiles + List fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter ); + + int numProfilesInStore = 0; + for (FileStatus stat : fileStatuses) { + String profileName = stat.getPath().getName(); + //Strip extension and store only query ID + profiles.add(profileName.substring(0, profileName.length() - DRILL_SYS_FILE_EXT_SIZE), false); + numProfilesInStore++; + } + + return numProfilesInStore; + } + + // Infers File System of Local Store + private DrillFileSystem inferFileSystem(DrillConfig drillConfig) throws IOException { + boolean hasZkBlobRoot = drillConfig.hasPath(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT); + final Path blobRoot = hasZkBlobRoot ? + new org.apache.hadoop.fs.Path(drillConfig.getString(ZookeeperPersistentStoreProvider.DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) : + LocalPersistentStore.getLogDir(); + + return LocalPersistentStore.getFileSystem(drillConfig, blobRoot); + } + +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java index 1819962425b..a3998ff631a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java @@ -37,6 +37,7 @@ import javax.ws.rs.core.UriInfo; import javax.xml.bind.annotation.XmlRootElement; + import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; @@ -90,7 +91,7 @@ public static class ProfileInfo implements Comparable { public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query, String state, String user, double totalCost, String queueName) { - this.queryId = queryId; + this.queryId = queryId.substring(queryId.lastIndexOf('/') + 1); this.startTime = startTime; this.endTime = endTime; this.time = new Date(startTime); @@ -378,7 +379,7 @@ public Viewable getProfile(@PathParam("queryid") String queryId){ ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId), work.getContext().getConfig()); return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/profile.ftl", sc, wrapper); } catch (Exception | Error e) { - logger.error("Exception was thrown when fetching profile {} :\n{}", queryId, e); + logger.error("Exception was thrown when fetching profile {} :\n{}\n====\n", queryId, e); return ViewableWithPermissions.create(authEnabled.get(), "/rest/errorMessage.ftl", sc, e); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DrillSysFilePathFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DrillSysFilePathFilter.java new file mode 100644 index 00000000000..b6958f7dae2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DrillSysFilePathFilter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store; + +import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * Filter for Drill System Files + */ +public class DrillSysFilePathFilter implements PathFilter { + + //NOTE: The filename is a combination of query ID (which is monotonically + //decreasing value derived off epoch timestamp) and a random value. This + //filter helps eliminate that list + String cutoffFileName = null; + public DrillSysFilePathFilter() {} + + public DrillSysFilePathFilter(String cutoffSysFileName) { + if (cutoffSysFileName != null) { + this.cutoffFileName = cutoffSysFileName + DRILL_SYS_FILE_SUFFIX; + } + } + + /* (non-Javadoc) + * @see org.apache.hadoop.fs.PathFilter#accept(org.apache.hadoop.fs.Path) + */ + @Override + public boolean accept(Path file){ + if (file.getName().endsWith(DRILL_SYS_FILE_SUFFIX)) { + if (cutoffFileName != null) { + return (file.getName().compareTo(cutoffFileName) <= 0); + } else { + return true; + } + } + return false; + } +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java index 5d9e7dcd500..76f82f22587 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java @@ -23,8 +23,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; +import java.util.Date; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -32,8 +37,13 @@ import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.time.DateUtils; import org.apache.drill.common.collections.ImmutableEntry; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryProfile; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.store.sys.BasePersistentStore; @@ -46,8 +56,10 @@ import org.apache.drill.shaded.guava.com.google.common.base.Function; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder; +import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader; +import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache; import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,11 +67,33 @@ public class LocalPersistentStore extends BasePersistentStore { private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); + //Provides a threshold above which we report an event's time + private static final long RESPONSE_TIME_THRESHOLD_MSEC = /*200*/0L; + + private static final int DRILL_SYS_FILE_EXT_SIZE = DRILL_SYS_FILE_SUFFIX.length(); + private final Path basePath; private final PersistentStoreConfig config; private final DrillFileSystem fs; - public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig config) { + private SimpleDateFormat indexedPathFormat; + + private PathFilter sysFileSuffixFilter; + + private Comparator profilePathComparator; + + private Function> stringTransformer; + + private Function> fileStatusTransformer; + + private CacheLoader cacheLoader; + private LoadingCache deserializedVCache; + + private int deserializedCacheCapacity; + + private String indexPathPattern; + + public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig config/*, DrillConfig drillConfig*/) { this.basePath = new Path(base, config.getName()); this.config = config; this.fs = fs; @@ -68,9 +102,68 @@ public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig } catch (IOException e) { throw new RuntimeException("Failure setting pstore configuration path."); } + + //TODO: int cacheCapacity = drillConfig.getInt(ExecConstants.HTTP_MAX_PROFILES); + deserializedCacheCapacity = 100; //drillConfig.getInt(ExecConstants.PROFILES_STORE_CACHE_SIZE); + + indexPathPattern = "yyyy/MM/dd"; //TODO ExecConstants + indexedPathFormat = new SimpleDateFormat(indexPathPattern); + + this.sysFileSuffixFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX); + } + }; + + this.profilePathComparator = new Comparator() { + @Override + public int compare(String path1, String path2) { + return path1.substring(path1.lastIndexOf('/')+1).compareTo(path2.substring(path2.lastIndexOf('/')+1)); + } + }; + + // Transformer function to extract profile based on query ID String + this.stringTransformer = new Function>() { + @Nullable + @Override + public Entry apply(String key) { + return new ImmutableEntry<>(key, getViaAbsolutePath(key)); + } + }; + + /*// Transformer function to extract profile based on FileStatus + this.fileStatusTransformer = new Function>() { + @Nullable + @Override + public Entry apply(FileStatus fStatus) { + Path fPath = fStatus.getPath(); + String sanSuffixName = fPath.getName().substring(0, fPath.getName().length() - DRILL_SYS_FILE_EXT_SIZE); + return new ImmutableEntry<>(sanSuffixName, get(fStatus)); + } + };*/ + + //Defining Cache loader for handling missing entries + this.cacheLoader = new CacheLoader() { + @Override + public V load(String srcPathAsStr) { + //Cache miss to force loading from FS + //logger.info("cacheMiss::fetchFromFS:: {}", srcPathAsStr); + return deserializeFromFileSystem(srcPathAsStr); + } + }; + + //Creating the cache + this.deserializedVCache = CacheBuilder.newBuilder() + .initialCapacity(Math.max(deserializedCacheCapacity/5, 20)) //startingCapacity: 20% or 20 + .maximumSize(deserializedCacheCapacity) + .recordStats() + .build(cacheLoader); + + } - protected Path getBasePath() { + public Path getBasePath() { return basePath; } @@ -108,47 +201,88 @@ public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throw } @Override - public Iterator> getRange(int skip, int take) { - try { - // list only files with sys file suffix - PathFilter sysFileSuffixFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX); - } - }; + public Iterator> getRange/*Neo*/(int skip, int take) { + try { + List files = new LinkedList<>(); + // Sort and explore Directory stack using DepthFirstSearch + LinkedList profileDirStack = new LinkedList(DrillFileSystemUtil.listDirectoriesSafe(fs, basePath, false)); + Collections.sort(profileDirStack); + logger.info("dirSize:: {} ", profileDirStack.size()); - List fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter); - if (fileStatuses.isEmpty()) { - return Collections.emptyIterator(); - } + int collectedProfileCount = 0; + while (!profileDirStack.isEmpty()) { + // Explore dir from top of stack + FileStatus latestDir = profileDirStack.removeLast(); - List files = Lists.newArrayList(); - for (FileStatus stat : fileStatuses) { - String s = stat.getPath().getName(); - files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); - } + // Read all profiles in last dir + List profileStatus = DrillFileSystemUtil.listFiles(fs, latestDir.getPath(), false, sysFileSuffixFilter); + if (!profileStatus.isEmpty()) { + List additions = new LinkedList<>(); + for (FileStatus stat : profileStatus) { + String filePathStr = stat.getPath().toUri().getPath(); + additions.add(filePathStr.substring(0, filePathStr.length() - DRILL_SYS_FILE_EXT_SIZE)); + } + //Sort additions & append (saves time in resorting entire list) + Collections.sort(additions, profilePathComparator); + files.addAll(additions); + + //[sodBug] + if (!files.isEmpty()) { + logger.info("First is {}", files.get(0)); + } - Collections.sort(files); + int _pCount = profileStatus.size(); + if (_pCount > 0) { + collectedProfileCount += _pCount; + logger.info("# profiles added = {} [Total: {} (act) / {} (est)] ", _pCount, files.size(), collectedProfileCount); + } + //[eodBug] + } - return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function>() { - @Nullable - @Override - public Entry apply(String key) { - return new ImmutableEntry<>(key, get(key)); + // Explore subdirectories + List childSubdirStack = DrillFileSystemUtil.listDirectoriesSafe(fs, latestDir.getPath(), false); + // Sorting list before addition to stack + Collections.sort(childSubdirStack); + if (!childSubdirStack.isEmpty()) { + profileDirStack.addAll(childSubdirStack); + } else { + logger.info("foundLeaf:: {}", latestDir.getPath().toUri()); + } + + // Terminate exploration if required count has been met + if ( collectedProfileCount >= (skip + take) ) { + profileDirStack.clear(); + } } - }).iterator(); - } catch (IOException e) { - throw new RuntimeException(e); + + //Sorting not required since preSorted //dBug + logger.info("Post Scan First is {}", files.get(0)); + + Iterator> rangeIterator = Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), this.stringTransformer).iterator(); + logger.info("CacheSTATS::{}:: {}", (take+skip), this.deserializedVCache.stats().toString()); + return rangeIterator; + } catch (IOException e) { + throw new RuntimeException(e); + } } - } private Path makePath(String name) { + Preconditions.checkArgument( + //!name.contains("/") && + !name.contains(":") && + !name.contains("..")); +// return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + return new Path(name + DRILL_SYS_FILE_SUFFIX); + } + + // Using timestamp to infer correct pigeon-hole for writing destination + private Path makeIndexedPath(String name, long timestamp) { Preconditions.checkArgument( !name.contains("/") && - !name.contains(":") && - !name.contains("..")); - return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + !name.contains(":") && + !name.contains("..")); + Path indexedPath = new Path(basePath, indexedPathFormat.format(timestamp)); + return new Path(indexedPath, name + DRILL_SYS_FILE_SUFFIX); } @Override @@ -162,25 +296,34 @@ public boolean contains(String key) { @Override public V get(String key) { + Path actualPath = makePath(key); try { - Path path = makePath(key); - if (!fs.exists(path)) { - return null; + //logger.info("key2make::{}", key); + if (!fs.exists(actualPath)) { + //Generate paths within upper and lower bounds to test + List possibleDirs = getPossiblePaths(key.substring(key.lastIndexOf('/') + 1)); + actualPath = getPathFromPossibleDirList(key, possibleDirs); + if (actualPath == null) { + return null; + } } } catch (IOException e) { throw new RuntimeException(e); } - final Path path = makePath(key); - try (InputStream is = fs.open(path)) { - return config.getSerializer().deserialize(IOUtils.toByteArray(is)); - } catch (IOException e) { - throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); - } +// logger.info("DeSerializing {}", actualPath.toUri().getPath()); + return deserializedVCache.getUnchecked(actualPath.toString()); } @Override public void put(String key, V value) { - try (OutputStream os = fs.create(makePath(key))) { + Path writePath = null; + if (value instanceof QueryProfile) { + QueryProfile profile = (QueryProfile) value; + writePath = makeIndexedPath(key, profile.getStart()); + } else { + writePath = makePath(key); + } + try (OutputStream os = fs.create(writePath)) { IOUtils.write(config.getSerializer().serialize(value), os); } catch (IOException e) { throw new RuntimeException(e); @@ -217,4 +360,132 @@ public void delete(String key) { @Override public void close() { } + + // Gets deserialized by exact path (Used for listing) + private V getViaAbsolutePath(String key) { + try { +// logger.info("key2make::{}", key); + Path path = makePath(key); + if (!fs.exists(path)) { +// logger.info("gotNullPath for {} ", key); + return null; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + final Path path = makePath(key); + return deserializedVCache.getUnchecked(path.toString()); + } + + // Returns path if profile is found within list of possible direct (Used for blind lookup of key) + private Path getPathFromPossibleDirList(String key, List possibleDirList) { + for (String possibleDir : possibleDirList) { + Path testPath = new Path(basePath + "/" + possibleDir, key + DRILL_SYS_FILE_SUFFIX); + try { + if (fs.exists(testPath)) { + return testPath; + } + } catch (IOException e) { + /*DoNothing*/ + } + } + return null; + } + + // Infers the list of possible directories where the profile is located (Used for blind lookup of key) + private List getPossiblePaths(String queryIdString) { + //Reqd:: + QueryId queryId = QueryIdHelper.getQueryIdFromString(queryIdString); + long lowerBoundTime = (Integer.MAX_VALUE - ((queryId.getPart1() + Integer.MAX_VALUE) >> 32)) * 1000; // +/- 1000 for border cases + long upperBoundTime = (Integer.MAX_VALUE - ((queryId.getPart1() + Integer.MIN_VALUE) >> 32)) * 1000; // +/- 1000 for border cases + //[sodBug] + Date lowerBoundDate = new Date(lowerBoundTime); + String lowerBoundPath = indexedPathFormat.format(lowerBoundDate); + logger.info("Inferred LowerBound Time is {} . Look from {}", lowerBoundDate, lowerBoundPath); + Date upperBoundDate = new Date(upperBoundTime); + logger.info("Inferred UpperBound Time is {} . Look until {}", upperBoundDate, indexedPathFormat.format(upperBoundDate)); + //[eodBug] + + if (lowerBoundPath.equals(indexedPathFormat.format(upperBoundDate))) { + return Collections.singletonList(lowerBoundPath); + } + + final IncrementType incrementType = + indexPathPattern.contains("m") ? IncrementType.Minute : + indexPathPattern.contains("H") ? IncrementType.Hour : + indexPathPattern.contains("d") ? IncrementType.Day : + indexPathPattern.contains("M") ? IncrementType.Month : + indexPathPattern.contains("y") ? IncrementType.Year : null; + + if (incrementType == null) { + return new ArrayList<>(0); //Empty + } + + Date currDate = lowerBoundDate; + logger.info("currDate.after(upperBoundDate) : {}", currDate.after(upperBoundDate)); + int increment = 0; + List possibleSrcDirs = new ArrayList<>(); + do { + // Adding + possibleSrcDirs.add(indexedPathFormat.format(currDate)); + + // Incrementing + switch (incrementType) { + case Minute: + currDate = DateUtils.addMinutes(lowerBoundDate, ++increment); + break; + + case Hour: + currDate = DateUtils.addHours(lowerBoundDate, ++increment); + break; + + case Day: + currDate = DateUtils.addDays(lowerBoundDate, ++increment); + break; + + case Month: + currDate = DateUtils.addMonths(lowerBoundDate, ++increment); + break; + + case Year: + currDate = DateUtils.addYears(lowerBoundDate, ++increment); + break; + + default: + break; + } + } while (!currDate.after(upperBoundDate)); + + //[sodBug] + for (String possibility : possibleSrcDirs) { + logger.info("Possibility :: {}", possibility); + } + //[eodBug] + + return possibleSrcDirs; + + /* + // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence). Last 12 bytes are random. + final long time = (int) (System.currentTimeMillis()/1000); + final long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt(); + */ + } + + //TODO | FIXME: Guava to Handle RuntimeException by + //Deserialize path's contents (leveraged by Guava Cache) + private V deserializeFromFileSystem(String srcPath) { + final Path path = new Path(srcPath); + try (InputStream is = fs.open(path)) { + return config.getSerializer().deserialize(IOUtils.toByteArray(is)); + } catch (IOException e) { + logger.info("Unable to deserialize {}\n---{}====", path, e.getMessage()); //Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@f76ca5b; line: 1, column: 65538] + logger.info("deserialization RCA==> \n {}", ExceptionUtils.getRootCause(e)); + throw new RuntimeException("Unable TO deSerialize \"" + path, ExceptionUtils.getRootCause(e)); + } + } + + //Enumerator + private enum IncrementType { + Minute, Hour, Day, Month, Year + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ProfileSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ProfileSet.java new file mode 100644 index 00000000000..68a0dc88970 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ProfileSet.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store; + +import java.util.Iterator; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Wrapper around TreeSet to mimic a size-bound set ordered by name (implicitly the profiles' age) + */ +public class ProfileSet implements Iterable { + private TreeSet store; + private int maxCapacity; + //Using a dedicated counter to avoid + private AtomicInteger size; + + @SuppressWarnings("unused") + @Deprecated + private ProfileSet() {} + + public ProfileSet(int capacity) { + this.store = new TreeSet(); + this.maxCapacity = capacity; + this.size = new AtomicInteger(); + } + + public int size() { + return size.get(); + } + + /** + * Get max capacity of the profile set + * @return max capacity + */ + public int capacity() { + return maxCapacity; + } + + /** + * Add a profile name to the set, while removing the oldest, if exceeding capacity + * @param profile + * @return oldest profile + */ + public String add(String profile) { + return add(profile, false); + } + + /** + * Add a profile name to the set, while removing the oldest or youngest, based on flag + * @param profile + * @param retainOldest indicate retaining policy as oldest + * @return youngest/oldest profile + */ + public String add(String profile, boolean retainOldest) { + store.add(profile); + if (size.incrementAndGet() > maxCapacity) { + if (retainOldest) { + return removeYoungest(); + } else { + return removeOldest(); + } + } + return null; + } + + /** + * Remove the oldest profile + * @return oldest profile + */ + public String removeOldest() { + size.decrementAndGet(); + return store.pollLast(); + } + + /** + * Remove the youngest profile + * @return youngest profile + */ + public String removeYoungest() { + size.decrementAndGet(); + return store.pollFirst(); + } + + /** + * Retrieve the oldest profile without removal + * @return oldest profile + */ + public String getOldest() { + return store.last(); + } + + /** + * Retrieve the youngest profile without removal + * @return youngest profile + */ + public String getYoungest() { + return store.first(); + } + + /** + * Clear the set + */ + public void clear() { + size.set(0); + store.clear(); + } + + /** + * Clear the set with the initial capacity + * @param capacity + */ + public void clear(int capacity) { + clear(maxCapacity, false); + } + + /** + * Clear the set with the initial capacity + * @param capacity + * @param forceResize + */ + public void clear(int capacity, boolean forceResize) { + clear(); + if (forceResize || capacity > maxCapacity) { + maxCapacity = capacity; + } + } + + public boolean isEmpty() { + return store.isEmpty(); + } + + @Override + public Iterator iterator() { + return store.iterator(); + } +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java index bfb83e07c6a..f82900ec39a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java @@ -144,4 +144,16 @@ public static Path createPathSafe(String path) { return Strings.isNullOrEmpty(path) ? new Path("/") : new Path(path); } + /** + * Returns the status of a file/directory specified in source path to be renamed/moved to a destination path + * + * @param fs current file system + * @param src path to source + * @param dst path to destination + * @return status of rename/move + */ + public static boolean rename(FileSystem fs, Path src, Path dst) throws IOException { + return FileSystemUtil.rename(fs, src, dst); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java index 82500da30f6..9d8c368d2f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/FileSystemUtil.java @@ -191,6 +191,18 @@ public static PathFilter mergeFilters(PathFilter... filters) { return path -> Stream.of(filters).allMatch(filter -> filter.accept(path)); } + /** + * Helper method that will rename/move file specified in the source path to a destination path + * + * @param fs current file system + * @param src path to source + * @param dst path to destination + * @return status of rename/move + */ + public static boolean rename(FileSystem fs, Path src, Path dst) throws IOException { + return fs.rename(src, dst); + } + /** * Helper method that merges given filters into one and * determines which listing method should be called based on recursive flag value. diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index b2ff4a594cb..0adfce9a5d3 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -208,6 +208,12 @@ drill.exec: { } }, profiles.store: { + index: { + enabled: true, + max: 1000, + supported.fs: ["hdfs","maprfs","s3a"], + format: "yyyy/MM/dd" + }, inmemory: false, capacity: 1000 },