Skip to content

Commit 3cb6112

Browse files
committedSep 16, 2024
[IMP] snippets: move all work from parent to mp workers
In `convert_html_columns()`, we select 100MiB worth of DB tuples and pass them to a ProcessPoolExecutor together with a converter callable. So far, the converter returns all tuples, changed or unchanged together with the information if it has changed something. All this is returned through IPC to the parent process. In the parent process, the caller only acts on the changed tuples, though, the rest is ignored. In any scenario I've seen, only a small proportion of the input tuples is actually changed, meaning that a large proportion is returned through IPC unnecessarily. What makes it worse is that processing of the converted results in the parent process is often slower than the conversion, leading to two effects: 1) The results of all workers sit in the parent process's memory, possibly leading to MemoryError (upg-2021031) 2) The parallel processing is being serialized on the feedback, defeating a large part of the intended performance gains To improve this, this commit - moves all work into the workers, meaning not just the conversion filter, but also the DB query as well as the DB updates. - by doing so reduces the amount of data passed by IPC to just the query texts - by doing so distributes the data held in memory to all worker processes - reduces the chunk size by one order of magnitude, which means - a lot less memory used at a time - a lot better distribution of "to-be-changed" rows when these rows are clustered in the table All in all, in my test case, this - reduces maximum process size in memory to 300MiB for all processes compared to formerly >2GiB (and MemoryError) in the parent process - reduces runtime from 17 minutes to less than 2 minutes
1 parent a84c23a commit 3cb6112

File tree

2 files changed

+55
-26
lines changed

2 files changed

+55
-26
lines changed
 

‎src/base/tests/test_util.py

+7
Original file line numberDiff line numberDiff line change
@@ -1455,12 +1455,19 @@ def testsnip(self):
14551455
}
14561456
)
14571457
cr = self.env.cr
1458+
if hasattr(self, "_savepoint_id"):
1459+
# As the `convert_html_columns` method uses `parallel_execute`, the cursor is commited
1460+
# Re-create the lost savepoint and do manual cleanup
1461+
self.addCleanup(cr.execute, f"SAVEPOINT test_{self._savepoint_id}")
1462+
self.addCleanup(view_id.unlink)
1463+
threading.current_thread().testing = False
14581464
snippets.convert_html_content(
14591465
cr,
14601466
snippets.html_converter(
14611467
not_doing_anything_converter, selector="//*[hasclass('fake_class_not_doing_anything')]"
14621468
),
14631469
)
1470+
threading.current_thread().testing = True
14641471
util.invalidate(view_id)
14651472
res = self.env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"])
14661473
self.assertEqual(len(res), 1)

‎src/util/snippets.py

+48-26
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# -*- coding: utf-8 -*-
2+
import concurrent
23
import inspect
34
import logging
45
import re
@@ -11,6 +12,11 @@
1112
from psycopg2.extensions import quote_ident
1213
from psycopg2.extras import Json
1314

15+
try:
16+
from odoo.sql_db import db_connect
17+
except ImportError:
18+
from openerp.sql_db import db_connect
19+
1420
from .const import NEARLYWARN
1521
from .exceptions import MigrationError
1622
from .helpers import table_of_model
@@ -243,31 +249,39 @@ def _dumps(self, node):
243249

244250

245251
class Convertor:
246-
def __init__(self, converters, callback):
252+
def __init__(self, converters, callback, dbname, update_query):
247253
self.converters = converters
248254
self.callback = callback
255+
self.dbname = dbname
256+
self.update_query = update_query
249257

250-
def __call__(self, row):
258+
def __call__(self, query):
251259
converters = self.converters
252260
columns = self.converters.keys()
253261
converter_callback = self.callback
254-
res_id, *contents = row
255-
changes = {}
256-
for column, content in zip(columns, contents):
257-
if content and converters[column]:
258-
# jsonb column; convert all keys
259-
new_content = {}
260-
has_changed, new_content["en_US"] = converter_callback(content.pop("en_US"))
261-
if has_changed:
262-
for lang, value in content.items():
263-
_, new_content[lang] = converter_callback(value)
264-
new_content = Json(new_content)
265-
else:
266-
has_changed, new_content = converter_callback(content)
267-
changes[column] = new_content
268-
if has_changed:
269-
changes["id"] = res_id
270-
return changes if "id" in changes else None
262+
update_query = self.update_query
263+
with db_connect(self.dbname).cursor() as cr:
264+
cr.execute(query)
265+
for row in cr.fetchall():
266+
res_id, *contents = row
267+
changes = {}
268+
for column, content in zip(columns, contents):
269+
if content and converters[column]:
270+
# jsonb column; convert all keys
271+
new_content = {}
272+
has_changed, new_content["en_US"] = converter_callback(content.pop("en_US"))
273+
if has_changed:
274+
for lang, value in content.items():
275+
_, new_content[lang] = converter_callback(value)
276+
new_content = Json(new_content)
277+
else:
278+
has_changed, new_content = converter_callback(content)
279+
changes[column] = new_content
280+
if has_changed:
281+
changes["id"] = res_id
282+
if "id" in changes:
283+
cr.execute(update_query, changes)
284+
cr.commit()
271285

272286

273287
def convert_html_columns(cr, table, columns, converter_callback, where_column="IS NOT NULL", extra_where="true"):
@@ -305,17 +319,25 @@ def convert_html_columns(cr, table, columns, converter_callback, where_column="I
305319
update_sql = ", ".join(f'"{column}" = %({column})s' for column in columns)
306320
update_query = f"UPDATE {table} SET {update_sql} WHERE id = %(id)s"
307321

322+
cr.commit()
308323
with ProcessPoolExecutor(max_workers=get_max_workers()) as executor:
309-
convert = Convertor(converters, converter_callback)
310-
for query in log_progress(split_queries, logger=_logger, qualifier=f"{table} updates"):
311-
cr.execute(query)
312-
for data in executor.map(convert, cr.fetchall(), chunksize=1000):
313-
if data:
314-
cr.execute(update_query, data)
324+
convert = Convertor(converters, converter_callback, cr.dbname, update_query)
325+
futures = [executor.submit(convert, query) for query in split_queries]
326+
for future in log_progress(
327+
concurrent.futures.as_completed(futures),
328+
logger=_logger,
329+
qualifier=f"{table} updates",
330+
size=len(split_queries),
331+
estimate=False,
332+
log_hundred_percent=True,
333+
):
334+
# just for raising any worker exception
335+
future.result()
336+
cr.commit()
315337

316338

317339
def determine_chunk_limit_ids(cr, table, column_arr, where):
318-
bytes_per_chunk = 100 * 1024 * 1024
340+
bytes_per_chunk = 10 * 1024 * 1024
319341
columns = ", ".join(quote_ident(column, cr._cnx) for column in column_arr if column != "id")
320342
cr.execute(
321343
f"""

0 commit comments

Comments
 (0)