Skip to content

Commit

Permalink
add interface to specify YARN application name
Browse files Browse the repository at this point in the history
  • Loading branch information
kmaehashi committed Sep 24, 2015
1 parent 76b72ca commit 9e29cdc
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,45 @@ object JubatusYarnApplication extends HasLogger {
* @return [[JubatusYarnApplication]]
*/
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int): Future[JubatusYarnApplication] = {
start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"))
start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"), null)
}

/**
* JubatusYarnApplication を起動します。
*
* juba${aLearningMachineType_proxy} がひとつ, juba${aLearningMachineType} が ${aNodeCount} だけ起動します。
* 各 juba${aLearningMachineType} の使用するリソースを ${aResource} に指定してください。
*
* @param aLearningMachineName learning machine name
* @param aLearningMachineType learning machine type
* @param aZookeepers ZooKeeper locations
* @param aConfigString config json string
* @param aResource computer resources in the cluster
* @param aNodeCount number of cluster
* @param aApplicationName yarn-application name
* @return [[JubatusYarnApplication]]
*/
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aApplicationName: String): Future[JubatusYarnApplication] = {
start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"), aApplicationName)
}

/**
* JubatusYarnApplication を起動します。
*
* juba${aLearningMachineType_proxy} がひとつ, juba${aLearningMachineType} が ${aNodeCount} だけ起動します。
* 各 juba${aLearningMachineType} の使用するリソースを ${aResource} に指定してください。
*
* @param aLearningMachineName learning machine name
* @param aLearningMachineType learning machine type
* @param aZookeepers ZooKeeper locations
* @param aConfigString config json string
* @param aResource computer resources in the cluster
* @param aNodeCount number of cluster
* @param aBasePath base path of jar and sh files
* @return [[JubatusYarnApplication]]
*/
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path): Future[JubatusYarnApplication] = {
start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, aBasePath, null)
}

/**
Expand All @@ -101,9 +139,10 @@ object JubatusYarnApplication extends HasLogger {
* @param aResource computer resources in the cluster
* @param aNodeCount number of cluster
* @param aBasePath base path of jar and sh files
* @param aApplicationName yarn-application name
* @return [[JubatusYarnApplication]]
*/
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path): Future[JubatusYarnApplication] = Future {
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path, aApplicationName: String): Future[JubatusYarnApplication] = Future {
require(aResource.memory > 0, "specify memory than 1MB.")
require(aNodeCount > 0, "specify node count than 1")

Expand All @@ -114,8 +153,8 @@ object JubatusYarnApplication extends HasLogger {
case None => throw new IllegalStateException("Service not running.")
case Some(tYarnClientController) =>
// ApplicationMaster 起動
logger.info(s"startJubatusApplication $aLearningMachineName, $aLearningMachineType, $aZookeepers, $aConfigString, $aResource, $aNodeCount")
val tApplicationMasterProxy = tYarnClientController.startJubatusApplication(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, aBasePath)
logger.info(s"startJubatusApplication $aLearningMachineName, $aLearningMachineType, $aZookeepers, $aConfigString, $aResource, $aNodeCount, $aApplicationName")
val tApplicationMasterProxy = tYarnClientController.startJubatusApplication(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, aBasePath, aApplicationName)
waitForStarted(ApplicationContext(tYarnClientController, tApplicationMasterProxy, tService))
}
}
Expand All @@ -135,7 +174,44 @@ object JubatusYarnApplication extends HasLogger {
* @return [[JubatusYarnApplication]]
*/
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int): Future[JubatusYarnApplication] = {
start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"))
start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"), null)
}

/**
* JubatusYarnApplication を起動します。
*
* juba${aLearningMachineType_proxy} がひとつ, juba${aLearningMachineType} が ${aNodeCount} だけ起動します。
* 各 juba${aLearningMachineType} の使用するリソースを ${aResource} に指定してください。
*
* @param aLearningMachineName learning machine name
* @param aLearningMachineType learning machine type
* @param aZookeepers ZooKeeper locations
* @param aConfigFile config file
* @param aResource computer resources in the cluster
* @param aNodeCount number of cluster
* @param aApplicationName yarn-application name
* @return [[JubatusYarnApplication]]
*/
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aApplicationName: String): Future[JubatusYarnApplication] = {
start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, new Path("hdfs:///jubatus-on-yarn"), aApplicationName)
}

/**
* JubatusYarnApplication を起動します。
*
* juba${aLearningMachineType_proxy} がひとつ, juba${aLearningMachineType} が ${aNodeCount} だけ起動します。
* 各 juba${aLearningMachineType} の使用するリソースを ${aResource} に指定してください。
*
* @param aLearningMachineName learning machine name
* @param aLearningMachineType learning machine type
* @param aZookeepers ZooKeeper locations
* @param aConfigFile config file
* @param aResource computer resources in the cluster
* @param aNodeCount number of cluster
* @return [[JubatusYarnApplication]]
*/
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path): Future[JubatusYarnApplication] = {
start(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, aBasePath, null)
}

