Skip to content

Commit bb504e0

Browse files
Merge pull request #625 from cloudsufi/cherry-pick/GA_Fixes
[🍒][PLUGIN-1925] : Fix OOM issue by adding SafeBigDecimalSplitter and Prevent auto security injection on TCPS URLs
2 parents 69c9716 + e1e36e0 commit bb504e0

File tree

4 files changed

+173
-6
lines changed

4 files changed

+173
-6
lines changed

database-commons/src/main/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormat.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2929
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
3030
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
31+
import org.apache.hadoop.mapreduce.lib.db.DBSplitter;
3132
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
3233
import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
3334
import org.slf4j.Logger;
@@ -39,6 +40,7 @@
3940
import java.sql.DriverManager;
4041
import java.sql.SQLException;
4142
import java.sql.Statement;
43+
import java.sql.Types;
4244
import java.util.Properties;
4345

4446
/**
@@ -128,6 +130,15 @@ public Connection createConnection() {
128130
return getConnection();
129131
}
130132

133+
@Override
134+
protected DBSplitter getSplitter(int sqlDataType) {
135+
// Use SafeBigDecimalSplitter for columns having high precision decimal or numeric columns
136+
if (sqlDataType == Types.NUMERIC || sqlDataType == Types.DECIMAL) {
137+
return new SafeBigDecimalSplitter();
138+
}
139+
return super.getSplitter(sqlDataType);
140+
}
141+
131142
@Override
132143
public RecordReader createDBRecordReader(DBInputSplit split, Configuration conf) throws IOException {
133144
final RecordReader dbRecordReader = super.createDBRecordReader(split, conf);
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.source;
18+
19+
import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
20+
import java.math.BigDecimal;
21+
import java.math.RoundingMode;
22+
23+
/**
24+
* Safe implementation of {@link BigDecimalSplitter} to ensure precise division of BigDecimal values while calculating
25+
* split points for NUMERIC and DECIMAL types.
26+
*
27+
* <p>Problem: The default {@link BigDecimalSplitter} implementation may return 0 when the numerator is smaller than the
28+
* denominator (e.g., 1 / 4 = 0), due to the lack of a defined scale for division. Since the result (0) is smaller than
29+
* {@link BigDecimalSplitter#MIN_INCREMENT} (i.e. {@code 10000 * Double.MIN_VALUE}), the split size defaults to
30+
* {@code MIN_INCREMENT}, leading to an excessive number of splits (~10M) and potential OOM errors.</p>
31+
*
32+
* <p>Fix: This implementation derives scale from column metadata, adds a buffer of 5 decimal places, and uses
33+
* {@link RoundingMode#HALF_UP} as the rounding mode.</p
34+
*
35+
* <p>Note: This class is used by {@link DataDrivenETLDBInputFormat}.</p>
36+
*/
37+
public class SafeBigDecimalSplitter extends BigDecimalSplitter {
38+
39+
/* An additional buffer of +5 digits is applied to preserve accuracy during division. */
40+
public static final int SCALE_BUFFER = 5;
41+
/**
42+
* Performs safe division with correct scale handling.
43+
*
44+
* @param numerator the dividend (BigDecimal)
45+
* @param denominator the divisor (BigDecimal)
46+
* @return quotient with derived scale
47+
* @throws ArithmeticException if denominator is zero
48+
*/
49+
@Override
50+
protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
51+
// Determine the required scale for the division and add a buffer to ensure accuracy
52+
int effectiveScale = Math.max(numerator.scale(), denominator.scale()) + SCALE_BUFFER;
53+
return numerator.divide(denominator, effectiveScale, RoundingMode.HALF_UP);
54+
}
55+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.db.source;
18+
19+
import org.apache.hadoop.conf.Configuration;
20+
import org.apache.hadoop.mapreduce.InputSplit;
21+
import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
22+
import org.junit.Test;
23+
24+
import java.math.BigDecimal;
25+
import java.sql.ResultSet;
26+
import java.sql.SQLException;
27+
import java.util.List;
28+
29+
import static org.junit.Assert.assertEquals;
30+
import static org.junit.Assert.assertThrows;
31+
import static org.junit.Assert.assertTrue;
32+
import static org.mockito.Mockito.mock;
33+
import static org.mockito.Mockito.when;
34+
35+
/**
36+
* Test class for {@link SafeBigDecimalSplitter}
37+
*/
38+
public class SafeBigDecimalSplitterTest {
39+
private final SafeBigDecimalSplitter splitter = new SafeBigDecimalSplitter();
40+
41+
@Test
42+
public void testSmallRangeDivision() {
43+
BigDecimal result = splitter.tryDivide(BigDecimal.ONE, new BigDecimal("4"));
44+
assertEquals(new BigDecimal("0.25000"), result);
45+
}
46+
47+
@Test
48+
public void testLargePrecision() {
49+
BigDecimal numerator = new BigDecimal("1.0000000000000000001");
50+
BigDecimal denominator = new BigDecimal("3");
51+
BigDecimal result = splitter.tryDivide(numerator, denominator);
52+
assertTrue(result.compareTo(BigDecimal.ZERO) > 0);
53+
}
54+
55+
@Test
56+
public void testDivisionByZero() {
57+
assertThrows(ArithmeticException.class, () ->
58+
splitter.tryDivide(BigDecimal.ONE, BigDecimal.ZERO));
59+
}
60+
61+
@Test
62+
public void testDivisionWithZeroNumerator() {
63+
// when minVal == maxVal
64+
BigDecimal result = splitter.tryDivide(BigDecimal.ZERO, BigDecimal.ONE);
65+
assertEquals(0, result.compareTo(BigDecimal.ZERO));
66+
}
67+
68+
@Test
69+
public void testSplits() throws SQLException {
70+
BigDecimal minVal = BigDecimal.valueOf(1);
71+
BigDecimal maxVal = BigDecimal.valueOf(2);
72+
int numSplits = 4;
73+
ResultSet resultSet = mock(ResultSet.class);
74+
Configuration conf = mock(Configuration.class);
75+
when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
76+
when(resultSet.getBigDecimal(1)).thenReturn(minVal);
77+
when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
78+
BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
79+
List<InputSplit> actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
80+
assertEquals(numSplits, actualSplits.size());
81+
}
82+
83+
@Test
84+
public void testSplitsWithMinValueEqualToMaxValue() throws SQLException {
85+
// when minVal == maxVal
86+
BigDecimal minVal = BigDecimal.valueOf(1);
87+
BigDecimal maxVal = BigDecimal.valueOf(1);
88+
int numSplits = 1;
89+
ResultSet resultSet = mock(ResultSet.class);
90+
Configuration conf = mock(Configuration.class);
91+
when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
92+
when(resultSet.getBigDecimal(1)).thenReturn(minVal);
93+
when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
94+
BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
95+
List<InputSplit> actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
96+
assertEquals(numSplits, actualSplits.size());
97+
}
98+
}

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConstants.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ private OracleConstants() {
2929
public static final String PLUGIN_NAME = "Oracle";
3030
public static final String ORACLE_CONNECTION_STRING_SID_FORMAT = "jdbc:oracle:thin:@%s:%s:%s";
3131
public static final String ORACLE_CONNECTION_STRING_SERVICE_NAME_FORMAT = "jdbc:oracle:thin:@//%s:%s/%s";
32-
// Connection formats to accept protocol (e.g., jdbc:oracle:thin:@<protocol>://<host>:<port>/<SID>)
33-
public static final String ORACLE_CONNECTION_STRING_SID_FORMAT_WITH_PROTOCOL = "jdbc:oracle:thin:@%s:%s:%s/%s";
34-
public static final String ORACLE_CONNECTION_STRING_SERVICE_NAME_FORMAT_WITH_PROTOCOL =
35-
"jdbc:oracle:thin:@%s://%s:%s/%s";
32+
// Connection formats using TNS DESCRIPTOR to accept protocol
33+
public static final String ORACLE_SERVICE_NAME_FORMAT_TNS_DESCRIPTOR_WITH_PROTOCOL =
34+
"jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=%s)(HOST=%s)(PORT=%s))(CONNECT_DATA=(SERVICE_NAME=%s)))";
35+
public static final String ORACLE_SID_FORMAT_TNS_DESCRIPTOR_WITH_PROTOCOL =
36+
"jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=%s)(HOST=%s)(PORT=%s))(CONNECT_DATA=(SID=%s)))";
3637
public static final String ORACLE_CONNECTION_STRING_TNS_FORMAT = "jdbc:oracle:thin:@%s";
3738
public static final String DEFAULT_BATCH_VALUE = "defaultBatchValue";
3839
public static final String DEFAULT_ROW_PREFETCH = "defaultRowPrefetch";
@@ -102,7 +103,8 @@ private static String getConnectionStringWithService(@Nullable String host,
102103
boolean isSSLEnabled) {
103104
// Choose the appropriate format based on whether SSL is enabled.
104105
if (isSSLEnabled) {
105-
return String.format(OracleConstants.ORACLE_CONNECTION_STRING_SERVICE_NAME_FORMAT_WITH_PROTOCOL,
106+
// Use the TNS descriptor format for TCPS to prevent automatic security injection.
107+
return String.format(ORACLE_SERVICE_NAME_FORMAT_TNS_DESCRIPTOR_WITH_PROTOCOL,
106108
connectionProtocol, host, port, database);
107109
}
108110
return String.format(OracleConstants.ORACLE_CONNECTION_STRING_SERVICE_NAME_FORMAT,
@@ -126,7 +128,8 @@ private static String getConnectionStringWithSID(@Nullable String host,
126128
boolean isSSLEnabled) {
127129
// Choose the appropriate format based on whether SSL is enabled.
128130
if (isSSLEnabled) {
129-
return String.format(OracleConstants.ORACLE_CONNECTION_STRING_SID_FORMAT_WITH_PROTOCOL,
131+
// Use the TNS descriptor format for TCPS to prevent automatic security injection.
132+
return String.format(ORACLE_SID_FORMAT_TNS_DESCRIPTOR_WITH_PROTOCOL,
130133
connectionProtocol, host, port, database);
131134
}
132135
return String.format(OracleConstants.ORACLE_CONNECTION_STRING_SID_FORMAT,

0 commit comments

Comments
 (0)