From 77e3c3ce11eb4cb847e04975975acfadf111637c Mon Sep 17 00:00:00 2001 From: Kunal Khatua Date: Fri, 17 May 2019 14:48:00 -0700 Subject: [PATCH] Initial clean up --- .../apache/drill/exec/server/Drillbit.java | 2 +- .../exec/server/profile/ProfileIndexer.java | 4 +- .../server/rest/profile/ProfileResources.java | 4 +- .../store/sys/store/LocalPersistentStore.java | 260 +++++++----------- 4 files changed, 106 insertions(+), 164 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index f8cfc72e222..10e71e4e868 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -240,7 +240,7 @@ public void run() throws Exception { Runtime.getRuntime().addShutdownHook(shutdownHook); gracefulShutdownThread.start(); - // Launch an archiving job that is # files and time bound + // Launch an archiving job that is limited by # files (so as to be timebound) PersistentStore queryProfileStore = drillbitContext.getProfileStoreContext().getCompletedProfileStore(); if (queryProfileStore instanceof LocalPersistentStore && context.getConfig().getBoolean(ExecConstants.PROFILES_STORE_INDEX_ENABLED)) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java index 679861d0c1f..805fd49bceb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/profile/ProfileIndexer.java @@ -113,7 +113,6 @@ public ProfileIndexer(ClusterCoordinator coord, DrillbitContext context) throws this.sysFileSuffixFilter = new DrillSysFilePathFilter(); String indexPathPattern = drillConfig.getString(ExecConstants.PROFILES_STORE_INDEX_FORMAT); this.indexedPathFormat = new SimpleDateFormat(indexPathPattern); - logger.info("Organizing any existing unindexed profiles"); } @@ -211,8 +210,7 @@ private long getProfileStart(Path srcPath) { QueryProfile profile = pStoreConfig.getSerializer().deserialize(IOUtils.toByteArray(is)); return profile.getStart(); } catch (IOException e) { - logger.info("Unable to deserialize {}\n---{}====", srcPath, e.getMessage()); //Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@f76ca5b; line: 1, column: 65538] - logger.info("deserialization RCA==> \n {}", ExceptionUtils.getRootCause(e)); + logger.error("Unable to deserialize {}\n{}", srcPath, e.getMessage()); } return Long.MIN_VALUE; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java index e8fbece641c..64961c51bad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java @@ -37,7 +37,6 @@ import javax.ws.rs.core.UriInfo; import javax.xml.bind.annotation.XmlRootElement; - import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; @@ -74,7 +73,6 @@ public class ProfileResources { public static class ProfileInfo implements Comparable { private static final int QUERY_SNIPPET_MAX_CHAR = 150; private static final int QUERY_SNIPPET_MAX_LINES = 8; - public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss"); private final String queryId; @@ -379,7 +377,7 @@ public Viewable getProfile(@PathParam("queryid") String queryId){ ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId), work.getContext().getConfig()); return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/profile.ftl", sc, wrapper); } catch (Exception | Error e) { - logger.error("Exception was thrown when fetching profile {} :\n{}\n====\n", queryId, e); + logger.error("Exception was thrown when fetching profile {} :\n{}", queryId, e); return ViewableWithPermissions.create(authEnabled.get(), "/rest/errorMessage.ftl", sc, e); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java index 43d4784333f..793233b0981 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java @@ -67,10 +67,7 @@ public class LocalPersistentStore extends BasePersistentStore { private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); - - //Provides a threshold above which we report an event's time - //TODO Configurable threshold? - private static final long RESPONSE_TIME_THRESHOLD_MSEC = /*200*/0L; + //Sub directory where external profiles can be dumped for rendering private static final String DIORAMA = "diorama"; private static final int DRILL_SYS_FILE_EXT_SIZE = DRILL_SYS_FILE_SUFFIX.length(); @@ -86,8 +83,6 @@ public class LocalPersistentStore extends BasePersistentStore { private final PathFilter sysFileSuffixFilter; private final Comparator profilePathComparator; private final Function> stringTransformer; - //TODO Do we need this? - private Function> fileStatusTransformer; private final int deserializedCacheCapacity; private final CacheLoader cacheLoader; @@ -103,7 +98,6 @@ public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig throw new RuntimeException("Failure setting pstore configuration path."); } - //TODO: int cacheCapacity = drillConfig.getInt(ExecConstants.HTTP_MAX_PROFILES [or] PROFILES_STORE_CACHE_SIZE deserializedCacheCapacity = drillConfig.getInt(ExecConstants.PROFILES_STORE_CACHE_SIZE); indexPathPattern = drillConfig.getString(ExecConstants.PROFILES_STORE_INDEX_FORMAT); @@ -137,23 +131,11 @@ public Entry apply(String key) { } }; - /*// Transformer function to extract profile based on FileStatus - this.fileStatusTransformer = new Function>() { - @Nullable - @Override - public Entry apply(FileStatus fStatus) { - Path fPath = fStatus.getPath(); - String sanSuffixName = fPath.getName().substring(0, fPath.getName().length() - DRILL_SYS_FILE_EXT_SIZE); - return new ImmutableEntry<>(sanSuffixName, get(fStatus)); - } - };*/ - //Defining Cache loader for handling missing entries this.cacheLoader = new CacheLoader() { @Override public V load(String srcPathAsStr) { //Cache miss to force loading from FS - //logger.info("cacheMiss::fetchFromFS:: {}", srcPathAsStr); return deserializeFromFileSystem(srcPathAsStr); } }; @@ -162,10 +144,7 @@ public V load(String srcPathAsStr) { this.deserializedVCache = CacheBuilder.newBuilder() .initialCapacity(Math.max(deserializedCacheCapacity/5, 20)) //startingCapacity: 20% or 20 .maximumSize(deserializedCacheCapacity) - .recordStats() //TODO Should we get rid of this since we arent using it? .build(cacheLoader); - - } public Path getBasePath() { @@ -205,77 +184,59 @@ public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throw } @Override - public Iterator> getRange/*Neo*/(int skip, int take) { - try { - List files = new LinkedList<>(); - // Sort and explore Directory stack using DepthFirstSearch - LinkedList profileDirStack = new LinkedList(DrillFileSystemUtil.listDirectoriesSafe(fs, basePath, false)); - profileDirStack.sort(Comparator.naturalOrder()); - logger.info("dirSize:: {} ", profileDirStack.size()); - - int collectedProfileCount = 0; - while (!profileDirStack.isEmpty()) { - // Explore dir from top of stack - FileStatus latestDir = profileDirStack.removeLast(); - - // Read all profiles in last dir - List profileStatus = DrillFileSystemUtil.listFiles(fs, latestDir.getPath(), false, sysFileSuffixFilter); - if (!profileStatus.isEmpty()) { - List additions = new LinkedList<>(); - for (FileStatus stat : profileStatus) { - String filePathStr = stat.getPath().toUri().getPath(); - additions.add(filePathStr.substring(0, filePathStr.length() - DRILL_SYS_FILE_EXT_SIZE)); - } - //Sort additions & append (saves time in resorting entire list) - additions.sort(profilePathComparator); - files.addAll(additions); - - //[sodBug] - if (!files.isEmpty()) { - logger.info("First is {}", files.get(0)); - } - - int _pCount = profileStatus.size(); - if (_pCount > 0) { - collectedProfileCount += _pCount; - logger.info("# profiles added = {} [Total: {} (act) / {} (est)] ", _pCount, files.size(), collectedProfileCount); - } - //[eodBug] - } - - // Explore subdirectories - List childSubdirStack = DrillFileSystemUtil.listDirectoriesSafe(fs, latestDir.getPath(), false); - // Sorting list before addition to stack - childSubdirStack.sort(Comparator.naturalOrder()); - if (!childSubdirStack.isEmpty()) { - profileDirStack.addAll(childSubdirStack); - } else { - logger.info("foundLeaf:: {}", latestDir.getPath().toUri()); - } - - // Terminate exploration if required count has been met - if ( collectedProfileCount >= (skip + take) ) { - profileDirStack.clear(); + public Iterator> getRange(int skip, int take) { + try { + List files = new LinkedList<>(); + // Sort and explore Directory stack using DepthFirstSearch + LinkedList profileDirStack = new LinkedList(DrillFileSystemUtil.listDirectoriesSafe(fs, basePath, false)); + profileDirStack.sort(Comparator.naturalOrder()); + + int collectedProfileCount = 0; + while (!profileDirStack.isEmpty()) { + // Explore dir from top of stack + FileStatus latestDir = profileDirStack.removeLast(); + + // Read all profiles in last dir + List profileStatus = DrillFileSystemUtil.listFiles(fs, latestDir.getPath(), false, sysFileSuffixFilter); + if (!profileStatus.isEmpty()) { + List additions = new LinkedList<>(); + for (FileStatus stat : profileStatus) { + String filePathStr = stat.getPath().toUri().getPath(); + additions.add(filePathStr.substring(0, filePathStr.length() - DRILL_SYS_FILE_EXT_SIZE)); } + //Sort additions & append (saves time in resorting entire list) + additions.sort(profilePathComparator); + files.addAll(additions); } - //Sorting not required since preSorted //dBug - logger.info("Post Scan First is {}", files.get(0)); + // Explore subdirectories + List childSubdirStack = DrillFileSystemUtil.listDirectoriesSafe(fs, latestDir.getPath(), false); + // Sorting list before addition to stack + childSubdirStack.sort(Comparator.naturalOrder()); + if (!childSubdirStack.isEmpty()) { + profileDirStack.addAll(childSubdirStack); + } else { + //Found Leaf dir + } - Iterator> rangeIterator = Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), this.stringTransformer).iterator(); - logger.info("CacheSTATS::{}:: {}", (take+skip), this.deserializedVCache.stats().toString()); - return rangeIterator; - } catch (IOException e) { - throw new RuntimeException(e); + // Terminate exploration if required count has been met + if ( collectedProfileCount >= (skip + take) ) { + profileDirStack.clear(); + } } + + //Sorting not required since preSorted + Iterator> rangeIterator = Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), this.stringTransformer).iterator(); + return rangeIterator; + } catch (IOException e) { + throw new RuntimeException(e); } + } private Path makePath(String name) { Preconditions.checkArgument( - //!name.contains("/") && !name.contains(":") && !name.contains("..")); -// return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); return new Path(name + DRILL_SYS_FILE_SUFFIX); } @@ -302,7 +263,6 @@ public boolean contains(String key) { public V get(String key) { Path actualPath = makePath(key); try { - //logger.info("key2make::{}", key); if (!fs.exists(actualPath)) { //Generate paths within upper and lower bounds to test List possibleDirs = getPossiblePaths(key.substring(key.lastIndexOf('/') + 1)); @@ -315,7 +275,6 @@ public V get(String key) { } catch (IOException e) { throw new RuntimeException(e); } -// logger.info("DeSerializing {}", actualPath.toUri().getPath()); return deserializedVCache.getUnchecked(actualPath.toString()); } @@ -369,10 +328,8 @@ public void close() { // Gets deserialized by exact path (Used for listing) private V getViaAbsolutePath(String key) { try { -// logger.info("key2make::{}", key); Path path = makePath(key); if (!fs.exists(path)) { -// logger.info("gotNullPath for {} ", key); return null; } } catch (IOException e) { @@ -398,90 +355,79 @@ private Path getPathFromPossibleDirList(String key, List possibleDirList } // Infers the list of possible directories where the profile is located (Used for blind lookup of key) - private List getPossiblePaths(String queryIdString) { - //Reqd:: - QueryId queryId = QueryIdHelper.getQueryIdFromString(queryIdString); - long lowerBoundTime = (Integer.MAX_VALUE - ((queryId.getPart1() + Integer.MAX_VALUE) >> 32)) * 1000; // +/- 1000 for border cases - long upperBoundTime = (Integer.MAX_VALUE - ((queryId.getPart1() + Integer.MIN_VALUE) >> 32)) * 1000; // +/- 1000 for border cases - //[sodBug] - Date lowerBoundDate = new Date(lowerBoundTime); - String lowerBoundPath = indexedPathFormat.format(lowerBoundDate); - logger.info("Inferred LowerBound Time is {} . Look from {}", lowerBoundDate, lowerBoundPath); - Date upperBoundDate = new Date(upperBoundTime); - logger.info("Inferred UpperBound Time is {} . Look until {}", upperBoundDate, indexedPathFormat.format(upperBoundDate)); - //[eodBug] - - if (incrementType == null) { - return new ArrayList<>(0); //Empty - } + // Ref: https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java#L67 + private List getPossiblePaths(String queryIdString) { + QueryId queryId = QueryIdHelper.getQueryIdFromString(queryIdString); + long lowerBoundTime = (Integer.MAX_VALUE - ((queryId.getPart1() + Integer.MAX_VALUE) >> 32)) * 1000; // +/- 1000 for border cases + long upperBoundTime = (Integer.MAX_VALUE - ((queryId.getPart1() + Integer.MIN_VALUE) >> 32)) * 1000; // +/- 1000 for border cases + Date lowerBoundDate = new Date(lowerBoundTime); + Date upperBoundDate = new Date(upperBoundTime); + if (incrementType == null) { + return new ArrayList<>(0); //Empty + } - Date currDate = lowerBoundDate; - logger.info("currDate.after(upperBoundDate) : {}", currDate.after(upperBoundDate)); - int increment = 0; - - Set possibleSrcDirSet = new TreeSet<>(); - do { - //Add tokenized parents as well - String[] possibleDirTokens = indexedPathFormat.format(currDate).split("/"); - String possibleDir = ""; - for (String token : possibleDirTokens) { - if (possibleDir.isEmpty()) { - possibleDir = token; - } else { - possibleDir = possibleDir.concat("/").concat(token); - } - // Adding - possibleSrcDirSet.add(possibleDir); - //logger.info("Added {}", possibleDir); + // Iterate through possible matches + Date currDate = lowerBoundDate; + Set possibleSrcDirSet = new TreeSet<>(); + int increment = 0; + do { + //Add tokenized parents as well + String[] possibleDirTokens = indexedPathFormat.format(currDate).split("/"); + String possibleDir = ""; + for (String token : possibleDirTokens) { + if (possibleDir.isEmpty()) { + possibleDir = token; + } else { + possibleDir = possibleDir.concat("/").concat(token); } + // Adding + possibleSrcDirSet.add(possibleDir); + } - // Incrementing - switch (incrementType) { - case Minute: - currDate = DateUtils.addMinutes(lowerBoundDate, ++increment); - break; + // Incrementing + switch (incrementType) { + case Minute: + currDate = DateUtils.addMinutes(lowerBoundDate, ++increment); + break; - case Hour: - currDate = DateUtils.addHours(lowerBoundDate, ++increment); - break; + case Hour: + currDate = DateUtils.addHours(lowerBoundDate, ++increment); + break; - case Day: - currDate = DateUtils.addDays(lowerBoundDate, ++increment); - break; + case Day: + currDate = DateUtils.addDays(lowerBoundDate, ++increment); + break; - case Month: - currDate = DateUtils.addMonths(lowerBoundDate, ++increment); - break; + case Month: + currDate = DateUtils.addMonths(lowerBoundDate, ++increment); + break; - case Year: - currDate = DateUtils.addYears(lowerBoundDate, ++increment); - break; + case Year: + currDate = DateUtils.addYears(lowerBoundDate, ++increment); + break; - default: - break; - } - } while (!currDate.after(upperBoundDate)); + default: + break; + } + } while (!currDate.after(upperBoundDate)); - List sortedPossibleDirs = new ArrayList(); - sortedPossibleDirs.addAll(possibleSrcDirSet); - sortedPossibleDirs.sort(Comparator.reverseOrder()); + List sortedPossibleDirs = new ArrayList(); + sortedPossibleDirs.addAll(possibleSrcDirSet); + sortedPossibleDirs.sort(Comparator.reverseOrder()); - //TODO [sodBug] + // For Debugging only + if (logger.isDebugEnabled()) { + logger.debug("Inferred LowerBound Time is {} . Look from {}", lowerBoundDate, indexedPathFormat.format(lowerBoundDate)); + logger.debug("Inferred UpperBound Time is {} . Look until {}", upperBoundDate, indexedPathFormat.format(upperBoundDate)); for (String possibility : sortedPossibleDirs) { - logger.info("Possibility :: {}", possibility); + logger.debug("Possible dir :: {}", possibility); } - //[eodBug] - - return sortedPossibleDirs; - /* - // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence). Last 12 bytes are random. - final long time = (int) (System.currentTimeMillis()/1000); - final long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt(); - */ } - //TODO | FIXME: Guava to Handle RuntimeException by - //Deserialize path's contents (leveraged by Guava Cache) + return sortedPossibleDirs; + } + + // Deserialize path's contents (leveraged by Guava Cache) private V deserializeFromFileSystem(String srcPath) { final Path path = new Path(srcPath); try (InputStream is = fs.open(path)) { @@ -491,7 +437,7 @@ private V deserializeFromFileSystem(String srcPath) { } } - //Enumerator + //Enumerator used for date increment during path discovery private enum IncrementType { Minute, Hour, Day, Month, Year }