Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pig Job Auto Tuning Integration #290

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion plugins/jobtype/jobtypes/pig-0.12.0/plugin.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
pig.listener.visualizer=false

jobtype.classpath=${pig.home}/lib/*,${pig.home}/*
pig.home=
pig.home=
#pig.enable.tuning=false
#tuning.api.end.point=http://hostname:8080/rest/getCurrentRunParameters
#auto.tuning.job.type=PIG
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here? Why are we adding commented-out properties?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just kept for references, if user want to enable tuning they can do that. I have un-commented 2 of them.

Binary file added plugins/jobtype/lib/commons-httpclient-3.1.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/gson-2.8.1.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/httpclient-4.5.3.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/httpcore-4.4.6.jar
Binary file not shown.
27 changes: 25 additions & 2 deletions plugins/jobtype/src/azkaban/jobtype/HadoopPigJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ public class HadoopPigJob extends JavaProcessJob {
public static final String HADOOP_UGI = "hadoop.job.ugi";
public static final String DEBUG = "debug";

//Global tuning enabled for Pig, this flag will decide whether azkaban supports tuning for pig or not
public static final String PIG_ENABLE_TUNING = "pig.enable.tuning";

//Job level tuning enabled. Should be set at job level
public static final String JOB_ENABLE_TUNING = "job.enable.tuning";

//Internal flag for seeing whether this was a retry of the job after failure because of auto tuning
public static final String AUTO_TUNING_RETRY = "auto.tuning.retry";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no cleaner way to pass this around than as a job prop?

Also, if this is an internal flag, it should not be public.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable was not required here.

All other parameters are passed through job props, so thought of using same.


public static String HADOOP_SECURE_PIG_WRAPPER =
"azkaban.jobtype.HadoopSecurePigWrapper";

Expand All @@ -68,7 +77,7 @@ public class HadoopPigJob extends JavaProcessJob {
File tokenFile = null;

private final boolean userPigJar;

private final boolean enableTuning;
private HadoopSecurityManager hadoopSecurityManager;

private File pigLogFile = null;
Expand All @@ -77,8 +86,20 @@ public HadoopPigJob(String jobid, Props sysProps, Props jobProps, Logger log)
throws IOException {
super(jobid, sysProps, jobProps, log);

HADOOP_SECURE_PIG_WRAPPER = HadoopSecurePigWrapper.class.getName();
if (jobProps.containsKey(JOB_ENABLE_TUNING) && jobProps.containsKey(PIG_ENABLE_TUNING)) {
enableTuning = jobProps.getBoolean(JOB_ENABLE_TUNING) && jobProps.getBoolean(PIG_ENABLE_TUNING);
}else
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style does not match convention. }else -> } else {, if( -> if (, and opening curly brace should appear on the same line as the if.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

{
enableTuning = false;
}
if(enableTuning)
{
HADOOP_SECURE_PIG_WRAPPER = HadoopTuningSecurePigWrapper.class.getName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be modifying a static variable here. IIUC this could affect other running jobs.

HADOOP_SECURE_PIG_WRAPPER should really be a constant (final), and there should be a new field like private final String securePigWrapper which contains the wrapper to use for this specific instance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right here. Great catch. Done.


}else
{
HADOOP_SECURE_PIG_WRAPPER = HadoopSecurePigWrapper.class.getName();
}
getJobProps().put(CommonJobProperties.JOB_ID, jobid);
shouldProxy =
getSysProps().getBoolean(HadoopSecurityManager.ENABLE_PROXYING, false);
Expand Down Expand Up @@ -248,6 +269,7 @@ protected List<String> getClassPaths() {
classPath.add(HadoopConfigurationInjector.getPath(getJobProps(),
getWorkingDirectory()));


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unnecessary whitespace changes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// assuming pig 0.8 and up
if (!userPigJar) {
classPath.add(getSourcePathFromClass(PigRunner.class));
Expand Down Expand Up @@ -390,4 +412,5 @@ public void cancel() throws InterruptedException {
HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps,
tokenFile, getLog());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright 2014 LinkedIn Corp.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update copyright year

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.jobtype;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;

import azkaban.flow.CommonJobProperties;
import azkaban.utils.Props;


/**
* HadoopTuningConfigurationInjector is responsible for inserting links back to the
* Azkaban UI in configurations and for automatically injecting designated job
* properties into the Hadoop configuration.
* <p>
* It is assumed that the necessary links have already been loaded into the
* properties. After writing the necessary links as a xml file as required by
* Hadoop's configuration, clients may add the links as a default resource
* using injectResources() so that they are included in any Configuration
* constructed.
*/
public class HadoopTuningConfigurationInjector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class needs to leverage HadoopConfigurationInjector. There is a huge amount of code duplication here. Through refactoring if necessary, this class should only be adding logic for the tuning injections, not anything else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
It seems like almost all of the code is duplicated apart from a few lines.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reused HadoopConfigurationInjector and refactored some methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Looks great

private static Logger _logger = Logger.getLogger(HadoopTuningConfigurationInjector.class);

// File to which the Hadoop configuration to inject will be written.
public static final String INJECT_FILE = "hadoop-inject.xml";
// File to which the Hadoop configuration to inject will be written.
public static final String INJECT_TUNING_FILE = "hadoop-tuning-inject.xml";

// Prefix for properties to be automatically injected into the Hadoop conf.
public static final String INJECT_PREFIX = "hadoop-inject.";

