Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pig Job Auto Tuning Integration #290

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion plugins/jobtype/jobtypes/pig-0.12.0/plugin.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
pig.listener.visualizer=false

jobtype.classpath=${pig.home}/lib/*,${pig.home}/*
pig.home=
pig.home=
pig.enable.tuning=false
#tuning.api.end.point=http://hostname:8080/rest/getCurrentRunParameters
auto.tuning.job.type=PIG
Binary file added plugins/jobtype/lib/commons-httpclient-3.1.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/gson-2.8.1.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/httpclient-4.5.3.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/httpcore-4.4.6.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/javassist-3.21.0-GA.jar
Binary file not shown.
Binary file added plugins/jobtype/lib/mockito-core-1.10.19.jar
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added plugins/jobtype/lib/powermock-core-1.6.2.jar
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added plugins/jobtype/lib/powermock-reflect-1.6.2.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ public static void injectResources(Props props) {
*
* @param props The Azkaban properties
* @param workingDir The Azkaban job working directory
* @param fileName File name to write the XML configuration
*/
public static void prepareResourcesToInject(Props props, String workingDir) {
public static void prepareResourcesToInject(Props props, String workingDir, String fileName) {
try {
Configuration conf = new Configuration(false);

Expand All @@ -104,7 +105,7 @@ public static void prepareResourcesToInject(Props props, String workingDir) {
}

// Now write out the configuration file to inject.
File file = getConfFile(props, workingDir, INJECT_FILE);
File file = getConfFile(props, workingDir, fileName);
OutputStream xmlOut = new FileOutputStream(file);
conf.writeXml(xmlOut);
xmlOut.close();
Expand All @@ -113,17 +114,31 @@ public static void prepareResourcesToInject(Props props, String workingDir) {
}
}

/**
* Writes out the XML configuration file that will be injected by the client
* as a configuration resource.
* <p>
* This file will include a series of links injected by Azkaban as well as
* any job properties that begin with the designated injection prefix.
*
* @param props The Azkaban properties
* @param workingDir The Azkaban job working directory
*/
public static void prepareResourcesToInject(Props props, String workingDir) {
prepareResourcesToInject(props, workingDir, INJECT_FILE);
}

private static void addHadoopProperty(Props props, String propertyName) {
props.put(INJECT_PREFIX + propertyName, props.get(propertyName));
props.put(INJECT_PREFIX + propertyName, props.get(propertyName));
}

private static void addHadoopWorkflowProperty(Props props, String propertyName) {
String workflowID = props.get(CommonJobProperties.PROJECT_NAME)
+ WORKFLOW_ID_SEPERATOR + props.get(CommonJobProperties.FLOW_ID);
String workflowID =
props.get(CommonJobProperties.PROJECT_NAME) + WORKFLOW_ID_SEPERATOR + props.get(CommonJobProperties.FLOW_ID);
props.put(INJECT_PREFIX + propertyName, workflowID);
}

private static void addHadoopProperties(Props props) {
public static void addHadoopProperties(Props props) {
String[] propsToInject = new String[]{
CommonJobProperties.EXEC_ID,
CommonJobProperties.FLOW_ID,
Expand All @@ -142,7 +157,7 @@ private static void addHadoopProperties(Props props) {
CommonJobProperties.SUBMIT_USER
};

for(String propertyName : propsToInject) {
for (String propertyName : propsToInject) {
addHadoopProperty(props, propertyName);
}
addHadoopWorkflowProperty(props, WORKFLOW_ID_CONFIG);
Expand Down
25 changes: 20 additions & 5 deletions plugins/jobtype/src/azkaban/jobtype/HadoopPigJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public class HadoopPigJob extends JavaProcessJob {
public static final String HADOOP_UGI = "hadoop.job.ugi";
public static final String DEBUG = "debug";

//Global tuning enabled for Pig, this flag will decide whether azkaban supports tuning for pig or not
public static final String PIG_ENABLE_TUNING = "pig.enable.tuning";

//Job level tuning enabled. Should be set at job level
public static final String JOB_ENABLE_TUNING = "job.enable.tuning";

public static String HADOOP_SECURE_PIG_WRAPPER =
"azkaban.jobtype.HadoopSecurePigWrapper";

Expand All @@ -68,17 +74,25 @@ public class HadoopPigJob extends JavaProcessJob {
File tokenFile = null;

private final boolean userPigJar;

private final boolean enableTuning;
private HadoopSecurityManager hadoopSecurityManager;

private final String securePigWrapper;
private File pigLogFile = null;

public HadoopPigJob(String jobid, Props sysProps, Props jobProps, Logger log)
throws IOException {
super(jobid, sysProps, jobProps, log);

HADOOP_SECURE_PIG_WRAPPER = HadoopSecurePigWrapper.class.getName();

if (jobProps.containsKey(JOB_ENABLE_TUNING) && jobProps.containsKey(PIG_ENABLE_TUNING)) {
enableTuning = jobProps.getBoolean(JOB_ENABLE_TUNING) && jobProps.getBoolean(PIG_ENABLE_TUNING);
} else {
enableTuning = false;
}
if (enableTuning) {
securePigWrapper = HadoopTuningSecurePigWrapper.class.getName();
} else {
securePigWrapper = HadoopSecurePigWrapper.class.getName();
}
getJobProps().put(CommonJobProperties.JOB_ID, jobid);
shouldProxy =
getSysProps().getBoolean(HadoopSecurityManager.ENABLE_PROXYING, false);
Expand Down Expand Up @@ -138,7 +152,7 @@ public void run() throws Exception {

@Override
protected String getJavaClass() {
return HADOOP_SECURE_PIG_WRAPPER;
return securePigWrapper;
}

@Override
Expand Down Expand Up @@ -390,4 +404,5 @@ public void cancel() throws InterruptedException {
HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps,
tokenFile, getLog());
}

}
83 changes: 7 additions & 76 deletions plugins/jobtype/src/azkaban/jobtype/HadoopSecurePigWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,17 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.pig.PigRunner;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;

import azkaban.jobExecutor.ProcessJob;
import azkaban.jobtype.pig.PigUtil;
import azkaban.utils.Props;

public class HadoopSecurePigWrapper {

private static final String PIG_DUMP_HADOOP_COUNTER_PROPERTY = "pig.dump.hadoopCounter";
public class HadoopSecurePigWrapper {

private static File pigLogFile;

Expand All @@ -57,15 +49,14 @@ public static void main(final String[] args) throws Exception {
Properties jobProps = HadoopSecureWrapperUtils.loadAzkabanProps();
props = new Props(null, jobProps);
HadoopConfigurationInjector.injectResources(props);

// special feature of secure pig wrapper: we will append the pig error file
// onto system out
pigLogFile = new File(System.getenv("PIG_LOG_FILE"));

if (HadoopSecureWrapperUtils.shouldProxy(jobProps)) {
String tokenFile = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
UserGroupInformation proxyUser =
HadoopSecureWrapperUtils.setupProxyUser(jobProps, tokenFile, logger);
UserGroupInformation proxyUser = HadoopSecureWrapperUtils.setupProxyUser(jobProps, tokenFile, logger);
proxyUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
Expand All @@ -80,14 +71,9 @@ public Void run() throws Exception {

@SuppressWarnings("deprecation")
public static void runPigJob(String[] args) throws Exception {
PigStats stats = null;
if (props.getBoolean("pig.listener.visualizer", false) == true) {
stats = PigRunner.run(args, new AzkabanPigListener(props));
} else {
stats = PigRunner.run(args, null);
}
PigStats stats = PigUtil.runPigJob(args, props);

dumpHadoopCounters(stats);
PigUtil.dumpHadoopCounters(stats, props);

if (stats.isSuccessful()) {
return;
Expand All @@ -97,62 +83,7 @@ public static void runPigJob(String[] args) throws Exception {
handleError(pigLogFile);
}

// see jira ticket PIG-3313. Will remove these when we use pig binary with
// that patch.
// /////////////////////
System.out.println("Trying to do self kill, in case pig could not.");
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
Thread[] threadArray = threadSet.toArray(new Thread[threadSet.size()]);
for (Thread t : threadArray) {
if (!t.isDaemon() && !t.equals(Thread.currentThread())) {
System.out.println("Killing thread " + t);
t.stop();
}
}
System.exit(1);
// ////////////////////
throw new RuntimeException("Pig job failed.");
}

/**
* Dump Hadoop counters for each of the M/R jobs in the given PigStats.
*
* @param pigStats
*/
private static void dumpHadoopCounters(PigStats pigStats) {
try {
if (props.getBoolean(PIG_DUMP_HADOOP_COUNTER_PROPERTY, false)) {
if (pigStats != null) {
JobGraph jGraph = pigStats.getJobGraph();
Iterator<JobStats> iter = jGraph.iterator();
while (iter.hasNext()) {
JobStats jobStats = iter.next();
System.out.println("\n === Counters for job: "
+ jobStats.getJobId() + " ===");
Counters counters = jobStats.getHadoopCounters();
if (counters != null) {
for (Counters.Group group : counters) {
System.out.println(" Counter Group: " + group.getDisplayName()
+ " (" + group.getName() + ")");
System.out.println(" number of counters in this group: "
+ group.size());
for (Counters.Counter counter : group) {
System.out.println(" - " + counter.getDisplayName() + ": "
+ counter.getCounter());
}
}
} else {
System.out.println("There are no counters");
}
}
} else {
System.out.println("pigStats is null, can't dump Hadoop counters");
}
}
} catch (Exception e) {
System.out.println("Unexpected error: " + e.getMessage());
e.printStackTrace(System.out);
}
PigUtil.selfKill();
}

private static void handleError(File pigLog) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2018 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.jobtype;

import org.apache.hadoop.conf.Configuration;

import azkaban.utils.Props;


/**
* HadoopTuningConfigurationInjector is responsible for inserting links back to the
* Azkaban UI in configurations and for automatically injecting designated job
* properties into the Hadoop configuration.
* <p>
* It is assumed that the necessary links have already been loaded into the
* properties. After writing the necessary links as a xml file as required by
* Hadoop's configuration, clients may add the links as a default resource
* using injectResources() so that they are included in any Configuration
* constructed.
*/
public class HadoopTuningConfigurationInjector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class needs to leverage HadoopConfigurationInjector. There is a huge amount of code duplication here. Through refactoring if necessary, this class should only be adding logic for the tuning injections, not anything else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
It seems like almost all of the code is duplicated apart from a few lines.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reused HadoopConfigurationInjector and refactored some methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Looks great


// File to which the Hadoop configuration to inject will be written.
public static final String INJECT_TUNING_FILE = "hadoop-tuning-inject.xml";

/*
* To be called by the forked process to load the generated links and Hadoop
* configuration properties to automatically inject.
*
* @param props The Azkaban properties
*/
public static void injectResources(Props props) {
HadoopConfigurationInjector.injectResources(props);
Configuration.addDefaultResource(INJECT_TUNING_FILE);
}

/**
* Writes out the XML configuration file that will be injected by the client
* as a configuration resource.
* <p>
* This file will include a series of links injected by Azkaban as well as
* any job properties that begin with the designated injection prefix.
*
* @param props The Azkaban properties
* @param workingDir The Azkaban job working directory
*/
public static void prepareResourcesToInject(Props props, String workingDir) {
HadoopConfigurationInjector.prepareResourcesToInject(props, workingDir, INJECT_TUNING_FILE);
}

}
Loading