1
- import json
2
1
from . import err
3
2
from .cursors import Cursor
4
3
from . import converters
4
+ from ..state import sqlitelike as chdb_stateful
5
5
6
6
DEBUG = False
7
7
VERBOSE = False
10
10
class Connection (object ):
11
11
"""
12
12
Representation of a connection with chdb.
13
-
14
- The proper way to get an instance of this class is to call
15
- connect().
16
-
17
- Accepts several arguments:
18
-
19
- :param cursorclass: Custom cursor class to use.
20
- :param path: Optional folder path to store database files on disk.
21
-
22
- See `Connection <https://www.python.org/dev/peps/pep-0249/#connection-objects>`_ in the
23
- specification.
24
13
"""
25
14
26
- _closed = False
27
- _session = None
28
-
29
- def __init__ (self , cursorclass = Cursor , path = None ):
30
-
31
- self ._resp = None
32
-
33
- # 1. pre-process params in init
34
- self .encoding = 'utf8'
35
-
36
- self .cursorclass = cursorclass
37
-
38
- self ._result = None
15
+ def __init__ (self , path = None ):
16
+ self ._closed = False
17
+ self .encoding = "utf8"
39
18
self ._affected_rows = 0
19
+ self ._resp = None
40
20
41
- self .connect (path )
21
+ # Initialize sqlitelike connection
22
+ connection_string = ":memory:" if path is None else f"file:{ path } "
23
+ self ._conn = chdb_stateful .Connection (connection_string )
42
24
43
- def connect (self , path = None ):
44
- from chdb import session as chs
45
- self ._session = chs .Session (path )
46
- self ._closed = False
47
- self ._execute_command ("select 1;" )
48
- self ._read_query_result ()
25
+ # Test connection with a simple query
26
+ cursor = self ._conn .cursor ()
27
+ cursor .execute ("SELECT 1" )
28
+ cursor .close ()
49
29
50
30
def close (self ):
51
- """
52
- Send the quit message and close the socket.
53
-
54
- See `Connection.close() <https://www.python.org/dev/peps/pep-0249/#Connection.close>`_
55
- in the specification.
56
-
57
- :raise Error: If the connection is already closed.
58
- """
31
+ """Send the quit message and close the socket."""
59
32
if self ._closed :
60
33
raise err .Error ("Already closed" )
61
34
self ._closed = True
62
- self ._session = None
35
+ self ._conn . close ()
63
36
64
37
@property
65
38
def open (self ):
66
39
"""Return True if the connection is open"""
67
40
return not self ._closed
68
41
69
42
def commit (self ):
70
- """
71
- Commit changes to stable storage.
72
-
73
- See `Connection.commit() <https://www.python.org/dev/peps/pep-0249/#commit>`_
74
- in the specification.
75
- """
76
- return
43
+ """Commit changes to stable storage."""
44
+ # No-op for ClickHouse
45
+ pass
77
46
78
47
def rollback (self ):
79
- """
80
- Roll back the current transaction.
81
-
82
- See `Connection.rollback() <https://www.python.org/dev/peps/pep-0249/#rollback>`_
83
- in the specification.
84
- """
85
- return
48
+ """Roll back the current transaction."""
49
+ # No-op for ClickHouse
50
+ pass
86
51
87
52
def cursor (self , cursor = None ):
88
- """
89
- Create a new cursor to execute queries with.
90
-
91
- :param cursor: The type of cursor to create; current only :py:class:`Cursor`
92
- None means use Cursor.
93
- """
53
+ """Create a new cursor to execute queries with."""
54
+ if self ._closed :
55
+ raise err .Error ("Connection closed" )
94
56
if cursor :
95
- return cursor (self )
96
- return self . cursorclass (self )
57
+ return Cursor (self )
58
+ return Cursor (self )
97
59
98
- # The following methods are INTERNAL USE ONLY (called from Cursor)
99
- def query (self , sql ):
100
- if isinstance (sql , str ):
101
- sql = sql .encode (self .encoding , 'surrogateescape' )
102
- self ._execute_command (sql )
103
- self ._affected_rows = self ._read_query_result ()
104
- return self ._affected_rows
105
-
106
- def _execute_command (self , sql ):
107
- """
108
- :raise InterfaceError: If the connection is closed.
109
- :raise ValueError: If no username was specified.
110
- """
60
+ def query (self , sql , fmt = "ArrowStream" ):
61
+ """Execute a query and return the raw result."""
111
62
if self ._closed :
112
63
raise err .InterfaceError ("Connection closed" )
113
64
114
65
if isinstance (sql , str ):
115
- sql = sql .encode (self .encoding )
66
+ sql = sql .encode (self .encoding , "surrogateescape" )
116
67
117
- if isinstance (sql , bytearray ):
118
- sql = bytes (sql )
119
-
120
- # drop last command return
121
- if self ._resp is not None :
122
- self ._resp = None
123
-
124
- if DEBUG :
125
- print ("DEBUG: query:" , sql )
126
68
try :
127
- res = self ._session .query (sql , fmt = "JSON" )
128
- if res .has_error ():
129
- raise err .DatabaseError (res .error_message ())
130
- self ._resp = res .data ()
69
+ result = self ._conn .query (sql .decode (), fmt )
70
+ self ._resp = result
71
+ return result
131
72
except Exception as error :
132
- raise err .InterfaceError ("query err: %s" % error )
73
+ raise err .InterfaceError (f"Query error: { error } " )
133
74
134
75
def escape (self , obj , mapping = None ):
135
- """Escape whatever value you pass to it.
136
-
137
- Non-standard, for internal use; do not use this in your applications.
138
- """
139
- if isinstance (obj , str ):
140
- return "'" + self .escape_string (obj ) + "'"
141
- if isinstance (obj , (bytes , bytearray )):
142
- ret = self ._quote_bytes (obj )
143
- return ret
144
- return converters .escape_item (obj , mapping = mapping )
76
+ """Escape whatever value you pass to it."""
77
+ return converters .escape_item (obj , mapping )
145
78
146
79
def escape_string (self , s ):
147
80
return converters .escape_string (s )
148
81
149
82
def _quote_bytes (self , s ):
150
83
return converters .escape_bytes (s )
151
84
152
- def _read_query_result (self ):
153
- self ._result = None
154
- result = CHDBResult (self )
155
- result .read ()
156
- self ._result = result
157
- return result .affected_rows
158
-
159
85
def __enter__ (self ):
160
86
"""Context manager that returns a Cursor"""
161
87
return self .cursor ()
@@ -166,52 +92,9 @@ def __exit__(self, exc, value, traceback):
166
92
self .rollback ()
167
93
else :
168
94
self .commit ()
95
+ self .close ()
169
96
170
97
@property
171
98
def resp (self ):
99
+ """Return the last query response"""
172
100
return self ._resp
173
-
174
-
175
- class CHDBResult (object ):
176
- def __init__ (self , connection ):
177
- """
178
- :type connection: Connection
179
- """
180
- self .connection = connection
181
- self .affected_rows = 0
182
- self .insert_id = None
183
- self .warning_count = 0
184
- self .message = None
185
- self .field_count = 0
186
- self .description = None
187
- self .rows = None
188
- self .has_next = None
189
-
190
- def read (self ):
191
- # Handle empty responses (for instance from CREATE TABLE)
192
- if self .connection .resp is None :
193
- return
194
-
195
- try :
196
- data = json .loads (self .connection .resp )
197
- except Exception as error :
198
- raise err .InterfaceError ("Unsupported response format:" % error )
199
-
200
- try :
201
- self .field_count = len (data ["meta" ])
202
- description = []
203
- for meta in data ["meta" ]:
204
- fields = [meta ["name" ], meta ["type" ]]
205
- description .append (tuple (fields ))
206
- self .description = tuple (description )
207
-
208
- rows = []
209
- for line in data ["data" ]:
210
- row = []
211
- for i in range (self .field_count ):
212
- column_data = converters .convert_column_data (self .description [i ][1 ], line [self .description [i ][0 ]])
213
- row .append (column_data )
214
- rows .append (tuple (row ))
215
- self .rows = tuple (rows )
216
- except Exception as error :
217
- raise err .InterfaceError ("Read return data err:" % error )
0 commit comments