Skip to content

Commit 1c914a1

Browse files
mgaido91Marcelo Vanzin
authored and
Marcelo Vanzin
committed
[LIVY-572] Avoid usage of spark classes in ColumnBuffer
## What changes were proposed in this pull request? The `ColumnBuffers` can be created both inside spark jobs and in the Livy server. The latter case happens when operation logs are returned and fails before this patch because we are using Spark classes in this code after the refactor in LIVY-503. Unfortunately, we do not have test coverage for operation logs retrieval and this is the reason why this wasn't spot out earlier. Since operation logs are retrieved by beeline for each query, this means that every query run through beeline fails, unless `livy.server.thrift.logging.operation.enabled` is set to `false`. ## How was this patch tested? manual tests using beeline Author: Marco Gaido <[email protected]> Closes apache#162 from mgaido91/LIVY-572.
1 parent 07d216d commit 1c914a1

File tree

2 files changed

+76
-67
lines changed

2 files changed

+76
-67
lines changed

thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ColumnBuffer.java

+1-66
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,12 @@
1616
*/
1717
package org.apache.livy.thriftserver.session;
1818

19-
import java.math.BigDecimal;
2019
import java.nio.ByteBuffer;
2120
import java.util.ArrayList;
2221
import java.util.Arrays;
2322
import java.util.BitSet;
24-
import java.util.Iterator;
2523
import java.util.List;
26-
import java.util.Spliterator;
27-
import java.util.Spliterators;
28-
import java.util.concurrent.atomic.AtomicInteger;
29-
import java.util.stream.Collectors;
3024
import java.util.stream.Stream;
31-
import java.util.stream.StreamSupport;
32-
33-
import scala.Tuple2;
34-
import scala.collection.Map;
35-
import scala.collection.Seq;
36-
37-
import org.apache.spark.sql.Row;
38-
import org.apache.spark.sql.types.StructField;
3925

