KYLIN-5539 need close FSDataInputStream #2119
Open
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Proposed changes
1.初始发现:线上告警某节点存在大量的CLOSE_WAIT,通过 netstat -anp 发现来自于Kylin4 JobServer 进程,CLOSE_WAIT数达到9000多。并且 CLOSE_WAIT 来自的外部地址端口都是 50010,而该端口是 Hadoop DataNode 数据传输使用,故此怀疑是 JobServer在每次作业构建时 fileSystem.open() 一个流后没有进行close。
2.模拟复现:在研测环境提交cube构建任务,并观察 CLOSE_WAIT 数及增长情况,发现每次cube构建结束后,CLOSE_WAIT 数增加1,至此可以确定是JobServer代码中未关闭流导致。
3.定位代码:深入kylin4 构建代码进行debug,最终定位到 org.apache.kylin.engine.spark.utils.UpdateMetadataUtil#syncLocalMetadataToRemote 94行 (Apache Kylin main分支)
String resKey = toUpdateSeg.getStatisticsResourcePath();
String statisticsDir = config.getJobTmpDir(currentInstanceCopy.getProject()) + "/"
Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
if (fs.exists(statisticsFile)) {
FSDataInputStream is = fs.open(statisticsFile); //未关闭流
ResourceStore.getStore(config).putBigResource(resKey, is, System.currentTimeMillis());
}
CubeUpdate update = new CubeUpdate(currentInstanceCopy);
update.setCuboids(distCube.getCuboids());
List toRemoveSegs = Lists.newArrayList();
4. 研测验证:
Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
if (fs.exists(statisticsFile)) {
try (FSDataInputStream is = fs.open(statisticsFile)) { // 关闭流
ResourceStore.getStore(config).putBigResource(resKey, is, System.currentTimeMillis());
}
}
修改代码后,在研测环境进行多轮cube构建测试,CLOSE_WAIT 均无增加,验证解决。
Branch to commit
Types of changes
What types of changes does your code introduce to Kylin?
Put an
x
in the boxes that applyChecklist
Put an
x
in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.Further comments
If this is a relatively large or complex change, kick off the discussion at [email protected] or [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...