Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,16 @@ TimelineWriteResponse write(TimelineCollectorContext context,
*
* Any errors occurring for individual write request objects will be reported
* in the response.
*<p>
* This is not invoked anywhere, tested and all implementations return null.
*
* @param data
* a {@link TimelineEntity} object
* a {@link TimelineAggregationTrack} enum
* value.
* @param track Specifies the track or dimension along which aggregation would
* occur. Includes USER, FLOW, QUEUE, etc.
* @return a {@link TimelineWriteResponse} object.
* @return a {@link TimelineWriteResponse} object. All implementations return null.
* @throws IOException if there is any exception encountered while aggregating
* entities to the backend storage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -61,12 +63,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;

/**
* File System based implementation for TimelineReader. This implementation may
* not provide a complete implementation of all the necessary features. This
* implementation is provided solely for basic testing purposes, and should not
* be used in a non-test situation.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FileSystemTimelineReaderImpl extends AbstractService
implements TimelineReader {

Expand Down Expand Up @@ -164,7 +170,9 @@ private static void fillFields(TimelineEntity finalEntity,
private String getFlowRunPath(String userId, String clusterId,
String flowName, Long flowRunId, String appId) throws IOException {
if (userId != null && flowName != null && flowRunId != null) {
return userId + File.separator + flowName + File.separator + "*" + File.separator + flowRunId;
return escape(userId) + File.separator
+ escape(flowName) + File.separator
+ "*" + File.separator + flowRunId;
}
if (clusterId == null || appId == null) {
throw new IOException("Unable to get flow info");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
Expand All @@ -51,7 +52,27 @@
* This implements a FileSystem based backend for storing application timeline
* information. This implementation may not provide a complete implementation of
* all the necessary features. This implementation is provided solely for basic
* testing purposes, and should not be used in a non-test situation.
* testing purposes, and MUST NOT be used in a non-test situation.
* <p>
* Key limitations are:
* <ol>
* <li>Inadequate scalability and concurrency for production use</li>
* <li>Weak security: any authenticated caller can add events to any application
* timeline.</li>
* </ol>
* <p>
* To implement an atomic append it reads all the data in the original file,
* writes that to a temporary file, appends the new
* data there and renames that temporary file to the original path.
* This is makes the update operation slower and slower the longer an application runs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: remove the word "is."

* If any other update comes in while an existing update is in progress,
* it will read and append to the previous state of the log, losing all changes
* from the ongoing transaction.
* <p>
* This is not a database. Apache HBase is. Use it.
* <p>
* The only realistic justification for this is if you are writing code which updates
* the timeline service and you want something easier to debug in unit tests.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
Expand Down Expand Up @@ -89,8 +110,16 @@ public class FileSystemTimelineWriterImpl extends AbstractService
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemTimelineWriter.class);

private static final Logger LOGIMPL =
LoggerFactory.getLogger(FileSystemTimelineWriterImpl.class);

public static final LogExactlyOnce WARNING_OF_USE =
new LogExactlyOnce(LOGIMPL);

FileSystemTimelineWriterImpl() {
super((FileSystemTimelineWriterImpl.class.getName()));
WARNING_OF_USE.warn("This timeline writer is neither safe nor scaleable enough to"
+ " be used in production.");
}

@Override
Expand Down Expand Up @@ -126,21 +155,19 @@ private synchronized void writeInternal(String clusterId, String userId,
TimelineEntity entity,
TimelineWriteResponse response)
throws IOException {
String entityTypePathStr = clusterId + File.separator + userId +
File.separator + escape(flowName) + File.separator +
escape(flowVersion) + File.separator + flowRun + File.separator + appId
+ File.separator + entity.getType();
final String entityTypePathStr =
buildEntityTypeSubpath(clusterId, userId, flowName, flowVersion, flowRun, appId, entity.getType());
Path entityTypePath = new Path(entitiesPath, entityTypePathStr);
try {
mkdirs(entityTypePath);
Path filePath =
new Path(entityTypePath,
entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
escape(entity.getId(), "id") + TIMELINE_SERVICE_STORAGE_EXTENSION);
createFileWithRetries(filePath);

byte[] record = new StringBuilder()
.append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
.append("\n").toString().getBytes(StandardCharsets.UTF_8);
byte[] record =
(TimelineUtils.dumpTimelineRecordtoJSON(entity) + "\n")
.getBytes(StandardCharsets.UTF_8);
writeFileWithRetries(filePath, record);
} catch (Exception ioe) {
LOG.warn("Interrupted operation:{}", ioe.getMessage());
Expand All @@ -153,6 +180,35 @@ private synchronized void writeInternal(String clusterId, String userId,
}
}

/**
* Given the various attributes of an entity, return the string subpath
* of the directory.
* @param clusterId cluster ID
* @param userId user ID
* @param flowName flow name
* @param flowVersion flow version
* @param flowRun flow run
* @param appId application ID
* @param type entity type
* @return the subpath for records.
*/
@VisibleForTesting
public static String buildEntityTypeSubpath(final String clusterId,
final String userId,
final String flowName,
final String flowVersion,
final long flowRun,
final String appId,
final String type) {
return clusterId
+ File.separator + userId
+ File.separator + escape(flowName, "")
+ File.separator + escape(flowVersion, "")
+ File.separator + flowRun
+ File.separator + escape(appId, "")
+ File.separator + escape(type, "type");
}

private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
TimelineWriteError error = new TimelineWriteError();
error.setEntityId(entity.getId());
Expand Down Expand Up @@ -316,8 +372,29 @@ protected void writeFile(Path outputPath, byte[] data) throws IOException {
}
}

// specifically escape the separator character
private static String escape(String str) {
return str.replace(File.separatorChar, '_');
/**
* Escape filesystem separator character and other URL-unfriendly chars.
* @param str input string
* @return a string with none of the escaped characters.
*/
@VisibleForTesting
public static String escape(String str) {
return escape(str, "");
}

/**
* Escape filesystem separator character and other URL-unfriendly chars.
* Empty strings are mapped to a fallback string, which may itself be empty.
* @param str input string
* @param fallback fallback char
* @return a string with none of the escaped characters.
*/
@VisibleForTesting
public static String escape(String str, final String fallback) {
return str.isEmpty()
? fallback
: str.replace(File.separatorChar, '_')
.replace('?', '_')
.replace(':', '_');
}
}
Loading