diff --git a/src/base/tests/test_util.py b/src/base/tests/test_util.py index b2b169d87..157512261 100644 --- a/src/base/tests/test_util.py +++ b/src/base/tests/test_util.py @@ -659,6 +659,34 @@ def _get_cr(self): self.addCleanup(cr.close) return cr + def test_explode_format_parallel_filter(self): + cr = self._get_cr() + + cr.execute("SELECT MIN(id) FROM res_users") + min_id = cr.fetchone()[0] + + q1 = "SELECT '{} {0} {x} {x:*^30d} {x.a.b} {x[0]}, {x[1]!s:*^30} {{x}}' FROM res_users" + q2 = "SELECT '{} {0} {x} {x:*^30d} {x.a.b} {x[0]}, {x[1]!s:*^30} {{x}}' FROM res_users WHERE {parallel_filter}" + expected_out = q1 + f" WHERE res_users.id BETWEEN {min_id} AND {min_id}" + + out1 = util.explode_query_range( + cr, + q1, + table="res_users", + bucket_size=1, + format=False, + )[0] + self.assertEqual(out1, expected_out) + + out2 = util.explode_query_range( + cr, + q2, + table="res_users", + bucket_size=1, + format=False, + )[0] + self.assertEqual(out2, expected_out) + def test_explode_mult_filters(self): cr = self._get_cr() queries = util.explode_query_range( diff --git a/src/util/pg.py b/src/util/pg.py index ea84abab5..35c96b4fb 100644 --- a/src/util/pg.py +++ b/src/util/pg.py @@ -234,7 +234,7 @@ def explode_query(cr, query, alias=None, num_buckets=8, prefix=None): return [cr.mogrify(query, [num_buckets, index]).decode() for index in range(num_buckets)] -def explode_query_range(cr, query, table, alias=None, bucket_size=10000, prefix=None): +def explode_query_range(cr, query, table, alias=None, bucket_size=10000, prefix=None, format=True): """ Explode a query to multiple queries that can be executed in parallel. @@ -297,16 +297,27 @@ def explode_query_range(cr, query, table, alias=None, bucket_size=10000, prefix= # Still, since the query may only be valid if there is no split, we force the usage of `prefix` in the query to # validate its correctness and avoid scripts that pass the CI but fail in production. parallel_filter = "{alias}.id IS NOT NULL".format(alias=alias) - return [query.format(parallel_filter=parallel_filter)] + return [ + ( + query.format(parallel_filter=parallel_filter) + if format + else query.replace("{parallel_filter}", parallel_filter) + ) + ] parallel_filter = "{alias}.id BETWEEN %(lower-bound)s AND %(upper-bound)s".format(alias=alias) - query = query.replace("%", "%%").format(parallel_filter=parallel_filter) + + query = query.replace("%", "%%") + query = ( + query.format(parallel_filter=parallel_filter) if format else query.replace("{parallel_filter}", parallel_filter) + ) + return [ cr.mogrify(query, {"lower-bound": ids[i], "upper-bound": ids[i + 1] - 1}).decode() for i in range(len(ids) - 1) ] -def explode_execute(cr, query, table, alias=None, bucket_size=10000, logger=_logger): +def explode_execute(cr, query, table, alias=None, bucket_size=10000, format=True, logger=_logger): """ Execute a query in parallel. @@ -336,6 +347,8 @@ def explode_execute(cr, query, table, alias=None, bucket_size=10000, logger=_log :param str table: name of the *main* table of the query, used to split the processing :param str alias: alias used for the main table in the query :param int bucket_size: size of the buckets of ids to split the processing + :param bool format: whether to use `.format` (instead of `.replace`) to replace the parallel filter, + setting it to `False` can prevent issues with hard-coded curly braces. :param logger: logger used to report the progress :type logger: :class:`logging.Logger` :return: the sum of `cr.rowcount` for each query run @@ -349,7 +362,7 @@ def explode_execute(cr, query, table, alias=None, bucket_size=10000, logger=_log """ return parallel_execute( cr, - explode_query_range(cr, query, table, alias=alias, bucket_size=bucket_size), + explode_query_range(cr, query, table, alias=alias, bucket_size=bucket_size, format=format), logger=logger, )