Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support remove storage by hand or heartbeat #482

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ username=root
password=root

# 时序数据库列表,使用','分隔不同实例
storageEngineList=127.0.0.1#6667#iotdb11#username=root#password=root#sessionPoolSize=20#has_data=false#is_read_only=false
storageEngineList=127.0.0.1#6667#iotdb11#username=root#password=root#sessionPoolSize=50#has_data=false#is_read_only=false,127.0.0.1#6668#iotdb11#username=root#password=root#sessionPoolSize=50#has_data=false#is_read_only=false
#storageEngineList=127.0.0.1#8086#influxdb#url=http://localhost:8086/#token=your-token#organization=your-organization#has_data=false
#storageEngineList=127.0.0.1#4242#opentsdb#url=http://127.0.0.1
#storageEngineList=11.101.17.21#5432#timescaledb#username=postgres#password=123456
Expand Down Expand Up @@ -75,6 +75,8 @@ migrationPolicyClassName=cn.edu.tsinghua.iginx.migration.GreedyMigrationPolicy
# parquet是否为本地存储
isLocalParquetStorage=true

migration_thread_pool_size=20

##########################
### simple policy 策略配置
##########################
Expand Down Expand Up @@ -184,3 +186,28 @@ batchSize=50
transformTaskThreadPoolSize=10
# Transform最大重试次数
transformMaxRetryTimes=3

##########################
### 容错相关
##########################

# 是否开启容错
enable_storage_heartbeat=false

# 存储节点心跳包间隔
storage_heartbeat_interval=10s

# 存储节点单个心跳包最大重试次数
storage_heartbeat_max_retry_times=2

# 存储节点心跳包超时时间
storage_heartbeat_timeout=1s

# 存储节点宕机后重连的时间间隔
storage_retry_connect_interval=30s

# 存储心跳包检测调度线程池大小
storage_heartbeat_threshold_pool_size=20

# 尝试重连概率
storage_restore_heartbeat_probability=0.2
11 changes: 8 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,22 @@
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>iginx-thrift</artifactId>
<version>${project.version}</version>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>iginx-shared</artifactId>
<version>${project.version}</version>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>iginx-session</artifactId>
<version>${project.version}</version>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>cn.edu.tsinghua</groupId>
<artifactId>iginx-sync</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
Expand Down
31 changes: 30 additions & 1 deletion core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import cn.edu.tsinghua.iginx.engine.physical.PhysicalEngineImpl;
import cn.edu.tsinghua.iginx.engine.physical.storage.StorageManager;
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
import cn.edu.tsinghua.iginx.exceptions.StatusCode;
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
import cn.edu.tsinghua.iginx.metadata.IMetaManager;
import cn.edu.tsinghua.iginx.metadata.entity.*;
import cn.edu.tsinghua.iginx.migration.MigrationManager;
import cn.edu.tsinghua.iginx.migration.storage.StorageMigrationExecutor;
import cn.edu.tsinghua.iginx.utils.JsonUtils;
import cn.edu.tsinghua.iginx.resource.QueryResourceManager;
import cn.edu.tsinghua.iginx.thrift.*;
Expand All @@ -44,7 +47,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -182,6 +184,33 @@ public QueryDataResp queryData(QueryDataReq req) {
return ctx.getResult().getQueryDataResp();
}

@Override
public Status removeStorageEngine(RemoveStorageEngineReq req) {
if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) {
return RpcUtils.ACCESS_DENY;
}
long storageId = req.getStorageId();
StorageEngineMeta storageEngine = metaManager.getStorageEngine(storageId);
if (storageEngine == null) {
Status status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("storage engine is not exists.");
return status;
}
try {
if (StorageMigrationExecutor.getInstance().migration(storageId, req.sync)) {
return RpcUtils.SUCCESS;
}
Status status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("unexpected error during storage migration");
return status;
} catch (Exception e) {
logger.error("unexpected error during storage migration: ", e);
Status status = new Status(StatusCode.STATEMENT_EXECUTION_ERROR.getStatusCode());
status.setMessage("unexpected error during storage migration: " + e.getMessage());
return status;
}
}

