Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support multi snowflake instance in one project #139

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.base.Preconditions;
import com.sankuai.inf.leaf.IDGen;
import com.sankuai.inf.leaf.common.PropertyFactory;
import com.sankuai.inf.leaf.common.Result;
import com.sankuai.inf.leaf.common.Status;
import com.sankuai.inf.leaf.common.Utils;
Expand Down Expand Up @@ -30,22 +31,24 @@ public boolean init() {
private long sequence = 0L;
private long lastTimestamp = -1L;
private static final Random RANDOM = new Random();
private final String snowflakeName;

public SnowflakeIDGenImpl(String zkAddress, int port) {
//Thu Nov 04 2010 09:42:54 GMT+0800 (中国标准时间)
this(zkAddress, port, 1288834974657L);
this(zkAddress, port, 1288834974657L, PropertyFactory.getProperties().getProperty("leaf.name"));
}

/**
* @param zkAddress zk地址
* @param port snowflake监听端口
* @param twepoch 起始的时间戳
*/
public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch) {
public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch, String snowflakeName) {
this.twepoch = twepoch;
this.snowflakeName = snowflakeName;
Preconditions.checkArgument(timeGen() > twepoch, "Snowflake not support twepoch gt currentTime");
final String ip = Utils.getIp();
SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress);
SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress, snowflakeName);
LOGGER.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", twepoch, ip, zkAddress, port);
boolean initFlag = holder.init();
if (initFlag) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.slf4j.LoggerFactory;

import com.google.common.collect.Maps;
import com.sankuai.inf.leaf.common.*;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.zookeeper.CreateMode;
Expand All @@ -31,39 +30,44 @@ public class SnowflakeZookeeperHolder {
private String zk_AddressNode = null;//保存自身的key ip:port-000000001
private String listenAddress = null;//保存自身的key ip:port
private int workerID;
private static final String PREFIX_ZK_PATH = "/snowflake/" + PropertyFactory.getProperties().getProperty("leaf.name");
private static final String PROP_PATH = System.getProperty("java.io.tmpdir") + File.separator + PropertyFactory.getProperties().getProperty("leaf.name") + "/leafconf/{port}/workerID.properties";
private static final String PATH_FOREVER = PREFIX_ZK_PATH + "/forever";//保存所有数据持久的节点
private String ip;
private String port;
private String connectionString;
private final String prefixZkPath;
private final String propPath;
private final String pathForever;
private final String ip;
private final String port;
private final String connectionString;
private final String snowflakeName;
private long lastUpdateTime;

public SnowflakeZookeeperHolder(String ip, String port, String connectionString) {
public SnowflakeZookeeperHolder(String ip, String port, String connectionString, String snowflakeName) {
this.ip = ip;
this.port = port;
this.listenAddress = ip + ":" + port;
this.connectionString = connectionString;
this.snowflakeName = snowflakeName;
prefixZkPath = "/snowflake/" + snowflakeName;
propPath = System.getProperty("java.io.tmpdir") + File.separator + snowflakeName + "/leafconf/{port}/workerID.properties";
pathForever = prefixZkPath + "/forever";//保存所有数据持久的节点
}

public boolean init() {
try {
CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);
curator.start();
Stat stat = curator.checkExists().forPath(PATH_FOREVER);
Stat stat = curator.checkExists().forPath(pathForever);
if (stat == null) {
//不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据
zk_AddressNode = createNode(curator);
//worker id 默认是0
updateLocalWorkerID(workerID);
//定时上报本机时间给forever节点
ScheduledUploadData(curator, zk_AddressNode);
scheduledUploadData(curator, zk_AddressNode);
return true;
} else {
Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001
Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001)
//存在根节点,先检查是否有属于自己的根节点
List<String> keys = curator.getChildren().forPath(PATH_FOREVER);
List<String> keys = curator.getChildren().forPath(pathForever);
for (String key : keys) {
String[] nodeKey = key.split("-");
realNode.put(nodeKey[0], key);
Expand All @@ -72,7 +76,7 @@ public boolean init() {
Integer workerid = nodeMap.get(listenAddress);
if (workerid != null) {
//有自己的节点,zk_AddressNode=ip:port
zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);
zk_AddressNode = pathForever + "/" + realNode.get(listenAddress);
workerID = workerid;//启动worder时使用会使用
if (!checkInitTimeStamp(curator, zk_AddressNode)) {
throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
Expand All @@ -96,7 +100,7 @@ public boolean init() {
LOGGER.error("Start node ERROR {}", e);
try {
Properties properties = new Properties();
properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
properties.load(new FileInputStream(new File(propPath.replace("{port}", port + ""))));
workerID = Integer.valueOf(properties.getProperty("workerID"));
LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
} catch (Exception e1) {
Expand All @@ -108,14 +112,14 @@ public boolean init() {
}

private void doService(CuratorFramework curator) {
ScheduledUploadData(curator, zk_AddressNode);// /snowflake_forever/ip:port-000000001
scheduledUploadData(curator, zk_AddressNode);// /snowflake_forever/ip:port-000000001
}

private void ScheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {
private void scheduledUploadData(final CuratorFramework curator, final String zk_AddressNode) {
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "schedule-upload-time");
Thread thread = new Thread(r, snowflakeName + "-schedule-upload-time");
thread.setDaemon(true);
return thread;
}
Expand Down Expand Up @@ -144,7 +148,7 @@ private boolean checkInitTimeStamp(CuratorFramework curator, String zk_AddressNo
*/
private String createNode(CuratorFramework curator) throws Exception {
try {
return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(PATH_FOREVER + "/" + listenAddress + "-", buildData().getBytes());
return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(pathForever + "/" + listenAddress + "-", buildData().getBytes());
} catch (Exception e) {
LOGGER.error("create node error msg {} ", e.getMessage());
throw e;
Expand Down Expand Up @@ -187,7 +191,7 @@ private Endpoint deBuildData(String json) throws IOException {
* @param workerID
*/
private void updateLocalWorkerID(int workerID) {
File leafConfFile = new File(PROP_PATH.replace("{port}", port));
File leafConfFile = new File(propPath.replace("{port}", port));
boolean exists = leafConfFile.exists();
LOGGER.info("file exists status is {}", exists);
if (exists) {
Expand Down