@@ -126,51 +126,51 @@ def _connect_to_server(self):
126
126
127
127
:return: Connection object if successful, or `None` if an error occurred.
128
128
:rtype: Connection or None
129
- ...
130
- :raises `psycopg2.OperationalError`: Failed to connect to the server.
129
+ :raises `psycopg.OperationalError`: Failed to connect to the server.
131
130
"""
132
131
try :
133
- conn = psycopg2 .connect (
132
+ conn = psycopg .connect (
134
133
dbname = 'stackql' ,
135
134
user = 'stackql' ,
136
135
host = self .server_address ,
137
- port = self .server_port
136
+ port = self .server_port ,
137
+ autocommit = True ,
138
+ row_factory = dict_row # Use dict_row to get rows as dictionaries
138
139
)
139
140
return conn
140
- except psycopg2 .OperationalError as oe :
141
+ except psycopg .OperationalError as oe :
141
142
print (f"OperationalError while connecting to the server: { oe } " )
142
143
except Exception as e :
143
- # Catching all other possible psycopg2 exceptions (and possibly other unexpected exceptions).
144
- # You might want to log this or handle it differently in a real-world scenario.
145
144
print (f"Unexpected error while connecting to the server: { e } " )
146
145
return None
147
146
148
147
def _run_server_query (self , query , is_statement = False ):
149
- """Runs a query against the server using psycopg2.
150
-
151
- :param query: SQL query to be executed on the server.
152
- :type query: str
153
- :return: List of result rows if the query fetches results; empty list if there are no results.
154
- :rtype: list of dict objects
155
- :raises: psycopg2.ProgrammingError for issues related to the SQL query,
156
- unless the error is "no results to fetch", in which case an empty list is returned.
157
- """
148
+ """Run a query against the server using the existing connection in server mode."""
149
+ if not self ._conn :
150
+ raise ConnectionError ("No active connection found. Ensure _connect_to_server is called." )
151
+
158
152
try :
159
- cur = self ._conn .cursor (cursor_factory = RealDictCursor )
160
- cur .execute (query )
161
- if is_statement :
162
- # If the query is a statement, there are no results to fetch.
163
- result_msg = cur .statusmessage
164
- cur .close ()
165
- return [{'message' : result_msg }]
166
- rows = cur .fetchall ()
167
- cur .close ()
168
- return rows
169
- except psycopg2 .ProgrammingError as e :
170
- if str (e ) == "no results to fetch" :
171
- return []
172
- else :
173
- raise
153
+ with self ._conn .cursor () as cur :
154
+ cur .execute (query )
155
+ if is_statement :
156
+ # Return status message for non-SELECT statements
157
+ result_msg = cur .statusmessage
158
+ return [{'message' : result_msg }]
159
+ try :
160
+ # Fetch results for SELECT queries
161
+ rows = cur .fetchall ()
162
+ return rows
163
+ except psycopg .ProgrammingError as e :
164
+ # Handle cases with no results
165
+ if "no results to fetch" in str (e ):
166
+ return []
167
+ else :
168
+ raise
169
+ except psycopg .OperationalError as oe :
170
+ print (f"OperationalError during query execution: { oe } " )
171
+ except Exception as e :
172
+ print (f"Unexpected error during query execution: { e } " )
173
+
174
174
175
175
def _run_query (self , query , custom_auth = None , env_vars = None ):
176
176
"""Internal method to execute a StackQL query using a subprocess.
@@ -330,13 +330,13 @@ def __init__(self,
330
330
331
331
if self .server_mode :
332
332
# server mode, connect to a server via the postgres wire protocol
333
- # Attempt to import psycopg2 only if server_mode is True
334
- global psycopg2 , RealDictCursor
333
+ # Attempt to import psycopg only if server_mode is True
334
+ global psycopg , dict_row
335
335
try :
336
- import psycopg2
337
- from psycopg2 . extras import RealDictCursor
336
+ import psycopg
337
+ from psycopg . rows import dict_row # For returning results as dictionaries
338
338
except ImportError :
339
- raise ImportError ("psycopg2 is required in server mode but is not installed. Please install psycopg2 and try again." )
339
+ raise ImportError ("psycopg is required in server mode but is not installed. Please install psycopg and try again." )
340
340
341
341
self .server_address = server_address
342
342
self .server_port = server_port
@@ -670,34 +670,30 @@ def execute(self, query, suppress_errors=True, custom_auth=None, env_vars=None):
670
670
def _run_server_query_with_new_connection (self , query ):
671
671
"""Run a query against a StackQL postgres wire protocol server with a new connection.
672
672
"""
673
- conn = None
674
673
try :
675
674
# Establish a new connection using credentials and configurations
676
- conn = psycopg2 .connect (
675
+ with psycopg .connect (
677
676
dbname = 'stackql' ,
678
677
user = 'stackql' ,
679
678
host = self .server_address ,
680
- port = self .server_port
681
- )
682
- # Create a new cursor and execute the query
683
- with conn .cursor (cursor_factory = RealDictCursor ) as cur :
684
- cur .execute (query )
685
- try :
686
- rows = cur .fetchall ()
687
- except psycopg2 .ProgrammingError as e :
688
- if str (e ) == "no results to fetch" :
689
- rows = []
690
- else :
691
- raise
692
- return rows
693
- except psycopg2 .OperationalError as oe :
679
+ port = self .server_port ,
680
+ row_factory = dict_row
681
+ ) as conn :
682
+ # Execute the query with a new cursor
683
+ with conn .cursor () as cur :
684
+ cur .execute (query )
685
+ try :
686
+ rows = cur .fetchall ()
687
+ except psycopg .ProgrammingError as e :
688
+ if str (e ) == "no results to fetch" :
689
+ rows = []
690
+ else :
691
+ raise
692
+ return rows
693
+ except psycopg .OperationalError as oe :
694
694
print (f"OperationalError while connecting to the server: { oe } " )
695
695
except Exception as e :
696
696
print (f"Unexpected error while connecting to the server: { e } " )
697
- finally :
698
- # Ensure the connection is always closed, even if an error occurs
699
- if conn is not None :
700
- conn .close ()
701
697
702
698
def _sync_query (self , query , new_connection = False ):
703
699
"""Synchronous function to perform the query.
0 commit comments