@@ -193,15 +193,15 @@ public class BaseTransform<Meta extends ITransformMeta, Data extends ITransformD
193193 /** the rowset for the error rows */
194194 private IRowSet errorRowSet ;
195195
196- private AtomicBoolean running ;
196+ private final AtomicBoolean running ;
197197
198- private AtomicBoolean stopped ;
198+ private final AtomicBoolean stopped ;
199199
200200 protected AtomicBoolean safeStopped ;
201201
202202 private AtomicBoolean paused ;
203203
204- private boolean init ;
204+ private final boolean init ;
205205
206206 /** the copy number of this thread */
207207 private int copyNr ;
@@ -282,13 +282,13 @@ public class BaseTransform<Meta extends ITransformMeta, Data extends ITransformD
282282 * The upper buffer size boundary after which we manage the thread priority a little bit to
283283 * prevent excessive locking
284284 */
285- private int upperBufferBoundary ;
285+ private final int upperBufferBoundary ;
286286
287287 /**
288288 * The lower buffer size boundary after which we manage the thread priority a little bit to
289289 * prevent excessive locking
290290 */
291- private int lowerBufferBoundary ;
291+ private final int lowerBufferBoundary ;
292292
293293 /** maximum number of errors to allow */
294294 private Long maxErrors = -1L ;
@@ -316,7 +316,7 @@ public class BaseTransform<Meta extends ITransformMeta, Data extends ITransformD
316316 */
317317 private IRowHandler rowHandler ;
318318
319- private AtomicBoolean markStopped ;
319+ private final AtomicBoolean markStopped ;
320320
321321 /**
322322 * This is the base transform that forms that basis for all transforms. You can derive from this
@@ -381,8 +381,8 @@ public BaseTransform(
381381 linesOutput = 0L ;
382382 }
383383
384- inputRowSets = null ;
385- outputRowSets = null ;
384+ inputRowSets = new ArrayList <>() ;
385+ outputRowSets = new ArrayList <>() ;
386386 nextTransforms = null ;
387387
388388 terminator = transformMeta .hasTerminator ();
@@ -418,8 +418,15 @@ public BaseTransform(
418418
419419 dispatch ();
420420
421- upperBufferBoundary = (int ) (pipeline .getRowSetSize () * 0.99 );
422- lowerBufferBoundary = (int ) (pipeline .getRowSetSize () * 0.01 );
421+ if (pipeline != null ) {
422+ upperBufferBoundary = (int ) (pipeline .getRowSetSize () * 0.99 );
423+ lowerBufferBoundary = (int ) (pipeline .getRowSetSize () * 0.01 );
424+ } else {
425+ upperBufferBoundary = 100 ;
426+ lowerBufferBoundary = 10 ;
427+ }
428+
429+ setInternalVariables ();
423430 }
424431
425432 @ Override
@@ -441,7 +448,7 @@ public boolean init() {
441448 .getPartitionSchema ()
442449 .calculatePartitionIds (this );
443450
444- if (partitionIdList .size () > 0 ) {
451+ if (! partitionIdList .isEmpty () ) {
445452 String partitionId = partitionIdList .get (partitionNr );
446453 setVariable (Const .INTERNAL_VARIABLE_TRANSFORM_PARTITION_ID , partitionId );
447454 } else {
@@ -1023,13 +1030,19 @@ private void handlePutRow(IRowMeta rowMeta, Object[] row) throws HopTransformExc
10231030 // Are we running yet? If not, wait a bit until all threads have been
10241031 // started.
10251032 //
1026- if (this .checkPipelineRunning == false ) {
1033+ if (!this .checkPipelineRunning ) {
1034+ int counter = 0 ;
10271035 while (!pipeline .isRunning () && !stopped .get ()) {
10281036 try {
1029- Thread .sleep (1 );
1037+ Thread .sleep (1000 );
1038+ counter ++;
10301039 } catch (InterruptedException e ) {
10311040 // Ignore
10321041 }
1042+ // wait 3s max
1043+ if (counter >= 3 ) {
1044+ break ;
1045+ }
10331046 }
10341047 this .checkPipelineRunning = true ;
10351048 }
@@ -1100,7 +1113,7 @@ private void specialPartitioning(IRowMeta rowMeta, Object[] row) throws HopTrans
11001113 // This is the case for non-clustered partitioning...
11011114 //
11021115 List <TransformMeta > nextTransforms = pipelineMeta .findNextTransforms (transformMeta );
1103- if (nextTransforms .size () > 0 ) {
1116+ if (! nextTransforms .isEmpty () ) {
11041117 nextTransformPartitioningMeta = nextTransforms .get (0 ).getTransformPartitioningMeta ();
11051118 }
11061119
@@ -1287,11 +1300,9 @@ public void handlePutRowTo(IRowMeta rowMeta, Object[] row, IRowSet rowSet)
12871300 }
12881301 }
12891302
1290- // call all row listeners...
1291- //
1292- for (IRowListener listener : rowListeners ) {
1293- listener .rowWrittenEvent (rowMeta , row );
1294- }
1303+ // Do not call the row listeners for targeted rows.
1304+ // It can cause rows with varying layouts to arrive at the same listener without a way to keep
1305+ // them apart.
12951306
12961307 // Keep adding to terminator_rows buffer...
12971308 if (terminator && terminatorRows != null ) {
@@ -1479,7 +1490,7 @@ protected void waitUntilPipelineIsStarted() {
14791490 // Are we running yet? If not, wait a bit until all threads have been
14801491 // started.
14811492 //
1482- if (this .checkPipelineRunning == false ) {
1493+ if (! this .checkPipelineRunning ) {
14831494 while (!pipeline .isRunning () && !stopped .get ()) {
14841495 try {
14851496 Thread .sleep (1 );
@@ -1556,6 +1567,7 @@ private Object[] handleGetRow() throws HopException {
15561567 row = inputRowSet .getRowImmediate ();
15571568 }
15581569 if (row != null ) {
1570+ obtainInputRowMeta (row , inputRowSet );
15591571 incrementLinesRead ();
15601572 }
15611573 } else {
@@ -1567,11 +1579,23 @@ private Object[] handleGetRow() throws HopException {
15671579 // The buffer to grow beyond "a few" entries.
15681580 // We'll only do that if the previous transform has not ended...
15691581
1570- if (!inputRowSet .isDone () && inputRowSet .size () <= lowerBufferBoundary && !isStopped ()) {
1571- try {
1572- Thread .sleep (0 , 1 );
1573- } catch (InterruptedException e ) {
1574- // Ignore sleep interruption exception
1582+ if (!isStopped () && row == null ) {
1583+ boolean streamReady = false ;
1584+ // Check each other input stream to see that stream meets the threshold
1585+ for (int r = 0 ; r < inputRowSets .size (); r ++) {
1586+ if (inputRowSet .isDone () || inputRowSet .size () > lowerBufferBoundary ) {
1587+ streamReady = true ;
1588+ break ;
1589+ }
1590+ nextInputStream ();
1591+ inputRowSet = currentInputStream ();
1592+ }
1593+ if (!streamReady ) {
1594+ try {
1595+ Thread .sleep (0 , 1 ); // Minimum sleeps vary by OS scheduler, this could be 1ms or more
1596+ } catch (InterruptedException e ) {
1597+ // Ignore sleep interruption exception
1598+ }
15751599 }
15761600 }
15771601
@@ -1588,7 +1612,7 @@ private Object[] handleGetRow() throws HopException {
15881612 // We can use timeouts to switch from one to another...
15891613 //
15901614 if (waitingTime == null ) {
1591- waitingTime = DynamicWaitTimes .build (inputRowSets , this ::getCurrentInputRowSetNr );
1615+ waitingTime = DynamicWaitTimes .build (inputRowSets , this ::getCurrentInputRowSetNr , 20 );
15921616 }
15931617 while (row == null && !isStopped ()) {
15941618 // Get a row from the input in row set ...
@@ -1598,6 +1622,7 @@ private Object[] handleGetRow() throws HopException {
15981622 row = inputRowSet .getRowWait (waitingTime .get (), TimeUnit .MILLISECONDS );
15991623 boolean timeout = false ;
16001624 if (row != null ) {
1625+ obtainInputRowMeta (row , inputRowSet );
16011626 incrementLinesRead ();
16021627 blockPointer ++;
16031628 waitingTime .reset ();
@@ -1632,6 +1657,7 @@ private Object[] handleGetRow() throws HopException {
16321657 inputRowSetsLock .writeLock ().unlock ();
16331658 }
16341659 } else {
1660+ obtainInputRowMeta (row , inputRowSet );
16351661 incrementLinesRead ();
16361662 }
16371663 } else {
@@ -1660,17 +1686,12 @@ private Object[] handleGetRow() throws HopException {
16601686 nextInputStream ();
16611687 inputRowSet = currentInputStream ();
16621688 row = getRowFrom (inputRowSet );
1689+ obtainInputRowMeta (row , inputRowSet );
16631690 }
16641691 } finally {
16651692 inputRowSetsLock .readLock ().unlock ();
16661693 }
16671694
1668- // Also set the meta data on the first occurrence.
1669- // or if prevTransforms.length > 1 inputRowMeta can be changed
1670- if (inputRowMeta == null || prevTransforms .length > 1 ) {
1671- inputRowMeta = inputRowSet .getRowMeta ();
1672- }
1673-
16741695 if (row != null ) {
16751696 // OK, before we return the row, let's see if we need to check on mixing
16761697 // row compositions...
@@ -1690,6 +1711,54 @@ private Object[] handleGetRow() throws HopException {
16901711 return row ;
16911712 }
16921713
1714+ /**
1715+ * The first non-null row we get we'll lock in the row metadata. For scenarios with multiple
1716+ * inputs, we move the metadata around (e.g. Merge Rows).
1717+ *
1718+ * @param row The input row (not null!)
1719+ * @param inputRowSet The row set we're reading from right now
1720+ */
1721+ private void obtainInputRowMeta (Object [] row , IRowSet inputRowSet ) {
1722+ if (row == null ) {
1723+ return ;
1724+ }
1725+
1726+ // Set the row metadata on the first occurrence.
1727+ // If prevTransforms.length > 1, inputRowMeta can be changed as well.
1728+ //
1729+ if (inputRowMeta == null || prevTransforms .length > 1 ) {
1730+ inputRowMeta = inputRowSet .getRowMeta ();
1731+ }
1732+
1733+ // Extra sanity check
1734+ //
1735+ if (inputRowMeta == null ) {
1736+ int nr = 0 ;
1737+ for (IRowSet rowSet : inputRowSets ) {
1738+ log .logMinimal (
1739+ "===> Input row set #"
1740+ + nr
1741+ + ", done? "
1742+ + rowSet .isDone ()
1743+ + ", size="
1744+ + rowSet .size ()
1745+ + ", metadata? "
1746+ + (rowSet .getRowMeta () != null ));
1747+ nr ++;
1748+ }
1749+ log .logMinimal ("===> Current input row set nr=" + currentInputRowSetNr );
1750+
1751+ throw new RuntimeException (
1752+ "No row metadata obtained for row "
1753+ + Arrays .toString (row )
1754+ + Const .CR
1755+ + "inputRowSet.getRowMeta()="
1756+ + inputRowSet .getRowMeta ()
1757+ + ", inputRowSets.size()="
1758+ + inputRowSets .size ());
1759+ }
1760+ }
1761+
16931762 /**
16941763 * IRowHandler controls how getRow/putRow are handled. The default IRowHandler will simply call
16951764 * {@link #handleGetRow()} and {@link #handlePutRow(IRowMeta, Object[])}
@@ -1853,7 +1922,7 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
18531922 // Have all threads started?
18541923 // Are we running yet? If not, wait a bit until all threads have been
18551924 // started.
1856- if (this .checkPipelineRunning == false ) {
1925+ if (! this .checkPipelineRunning ) {
18571926 while (!pipeline .isRunning () && !stopped .get ()) {
18581927 try {
18591928 Thread .sleep (1 );
@@ -1869,11 +1938,23 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
18691938 // The buffer to grow beyond "a few" entries.
18701939 // We'll only do that if the previous transform has not ended...
18711940
1872- if (!rowSet .isDone () && rowSet .size () <= lowerBufferBoundary && !isStopped ()) {
1873- try {
1874- Thread .sleep (0 , 1 );
1875- } catch (InterruptedException e ) {
1876- // Ignore sleep interruption exception
1941+ if (!isStopped ()) {
1942+ boolean streamReady = false ;
1943+ // Check each other input stream to see that stream meets the threshold
1944+ for (int r = 0 ; r < inputRowSets .size (); r ++) {
1945+ if (rowSet .isDone () || rowSet .size () > lowerBufferBoundary ) {
1946+ streamReady = true ;
1947+ break ;
1948+ }
1949+ nextInputStream ();
1950+ rowSet = currentInputStream ();
1951+ }
1952+ if (!streamReady ) {
1953+ try {
1954+ Thread .sleep (0 , 1 ); // Minimum sleeps vary by OS scheduler, this could be 1ms or more
1955+ } catch (InterruptedException e ) {
1956+ // Ignore sleep interruption exception
1957+ }
18771958 }
18781959 }
18791960
@@ -1907,7 +1988,7 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
19071988 rowData = rowSet .getRow ();
19081989 if (rowData == null ) {
19091990 if (waitingTime == null ) {
1910- waitingTime = DynamicWaitTimes .build (inputRowSets , this ::getCurrentInputRowSetNr );
1991+ waitingTime = DynamicWaitTimes .build (inputRowSets , this ::getCurrentInputRowSetNr , 20 );
19111992 }
19121993 // Must release the read lock before acquisition of the write lock to prevent deadlocks.
19131994 //
@@ -2170,9 +2251,6 @@ public void dispatch() {
21702251 inputRowSetsLock .writeLock ().lock ();
21712252 outputRowSetsLock .writeLock ().lock ();
21722253 try {
2173- inputRowSets = new ArrayList <>();
2174- outputRowSets = new ArrayList <>();
2175-
21762254 errorRowSet = null ;
21772255 prevTransforms = new TransformMeta [nrInput ];
21782256 nextTransforms = new TransformMeta [nrOutput ];
@@ -2723,15 +2801,15 @@ public synchronized void markStop() {
27232801 Calendar cal = Calendar .getInstance ();
27242802 stopTime = cal .getTime ();
27252803
2804+ // We're finally completely done with this transform.
2805+ //
2806+ setRunning (false );
2807+
27262808 // Here we are completely done with the pipeline.
27272809 // Call all the attached listeners and notify the outside world that the transform has
27282810 // finished.
27292811 //
27302812 fireTransformFinishedListeners ();
2731-
2732- // We're finally completely done with this transform.
2733- //
2734- setRunning (false );
27352813 }
27362814 }
27372815
0 commit comments