@Override
public Status addStorageEngines(AddStorageEnginesReq req) {
if (!sessionManager.checkSession(req.getSessionId(), AuthType.Cluster)) {
Expand Down
105 changes: 105 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,47 @@ public class Config {

private boolean isLocalParquetStorage = true;

////////////////////
// 容错相关配置 //
////////////////////

/**
* 是否开启存储活性检测
*/
private boolean enableStorageHeartbeat = false;

/**
* 存储节点心跳包间隔,单位是 ms,如果为 0 表示不检测存储节点活性,默认为 10s
*/
private long storageHeartbeatInterval = 10000;

/**
* 存储节点单个心跳包尝试发送的最大重试次数
*/
private int storageHeartbeatMaxRetryTimes = 5;

/**
* 存储节点单个心跳包超时时间,单位 ms,默认为 1s
*/
private long storageHeartbeatTimeout = 1000;

/**
* 存储节点宕机后重连的时间间隔,单位 ms,默认为 50s
*/
private long storageRetryConnectInterval = 50000;

/**
* 存储节点心跳包线程池大小,默认为 10
*/
private int storageHeartbeatThresholdPoolSize = 10;

/**
* 存储宕机后尝试重连概率,默认为0.05
*/
private double storageRestoreHeartbeatProbability = 0.05;

private int migrationThreadPoolSize = 20;

public int getMaxTimeseriesLength() {
return maxTimeseriesLength;
}
Expand Down Expand Up @@ -712,4 +753,68 @@ public boolean isLocalParquetStorage() {
public void setLocalParquetStorage(boolean localParquetStorage) {
isLocalParquetStorage = localParquetStorage;
}

public boolean isEnableStorageHeartbeat() {
return enableStorageHeartbeat;
}

public void setEnableStorageHeartbeat(boolean enableStorageHeartbeat) {
this.enableStorageHeartbeat = enableStorageHeartbeat;
}

public long getStorageHeartbeatInterval() {
return storageHeartbeatInterval;
}

public void setStorageHeartbeatInterval(long storageHeartbeatInterval) {
this.storageHeartbeatInterval = storageHeartbeatInterval;
}

public int getStorageHeartbeatMaxRetryTimes() {
return storageHeartbeatMaxRetryTimes;
}

public void setStorageHeartbeatMaxRetryTimes(int storageHeartbeatMaxRetryTimes) {
this.storageHeartbeatMaxRetryTimes = storageHeartbeatMaxRetryTimes;
}

public long getStorageHeartbeatTimeout() {
return storageHeartbeatTimeout;
}

public void setStorageHeartbeatTimeout(long storageHeartbeatTimeout) {
this.storageHeartbeatTimeout = storageHeartbeatTimeout;
}

public long getStorageRetryConnectInterval() {
return storageRetryConnectInterval;
}

public void setStorageRetryConnectInterval(long storageRetryConnectInterval) {
this.storageRetryConnectInterval = storageRetryConnectInterval;
}

public int getStorageHeartbeatThresholdPoolSize() {
return storageHeartbeatThresholdPoolSize;
}

public void setStorageHeartbeatThresholdPoolSize(int storageHeartbeatThresholdPoolSize) {
this.storageHeartbeatThresholdPoolSize = storageHeartbeatThresholdPoolSize;
}

public double getStorageRestoreHeartbeatProbability() {
return storageRestoreHeartbeatProbability;
}

public void setStorageRestoreHeartbeatProbability(double storageRestoreHeartbeatProbability) {
this.storageRestoreHeartbeatProbability = storageRestoreHeartbeatProbability;
}

public int getMigrationThreadPoolSize() {
return migrationThreadPoolSize;
}

public void setMigrationThreadPoolSize(int migrationThreadPoolSize) {
this.migrationThreadPoolSize = migrationThreadPoolSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ private void loadPropsFromFile() {
config.setHistoricalPrefixList(properties.getProperty("historicalPrefixList", ""));
config.setExpectedStorageUnitNum(Integer.parseInt(properties.getProperty("expectedStorageUnitNum", "0")));
config.setLocalParquetStorage(Boolean.parseBoolean(properties.getProperty("isLocalParquetStorage", "true")));

// 容错相关
config.setEnableStorageHeartbeat(Boolean.parseBoolean(properties.getProperty("enable_storage_heartbeat", "false")));
config.setStorageHeartbeatInterval(ConfigUtils.parseTime(properties.getProperty("storage_heartbeat_interval", "10s")));
config.setStorageHeartbeatMaxRetryTimes(Integer.parseInt(properties.getProperty("storage_heartbeat_max_retry_times", "5")));
config.setStorageHeartbeatTimeout(ConfigUtils.parseTime(properties.getProperty("storage_heartbeat_timeout", "1s")));
config.setStorageRetryConnectInterval(ConfigUtils.parseTime(properties.getProperty("storage_retry_connect_interval", "50s")));
config.setStorageHeartbeatThresholdPoolSize(Integer.parseInt(properties.getProperty("storage_heartbeat_threshold_pool_size", "10")));
config.setStorageRestoreHeartbeatProbability(Double.parseDouble(properties.getProperty("storage_restore_heartbeat_probability", "0.05")));
config.setMigrationThreadPoolSize(Integer.parseInt(properties.getProperty("migration_thread_pool_size", "20")));
} catch (IOException e) {
logger.error("Fail to load properties: ", e);
}
Expand Down Expand Up @@ -205,6 +215,15 @@ private void loadPropsFromEnv() {
config.setHistoricalPrefixList(EnvUtils.loadEnv("historicalPrefixList", config.getHistoricalPrefixList()));
config.setExpectedStorageUnitNum(EnvUtils.loadEnv("expectedStorageUnitNum", config.getExpectedStorageUnitNum()));
config.setLocalParquetStorage(EnvUtils.loadEnv("isLocalParquetStorage", config.isLocalParquetStorage()));

// 容错相关
config.setEnableStorageHeartbeat(EnvUtils.loadEnv("enable_storage_heartbeat", config.isEnableStorageHeartbeat()));
config.setStorageHeartbeatInterval(ConfigUtils.parseTime(EnvUtils.loadEnv("storage_heartbeat_interval", ConfigUtils.toTimeString(config.getStorageHeartbeatInterval()))));
config.setStorageHeartbeatMaxRetryTimes(EnvUtils.loadEnv("storage_heartbeat_max_retry_times", config.getStorageHeartbeatMaxRetryTimes()));
config.setStorageHeartbeatTimeout(ConfigUtils.parseTime(EnvUtils.loadEnv("storage_heartbeat_timeout", ConfigUtils.toTimeString(config.getStorageHeartbeatTimeout()))));
config.setStorageRetryConnectInterval(ConfigUtils.parseTime(EnvUtils.loadEnv("storage_retry_connect_interval", ConfigUtils.toTimeString(config.getStorageRetryConnectInterval()))));
config.setStorageHeartbeatThresholdPoolSize(EnvUtils.loadEnv("storageHeartbeatThresholdPoolSize", config.getStorageHeartbeatThresholdPoolSize()));
config.setStorageRestoreHeartbeatProbability(EnvUtils.loadEnv("storageRestoreHeartbeatProbability", config.getStorageRestoreHeartbeatProbability()));
}

private void loadUDFListFromFile() {
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cn.edu.tsinghua.iginx.conf;

public class ConfigUtils {

private static final String[] suffixList = new String[] {
"ms", "s", "m", "h", "min", "hour", "day"
};

private static final long[] factors = new long[] {
1, 1000, 1000 * 60, 1000 * 60 * 60, 1000 * 60, 1000 * 60 * 60, 1000 * 60 * 60 * 24
};

/**
* 将表示时间的字符串以毫秒的形式解析。
* 支持使用 ms,s,m,h 作为后缀,不含后缀的情况下默认为 ms
* @param s 表示时间的字符串
* @return 字符串表示的
*/
public static long parseTime(String s) {
long factor = 1;
int suffixLen = 0;
for (int i = 0; i < suffixList.length; i++) {
if (s.endsWith(suffixList[i])) {
factor = factors[i];
suffixLen = suffixList[i].length();
break;
}
}
s = s.substring(0, s.length() - suffixLen);
long value = Long.parseLong(s);
return value * factor;
}

public static String toTimeString(long time) {
return Long.toString(time) + "ms";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import cn.edu.tsinghua.iginx.engine.physical.optimizer.ReplicaDispatcher;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.metadata.DefaultMetaManager;
import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitMeta;
import cn.edu.tsinghua.iginx.metadata.entity.StorageUnitState;

public class NaiveReplicaDispatcher implements ReplicaDispatcher {

Expand All @@ -33,7 +36,12 @@ public String chooseReplica(StoragePhysicalTask task) {
if (task == null) {
return null;
}
return task.getTargetFragment().getMasterStorageUnitId();
String masterStorageUnitId = task.getTargetFragment().getMasterStorageUnitId();
StorageUnitMeta masterStorageUnit = DefaultMetaManager.getInstance().getStorageUnit(masterStorageUnitId);
if (masterStorageUnit.getState() == StorageUnitState.DISCARD) {
return masterStorageUnit.getMigrationTo();
}
return masterStorageUnitId;
}

public static NaiveReplicaDispatcher getInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Timeseries;
import cn.edu.tsinghua.iginx.engine.physical.storage.fault_tolerance.Connector;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.metadata.entity.TimeInterval;
Expand All @@ -31,6 +32,8 @@

public interface IStorage {

Connector getConnector();

TaskExecuteResult execute(StoragePhysicalTask task);

List<Timeseries> getTimeSeries() throws PhysicalException;
Expand Down
Loading