Skip to content

Commit 604601b

Browse files
committed
Convert async workethreads to use classes
This refactors the worker thread class and image request class. Now there is one general submit function in the worker thread class. The ImageRequest class has been replaced with a set of AsyncRequests classes for each operation. This makes the code a bit cleaner and the inputs for each async tasks are more clearly defined.
1 parent a10c78f commit 604601b

File tree

6 files changed

+151
-166
lines changed

6 files changed

+151
-166
lines changed

imagegw/shifter_imagegw/config.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ def __init__(self, data=None):
7171
data = config['Platforms'][platform]
7272
self.Platforms[platform] = Platform(data)
7373

74-
self.Locations = config['Locations']
75-
self.Platofrms = config['Platforms']
76-
7774
loglevel = config.get('LogLevel', 'info').lower()
7875
loglevel_map = {
7976
'debug': logging.DEBUG,

imagegw/shifter_imagegw/imagemngr.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
from pymongo import MongoClient
3535
import pymongo.errors
3636
from shifter_imagegw.imageworker import WorkerThreads
37+
from shifter_imagegw.imageworker import PullRequest
38+
from shifter_imagegw.imageworker import ImportRequest
39+
from shifter_imagegw.imageworker import ExpireRequest
3740
from shifter_imagegw.config import Config
3841
import grp
3942

@@ -170,7 +173,7 @@ def _isadmin(self, session, system=None):
170173
admins = self.platforms[system].admins
171174
user = session.user
172175
if user in admins:
173-
self.logger.debug('user {user} is an admin')
176+
self.logger.debug(f'user {user} is an admin')
174177
return True
175178
return False
176179

@@ -509,7 +512,14 @@ def pull(self, session, image):
509512
request['session'] = session
510513
self.logger.debug("Calling do pull with queue="
511514
f"{request['system']}")
512-
self.workers.dopull(ident, request)
515+
pr = PullRequest(self.config,
516+
session.system,
517+
request['tag'],
518+
ident,
519+
session,
520+
useracl=request['userACL'],
521+
groupacl=request['groupACL'])
522+
self.workers.submit(pr)
513523

514524
memo = "pull request queued " \
515525
f"s={request['system']} tag={request['tag']}"
@@ -561,7 +571,13 @@ def mngrimport(self, session, image):
561571
request['session'] = session
562572
self.logger.debug("Calling wrkimport with queue="
563573
f"{request['system']}")
564-
self.workers.dowrkimport(ident, request)
574+
ir = ImportRequest(self.config,
575+
session.system,
576+
image['tag'],
577+
ident,
578+
session,
579+
image['filepath'])
580+
self.workers.submit(ir)
565581

566582
memo = "import request queued " \
567583
f"s={request['system']} tag={request['tag']}"
@@ -774,8 +790,13 @@ def expire_id(self, rec, ident):
774790
""" Helper function to expire by id """
775791
memo = f"Calling do expire id={ident}"
776792
self.logger.debug(memo)
793+
er = ExpireRequest(self.config,
794+
rec['system'],
795+
rec['tag'],
796+
rec['id'],
797+
ident)
777798

778-
self.workers.doexpire(ident, rec)
799+
self.workers.submit(er)
779800
self.logger.info("expire request queued "
780801
f"s={rec['system']} tag={ident}")
781802

@@ -795,7 +816,13 @@ def expire(self, session, image):
795816
memo = "Calling do expire with " \
796817
f"queue={image['system']} id={ident}"
797818
self.logger.debug(memo)
798-
self.workers.doexpire(ident, rec)
819+
er = ExpireRequest(self.config,
820+
rec['system'],
821+
rec['tag'],
822+
rec['id'],
823+
ident)
824+
825+
self.workers.submit(er)
799826

800827
memo = "expire request queued " \
801828
f"s={image['system']} tag={image['tag']}"

imagegw/shifter_imagegw/transfer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ def _exec_and_log(cmd, logger):
6060
stderr = bstderr.decode("utf-8")
6161
if logger is not None:
6262
if stdout is not None and len(stdout) > 0:
63-
logger.debug("{cmd[0]} stdout: {stdout.strip()}")
63+
logger.debug(f"{cmd[0]} stdout: {stdout.strip()}")
6464
if stderr is not None and len(stderr) > 0:
65-
logger.error("{cmd[0]} stdout: {stdoerr.strip()}")
65+
logger.error(f"{cmd[0]} stderr: {stderr.strip()}")
6666
return proc.returncode
6767

6868

@@ -84,9 +84,9 @@ def _get_stdout_and_log(cmd, logger=None):
8484
stderr = bstderr.decode("utf-8")
8585
if logger is not None:
8686
if stdout is not None and len(stdout) > 0:
87-
logger.debug("{cmd[0]} stdout: {stdout.strip()}")
87+
logger.debug(f"{cmd[0]} stdout: {stdout.strip()}")
8888
if stderr is not None and len(stderr) > 0:
89-
logger.error("{cmd[0]} stdout: {stdoerr.strip()}")
89+
logger.error(f"{cmd[0]} stdout: {stderr.strip()}")
9090
# push this error back to calling function so
9191
# it can be reported sensibly
9292
rerror = "%s %s" % (cmd[0], stderr.strip())

imagegw/test/auth_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from shifter_imagegw.config import Config
2020
import warnings
2121
warnings.filterwarnings('ignore', category=UserWarning, module='pymunge.raw')
22-
from shifter_imagegw.auth import authenticate
22+
from shifter_imagegw.auth import authenticate # noqa
2323

2424

2525
def test_authenticate(mocker):

imagegw/test/imagemngr_test.py

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from shifter_imagegw.imageworker import WorkerThreads
21
import os
32
import pytest
43
import time
@@ -8,13 +7,12 @@
87
from copy import deepcopy
98
from time import sleep
109
from pymongo import MongoClient
11-
from shifter_imagegw.imagemngr import ImageMngr
1210
from multiprocessing.pool import ThreadPool
1311
from random import randint
1412
from shifter_imagegw.config import Config
15-
import warnings
16-
warnings.filterwarnings('ignore', category=UserWarning, module='pymunge.raw')
17-
from shifter_imagegw.auth import Session # noqa
13+
from shifter_imagegw.models import Session
14+
from shifter_imagegw.imageworker import WorkerThreads
15+
from shifter_imagegw.imagemngr import ImageMngr
1816

1917
"""
2018
Shifter, Copyright (c) 2015, The Regents of the University of California,
@@ -42,6 +40,7 @@ def __init__(self, q):
4240
self.mode = 1
4341
self.q = q
4442
self.pools = ThreadPool(processes=2)
43+
self.op = "pull"
4544

4645
def set_mode(self, mode):
4746
self.mode = mode
@@ -58,7 +57,7 @@ def pull(self, request, updater):
5857
states = ('PULLING', 'EXAMINATION', 'CONVERSION', 'TRANSFER')
5958
for state in states:
6059
updater.update_status(state, state)
61-
sleep(1)
60+
sleep(0.1)
6261
ret = {
6362
'id': '%x' % randint(0, 100000),
6463
'entrypoint': ['./blah'],
@@ -73,7 +72,7 @@ def wrkimport(self, request, updater):
7372
states = ('HASHING', 'TRANSFER', 'READY')
7473
for state in states:
7574
updater.update_status(state, state)
76-
sleep(1)
75+
sleep(0.1)
7776
ret = {
7877
'id': '%x' % randint(0, 100000),
7978
'entrypoint': ['./blah'],
@@ -84,6 +83,15 @@ def wrkimport(self, request, updater):
8483
updater.update_status(state, state, ret)
8584
return ret
8685

86+
def submit(self, req):
87+
req.updater.update_method = self.updater
88+
if self.op == "pull":
89+
self.pools.apply_async(self.pull, [req, req.updater],
90+
{}, None, req.updater.failed)
91+
elif self.op == "import":
92+
self.pools.apply_async(self.wrkimport, [req, req.updater],
93+
{}, None, req.updater.failed)
94+
8795

8896
@pytest.fixture(autouse=True)
8997
def set_path(monkeypatch):
@@ -642,9 +650,9 @@ def test_pull2(ctx):
642650
pr = ctx.pull
643651
# Do the pull
644652
session = ctx.session
645-
rec1 = ctx.mtm.pull(session, pr) # ,delay=False)
653+
rec1 = ctx.mtm.pull(session, pr)
646654
pr['tag'] = ctx.tag2
647-
rec2 = ctx.mtm.pull(session, pr) # ,delay=False)
655+
rec2 = ctx.mtm.pull(session, pr)
648656
assert rec1
649657
id1 = rec1['_id']
650658
assert rec2
@@ -947,6 +955,7 @@ def test_import(ctx):
947955
}
948956
# Do the pull
949957
session = ctx.session
958+
ctx.mtm.workers.op = "import"
950959
rec = ctx.mtm.mngrimport(session, pr) # ,delay=False)
951960
assert rec
952961
id = rec['_id']
@@ -968,26 +977,6 @@ def test_acl_update_denied(ctx):
968977
pass
969978

970979

971-
def test_expire_remote(ctx):
972-
system = ctx.system
973-
record = good_record(ctx)
974-
# Create a fake record in mongo
975-
id = ctx.images.insert_one(record).inserted_id
976-
assert id
977-
# Create a bogus image file
978-
file, metafile = create_fakeimage(ctx, system, record['id'],
979-
ctx.format)
980-
session = ctx.admin_session
981-
er = {'system': system, 'tag': ctx.tag, 'itype': ctx.itype}
982-
rec = ctx.m.expire(session, er) # ,delay=False)
983-
assert rec
984-
time.sleep(2)
985-
state = ctx.m.get_state(id)
986-
assert state == 'EXPIRED'
987-
assert os.path.exists(file) is False
988-
assert os.path.exists(metafile) is False
989-
990-
991980
def test_expire_local(ctx):
992981
record = good_record(ctx)
993982
system = 'systemb'
@@ -1000,7 +989,7 @@ def test_expire_local(ctx):
1000989
ctx.format)
1001990
session = ctx.admin_session
1002991
er = {'system': system, 'tag': ctx.tag, 'itype': ctx.itype}
1003-
rec = ctx.m.expire(session, er) # ,delay=False)
992+
rec = ctx.m.expire(session, er)
1004993
assert rec
1005994
time.sleep(2)
1006995
state = ctx.m.get_state(id)

0 commit comments

Comments
 (0)