+ * This is not invoked anywhere, tested and all implementations return null. * * @param data * a {@link TimelineEntity} object @@ -80,7 +82,7 @@ TimelineWriteResponse write(TimelineCollectorContext context, * value. * @param track Specifies the track or dimension along which aggregation would * occur. Includes USER, FLOW, QUEUE, etc. - * @return a {@link TimelineWriteResponse} object. + * @return a {@link TimelineWriteResponse} object. All implementations return null. * @throws IOException if there is any exception encountered while aggregating * entities to the backend storage. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index 2e771fc77e8f1..d9bee2db93f10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -39,6 +39,8 @@ import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -61,12 +63,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape; + /** * File System based implementation for TimelineReader. This implementation may * not provide a complete implementation of all the necessary features. This * implementation is provided solely for basic testing purposes, and should not * be used in a non-test situation. */ +@InterfaceAudience.Private +@InterfaceStability.Unstable public class FileSystemTimelineReaderImpl extends AbstractService implements TimelineReader { @@ -164,7 +170,9 @@ private static void fillFields(TimelineEntity finalEntity, private String getFlowRunPath(String userId, String clusterId, String flowName, Long flowRunId, String appId) throws IOException { if (userId != null && flowName != null && flowRunId != null) { - return userId + File.separator + flowName + File.separator + "*" + File.separator + flowRunId; + return escape(userId) + File.separator + + escape(flowName) + File.separator + + "*" + File.separator + flowRunId; } if (clusterId == null || appId == null) { throw new IOException("Unable to get flow info"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java similarity index 75% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index e658aad1eaa1c..82d15de380d07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; @@ -51,7 +52,27 @@ * This implements a FileSystem based backend for storing application timeline * information. This implementation may not provide a complete implementation of * all the necessary features. This implementation is provided solely for basic - * testing purposes, and should not be used in a non-test situation. + * testing purposes, and MUST NOT be used in a non-test situation. + *
+ * Key limitations are: + *
+ * To implement an atomic append it reads all the data in the original file, + * writes that to a temporary file, appends the new + * data there and renames that temporary file to the original path. + * This makes the update operation slower and slower the longer an application runs. + * If any other update comes in while an existing update is in progress, + * it will read and append to the previous state of the log, losing all changes + * from the ongoing transaction. + *
+ * This is not a database. Apache HBase is. Use it. + *
+ * The only realistic justification for this is if you are writing code which updates
+ * the timeline service and you want something easier to debug in unit tests.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -89,8 +110,16 @@ public class FileSystemTimelineWriterImpl extends AbstractService
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemTimelineWriter.class);
+ private static final Logger LOGIMPL =
+ LoggerFactory.getLogger(FileSystemTimelineWriterImpl.class);
+
+ public static final LogExactlyOnce WARNING_OF_USE =
+ new LogExactlyOnce(LOGIMPL);
+
FileSystemTimelineWriterImpl() {
super((FileSystemTimelineWriterImpl.class.getName()));
+ WARNING_OF_USE.warn("This timeline writer is neither safe nor scaleable enough to"
+ + " be used in production.");
}
@Override
@@ -126,21 +155,19 @@ private synchronized void writeInternal(String clusterId, String userId,
TimelineEntity entity,
TimelineWriteResponse response)
throws IOException {
- String entityTypePathStr = clusterId + File.separator + userId +
- File.separator + escape(flowName) + File.separator +
- escape(flowVersion) + File.separator + flowRun + File.separator + appId
- + File.separator + entity.getType();
+ final String entityTypePathStr =
+ buildEntityTypeSubpath(clusterId, userId, flowName, flowVersion, flowRun, appId, entity.getType());
Path entityTypePath = new Path(entitiesPath, entityTypePathStr);
try {
mkdirs(entityTypePath);
Path filePath =
new Path(entityTypePath,
- entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
+ escape(entity.getId(), "id") + TIMELINE_SERVICE_STORAGE_EXTENSION);
createFileWithRetries(filePath);
- byte[] record = new StringBuilder()
- .append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
- .append("\n").toString().getBytes(StandardCharsets.UTF_8);
+ byte[] record =
+ (TimelineUtils.dumpTimelineRecordtoJSON(entity) + "\n")
+ .getBytes(StandardCharsets.UTF_8);
writeFileWithRetries(filePath, record);
} catch (Exception ioe) {
LOG.warn("Interrupted operation:{}", ioe.getMessage());
@@ -153,6 +180,35 @@ private synchronized void writeInternal(String clusterId, String userId,
}
}
+ /**
+ * Given the various attributes of an entity, return the string subpath
+ * of the directory.
+ * @param clusterId cluster ID
+ * @param userId user ID
+ * @param flowName flow name
+ * @param flowVersion flow version
+ * @param flowRun flow run
+ * @param appId application ID
+ * @param type entity type
+ * @return the subpath for records.
+ */
+ @VisibleForTesting
+ public static String buildEntityTypeSubpath(final String clusterId,
+ final String userId,
+ final String flowName,
+ final String flowVersion,
+ final long flowRun,
+ final String appId,
+ final String type) {
+ return clusterId
+ + File.separator + userId
+ + File.separator + escape(flowName, "")
+ + File.separator + escape(flowVersion, "")
+ + File.separator + flowRun
+ + File.separator + escape(appId, "")
+ + File.separator + escape(type, "type");
+ }
+
private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
TimelineWriteError error = new TimelineWriteError();
error.setEntityId(entity.getId());
@@ -316,8 +372,29 @@ protected void writeFile(Path outputPath, byte[] data) throws IOException {
}
}
- // specifically escape the separator character
- private static String escape(String str) {
- return str.replace(File.separatorChar, '_');
+ /**
+ * Escape filesystem separator character and other URL-unfriendly chars.
+ * @param str input string
+ * @return a string with none of the escaped characters.
+ */
+ @VisibleForTesting
+ public static String escape(String str) {
+ return escape(str, "");
+ }
+
+ /**
+ * Escape filesystem separator character and other URL-unfriendly chars.
+ * Empty strings are mapped to a fallback string, which may itself be empty.
+ * @param str input string
+ * @param fallback fallback char
+ * @return a string with none of the escaped characters.
+ */
+ @VisibleForTesting
+ public static String escape(String str, final String fallback) {
+ return str.isEmpty()
+ ? fallback
+ : str.replace(File.separatorChar, '_')
+ .replace('?', '_')
+ .replace(':', '_');
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index efed104eeea76..8297f17e4b166 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -27,14 +27,17 @@
import java.util.List;
import java.util.Map;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@@ -43,11 +46,16 @@
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.buildEntityTypeSubpath;
+import static org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;
+
+public class TestFileSystemTimelineWriterImpl extends AbstractHadoopTestBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestFileSystemTimelineWriterImpl.class);
+
+ public static final String UP = ".." + File.separator;
-public class TestFileSystemTimelineWriterImpl {
@TempDir
private File tmpFolder;
@@ -84,12 +92,10 @@ void testWriteEntityToFile() throws Exception {
te.addEntity(entity2);
Map