Skip to content

Commit fe960ae

Browse files
author
zzhenzeng
committed
fix multi-thead exception
1 parent 3245b78 commit fe960ae

4 files changed

Lines changed: 62 additions & 47 deletions

File tree

src/main/java/org/elasticflow/piper/Breaker.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
/**
66
* pipe end connect breaker control
7-
*
7+
* When a serious error occurs, temporarily interrupt the operation
88
* @author chengwen
99
* @version 1.0
1010
* @date 2019-01-24 10:55
@@ -13,13 +13,13 @@
1313
@NotThreadSafe
1414
public class Breaker {
1515

16-
private long earlyFail;
16+
private volatile long earlyFail;
1717

18-
private long checkFail;
18+
private volatile long checkFail;
1919

2020
private int failTimes;
2121

22-
private int failSpan = 3000;
22+
private int failSpan = 300;
2323

2424
private int recheckSpan = 60000;
2525

src/main/java/org/elasticflow/piper/PipePump.java

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,10 @@ private void processFlow(Task task, String mainName, String storeId, List<String
130130
try {
131131
task.setL2seq(L2seq);
132132
Resource.FLOW_INFOS.get(task.getInstance(), task.getJobType().name()).put(task.getInstance() + L2seq,
133-
"start count page...");
134-
getReader().lock.lock();
135-
ConcurrentLinkedDeque<String> pageList = getReader().getPageSplit(task,
136-
getInstanceConfig().getPipeParams().getReadPageSize());
137-
getReader().lock.unlock();
133+
"start count page...");
134+
ConcurrentLinkedDeque<String> pageList = this.getPageLists(task);
138135
if (pageList == null)
139-
throw new EFException("read data get page split exception!", ELEVEL.Termination);
136+
throw new EFException("Reader page split exception!", ELEVEL.Termination);
140137
processListsPages(task, writeTo, pageList, storeId);
141138

142139
} catch (EFException e) {
@@ -174,8 +171,8 @@ private void processListsPages(Task task, String writeTo, ConcurrentLinkedDeque<
174171
" no data!"));
175172
} else {
176173
log.info(Common.formatLog("start",
177-
getInstanceConfig().getPipeParams().isMultiThread() ? "MultiThread"
178-
: "SingleThread" + " Start " + task.getJobType().name(),
174+
(getInstanceConfig().getPipeParams().isMultiThread() ? "MultiThread"
175+
: "SingleThread") + " Start " + task.getJobType().name(),
179176
mainName, storeId, task.getL1seq(), 0, "",
180177
GlobalParam.SCAN_POSITION.get(mainName).getL2SeqPos(task.getL2seq()), 0, ",totalpage:" + pageNum));
181178

@@ -188,9 +185,9 @@ private void processListsPages(Task task, String writeTo, ConcurrentLinkedDeque<
188185
long start = Common.getNow();
189186
AtomicInteger total = new AtomicInteger(0);
190187
if (getInstanceConfig().getPipeParams().isMultiThread()) {
191-
final CountDownLatch synThreads = new CountDownLatch(estimateThreads(pageNum));
188+
CountDownLatch synThreads = new CountDownLatch(estimateThreads(pageNum));
192189
Resource.ThreadPools
193-
.submitJobPage(new Pump(synThreads, task, storeId, pageList, writeTo, computeModel, total));
190+
.submitJobPage(new PumpThread(synThreads, task, storeId, pageList, writeTo, computeModel, total));
194191
try {
195192
synThreads.await();
196193
} catch (Exception e) {
@@ -237,13 +234,8 @@ private void singleThread(Task task, String storeId, ConcurrentLinkedDeque<Strin
237234
if (Common.checkFlowStatus(task.getInstance(), task.getL1seq(), task.getJobType(), STATUS.Termination)) {
238235
break;
239236
} else {
240-
getReader().lock.lock();
241-
DataPage pagedata = (DataPage) CPU.RUN(
242-
getID(), "Pipe", "fetchPage", false, Page.getInstance(keyField, scanField, startId,
243-
dataBoundary, this.readHandler, getInstanceConfig().getWriteFields(), dataScanDSL),
244-
getReader());
245-
getReader().freeJobPage();
246-
getReader().lock.unlock();
237+
DataPage pagedata = this.getPageData(Page.getInstance(keyField, scanField, startId,
238+
dataBoundary, this.readHandler, getInstanceConfig().getWriteFields(), dataScanDSL));
247239
if (getInstanceConfig().openCompute()) {
248240
pagedata = (DataPage) CPU.RUN(getID(), "ML", computeModel, false, getID(), task.getJobType().name(),
249241
writeTo, pagedata);
@@ -289,7 +281,7 @@ int estimateThreads(int numJobs) {
289281
* @date 2019-01-11 10:45
290282
* @modify 2019-01-11 10:45
291283
*/
292-
public class Pump implements Runnable {
284+
public class PumpThread implements Runnable {
293285
long start = Common.getNow();
294286
final int pageSize;
295287
final String ID = CPU.getUUID();
@@ -306,7 +298,7 @@ public class Pump implements Runnable {
306298
ConcurrentLinkedDeque<String> pageList;
307299
boolean isUpdate = getInstanceConfig().getPipeParams().getWriteType().equals("increment") ? true : false;
308300

309-
public Pump(CountDownLatch synThreads, Task task, String storeId, ConcurrentLinkedDeque<String> pageList,
301+
public PumpThread(CountDownLatch synThreads, Task task, String storeId, ConcurrentLinkedDeque<String> pageList,
310302
String writeTo, String computeModel, AtomicInteger total) {
311303
this.pageList = pageList;
312304
this.writeTo = writeTo;
@@ -342,15 +334,10 @@ public void run() {
342334
Resource.ThreadPools.cleanWaitJob(getId());
343335
Common.LOG.warn(task.getInstance() + " " + task.getJobType().name() + " job has been Terminated!");
344336
break;
345-
} else {
346-
getReader().lock.lock();
347-
DataPage pagedata = (DataPage) CPU.RUN(getID(), "Pipe", "fetchPage", false,
348-
Page.getInstance(task.getScanParam().getKeyField(), task.getScanParam().getScanField(),
349-
startId, dataBoundary, readHandler, getInstanceConfig().getWriteFields(),
350-
dataScanDSL),
351-
getReader());
352-
getReader().freeJobPage();
353-
getReader().lock.unlock();
337+
} else {
338+
DataPage pagedata = getPageData(Page.getInstance(task.getScanParam().getKeyField(), task.getScanParam().getScanField(),
339+
startId, dataBoundary, readHandler, getInstanceConfig().getWriteFields(),
340+
dataScanDSL));
354341
if (getInstanceConfig().openCompute()) {
355342
pagedata = (DataPage) CPU.RUN(getID(), "ML", computeModel, false, getID(),
356343
task.getJobType().name(), writeTo, pagedata);
@@ -407,4 +394,35 @@ protected Integer compute() {
407394
}
408395

409396
}
397+
398+
//thread safe get page list
399+
private ConcurrentLinkedDeque<String> getPageLists(Task task) {
400+
ConcurrentLinkedDeque<String> pageList = null;
401+
getReader().lock.lock();
402+
try {
403+
pageList = getReader().getPageSplit(task,
404+
getInstanceConfig().getPipeParams().getReadPageSize());
405+
} catch (Exception e) {
406+
log.error("get Page lists Exception]", e);
407+
}finally {
408+
getReader().lock.unlock();
409+
}
410+
return pageList;
411+
}
412+
//thread safe get page data
413+
private DataPage getPageData(Page pager){
414+
getReader().lock.lock();
415+
DataPage pagedata=null;
416+
try {
417+
pagedata = (DataPage) CPU.RUN(
418+
getID(), "Pipe", "fetchPage", false, pager,
419+
getReader());
420+
getReader().freeJobPage();
421+
} catch (Exception e) {
422+
log.error("get Page Data Exception]", e);
423+
}finally {
424+
getReader().lock.unlock();
425+
}
426+
return pagedata;
427+
}
410428
}

src/main/java/org/elasticflow/util/Common.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -409,16 +409,16 @@ public static String formatLog(String types,String heads,String instanceName, St
409409
}
410410
switch (types) {
411411
case "complete":
412-
str.append(" docs:" + total);
413-
str.append(" position:" + update);
412+
str.append(" Docs:" + total);
413+
str.append(" scanAt:" + update);
414414
str.append(" useTime:" + useTimeFormat);
415415
break;
416416
case "start":
417-
str.append(" position:" + update);
417+
str.append(" scanAt:" + update);
418418
break;
419419
default:
420-
str.append(" docs:" + total+ (dataBoundary.length()<1 ? "" : " dataBoundary:" + dataBoundary)
421-
+ " position:" + update + " useTime:" + useTimeFormat);
420+
str.append(" Docs:" + total+ (total==0 || dataBoundary.length()<1 ? "" : " dataBoundary:" + dataBoundary)
421+
+ " scanAt:" + update + " useTime:" + useTimeFormat);
422422
break;
423423
}
424424
return str.append(moreinfo).toString();

src/main/java/org/elasticflow/yarn/ThreadPools.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919
*/
2020
public class ThreadPools {
2121

22-
private ArrayBlockingQueue<PipePump.Pump> waitJob = new ArrayBlockingQueue<>(GlobalParam.POOL_SIZE * 10);
22+
private ArrayBlockingQueue<PipePump.PumpThread> waitJob = new ArrayBlockingQueue<>(GlobalParam.POOL_SIZE * 10);
2323

2424
private int maxThreadNums = GlobalParam.POOL_SIZE;
2525

26-
ThreadPoolExecutor cachedThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
27-
30L, TimeUnit.SECONDS,
26+
ThreadPoolExecutor cachedThreadPool = new ThreadPoolExecutor(maxThreadNums, maxThreadNums,
27+
1L, TimeUnit.SECONDS,
2828
new SynchronousQueue<Runnable>());
2929

30-
public void submitJobPage(PipePump.Pump jobPage) {
30+
public void submitJobPage(PipePump.PumpThread jobPage) {
3131
try {
3232
waitJob.put(jobPage);
3333
} catch (Exception e) {
@@ -36,8 +36,8 @@ public void submitJobPage(PipePump.Pump jobPage) {
3636
}
3737

3838
public void cleanWaitJob(String id) {
39-
Iterator<PipePump.Pump> iter = waitJob.iterator();
40-
PipePump.Pump job;
39+
Iterator<PipePump.PumpThread> iter = waitJob.iterator();
40+
PipePump.PumpThread job;
4141
while(iter.hasNext()) {
4242
job = iter.next();
4343
if(job.getId().equals(id))
@@ -49,10 +49,7 @@ public void start() {
4949
new Thread(() -> {
5050
try {
5151
while(true) {
52-
PipePump.Pump job = waitJob.take();
53-
while(cachedThreadPool.getTaskCount()>=maxThreadNums) {
54-
Thread.sleep(900);
55-
}
52+
PipePump.PumpThread job = waitJob.take();
5653
for(int i=0;i<job.needThreads();i++)
5754
cachedThreadPool.execute(job);
5855
}

0 commit comments

Comments
 (0)