Skip to content

Commit 36706de

Browse files
add savepoint
1 parent 81fe1d2 commit 36706de

File tree

5 files changed

+189
-93
lines changed

5 files changed

+189
-93
lines changed

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterMode.java renamed to core/src/main/java/com/dtstack/flink/sql/ClusterMode.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
/**
23
* Licensed to the Apache Software Foundation (ASF) under one
34
* or more contributor license agreements. See the NOTICE file
@@ -16,20 +17,18 @@
1617
* limitations under the License.
1718
*/
1819

19-
package com.dtstack.flink.sql.launcher;
20+
package com.dtstack.flink.sql;
2021

2122
/**
22-
* This class defines three running mode of FlinkX
23-
*
24-
* Company: www.dtstack.com
25-
23+
* Created by sishu.yss on 2018/10/10.
2624
*/
27-
public class ClusterMode {
28-
29-
public static final String MODE_LOCAL = "local";
25+
public enum ClusterMode {
3026

31-
public static final String MODE_STANDALONE = "standalone";
27+
local(0),standalone(1),yarn(2),yarnPer(3);
3228

33-
public static final String MODE_YARN = "yarn";
29+
private int type;
3430

31+
ClusterMode(int type){
32+
this.type = type;
33+
}
3534
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/ClusterClientFactory.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,33 +22,27 @@
2222
import org.apache.flink.client.deployment.ClusterRetrieveException;
2323
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
2424
import org.apache.flink.client.program.ClusterClient;
25-
import org.apache.flink.client.program.StandaloneClusterClient;
2625
import org.apache.flink.client.program.rest.RestClusterClient;
2726
import org.apache.flink.configuration.ConfigConstants;
2827
import org.apache.flink.configuration.Configuration;
2928
import org.apache.flink.configuration.GlobalConfiguration;
3029
import org.apache.flink.core.fs.FileSystem;
3130
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
32-
import org.apache.flink.yarn.YarnClusterClient;
3331
import org.apache.flink.yarn.YarnClusterDescriptor;
3432
import org.apache.hadoop.yarn.api.records.ApplicationId;
3533
import org.apache.hadoop.yarn.api.records.ApplicationReport;
3634
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
3735
import org.apache.hadoop.yarn.client.api.YarnClient;
3836
import org.apache.hadoop.yarn.conf.YarnConfiguration;
39-
4037
import java.io.File;
4138
import java.io.FilenameFilter;
42-
import java.lang.reflect.Field;
4339
import java.util.EnumSet;
4440
import java.util.HashSet;
4541
import java.util.Iterator;
4642
import java.util.List;
4743
import java.util.Map;
48-
import java.util.Properties;
4944
import java.util.Set;
50-
51-
import static com.dtstack.flink.sql.launcher.LauncherOptions.*;
45+
import com.dtstack.flink.sql.ClusterMode;
5246

5347
/**
5448
* The Factory of ClusterClient
@@ -58,29 +52,29 @@
5852
*/
5953
public class ClusterClientFactory {
6054

61-
public static ClusterClient createClusterClient(Properties props) throws ClusterRetrieveException {
62-
String clientType = props.getProperty(OPTION_MODE);
63-
if(clientType.equals(ClusterMode.MODE_STANDALONE)) {
64-
return createStandaloneClient(props);
65-
} else if(clientType.equals(ClusterMode.MODE_YARN)) {
66-
return createYarnClient(props);
55+
public static ClusterClient createClusterClient(LauncherOptions launcherOptions) throws ClusterRetrieveException {
56+
String mode = launcherOptions.getMode();
57+
if(mode.equals(ClusterMode.standalone.name())) {
58+
return createStandaloneClient(launcherOptions);
59+
} else if(mode.equals(ClusterMode.yarn.name())) {
60+
return createYarnClient(launcherOptions);
6761
}
6862
throw new IllegalArgumentException("Unsupported cluster client type: ");
6963
}
7064

71-
public static RestClusterClient createStandaloneClient(Properties props) throws ClusterRetrieveException {
72-
String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR);
65+
public static RestClusterClient createStandaloneClient(LauncherOptions launcherOptions) throws ClusterRetrieveException {
66+
String flinkConfDir = launcherOptions.getFlinkconf();
7367
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
7468
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
7569
RestClusterClient clusterClient = descriptor.retrieve(null);
7670
clusterClient.setDetached(true);
7771
return clusterClient;
7872
}
7973

80-
public static ClusterClient createYarnClient(Properties props) {
81-
String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR);
74+
public static ClusterClient createYarnClient(LauncherOptions launcherOptions) {
75+
String flinkConfDir = launcherOptions.getFlinkconf();
8276
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
83-
String yarnConfDir = props.getProperty(LauncherOptions.OPTION_YARN_CONF_DIR);
77+
String yarnConfDir = launcherOptions.getYarnconf();
8478
YarnConfiguration yarnConf = new YarnConfiguration();
8579
if(StringUtils.isNotBlank(yarnConfDir)) {
8680
try {

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,12 @@
2424
import com.dtstack.flink.sql.Main;
2525
import org.apache.flink.client.program.ClusterClient;
2626
import org.apache.flink.client.program.PackagedProgram;
27-
2827
import java.io.File;
2928
import java.util.List;
30-
31-
import static com.dtstack.flink.sql.launcher.ClusterMode.MODE_LOCAL;
32-
import static com.dtstack.flink.sql.launcher.LauncherOptions.OPTION_LOCAL_SQL_PLUGIN_PATH;
33-
import static com.dtstack.flink.sql.launcher.LauncherOptions.OPTION_MODE;
29+
import com.dtstack.flink.sql.ClusterMode;
30+
import org.apache.flink.table.shaded.org.apache.commons.lang.StringUtils;
31+
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
32+
import org.apache.flink.table.shaded.org.apache.commons.lang.BooleanUtils;
3433

3534
/**
3635
* Date: 2017/2/20
@@ -39,7 +38,6 @@
3938
*/
4039

4140
public class LauncherMain {
42-
4341
private static final String CORE_JAR = "core.jar";
4442

4543
private static String SP = File.separator;
@@ -51,18 +49,21 @@ private static String getLocalCoreJarPath(String localSqlRootJar){
5149

5250
public static void main(String[] args) throws Exception {
5351
LauncherOptionParser optionParser = new LauncherOptionParser(args);
54-
String mode = (String) optionParser.getVal(OPTION_MODE);
52+
LauncherOptions launcherOptions = optionParser.getLauncherOptions();
53+
String mode = launcherOptions.getMode();
5554
List<String> argList = optionParser.getProgramExeArgList();
56-
57-
if(mode.equals(MODE_LOCAL)) {
55+
if(mode.equals(ClusterMode.local.name())) {
5856
String[] localArgs = argList.toArray(new String[argList.size()]);
5957
Main.main(localArgs);
6058
} else {
61-
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(optionParser.getProperties());
62-
String pluginRoot = (String) optionParser.getVal(OPTION_LOCAL_SQL_PLUGIN_PATH);
59+
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
60+
String pluginRoot = launcherOptions.getLocalSqlPluginPath();
6361
File jarFile = new File(getLocalCoreJarPath(pluginRoot));
6462
String[] remoteArgs = argList.toArray(new String[argList.size()]);
6563
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);
64+
if(StringUtils.isNotBlank(launcherOptions.getSavePointPath())){
65+
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(launcherOptions.getSavePointPath(), BooleanUtils.toBoolean(launcherOptions.getAllowNonRestoredState())));
66+
}
6667
clusterClient.run(program, 1);
6768
clusterClient.shutdown();
6869
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherOptionParser.java

Lines changed: 55 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,14 @@
2525
import org.apache.commons.lang.StringUtils;
2626
import org.apache.flink.hadoop.shaded.com.google.common.base.Charsets;
2727
import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions;
28-
28+
import com.dtstack.flink.sql.util.PluginUtil;
2929
import java.io.File;
3030
import java.io.FileInputStream;
3131
import java.net.URLEncoder;
3232
import java.util.List;
3333
import java.util.Map;
34-
import java.util.Properties;
34+
import com.dtstack.flink.sql.ClusterMode;
3535

36-
import static com.dtstack.flink.sql.launcher.LauncherOptions.*;
37-
import static com.dtstack.flink.sql.launcher.ClusterMode.*;
3836

3937

4038
/**
@@ -45,14 +43,36 @@
4543
*/
4644
public class LauncherOptionParser {
4745

46+
public static final String OPTION_MODE = "mode";
47+
48+
public static final String OPTION_NAME = "name";
49+
50+
public static final String OPTION_SQL = "sql";
51+
52+
public static final String OPTION_FLINK_CONF_DIR = "flinkconf";
53+
54+
public static final String OPTION_YARN_CONF_DIR = "yarnconf";
55+
56+
public static final String OPTION_LOCAL_SQL_PLUGIN_PATH = "localSqlPluginPath";
57+
58+
public static final String OPTION_REMOTE_SQL_PLUGIN_PATH = "remoteSqlPluginPath";
59+
60+
public static final String OPTION_ADDJAR = "addjar";
61+
62+
public static final String OPTION_CONF_PROP = "confProp";
63+
64+
public static final String OPTION_SAVE_POINT_PATH = "savePointPath";
65+
66+
public static final String OPTION_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";
67+
4868
private Options options = new Options();
4969

5070
private BasicParser parser = new BasicParser();
5171

52-
private Properties properties = new Properties();
72+
private LauncherOptions properties = new LauncherOptions();
5373

5474
public LauncherOptionParser(String[] args) {
55-
options.addOption(LauncherOptions.OPTION_MODE, true, "Running mode");
75+
options.addOption(OPTION_MODE, true, "Running mode");
5676
options.addOption(OPTION_SQL, true, "Job sql file");
5777
options.addOption(OPTION_NAME, true, "Job name");
5878
options.addOption(OPTION_FLINK_CONF_DIR, true, "Flink configuration directory");
@@ -62,11 +82,14 @@ public LauncherOptionParser(String[] args) {
6282
options.addOption(OPTION_CONF_PROP, true, "sql ref prop,eg specify event time");
6383
options.addOption(OPTION_YARN_CONF_DIR, true, "Yarn and hadoop configuration directory");
6484

85+
options.addOption(OPTION_SAVE_POINT_PATH, true, "Savepoint restore path");
86+
options.addOption(OPTION_ALLOW_NON_RESTORED_STATE, true, "Flag indicating whether non restored state is allowed if the savepoint");
87+
6588
try {
6689
CommandLine cl = parser.parse(options, args);
67-
String mode = cl.getOptionValue(OPTION_MODE, MODE_LOCAL);
90+
String mode = cl.getOptionValue(OPTION_MODE, ClusterMode.local.name());
6891
//check mode
69-
properties.put(OPTION_MODE, mode);
92+
properties.setMode(mode);
7093

7194
String job = Preconditions.checkNotNull(cl.getOptionValue(OPTION_SQL),
7295
"Must specify job file using option '" + OPTION_SQL + "'");
@@ -76,78 +99,65 @@ public LauncherOptionParser(String[] args) {
7699
in.read(filecontent);
77100
String content = new String(filecontent, "UTF-8");
78101
String sql = URLEncoder.encode(content, Charsets.UTF_8.name());
79-
properties.put(OPTION_SQL, sql);
80-
102+
properties.setSql(sql);
81103
String localPlugin = Preconditions.checkNotNull(cl.getOptionValue(OPTION_LOCAL_SQL_PLUGIN_PATH));
82-
properties.put(OPTION_LOCAL_SQL_PLUGIN_PATH, localPlugin);
83-
104+
properties.setLocalSqlPluginPath(localPlugin);
84105
String remotePlugin = cl.getOptionValue(OPTION_REMOTE_SQL_PLUGIN_PATH);
85-
if(!mode.equalsIgnoreCase(ClusterMode.MODE_LOCAL)){
106+
if(!ClusterMode.local.name().equals(mode)){
86107
Preconditions.checkNotNull(remotePlugin);
87-
properties.put(OPTION_REMOTE_SQL_PLUGIN_PATH, remotePlugin);
108+
properties.setRemoteSqlPluginPath(remotePlugin);
88109
}
89-
90110
String name = Preconditions.checkNotNull(cl.getOptionValue(OPTION_NAME));
91-
properties.put(OPTION_NAME, name);
92-
111+
properties.setName(name);
93112
String addJar = cl.getOptionValue(OPTION_ADDJAR);
94113
if(StringUtils.isNotBlank(addJar)){
95-
properties.put(OPTION_ADDJAR, addJar);
114+
properties.setAddjar(addJar);
96115
}
97-
98116
String confProp = cl.getOptionValue(OPTION_CONF_PROP);
99117
if(StringUtils.isNotBlank(confProp)){
100-
properties.put(OPTION_CONF_PROP, confProp);
118+
properties.setConfProp(confProp);
101119
}
102-
103120
String flinkConfDir = cl.getOptionValue(OPTION_FLINK_CONF_DIR);
104121
if(StringUtils.isNotBlank(flinkConfDir)) {
105-
properties.put(OPTION_FLINK_CONF_DIR, flinkConfDir);
122+
properties.setFlinkconf(flinkConfDir);
106123
}
107124

108125
String yarnConfDir = cl.getOptionValue(OPTION_YARN_CONF_DIR);
109126
if(StringUtils.isNotBlank(yarnConfDir)) {
110-
properties.put(OPTION_YARN_CONF_DIR, yarnConfDir);
127+
properties.setYarnconf(yarnConfDir);
128+
}
129+
130+
String savePointPath = cl.getOptionValue(OPTION_SAVE_POINT_PATH);
131+
if(StringUtils.isNotBlank(savePointPath)) {
132+
properties.setSavePointPath(savePointPath);
133+
}
134+
135+
String allow_non = cl.getOptionValue(OPTION_ALLOW_NON_RESTORED_STATE);
136+
if(StringUtils.isNotBlank(allow_non)) {
137+
properties.setAllowNonRestoredState(allow_non);
111138
}
112139

113140
} catch (Exception e) {
114141
throw new RuntimeException(e);
115142
}
116-
117143
}
118144

119-
public Properties getProperties(){
145+
public LauncherOptions getLauncherOptions(){
120146
return properties;
121147
}
122148

123-
public Object getVal(String key){
124-
return properties.get(key);
125-
}
126-
127-
public List<String> getAllArgList(){
128-
List<String> args = Lists.newArrayList();
129-
for(Map.Entry<Object, Object> one : properties.entrySet()){
130-
args.add("-" + one.getKey().toString());
131-
args.add(one.getValue().toString());
132-
}
133-
134-
return args;
135-
}
136-
137-
public List<String> getProgramExeArgList(){
149+
public List<String> getProgramExeArgList() throws Exception {
150+
Map<String,Object> mapConf = PluginUtil.ObjectToMap(properties);
138151
List<String> args = Lists.newArrayList();
139-
for(Map.Entry<Object, Object> one : properties.entrySet()){
140-
String key = one.getKey().toString();
152+
for(Map.Entry<String, Object> one : mapConf.entrySet()){
153+
String key = one.getKey();
141154
if(OPTION_FLINK_CONF_DIR.equalsIgnoreCase(key)
142155
|| OPTION_YARN_CONF_DIR.equalsIgnoreCase(key)){
143156
continue;
144157
}
145-
146158
args.add("-" + key);
147159
args.add(one.getValue().toString());
148160
}
149-
150161
return args;
151162
}
152-
153163
}

0 commit comments

Comments
 (0)