Skip to content

Commit 4edcfb0

Browse files
DLPX-94056 HOP: Performance Tuning causing Unit Test case to break (#8)
PR URL: https://www.github.com/delphix/hop/pull/8
1 parent 1ccade5 commit 4edcfb0

File tree

2 files changed

+13
-37
lines changed

2 files changed

+13
-37
lines changed

engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1579,23 +1579,11 @@ private Object[] handleGetRow() throws HopException {
15791579
// The buffer to grow beyond "a few" entries.
15801580
// We'll only do that if the previous transform has not ended...
15811581

1582-
if (!isStopped() && row == null) {
1583-
boolean streamReady = false;
1584-
// Check each other input stream to see that stream meets the threshold
1585-
for (int r = 0; r < inputRowSets.size(); r++) {
1586-
if (inputRowSet.isDone() || inputRowSet.size() > lowerBufferBoundary) {
1587-
streamReady = true;
1588-
break;
1589-
}
1590-
nextInputStream();
1591-
inputRowSet = currentInputStream();
1592-
}
1593-
if (!streamReady) {
1594-
try {
1595-
Thread.sleep(0, 1); // Minimum sleeps vary by OS scheduler, this could be 1ms or more
1596-
} catch (InterruptedException e) {
1597-
// Ignore sleep interruption exception
1598-
}
1582+
if (!inputRowSet.isDone() && inputRowSet.size() <= lowerBufferBoundary && !isStopped()) {
1583+
try {
1584+
Thread.sleep(0, 1);
1585+
} catch (InterruptedException e) {
1586+
// Ignore sleep interruption exception
15991587
}
16001588
}
16011589

@@ -1938,23 +1926,11 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
19381926
// The buffer to grow beyond "a few" entries.
19391927
// We'll only do that if the previous transform has not ended...
19401928

1941-
if (!isStopped()) {
1942-
boolean streamReady = false;
1943-
// Check each other input stream to see that stream meets the threshold
1944-
for (int r = 0; r < inputRowSets.size(); r++) {
1945-
if (rowSet.isDone() || rowSet.size() > lowerBufferBoundary) {
1946-
streamReady = true;
1947-
break;
1948-
}
1949-
nextInputStream();
1950-
rowSet = currentInputStream();
1951-
}
1952-
if (!streamReady) {
1953-
try {
1954-
Thread.sleep(0, 1); // Minimum sleeps vary by OS scheduler, this could be 1ms or more
1955-
} catch (InterruptedException e) {
1956-
// Ignore sleep interruption exception
1957-
}
1929+
if (!rowSet.isDone() && rowSet.size() <= lowerBufferBoundary && !isStopped()) {
1930+
try {
1931+
Thread.sleep(0, 1);
1932+
} catch (InterruptedException e) {
1933+
// Ignore sleep interruption exception
19581934
}
19591935
}
19601936

engine/src/test/java/org/apache/hop/pipeline/transform/DynamicWaitTimesTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,21 @@ public class DynamicWaitTimesTest extends TestCase {
3434

3535
public void testSingleStreamStatus() {
3636
IRowSet rowSet = new BlockingRowSet(3);
37-
status = DynamicWaitTimes.build(Collections.singletonList(rowSet), () -> 0);
37+
status = DynamicWaitTimes.build(Collections.singletonList(rowSet), () -> 0, 20);
3838
assertEquals(1, status.get());
3939
status.adjust(true, rowSet);
4040
assertEquals(2, status.get());
4141
for (int i = 0; i < 10; i++) {
4242
status.adjust(true, rowSet);
4343
}
44-
assertEquals(DynamicWaitTimes.MAX_TIMEOUT, status.get());
44+
assertEquals(20, status.get());
4545
}
4646

4747
public void testMultiStreamStatus() {
4848
List<IRowSet> rowSetList =
4949
new ArrayList<>(
5050
Arrays.asList(new BlockingRowSet(1), new BlockingRowSet(2), new BlockingRowSet(7)));
51-
status = DynamicWaitTimes.build(rowSetList, () -> activeStreamIndex.get());
51+
status = DynamicWaitTimes.build(rowSetList, () -> activeStreamIndex.get(), 20);
5252
for (IRowSet iRowSet : rowSetList) {
5353
status.adjust(false, iRowSet);
5454
assertEquals(1, status.get());

0 commit comments

Comments
 (0)