public static final String WORKFLOW_ID_SEPERATOR = "$";
private static final String WORKFLOW_ID_CONFIG = "yarn.workflow.id";

/*
* To be called by the forked process to load the generated links and Hadoop
* configuration properties to automatically inject.
*
* @param props The Azkaban properties
*/
public static void injectResources(Props props, String workingDir) {
// Add mapred, yarn and hdfs site configs (in addition to core-site, which
// is automatically added) as default resources before we add the injected
// configuration. This will cause the injected properties to override the
// default site properties (instead of vice-versa). This is safe to do,
// even when these site files don't exist for your Hadoop installation.
if (props.getBoolean("azkaban.inject.hadoop-site.configs", true)) {
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
Configuration.addDefaultResource("yarn-default.xml");
Configuration.addDefaultResource("yarn-site.xml");
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
Configuration.addDefaultResource(INJECT_FILE);
Configuration.addDefaultResource(INJECT_TUNING_FILE);
}

/**
* Writes out the XML configuration file that will be injected by the client
* as a configuration resource.
* <p>
* This file will include a series of links injected by Azkaban as well as
* any job properties that begin with the designated injection prefix.
*
* @param props The Azkaban properties
* @param workingDir The Azkaban job working directory
*/
public static void prepareResourcesToInject(Props props, String workingDir) {
try {
Configuration conf = new Configuration(false);

// First, inject a series of Azkaban links. These are equivalent to
// CommonJobProperties.[EXECUTION,WORKFLOW,JOB,JOBEXEC,ATTEMPT]_LINK
addHadoopProperties(props);

// Next, automatically inject any properties that begin with the
// designated injection prefix.
Map<String, String> confProperties = props.getMapByPrefix(INJECT_PREFIX);

for (Map.Entry<String, String> entry : confProperties.entrySet()) {
String confKey = entry.getKey().replace(INJECT_PREFIX, "");
String confVal = entry.getValue();
if (confVal != null) {
conf.set(confKey, confVal);
}
}

// Now write out the configuration file to inject.
File file = getConfFile(props, workingDir, INJECT_TUNING_FILE);
OutputStream xmlOut = new FileOutputStream(file);
conf.writeXml(xmlOut);
xmlOut.close();
} catch (Throwable e) {
_logger.error("Encountered error while preparing the Hadoop configuration resource file", e);
}
}

private static void addHadoopProperty(Props props, String propertyName) {
props.put(INJECT_PREFIX + propertyName, props.get(propertyName));
}

private static void addHadoopWorkflowProperty(Props props, String propertyName) {
String workflowID =
props.get(CommonJobProperties.PROJECT_NAME) + WORKFLOW_ID_SEPERATOR + props.get(CommonJobProperties.FLOW_ID);
props.put(INJECT_PREFIX + propertyName, workflowID);
}

private static void addHadoopProperties(Props props) {
String[] propsToInject =
new String[] { CommonJobProperties.EXEC_ID, CommonJobProperties.FLOW_ID, CommonJobProperties.JOB_ID, CommonJobProperties.PROJECT_NAME, CommonJobProperties.PROJECT_VERSION, CommonJobProperties.EXECUTION_LINK, CommonJobProperties.JOB_LINK, CommonJobProperties.WORKFLOW_LINK, CommonJobProperties.JOBEXEC_LINK, CommonJobProperties.ATTEMPT_LINK, CommonJobProperties.OUT_NODES, CommonJobProperties.IN_NODES, CommonJobProperties.PROJECT_LAST_CHANGED_DATE, CommonJobProperties.PROJECT_LAST_CHANGED_BY, CommonJobProperties.SUBMIT_USER };

for (String propertyName : propsToInject) {
addHadoopProperty(props, propertyName);
}
addHadoopWorkflowProperty(props, WORKFLOW_ID_CONFIG);
}

/**
* Resolve the location of the file containing the configuration file.
*
* @param props The Azkaban properties
* @param workingDir The Azkaban job working directory
* @param fileName The desired configuration file name
*/
public static File getConfFile(Props props, String workingDir, String fileName) {
File jobDir = new File(workingDir, getDirName(props));
if (!jobDir.exists()) {
jobDir.mkdir();
}
return new File(jobDir, fileName);
}

/**
* For classpath reasons, we'll put each link file in a separate directory.
* This must be called only after the job id has been inserted by the job.
*
* @param props The Azkaban properties
*/
public static String getDirName(Props props) {
String dirSuffix = props.get(CommonJobProperties.NESTED_FLOW_PATH);

if ((dirSuffix == null) || (dirSuffix.length() == 0)) {
dirSuffix = props.get(CommonJobProperties.JOB_ID);
if ((dirSuffix == null) || (dirSuffix.length() == 0)) {
throw new RuntimeException("azkaban.flow.nested.path and azkaban.job.id were not set");
}
}

return "_resources_" + dirSuffix.replace(':', '_');
}

/**
* Gets the path to the directory in which the generated links and Hadoop
* conf properties files are written.
*
* @param props The Azkaban properties
* @param workingDir The Azkaban job working directory
*/
public static String getPath(Props props, String workingDir) {
return new File(workingDir, getDirName(props)).toString();
}

/**
* Loads an Azkaban property into the Hadoop configuration.
*
* @param props The Azkaban properties
* @param conf The Hadoop configuration
* @param name The property name to load from the Azkaban properties into the Hadoop configuration
*/
public static void loadProp(Props props, Configuration conf, String name) {
String prop = props.get(name);
if (prop != null) {
conf.set(name, prop);
}
}
}
Loading