Skip to content
This repository has been archived by the owner on Dec 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3 from ffaraone/LITE-28642-speedup-reports
Browse files Browse the repository at this point in the history
LITE-28642 speedup reports generation
  • Loading branch information
vgrebenschikov authored Sep 19, 2023
2 parents 6f9cd8b + 4430f1e commit 06311a7
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 49 deletions.
69 changes: 49 additions & 20 deletions reports/all_transactions/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def generate(
renderer_type=None,
extra_context_callback=None,
):
client.default_limit = 1000

subscriptions_rql = R()
if parameters.get("date"):
subscriptions_rql &= R().events.created.at.ge(parameters['date']['after'])
Expand All @@ -45,7 +47,7 @@ def generate(
client.ns('subscriptions')
.collection('requests')
.filter(subscriptions_rql)
.order_by("-events.created.at")
.order_by("events.created.at")
)
total_subscriptions = subscriptions.count()

Expand All @@ -59,32 +61,37 @@ def generate(
for product in PRODUCTS_TO_SKIP:
requests_rql &= R().asset.product.id.ne(product)
requests_rql &= R().type.oneof(request_types)
requests = client.requests.filter(requests_rql).order_by("-created")
requests = client.requests.filter(requests_rql).order_by("created")

total_requests = requests.count()

progress = Progress(progress_callback, total_subscriptions + total_requests)

ex = futures.ThreadPoolExecutor(
max_workers=6,
)
ex = futures.ThreadPoolExecutor()

if renderer_type == 'csv':
yield HEADERS

wait_for = []
for request in requests:

for i in range(0, total_requests, 1000):
wait_for.append(
ex.submit(
get_request_record,
process_requests_page,
client,
request,
progress,
requests[i:min(i + 1000, total_requests)],
progress,
)
)
progress.increment()

for future in futures.as_completed(wait_for):
results = future.result()
results = []
try:
results = future.result()
except Exception as e:
ex.shutdown(wait=False, cancel_futures=True)
raise e

for result in results:
if renderer_type == 'json':
yield {
Expand All @@ -95,18 +102,26 @@ def generate(
yield result

wait_for = []
for subscription in subscriptions:

for i in range(0, total_subscriptions, 1000):
wait_for.append(
ex.submit(
get_subscription_record,
process_subscriptions_page,
client,
subscription,
progress,
subscriptions[i:min(i + 1000, total_subscriptions)],
progress,
)
)


for future in futures.as_completed(wait_for):
results = future.result()
results = []
try:
results = future.result()
except Exception as e:
ex.shutdown(wait=False, cancel_futures=True)
raise e

for result in results:
if renderer_type == 'json':
yield {
Expand All @@ -117,7 +132,23 @@ def generate(
yield result


def get_request_record(client, request, progress):
def process_requests_page(client, requests, progress):
results = []
for request in requests:
results.extend(get_request_record(client, request))
progress.increment()
return results


def process_subscriptions_page(client, requests, progress):
results = []
for request in requests:
results.extend(get_subscription_record(client, request))
progress.increment()
return results


def get_request_record(client, request):
param_values = get_product_specifics(request, client)
output = []
for item in request["asset"]["items"]:
Expand Down Expand Up @@ -193,11 +224,10 @@ def get_request_record(client, request, progress):
)
except Exception:
pass
progress.increment()
return output


def get_subscription_record(client, subscription, progress):
def get_subscription_record(client, subscription):
param_values = get_product_specifics(subscription, client)
output = []
try:
Expand Down Expand Up @@ -278,7 +308,6 @@ def get_subscription_record(client, subscription, progress):
)
except Exception:
pass
progress.increment()
return output


Expand Down
76 changes: 47 additions & 29 deletions reports/ms_products/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
get_ta_parameter,
)
from concurrent import futures
from threading import Lock


HEADERS = (
Expand All @@ -24,6 +25,8 @@

TC_CACHE = {}

CACHE_LOCK = Lock()

PRODUCTS = [
'PRD-814-505-018',
'PRD-561-716-033',
Expand All @@ -50,10 +53,7 @@ def generate(
extra_context_callback=None,
):
init_tc_cache()
#limit = client.default_limit
client.default_limit = 1000
# populate_ta_cache(parameters, client)
#client.default_limit = limit
subscriptions_rql = R()

if parameters.get("date"):
Expand All @@ -69,7 +69,7 @@ def generate(
client.ns('subscriptions')
.collection('requests')
.filter(subscriptions_rql)
.order_by("-events.created.at")
.order_by("events.created.at")
)
total_subscriptions = subscriptions.count()

Expand All @@ -84,7 +84,7 @@ def generate(
if parameters.get('mkp') and parameters['mkp']['all'] is False:
requests_rql &= R().asset.marketplace.id.oneof(parameters['mkp']['choices'])
requests_rql &= R().type.oneof(request_types)
requests = client.requests.filter(requests_rql)
requests = client.requests.filter(requests_rql).order_by('created')

total_requests = requests.count()
total_progress = total_subscriptions + total_requests
Expand All @@ -97,19 +97,25 @@ def generate(


wait_for = []
for request in requests:

for i in range(0, total_requests, 1000):
wait_for.append(
ex.submit(
get_request_record,
process_requests_page,
client,
request,
progress,
requests[i:min(i + 1000, total_requests)],
progress,
)
)
progress.increment()

for future in futures.as_completed(wait_for):
results = future.result()
results = []
try:
results = future.result()
except Exception as e:
ex.shutdown(wait=False, cancel_futures=True)
raise e

for result in results:
if renderer_type == 'json':
yield {
Expand All @@ -120,19 +126,25 @@ def generate(
yield result

wait_for = []
for subscription in subscriptions:

for i in range(0, total_subscriptions, 1000):
wait_for.append(
ex.submit(
get_subscription_record,
process_subscriptions_page,
client,
subscription,
progress,
subscriptions[i:min(i + 1000, total_subscriptions)],
progress,
)
)
progress.increment()

for future in futures.as_completed(wait_for):
results = future.result()
results = []
try:
results = future.result()
except Exception as e:
ex.shutdown(wait=False, cancel_futures=True)
raise e

for result in results:
if renderer_type == 'json':
yield {
Expand All @@ -143,6 +155,22 @@ def generate(
yield result


def process_requests_page(client, requests, progress):
results = []
for request in requests:
results.extend(get_request_record(client, request, progress))
progress.increment()
return results


def process_subscriptions_page(client, requests, progress):
results = []
for request in requests:
results.extend(get_subscription_record(client, request, progress))
progress.increment()
return results


def get_request_record(client, request, progress):
param_values = get_product_specifics(request, client)
output = []
Expand Down Expand Up @@ -336,20 +364,10 @@ def get_param_mpn(request, client):
if request['asset']['tiers']['tier1']['id'] in TC_CACHE[request['asset']['product']['id']]:
return TC_CACHE[request['asset']['product']['id']][request['asset']['tiers']['tier1']['id']]
mpn = get_ta_parameter(request, 'tier1', 'tier1_mpn', client)
TC_CACHE[request['asset']['product']['id']][request['asset']['tiers']['tier1']['id']] = mpn
with CACHE_LOCK:
TC_CACHE[request['asset']['product']['id']][request['asset']['tiers']['tier1']['id']] = mpn
return mpn

def populate_ta_cache(parameters, client):
rql = R()
rql &= R().product.id.oneof(PRODUCTS)
if parameters.get('mkp') and parameters['mkp']['all'] is False:
rql &= R().marketplace.id.oneof(parameters['mkp']['choices'])
tcs = client.ns('tier').collection('configs').filter(rql)
for tc in tcs:
if tc['product']['id'] not in TC_CACHE:
TC_CACHE[tc['product']['id']] = {}
TC_CACHE[tc['product']['id']][tc['account']['id']] = get_param_value(tc['params'], 'tier1_mpn')

def get_param_value(params, param_id):
for param in params:
if param_id == param['id']:
Expand Down

0 comments on commit 06311a7

Please sign in to comment.