|
1 | 1 | # -*- coding: utf-8 -*- |
| 2 | +import concurrent |
2 | 3 | import inspect |
3 | 4 | import logging |
4 | 5 | import re |
|
11 | 12 | from psycopg2.extensions import quote_ident |
12 | 13 | from psycopg2.extras import Json |
13 | 14 |
|
| 15 | +try: |
| 16 | + from odoo.sql_db import db_connect |
| 17 | +except ImportError: |
| 18 | + from openerp.sql_db import db_connect |
| 19 | + |
14 | 20 | from .const import NEARLYWARN |
15 | 21 | from .exceptions import MigrationError |
16 | 22 | from .helpers import table_of_model |
@@ -243,31 +249,39 @@ def _dumps(self, node): |
243 | 249 |
|
244 | 250 |
|
245 | 251 | class Convertor: |
246 | | - def __init__(self, converters, callback): |
| 252 | + def __init__(self, converters, callback, dbname, update_query): |
247 | 253 | self.converters = converters |
248 | 254 | self.callback = callback |
| 255 | + self.dbname = dbname |
| 256 | + self.update_query = update_query |
249 | 257 |
|
250 | | - def __call__(self, row): |
| 258 | + def __call__(self, query): |
251 | 259 | converters = self.converters |
252 | 260 | columns = self.converters.keys() |
253 | 261 | converter_callback = self.callback |
254 | | - res_id, *contents = row |
255 | | - changes = {} |
256 | | - for column, content in zip(columns, contents): |
257 | | - if content and converters[column]: |
258 | | - # jsonb column; convert all keys |
259 | | - new_content = {} |
260 | | - has_changed, new_content["en_US"] = converter_callback(content.pop("en_US")) |
261 | | - if has_changed: |
262 | | - for lang, value in content.items(): |
263 | | - _, new_content[lang] = converter_callback(value) |
264 | | - new_content = Json(new_content) |
265 | | - else: |
266 | | - has_changed, new_content = converter_callback(content) |
267 | | - changes[column] = new_content |
268 | | - if has_changed: |
269 | | - changes["id"] = res_id |
270 | | - return changes |
| 262 | + update_query = self.update_query |
| 263 | + with db_connect(self.dbname).cursor() as cr: |
| 264 | + cr.execute(query) |
| 265 | + for row in cr.fetchall(): |
| 266 | + res_id, *contents = row |
| 267 | + changes = {} |
| 268 | + for column, content in zip(columns, contents): |
| 269 | + if content and converters[column]: |
| 270 | + # jsonb column; convert all keys |
| 271 | + new_content = {} |
| 272 | + has_changed, new_content["en_US"] = converter_callback(content.pop("en_US")) |
| 273 | + if has_changed: |
| 274 | + for lang, value in content.items(): |
| 275 | + _, new_content[lang] = converter_callback(value) |
| 276 | + new_content = Json(new_content) |
| 277 | + else: |
| 278 | + has_changed, new_content = converter_callback(content) |
| 279 | + changes[column] = new_content |
| 280 | + if has_changed: |
| 281 | + changes["id"] = res_id |
| 282 | + if "id" in changes: |
| 283 | + cr.execute(update_query, changes) |
| 284 | + cr.commit() |
271 | 285 |
|
272 | 286 |
|
273 | 287 | def convert_html_columns(cr, table, columns, converter_callback, where_column="IS NOT NULL", extra_where="true"): |
@@ -305,17 +319,25 @@ def convert_html_columns(cr, table, columns, converter_callback, where_column="I |
305 | 319 | update_sql = ", ".join(f'"{column}" = %({column})s' for column in columns) |
306 | 320 | update_query = f"UPDATE {table} SET {update_sql} WHERE id = %(id)s" |
307 | 321 |
|
| 322 | + cr.commit() |
308 | 323 | with ProcessPoolExecutor(max_workers=get_max_workers()) as executor: |
309 | | - convert = Convertor(converters, converter_callback) |
310 | | - for query in log_progress(split_queries, logger=_logger, qualifier=f"{table} updates"): |
311 | | - cr.execute(query) |
312 | | - for data in executor.map(convert, cr.fetchall(), chunksize=1000): |
313 | | - if "id" in data: |
314 | | - cr.execute(update_query, data) |
| 324 | + convert = Convertor(converters, converter_callback, cr.dbname, update_query) |
| 325 | + futures = [executor.submit(convert, query) for query in split_queries] |
| 326 | + for future in log_progress( |
| 327 | + concurrent.futures.as_completed(futures), |
| 328 | + logger=_logger, |
| 329 | + qualifier=f"{table} updates", |
| 330 | + size=len(split_queries), |
| 331 | + estimate=False, |
| 332 | + log_hundred_percent=True, |
| 333 | + ): |
| 334 | + # just for raising any worker exception |
| 335 | + future.result() |
| 336 | + cr.commit() |
315 | 337 |
|
316 | 338 |
|
317 | 339 | def determine_chunk_limit_ids(cr, table, column_arr, where): |
318 | | - bytes_per_chunk = 100 * 1024 * 1024 |
| 340 | + bytes_per_chunk = 10 * 1024 * 1024 |
319 | 341 | columns = ", ".join(quote_ident(column, cr._cnx) for column in column_arr if column != "id") |
320 | 342 | cr.execute( |
321 | 343 | f""" |
|
0 commit comments