Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,16 @@ TimelineWriteResponse write(TimelineCollectorContext context,
*
* Any errors occurring for individual write request objects will be reported
* in the response.
*<p>
* This is not invoked anywhere, tested and all implementations return null.
*
* @param data
* a {@link TimelineEntity} object
* a {@link TimelineAggregationTrack} enum
* value.
* @param track Specifies the track or dimension along which aggregation would
* occur. Includes USER, FLOW, QUEUE, etc.
* @return a {@link TimelineWriteResponse} object.
* @return a {@link TimelineWriteResponse} object. All implementations return null.
* @throws IOException if there is any exception encountered while aggregating
* entities to the backend storage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -61,12 +63,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl.escape;

/**
* File System based implementation for TimelineReader. This implementation may
* not provide a complete implementation of all the necessary features. This
* implementation is provided solely for basic testing purposes, and should not
* be used in a non-test situation.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FileSystemTimelineReaderImpl extends AbstractService
implements TimelineReader {

Expand Down Expand Up @@ -164,7 +170,9 @@ private static void fillFields(TimelineEntity finalEntity,
private String getFlowRunPath(String userId, String clusterId,
String flowName, Long flowRunId, String appId) throws IOException {
if (userId != null && flowName != null && flowRunId != null) {
return userId + File.separator + flowName + File.separator + "*" + File.separator + flowRunId;
return escape(userId) + File.separator
+ escape(flowName) + File.separator
+ "*" + File.separator + flowRunId;
}
if (clusterId == null || appId == null) {
throw new IOException("Unable to get flow info");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
Expand All @@ -51,7 +52,27 @@
* This implements a FileSystem based backend for storing application timeline
* information. This implementation may not provide a complete implementation of
* all the necessary features. This implementation is provided solely for basic
* testing purposes, and should not be used in a non-test situation.
* testing purposes, and MUST NOT be used in a non-test situation.
* <p>
* Key limitations are:
* <ol>
* <li>Inadequate scalability and concurrency for production use</li>
* <li>Weak security: any authenticated caller can add events to any application
* timeline.</li>
* </ol>
* <p>
* To implement an atomic append it reads all the data in the original file,
* writes that to a temporary file, appends the new
* data there and renames that temporary file to the original path.
* This makes the update operation slower and slower the longer an application runs.
* If any other update comes in while an existing update is in progress,
* it will read and append to the previous state of the log, losing all changes
* from the ongoing transaction.
* <p>
* This is not a database. Apache HBase is. Use it.
* <p>
* The only realistic justification for this is if you are writing code which updates
* the timeline service and you want something easier to debug in unit tests.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
Expand Down Expand Up @@ -89,8 +110,16 @@ public class FileSystemTimelineWriterImpl extends AbstractService
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemTimelineWriter.class);

private static final Logger LOGIMPL =
LoggerFactory.getLogger(FileSystemTimelineWriterImpl.class);

public static final LogExactlyOnce WARNING_OF_USE =
new LogExactlyOnce(LOGIMPL);

FileSystemTimelineWriterImpl() {
super((FileSystemTimelineWriterImpl.class.getName()));
WARNING_OF_USE.warn("This timeline writer is neither safe nor scaleable enough to"
+ " be used in production.");
}

@Override
Expand Down Expand Up @@ -126,21 +155,19 @@ private synchronized void writeInternal(String clusterId, String userId,
TimelineEntity entity,
TimelineWriteResponse response)
throws IOException {
String entityTypePathStr = clusterId + File.separator + userId +
File.separator + escape(flowName) + File.separator +
escape(flowVersion) + File.separator + flowRun + File.separator + appId
+ File.separator + entity.getType();
final String entityTypePathStr =
buildEntityTypeSubpath(clusterId, userId, flowName, flowVersion, flowRun, appId, entity.getType());
Path entityTypePath = new Path(entitiesPath, entityTypePathStr);
try {
mkdirs(entityTypePath);
Path filePath =
new Path(entityTypePath,
entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
escape(entity.getId(), "id") + TIMELINE_SERVICE_STORAGE_EXTENSION);
createFileWithRetries(filePath);

byte[] record = new StringBuilder()
.append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
.append("\n").toString().getBytes(StandardCharsets.UTF_8);
byte[] record =
(TimelineUtils.dumpTimelineRecordtoJSON(entity) + "\n")
.getBytes(StandardCharsets.UTF_8);
writeFileWithRetries(filePath, record);
} catch (Exception ioe) {
LOG.warn("Interrupted operation:{}", ioe.getMessage());
Expand All @@ -153,6 +180,35 @@ private synchronized void writeInternal(String clusterId, String userId,
}
}

/**
* Given the various attributes of an entity, return the string subpath
* of the directory.
* @param clusterId cluster ID
* @param userId user ID
* @param flowName flow name
* @param flowVersion flow version
* @param flowRun flow run
* @param appId application ID
* @param type entity type
* @return the subpath for records.
*/
@VisibleForTesting
public static String buildEntityTypeSubpath(final String clusterId,
final String userId,
final String flowName,
final String flowVersion,
final long flowRun,
final String appId,
final String type) {
return clusterId
+ File.separator + userId
+ File.separator + escape(flowName, "")
+ File.separator + escape(flowVersion, "")
+ File.separator + flowRun
+ File.separator + escape(appId, "")
+ File.separator + escape(type, "type");
}

private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
TimelineWriteError error = new TimelineWriteError();
error.setEntityId(entity.getId());
Expand Down Expand Up @@ -316,8 +372,29 @@ protected void writeFile(Path outputPath, byte[] data) throws IOException {
}
}

// specifically escape the separator character
private static String escape(String str) {
return str.replace(File.separatorChar, '_');
/**
* Escape filesystem separator character and other URL-unfriendly chars.
* @param str input string
* @return a string with none of the escaped characters.
*/
@VisibleForTesting
public static String escape(String str) {
return escape(str, "");
}

/**
* Escape filesystem separator character and other URL-unfriendly chars.
* Empty strings are mapped to a fallback string, which may itself be empty.
* @param str input string
* @param fallback fallback char
* @return a string with none of the escaped characters.
*/
@VisibleForTesting
public static String escape(String str, final String fallback) {
return str.isEmpty()
? fallback
: str.replace(File.separatorChar, '_')
.replace('?', '_')
.replace(':', '_');
}
}
Loading