/**
Expand All @@ -150,9 +226,10 @@ object JubatusYarnApplication extends HasLogger {
* @param aConfigFile config file
* @param aResource computer resources in the cluster
* @param aNodeCount number of cluster
* @param aApplicationName yarn-application name
* @return [[JubatusYarnApplication]]
*/
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path): Future[JubatusYarnApplication] = Future {
def start(aLearningMachineName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path, aApplicationName: String): Future[JubatusYarnApplication] = Future {
require(aResource.memory > 0, "specify memory than 1MB.")
require(aNodeCount > 0, "specify node count than 1")

Expand All @@ -163,8 +240,8 @@ object JubatusYarnApplication extends HasLogger {
case None => throw new IllegalStateException("Service not running.")
case Some(tYarnClientController) =>
// ApplicationMaster 起動
logger.info(s"startJubatusApplication $aLearningMachineName, $aLearningMachineType, $aZookeepers, $aConfigFile, $aResource, $aNodeCount")
val tApplicationMasterProxy = tYarnClientController.startJubatusApplication(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, aBasePath)
logger.info(s"startJubatusApplication $aLearningMachineName, $aLearningMachineType, $aZookeepers, $aConfigFile, $aResource, $aNodeCount, $aApplicationName")
val tApplicationMasterProxy = tYarnClientController.startJubatusApplication(aLearningMachineName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, aBasePath, aApplicationName)
waitForStarted(ApplicationContext(tYarnClientController, tApplicationMasterProxy, tService))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ class YarnClientController(location: Location, yarnClient: YarnClient = new Defa
}

def startJubatusApplication(aName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path): ApplicationMasterProxy = {
val tFullName = getFullName(aName, aLearningMachineType, aZookeepers)
startJubatusApplication(aName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, aBasePath, null)
}

def startJubatusApplication(aName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigString: String, aResource: Resource, aNodeCount: Int, aBasePath: Path, aApplicationName: String): ApplicationMasterProxy = {
val tFullName = Option(aApplicationName).getOrElse(getFullName(aName, aLearningMachineType, aZookeepers))
logger.info(s"starting $tFullName")

val tApplicationId = yarnClient.submitApplicationMaster(tFullName, aName, aLearningMachineType, aZookeepers, aConfigString, aResource, aNodeCount, location, aBasePath)
Expand All @@ -56,7 +60,11 @@ class YarnClientController(location: Location, yarnClient: YarnClient = new Defa
}

def startJubatusApplication(aName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path): ApplicationMasterProxy = {
val tFullName = getFullName(aName, aLearningMachineType, aZookeepers)
startJubatusApplication(aName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, aBasePath, null)
}

def startJubatusApplication(aName: String, aLearningMachineType: LearningMachineType, aZookeepers: List[Location], aConfigFile: Path, aResource: Resource, aNodeCount: Int, aBasePath: Path, aApplicationName: String): ApplicationMasterProxy = {
val tFullName = Option(aApplicationName).getOrElse(getFullName(aName, aLearningMachineType, aZookeepers))
logger.info(s"starting $tFullName")

val tApplicationId = yarnClient.submitApplicationMaster(tFullName, aName, aLearningMachineType, aZookeepers, aConfigFile, aResource, aNodeCount, location, aBasePath)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
core-site.xml
yarn-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0"?>

<!--
This file is a template file of core-site.xml.
A user should replace [value]s in this file to actual values and rename to core-site.xml.
-->
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://[host]:[port]</value>
</property>
<property>
<name>hadoop.proxyuser.mapred.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.mapred.hosts</name>
<value>*</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"method":"AROW","parameter":{"regularization_weight":1.0},"converter":{"num_filter_types":{},"num_filter_rules":[],"string_filter_types":{},"string_filter_rules":[],"num_types":{},"num_rules":[{"key":"*","type":"num"}],"string_types":{"unigram":{"method":"ngram","char_num":"1"}},"string_rules":[{"key":"*","type":"unigram","sample_weight":"bin","global_weight":"bin"}]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?xml version="1.0"?>
<!--
This file is a template file of yarn-site.xml.
A user should replace path in this file to actual values and rename to yarn-site.xml.
-->
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>

<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<property>
<description>List of directories to store localized files in.</description>
<name>yarn.nodemanager.local-dirs</name>
<value>file:///var/lib/hadoop-yarn/cache/${user.name}/nm-local-dir</value>
</property>

<property>
<description>Where to store container logs.</description>
<name>yarn.nodemanager.log-dirs</name>
<value>file:///var/log/hadoop-yarn/containers</value>
</property>
<!--
<property>
<description>Where to aggregate logs to.</description>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>hdfs://var/log/hadoop-yarn/apps</value>
</property>
-->
<property>
<description>Classpath for typical applications.</description>
<name>yarn.application.classpath</name>
<value>
$HADOOP_CONF_DIR,
$HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*,
$HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*,
$HADOOP_MAPRED_HOME/*,$HADOOP_MAPRED_HOME/lib/*,
$HADOOP_YARN_HOME/*,$HADOOP_YARN_HOME/lib/*
</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package us.jubat.yarn.client

import org.scalatest._
import us.jubat.yarn.common.Location
import us.jubat.yarn.common.LearningMachineType
import org.apache.hadoop.fs.Path
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util._
import scala.concurrent.duration.Duration
import scala.sys.process.{Process, ProcessBuilder}
import org.apache.hadoop.conf.Configuration

class JubatusYarnApplicationSpec extends FlatSpec with Matchers with BeforeAndAfterAll {

val machineType = LearningMachineType.Classifier
val zookeeper = new Location("localhost", 2181)
val configString = """{"method":"AROW","parameter":{"regularization_weight":1.0},"converter":{"num_filter_types":{},"num_filter_rules":[],"string_filter_types":{},"string_filter_rules":[],"num_types":{},"num_rules":[{"key":"*","type":"num"}],"string_types":{"unigram":{"method":"ngram","char_num":"1"}},"string_rules":[{"key":"*","type":"unigram","sample_weight":"bin","global_weight":"bin"}]}}"""
val basePath = new Path("hdfs:///jubatus-on-yarn/")
val configPath = new Path("hdfs:///jubatus-on-yarn/test/jubatus_config.json")

override def beforeAll(): Unit = {
//テストデータの配置
val conf = new Configuration()
val fs = configPath.getFileSystem(conf)
if (!fs.exists(configPath)) {
val localPath = new Path("jubatus-on-yarn-client/src/test/resources/jubatus_config.json")
fs.copyFromLocalFile(localPath, configPath)
}
}

// start()結果からアプリケーション名を取得する
private def getAppicationName(future: Future[JubatusYarnApplication]): String = {
var applicationName = ""
val result = future.andThen {
case Success(j) =>
applicationName = j.status.yarnApplication.getName
j.kill()
case Failure(t) =>
print("CREATE MODEL failed: " + t.getMessage)
t.printStackTrace()
}
Await.result(result, Duration.Inf)
return applicationName
}

"start ()" should "check ApplicationName" in {
//config parameter is String
//specific paramter: No Path and No ApplicationName
var future = JubatusYarnApplication.start("model1", machineType, List(zookeeper), configString, Resource(1, 1, 1), 3)
var resultName = getAppicationName(future)
resultName shouldBe "model1:" + machineType.name + ":" + zookeeper.hostAddress + ":" + zookeeper.port

//specific paramter: Path and No ApplicationName
future = JubatusYarnApplication.start("model2", machineType, List(zookeeper), configString, Resource(1, 1, 1), 3, basePath)
resultName = getAppicationName(future)
resultName shouldBe "model2:" + machineType.name + ":" + zookeeper.hostAddress + ":" + zookeeper.port

//specific paramter: No Path and ApplicationName
future = JubatusYarnApplication.start("model3", machineType, List(zookeeper), configString, Resource(1, 1, 1), 3, "dummyApplicationName3")
resultName = getAppicationName(future)
resultName shouldBe "dummyApplicationName3"

//specific paramter: Path and ApplicationName
future = JubatusYarnApplication.start("model4", machineType, List(zookeeper), configString, Resource(1, 1, 1), 3, basePath, "dummyApplicationName4")
resultName = getAppicationName(future)
resultName shouldBe "dummyApplicationName4"

//config parameter is Path
//specific paramter: No Path and No ApplicationName
future = JubatusYarnApplication.start("model5", machineType, List(zookeeper), configPath, Resource(1, 1, 1), 3)
resultName = getAppicationName(future)
resultName shouldBe "model5:" + machineType.name + ":" + zookeeper.hostAddress + ":" + zookeeper.port

//specific paramter: Path and No ApplicationName
future = JubatusYarnApplication.start("model6", machineType, List(zookeeper), configPath, Resource(1, 1, 1), 3, basePath)
resultName = getAppicationName(future)
resultName shouldBe "model6:" + machineType.name + ":" + zookeeper.hostAddress + ":" + zookeeper.port

//specific paramter: No Path and ApplicationName
future = JubatusYarnApplication.start("model7", machineType, List(zookeeper), configPath, Resource(1, 1, 1), 3, "dummyApplicationName7")
resultName = getAppicationName(future)
resultName shouldBe "dummyApplicationName7"

//specific paramter: Path and ApplicationName
future = JubatusYarnApplication.start("model8", machineType, List(zookeeper), configPath, Resource(1, 1, 1), 3, basePath, "dummyApplicationName8")
resultName = getAppicationName(future)
resultName shouldBe "dummyApplicationName8"
}

}
Loading

0 comments on commit 9e29cdc

Please sign in to comment.