13
13
*/
14
14
package com .facebook .presto .tests .hive ;
15
15
16
+ import com .facebook .presto .tests .ImmutableTpchTablesRequirements .ImmutableLineItemTable ;
16
17
import com .google .common .base .MoreObjects ;
17
18
import com .google .common .base .Throwables ;
18
19
import com .google .common .collect .ImmutableMap ;
19
20
import com .teradata .tempto .ProductTest ;
21
+ import com .teradata .tempto .Requires ;
20
22
import com .teradata .tempto .assertions .QueryAssert .Row ;
21
23
import com .teradata .tempto .query .QueryResult ;
22
24
import org .testng .annotations .DataProvider ;
23
25
import org .testng .annotations .Test ;
24
26
25
27
import java .sql .Connection ;
26
28
import java .sql .SQLException ;
29
+ import java .sql .Statement ;
27
30
import java .util .List ;
28
31
import java .util .Map ;
29
32
33
+ import static com .facebook .presto .tests .TestGroups .BIG_QUERY ;
30
34
import static com .facebook .presto .tests .TestGroups .STORAGE_FORMATS ;
31
35
import static com .facebook .presto .tests .utils .JdbcDriverUtils .setSessionProperty ;
36
+ import static com .facebook .presto .tests .utils .QueryExecutors .onHive ;
32
37
import static com .facebook .presto .util .ImmutableCollectors .toImmutableList ;
33
38
import static com .teradata .tempto .assertions .QueryAssert .Row .row ;
34
39
import static com .teradata .tempto .assertions .QueryAssert .assertThat ;
@@ -41,6 +46,7 @@ public class TestHiveStorageFormats
41
46
extends ProductTest
42
47
{
43
48
private static final String TPCH_SCHEMA = "tiny" ;
49
+ private static final String TEST_TPCH_LINIETEM = "tpch." + TPCH_SCHEMA + ".lineitem" ;
44
50
45
51
@ DataProvider (name = "storage_formats" )
46
52
public static Object [][] storageFormats ()
@@ -97,7 +103,7 @@ public void testInsertIntoTable(StorageFormat storageFormat)
97
103
query (insertInto );
98
104
99
105
// SELECT FROM TABLE
100
- assertSelect ("select sum(tax), sum(discount), sum(linenumber) from %s" , tableName );
106
+ assertSelect ("select sum(tax), sum(discount), sum(linenumber) from %s" , tableName , TEST_TPCH_LINIETEM );
101
107
102
108
// DROP TABLE
103
109
query (format ("DROP TABLE %s" , tableName ));
@@ -125,7 +131,7 @@ public void testCreateTableAs(StorageFormat storageFormat)
125
131
query (createTableAsSelect );
126
132
127
133
// SELECT FROM TABLE
128
- assertSelect ("select sum(extendedprice), sum(suppkey), count(partkey) from %s" , tableName );
134
+ assertSelect ("select sum(extendedprice), sum(suppkey), count(partkey) from %s" , tableName , TEST_TPCH_LINIETEM );
129
135
130
136
// DROP TABLE
131
137
query (format ("DROP TABLE %s" , tableName ));
@@ -171,7 +177,7 @@ public void testInsertIntoPartitionedTable(StorageFormat storageFormat)
171
177
query (insertInto );
172
178
173
179
// SELECT FROM TABLE
174
- assertSelect ("select sum(tax), sum(discount), sum(length(returnflag)) from %s" , tableName );
180
+ assertSelect ("select sum(tax), sum(discount), sum(length(returnflag)) from %s" , tableName , TEST_TPCH_LINIETEM );
175
181
176
182
// DROP TABLE
177
183
query (format ("DROP TABLE %s" , tableName ));
@@ -199,15 +205,72 @@ public void testCreatePartitionedTableAs(StorageFormat storageFormat)
199
205
query (createTableAsSelect );
200
206
201
207
// SELECT FROM TABLE
202
- assertSelect ("select sum(tax), sum(discount), sum(length(returnflag)) from %s" , tableName );
208
+ assertSelect ("select sum(tax), sum(discount), sum(length(returnflag)) from %s" , tableName , TEST_TPCH_LINIETEM );
203
209
204
210
// DROP TABLE
205
211
query (format ("DROP TABLE %s" , tableName ));
206
212
}
207
213
208
- private static void assertSelect (String query , String tableName )
214
+ @ Requires (ImmutableLineItemTable .class )
215
+ @ Test (groups = {STORAGE_FORMATS , BIG_QUERY })
216
+ public void testSelectFromPartitionedBzipTable () throws Exception
209
217
{
210
- QueryResult expected = query (format (query , "tpch." + TPCH_SCHEMA + ".lineitem" ));
218
+ // This test is marked as "big_query" because INSERT OVERWRITE TABLE is very slow, but that
219
+ // is the only way to get bzip tables in Hive.
220
+
221
+ String tableName = "storage_formats_test_select_partitioned_bzip" ;
222
+ query (format ("DROP TABLE IF EXISTS %s" , tableName ));
223
+
224
+ // The BZIP part of the table comes from the configs that are set during insert
225
+ String createTable = format (
226
+ "CREATE TABLE %s(" +
227
+ " l_orderkey BIGINT," +
228
+ " l_partkey BIGINT," +
229
+ " l_suppkey BIGINT," +
230
+ " l_linenumber INT," +
231
+ " l_quantity DOUBLE," +
232
+ " l_extendedprice DOUBLE," +
233
+ " l_discount DOUBLE," +
234
+ " l_tax DOUBLE," +
235
+ " l_linestatus VARCHAR(1)," +
236
+ " l_shipinstruct VARCHAR(25)," +
237
+ " l_shipmode VARCHAR(10)," +
238
+ " l_comment VARCHAR(44)" +
239
+ ") PARTITIONED BY (l_returnflag VARCHAR(1)) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE" ,
240
+ tableName );
241
+ onHive ().executeQuery (createTable );
242
+
243
+ try {
244
+ String insertInto = format (
245
+ "INSERT OVERWRITE TABLE %s PARTITION(l_returnflag) " +
246
+ "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, " +
247
+ "l_linestatus, l_shipinstruct, l_shipmode, l_comment, l_returnflag " +
248
+ "FROM default.lineitem" , tableName );
249
+ Statement statement = onHive ().getConnection ().createStatement ();
250
+ setHiveConfigsForBzipInsert (statement );
251
+ statement .execute (insertInto );
252
+ statement .close ();
253
+
254
+ assertSelect ("select sum(l_tax), sum(l_discount), sum(length(l_returnflag)) from %s" , tableName , "hive.default.lineitem" );
255
+ }
256
+ finally {
257
+ query (format ("DROP TABLE %s" , tableName ));
258
+ }
259
+ }
260
+
261
+ private void setHiveConfigsForBzipInsert (Statement statement )
262
+ throws SQLException
263
+ {
264
+ statement .execute ("SET hive.exec.compress.output=true;" );
265
+ statement .execute ("SET mapreduce.output.fileoutputformat.compress=true;" );
266
+ statement .execute ("SET mapred.output.compress=true" );
267
+ statement .execute ("SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codec" );
268
+ statement .execute ("SET hive.exec.dynamic.partition.mode=nonstrict;" );
269
+ }
270
+
271
+ private static void assertSelect (String query , String tableName , String expectedTable )
272
+ {
273
+ QueryResult expected = query (format (query , expectedTable ));
211
274
List <Row > expectedRows = expected .rows ().stream ()
212
275
.map ((columns ) -> row (columns .toArray ()))
213
276
.collect (toImmutableList ());
0 commit comments