-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpostgres.py
More file actions
149 lines (119 loc) · 4.78 KB
/
postgres.py
File metadata and controls
149 lines (119 loc) · 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
Minimum code to upload data taken from:
https://github.com/status-im/ift-data-py/blob/master/ift_data/clients/postgres.py
"""
import psycopg2
import pandas as pd
from typing import Optional, Union
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import JSONB
class Postgres:
def __init__(self, username: str, password: str, port: Union[int, str], database: str, host: str):
if isinstance(port, str):
port = int(port)
self.__params = {
"host": host,
"user": username,
"password": password,
"port": port,
"database": database
}
self.__url = f"postgresql://{username}:{password}@{host}:{port}/{database}"
self.__conn: psycopg2.extensions.connection = psycopg2.connect(**self.__params)
self.__cursor: psycopg2.extensions.cursor = self.__conn.cursor()
def insert(self, data: pd.DataFrame, table_name: str, schema: str, json_columns: Optional[list] = None):
"""
Insert the DataFrame in the specified schema > table.
If the schema / table name does not exist, it will be created.
Parameters:
- `data` - the data to be inserted in Postgres
- `table_name` - the name of the table
- `schema` - the name of the schema
- `json_columns` - when creating the table, `dict` columns will be turned into JSON objects in Postgres
"""
self.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
engine = create_engine(self.__url)
data.columns = [column.lower() for column in data.columns]
params = {
"name": table_name,
"con": engine,
"schema": schema,
"if_exists": "append",
"index": False
}
if json_columns:
params["dtype"] = {
json_column: JSONB
for json_column in json_columns
}
# Add new columns as they come
existing_columns = self.get_columns(schema, table_name)
if existing_columns:
for column in data.columns:
if column in existing_columns:
continue
# NOTE: New values will have to be transformed
self.execute(f"ALTER TABLE {schema}.{table_name} ADD COLUMN {column} TEXT")
data.to_sql(**params)
def execute(self, query: str):
"""
Execute queries such as INSERT, UPDATE, DELETE etc.
Parameters:
- `query` - the PostgreSQL query
"""
self.__execute(query)
self.__conn.commit()
def to_pandas(self, query: str, batch_size: int = 50_000, uppercase: bool = True) -> pd.DataFrame:
"""
Create a DataFrame from the given query
Parameters:
- `query` - the PostgreSQL query
- `batch_size` - how many rows will be fetched at once
- `uppercase` - if `True` then the columns will be uppercase. If `False` the columns will be lowercase
Output:
- DataFrame for the executed query
"""
self.__execute(query)
columns = [column.name.upper() if uppercase else column.name.lower() for column in self.__cursor.description]
chunks = []
while True:
rows = self.__cursor.fetchmany(batch_size)
if not rows:
break
chunks.append(pd.DataFrame(rows, columns=columns))
return pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame(columns=columns)
def close(self):
self.__cursor.close()
self.__conn.close()
def __del__(self):
self.close()
def __execute(self, query: str):
failed = False
is_closed = bool(self.__conn.closed)
if is_closed:
self.__conn: psycopg2.extensions.connection = psycopg2.connect(**self.__params)
self.__cursor: psycopg2.extensions.cursor = self.__conn.cursor()
try:
self.__cursor.execute(query)
except psycopg2.errors.InFailedSqlTransaction:
self.__conn.rollback()
failed = True
if failed:
self.__cursor.execute(query)
def get_columns(self, schema: str, table_name: str) -> list[str]:
"""
Get the column names in the correct order for the given table.
Parameters:
- `table_name` - the name of the table
- `schema` - the name of the schema
Output:
- the table's columns in the correct order
"""
query = f"""
SELECT column_name
FROM information_schema.columns
WHERE table_name = '{table_name}'
AND table_schema = '{schema}'
ORDER BY ordinal_position ASC
"""
return self.to_pandas(query)["COLUMN_NAME"].to_list()