4026
/**
4127
* Container for the contents of a single column in a result set.
@@ -180,7 +166,7 @@ public void add(Object value) {
180166
buffers[currentSize] = (byte[]) value;
181167
break;
182168
case STRING:
183-
strings[currentSize] = toHiveString(value, false);
169+
strings[currentSize] = (String) value;
184170
break;
185171
}
186172

@@ -264,57 +250,6 @@ private void setNull(int index) {
264250
nulls[byteIdx] = (byte) (nulls[byteIdx] | (1 << bitIdx));
265251
}
266252

267-
/**
268-
* Converts a value from a Spark dataset into a string that looks like what Hive would
269-
* generate. Because Spark generates rows that contain Scala types for non-primitive
270-
* columns, this code depends on Scala and is thus succeptible to binary compatibility
271-
* changes in the Scala libraries.
272-
*
273-
* The supported types are described in Spark's SQL programming guide, in the table
274-
* listing the mapping of SQL types to Scala types.
275-
*
276-
* @param value The object to stringify.
277-
* @param quoteStrings Whether to wrap String instances in quotes.
278-
*/
279-
private String toHiveString(Object value, boolean quoteStrings) {
280-
if (quoteStrings && value instanceof String) {
281-
return "\"" + value + "\"";
282-
} else if (value instanceof BigDecimal) {
283-
return ((BigDecimal) value).stripTrailingZeros().toString();
284-
} else if (value instanceof Map) {
285-
return stream(new ScalaIterator<>(((Map<?,?>) value).iterator()))
286-
.map(o -> toHiveString(o, true))
287-
.sorted()
288-
.collect(Collectors.joining(",", "{", "}"));
289-
} else if (value instanceof Seq) {
290-
return stream(new ScalaIterator<>(((Seq<?>) value).iterator()))
291-
.map(o -> toHiveString(o, true))
292-
.collect(Collectors.joining(",", "[", "]"));
293-
} else if (value instanceof Tuple2) {
294-
Tuple2 t = (Tuple2) value;
295-
return String.format("%s:%s", toHiveString(t._1(), true), toHiveString(t._2(), true));
296-
} else if (value instanceof Row) {
297-
Row r = (Row) value;
298-
final StructField[] fields = r.schema().fields();
299-
final AtomicInteger idx = new AtomicInteger();
300-
301-
return stream(new ScalaIterator<>(r.toSeq().iterator()))
302-
.map(o -> {
303-
String fname = fields[idx.getAndIncrement()].name();
304-
String fval = toHiveString(o, true);
305-
return String.format("\"%s\":%s", fname, fval);
306-
})
307-
.collect(Collectors.joining(",", "{", "}"));
308-
} else {
309-
return value.toString();
310-
}
311-
}
312-
313-
private Stream<?> stream(Iterator<?> it) {
314-
return StreamSupport.stream(
315-
Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
316-
}
317-
318253
private void ensureCapacity() {
319254
int nextSize = (currentSize + DEFAULT_SIZE);
320255
nextSize = nextSize - (nextSize % DEFAULT_SIZE);

thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/ResultSet.java

+75-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,22 @@
1616
*/
1717
package org.apache.livy.thriftserver.session;
1818

19+
import java.math.BigDecimal;
20+
import java.util.Iterator;
21+
import java.util.Spliterator;
22+
import java.util.Spliterators;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.stream.Collectors;
25+
import java.util.stream.Stream;
26+
import java.util.stream.StreamSupport;
27+
28+
import scala.Tuple2;
29+
import scala.collection.Map;
30+
import scala.collection.Seq;
31+
32+
import org.apache.spark.sql.Row;
33+
import org.apache.spark.sql.types.StructField;
34+
1935
/**
2036
* Utility class used for transferring results from the Spark application to the Livy server.
2137
*/
@@ -43,7 +59,13 @@ public void addRow(Object[] fields) {
4359
}
4460

4561
for (int i = 0; i < fields.length; i++) {
46-
columns[i].add(fields[i]);
62+
Object value;
63+
if (columns[i].getType() == DataType.STRING) {
64+
value = toHiveString(fields[i], false);
65+
} else {
66+
value = fields[i];
67+
}
68+
columns[i].add(value);
4769
}
4870
}
4971

@@ -55,4 +77,56 @@ public ColumnBuffer[] getColumns() {
5577
return columns;
5678
}
5779

80+
/**
81+
* Converts a value from a Spark dataset into a string that looks like what Hive would
82+
* generate. Because Spark generates rows that contain Scala types for non-primitive
83+
* columns, this code depends on Scala and is thus susceptible to binary compatibility
84+
* changes in the Scala libraries.
85+
*
86+
* The supported types are described in Spark's SQL programming guide, in the table
87+
* listing the mapping of SQL types to Scala types.
88+
*
89+
* @param value The object to stringify.
90+
* @param quoteStrings Whether to wrap String instances in quotes.
91+
*/
92+
private String toHiveString(Object value, boolean quoteStrings) {
93+
if (value == null) {
94+
return null;
95+
} else if (quoteStrings && value instanceof String) {
96+
return "\"" + value + "\"";
97+
} else if (value instanceof BigDecimal) {
98+
return ((BigDecimal) value).stripTrailingZeros().toString();
99+
} else if (value instanceof Map) {
100+
return stream(new ScalaIterator<>(((Map<?,?>) value).iterator()))
101+
.map(o -> toHiveString(o, true))
102+
.sorted()
103+
.collect(Collectors.joining(",", "{", "}"));
104+
} else if (value instanceof Seq) {
105+
return stream(new ScalaIterator<>(((Seq<?>) value).iterator()))
106+
.map(o -> toHiveString(o, true))
107+
.collect(Collectors.joining(",", "[", "]"));
108+
} else if (value instanceof Tuple2) {
109+
Tuple2 t = (Tuple2) value;
110+
return String.format("%s:%s", toHiveString(t._1(), true), toHiveString(t._2(), true));
111+
} else if (value instanceof Row) {
112+
Row r = (Row) value;
113+
final StructField[] fields = r.schema().fields();
114+
final AtomicInteger idx = new AtomicInteger();
115+
116+
return stream(new ScalaIterator<>(r.toSeq().iterator()))
117+
.map(o -> {
118+
String fname = fields[idx.getAndIncrement()].name();
119+
String fval = toHiveString(o, true);
120+
return String.format("\"%s\":%s", fname, fval);
121+
})
122+
.collect(Collectors.joining(",", "{", "}"));
123+
} else {
124+
return value.toString();
125+
}
126+
}
127+
128+
private Stream<?> stream(Iterator<?> it) {
129+
return StreamSupport.stream(
130+
Spliterators.spliteratorUnknownSize(it, Spliterator.ORDERED), false);
131+
}
58132
}

0 commit comments

Comments
 (0)