-
Notifications
You must be signed in to change notification settings - Fork 8
/
hivefdw.py
63 lines (50 loc) · 2.24 KB
/
hivefdw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from multicorn import ForeignDataWrapper
from multicorn.utils import log_to_postgres, ERROR, WARNING, DEBUG
from hive_service import ThriftHive
#from hive_service.ttypes import HiveServerException
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
class HiveForeignDataWrapper(ForeignDataWrapper):
"""
Hive FDW for PostgreSQL
"""
def __init__(self, options, columns):
super(HiveForeignDataWrapper, self).__init__(options, columns)
if 'host' not in options:
log_to_postgres('The host parameter is required and the default is localhost.', WARNING)
self.host = options.get("host", "localhost")
if 'port' not in options:
log_to_postgres('The host parameter is required and the default is 10000.', WARNING)
self.port = options.get("port", "10000")
if 'table' and 'query' not in options:
log_to_postgres('table or query parameter is required.', ERROR)
self.table = options.get("table", None)
self.query = options.get("query", None)
self.columns = columns
def execute(self, quals, columns):
if self.query:
statement = self.query
else:
statement = "SELECT " + ",".join(self.columns.keys()) + " FROM " + self.table
log_to_postgres('Hive query: ' + unicode(statement), DEBUG)
try:
transport = TSocket.TSocket(self.host, self.port)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = ThriftHive.Client(protocol)
transport.open()
client.execute(statement)
for row in client.fetchAll():
line = {}
cols = row.split("\t");
idx = 0
for column_name in self.columns:
line[column_name] = cols[idx]
idx = idx + 1
yield line
except Thrift.TException, tx:
log_to_postgres(tx.message, ERROR)
finally:
transport.close()