Skip to content

Commit 2bb0202

Browse files
committed
wip
1 parent 8541347 commit 2bb0202

File tree

6 files changed

+33
-46
lines changed

6 files changed

+33
-46
lines changed

initfiles/etc/DIR_NAME/environment.xml.in

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,7 @@
922922
daliServers="mydali"
923923
description="Thor process"
924924
fileCacheLimit="1800"
925+
globalMemorySize="200"
925926
heapRetainMemory="false"
926927
heapUseHugePages="false"
927928
heapUseTransparentHugePages="true"
@@ -935,7 +936,7 @@
935936
replicateAsync="false"
936937
replicateOutputs="false"
937938
slaveport="20100"
938-
slavesPerNode="1"
939+
slavesPerNode="2"
939940
watchdogEnabled="true"
940941
watchdogProgressEnabled="true">
941942
<SwapNode AutoSwapNode="false"/>
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#option('pickBestEngine', false);
2+
3+
numRecs := 10;
4+
rec := RECORD
5+
unsigned8 id;
6+
unsigned8 other;
7+
string984 rest := 'rest';
8+
string24 to1k := '1k';
9+
END;
10+
11+
ds := DATASET(numRecs, TRANSFORM(rec, SELF.id := HASH(COUNTER), SELF.other := COUNTER), DISTRIBUTED);
12+
13+
sd1 := SORT(NOFOLD(ds), id);
14+
sd2 := SORT(NOFOLD(ds), other);
15+
sd2p := PROJECT(sd2, TRANSFORM({sd2.id}, SELF := LEFT));
16+
17+
PARALLEL(
18+
OUTPUT(DEDUP(sd1, other, HASH));
19+
OUTPUT(DEDUP(sd2p, id, HASH));
20+
);

thorlcr/graph/thgraph.cpp

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2375,33 +2375,19 @@ void CGraphTempHandler::clearTemps()
23752375
tmpFiles.kill();
23762376
}
23772377

2378-
void CGraphTempHandler::getUsageStats(graph_id gid, offset_t & graphSpillSize)
2378+
offset_t CGraphTempHandler::getUsageStats()
23792379
{
23802380
CriticalBlock b(crit);
23812381
Owned<IFileUsageIterator> iter = getIterator();
2382-
graphSpillSize = 0;
2382+
offset_t activeSpillSize = 0;
23832383
ForEach(*iter)
23842384
{
23852385
CFileUsageEntry &entry = iter->query();
2386-
if (entry.queryGraphId() == gid)
2387-
graphSpillSize += entry.getSize();
2386+
activeSpillSize += entry.getSize();
23882387
}
2388+
return activeSpillSize;
23892389
}
23902390

2391-
void CGraphTempHandler::serializeUsageStats(MemoryBuffer &mb, graph_id gid)
2392-
{
2393-
offset_t graphSpillSize;
2394-
getUsageStats(gid, graphSpillSize);
2395-
mb.append(graphSpillSize);
2396-
}
2397-
2398-
void CGraphTempHandler::setUsageStats(CRuntimeStatisticCollection &rsc, graph_id gid)
2399-
{
2400-
offset_t graphSpillSize;
2401-
getUsageStats(gid, graphSpillSize);
2402-
rsc.setStatistic(StSizeActiveSpillFile, graphSpillSize);
2403-
rsc.setStatistic(StPeakSizeNodeSpillFile, graphSpillSize); // StatsMergeMax should track the peak
2404-
}
24052391
/////
24062392

24072393
class CGraphExecutor;

thorlcr/graph/thgraph.hpp

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,7 @@ interface IGraphTempHandler : extends IInterface
221221
virtual void deregisterFile(const char *name, bool kept=false) = 0;
222222
virtual void clearTemps() = 0;
223223
virtual IFileUsageIterator *getIterator() = 0;
224-
virtual void serializeUsageStats(MemoryBuffer &mb, graph_id gid) = 0;
225-
static void serializeNullUsageStats(MemoryBuffer &mb)
226-
{
227-
mb.append((offset_t)0);
228-
}
229-
virtual void setUsageStats(CRuntimeStatisticCollection &rsc, graph_id gid) = 0;
224+
virtual offset_t getUsageStats() = 0;
230225
};
231226

232227
class CGraphDependency : public CInterface
@@ -560,7 +555,6 @@ class graph_decl CGraphTempHandler : implements IGraphTempHandler, public CInter
560555
mutable CriticalSection crit;
561556
bool errorOnMissing;
562557

563-
void getUsageStats(graph_id gid, offset_t & graphSpillSize);
564558
public:
565559
IMPLEMENT_IINTERFACE;
566560

@@ -592,8 +586,7 @@ class graph_decl CGraphTempHandler : implements IGraphTempHandler, public CInter
592586
};
593587
return new CIterator(tmpFiles);
594588
}
595-
virtual void serializeUsageStats(MemoryBuffer &mb, graph_id gid) override;
596-
virtual void setUsageStats(CRuntimeStatisticCollection &rsc, graph_id gid) override;
589+
virtual offset_t getUsageStats() override;
597590
};
598591

599592
class graph_decl CGraphStub : public CInterface, implements IThorChildGraph

thorlcr/graph/thgraphmaster.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2722,11 +2722,6 @@ void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb)
27222722
sdMb.setBuffer(len, (void *)d);
27232723
act->slaveDone(node, sdMb);
27242724
}
2725-
offset_t nodeGraphSpill;
2726-
mb.read(nodeGraphSpill);
2727-
totalActiveSpillSize += nodeGraphSpill;
2728-
if (nodeGraphSpill>peakNodeSpillFile)
2729-
peakNodeSpillFile = nodeGraphSpill;
27302725
}
27312726

27322727
void CMasterGraph::getFinalProgress()

thorlcr/graph/thgraphslave.cpp

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,14 +1259,17 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
12591259

12601260
CRuntimeStatisticCollection stats(graphStatistics);
12611261
stats.setStatistic(StNumExecutions, numExecuted);
1262+
offset_t graphSpillSize = 0;
12621263
if (!owner)
1263-
queryJob().queryTempHandler()->setUsageStats(stats, gid);
1264+
graphSpillSize = queryJob().queryTempHandler()->getUsageStats();
12641265
else
12651266
{
12661267
IGraphTempHandler *tempHandler = queryTempHandler(false);
12671268
if (tempHandler)
1268-
tempHandler->setUsageStats(stats, gid);
1269+
graphSpillSize = tempHandler->getUsageStats();
12691270
}
1271+
1272+
stats.mergeStatistic(StPeakSizeNodeSpillFile, graphSpillSize);
12701273
stats.serialize(mb);
12711274

12721275
unsigned cPos = mb.length();
@@ -1340,17 +1343,6 @@ void CSlaveGraph::serializeDone(MemoryBuffer &mb)
13401343
}
13411344
}
13421345
mb.writeDirect(cPos, sizeof(count), &count);
1343-
1344-
if (!owner)
1345-
queryJob().queryTempHandler()->serializeUsageStats(mb, gid);
1346-
else
1347-
{
1348-
IGraphTempHandler *tempHandler = queryTempHandler(false);
1349-
if (tempHandler)
1350-
tempHandler->serializeUsageStats(mb, gid);
1351-
else
1352-
IGraphTempHandler::serializeNullUsageStats(mb);
1353-
}
13541346
}
13551347

13561348
void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)

0 commit comments

Comments
 (0)