Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 72 additions & 70 deletions base_bs_erf.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,73 +70,75 @@ def gen_data(nopt):
rnd.uniform(TL, TH, nopt),
)

##############################################

def run(name, alg, sizes=15, step=2, nopt=1024, nparr=True, dask=False, pass_args=False):
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--steps', required=False, default=sizes, help="Number of steps")
parser.add_argument('--step', required=False, default=step, help="Factor for each step")
parser.add_argument('--chunk', required=False, default=2000000,help="Chunk size for Dask")
parser.add_argument('--size', required=False, default=nopt, help="Initial data size")
parser.add_argument('--repeat',required=False, default=100, help="Iterations inside measured region")
parser.add_argument('--dask', required=False, default="sq", help="Dask scheduler: sq, mt, mp")
parser.add_argument('--text', required=False, default="", help="Print with each result")

args = parser.parse_args()
sizes= int(args.steps)
step = int(args.step)
nopt = int(args.size)
chunk= int(args.chunk)
repeat=int(args.repeat)
kwargs={}

if(dask):
import dask
import dask.multiprocessing
import dask.array as da
dask_modes = {
"sq": dask.async.get_sync,
"mt": dask.threaded.get,
"mp": dask.multiprocessing.get
}
kwargs = {"schd": dask_modes[args.dask]}
name += "-"+args.dask

for i in xrange(sizes):
price, strike, t = gen_data(nopt)
if not nparr:
call = [0.0 for i in range(nopt)]
put = [-1.0 for i in range(nopt)]
price=list(price)
strike=list(strike)
t=list(t)
repeat=1 # !!!!! ignore repeat count
if dask:
assert(not pass_args)
price = da.from_array(price, chunks=(chunk,), name=False)
strike = da.from_array(strike, chunks=(chunk,), name=False)
t = da.from_array(t, chunks=(chunk,), name=False)
if pass_args:
call = np.zeros(nopt, dtype=np.float64)
put = -np.ones(nopt, dtype=np.float64)
iterations = xrange(repeat)
print("ERF: {}: Size: {}".format(name, nopt)),
sys.stdout.flush()

if pass_args:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put) #warmup
t0 = now()
for _ in iterations:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put, **kwargs)
else:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY) #warmup
t0 = now()
for _ in iterations:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, **kwargs)
mops = get_mops(t0, nopt)
print("MOPS: {}".format(mops*2*repeat), args.text)
nopt *= step
repeat -= step
if repeat < 1:
repeat = 1
##############################################

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's changed here? EOLs?

def run(name, alg, sizes=15, step=2, nopt=1024, nparr=True, dask=False, pass_args=False, verbose=True):
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--steps', required=False, default=sizes, help="Number of steps")
parser.add_argument('--step', required=False, default=step, help="Factor for each step")
parser.add_argument('--chunk', required=False, default=2000000,help="Chunk size for Dask")
parser.add_argument('--size', required=False, default=nopt, help="Initial data size")
parser.add_argument('--repeat',required=False, default=100, help="Iterations inside measured region")
parser.add_argument('--dask', required=False, default="sq", help="Dask scheduler: sq, mt, mp")
parser.add_argument('--text', required=False, default="", help="Print with each result")

args = parser.parse_args()
sizes= int(args.steps)
step = int(args.step)
nopt = int(args.size)
chunk= int(args.chunk)
repeat=int(args.repeat)
kwargs={}

if(dask):
import dask
import dask.multiprocessing
import dask.array as da
dask_modes = {
"sq": dask.async.get_sync,
"mt": dask.threaded.get,
"mp": dask.multiprocessing.get
}
kwargs = {"schd": dask_modes[args.dask]}
name += "-"+args.dask

for i in xrange(sizes):
price, strike, t = gen_data(nopt)
if not nparr:
call = [0.0 for i in range(nopt)]
put = [-1.0 for i in range(nopt)]
price=list(price)
strike=list(strike)
t=list(t)
repeat=1 # !!!!! ignore repeat count
if dask:
assert(not pass_args)
price = da.from_array(price, chunks=(chunk,), name=False)
strike = da.from_array(strike, chunks=(chunk,), name=False)
t = da.from_array(t, chunks=(chunk,), name=False)
if pass_args:
call = np.zeros(nopt, dtype=np.float64)
put = -np.ones(nopt, dtype=np.float64)
iterations = xrange(repeat)
if verbose:
print("ERF: {}: Size: {}".format(name, nopt)),
sys.stdout.flush()

if pass_args:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put) #warmup
t0 = now()
for _ in iterations:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, call, put, **kwargs)
else:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY) #warmup
t0 = now()
for _ in iterations:
alg(nopt, price, strike, t, RISK_FREE, VOLATILITY, **kwargs)
mops = get_mops(t0, nopt)
if verbose:
print("MOPS: {}".format(mops*2*repeat), args.text)
nopt *= step
repeat -= step
if repeat < 1:
repeat = 1
16 changes: 16 additions & 0 deletions bs_apply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import numpy as np

class bs_runner(object):
def __init__(self, bs_impl, pool, nump):
self.bs_impl = bs_impl
self.pool = pool
self.nump = nump

def __call__(self, nopt, price, strike, t, rate, vol):
noptpp = int(nopt/self.nump)
call = np.empty(nopt, dtype=np.float64)
put = np.empty(nopt, dtype=np.float64)
asyncs = [self.pool.apply_async(self.bs_impl, (noptpp, price[i:i+noptpp], strike[i:i+noptpp], t[i:i+noptpp], rate, vol)) for i in range(0, nopt, noptpp)]
for a,i in zip(asyncs, range(len(asyncs))):
call[i:i+noptpp], put[i:i+noptpp] = a.get()
return call, put
25 changes: 25 additions & 0 deletions bs_apply_ipyparallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import ipyparallel
from ipyparallel import Client


class bs(object):
def __init__(self, bs_impl):
self.bs_impl = bs_impl

def __call__(self, nopt, rate, vol):
return self.bs_impl(nopt, price, strike, t, rate, vol)


class bs_runner(object):
def __init__(self, bs_impl):
self.bs_impl = bs_impl
self.client = Client()
self.dview = self.client[:]

def __call__(self, nopt, price, strike, t, rate, vol):
self.dview.scatter('price', price)
self.dview.scatter('strike', strike)
self.dview.scatter('t', t)
self.dview.push(dict(call=0,put=-1))
r = self.dview.apply(bs(self.bs_impl), t, rate, vol)
return self.dview.gather('call').get(), self.dview.gather('put').get()
51 changes: 51 additions & 0 deletions bs_erf_distarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import distarray.globalapi as da
from distarray.globalapi import log, sqrt, exp
import numpy as np
invsqrt = lambda x: 1.0/sqrt(x)
from base_bs_erf import run, erf


def black_scholes(ctxt, nopt, price, strike, t, rate, vol):
mr = -rate
sig_sig_two = vol * vol * 2

P = price
S = strike
T = t

a = log(P / S)
b = T * mr

z = T * sig_sig_two
c = 0.25 * z
y = invsqrt(z)

w1 = (a - b + c) * y
w2 = (a - b - c) * y

d1 = 0.5 + 0.5 * ctxt.fromarray(erf(np.asarray(w1)))
d2 = 0.5 + 0.5 * ctxt.fromarray(erf(np.asarray(w2)))

Se = exp(b) * S

call = P * d1 - Se * d2
put = call - P + Se

return call, put


class bs_runner(object):
def __init__(self, ctxt):
self.ctxt = ctxt

def __call__(self, nopt, price, strike, t, rate, vol):
dprice = self.ctxt.fromarray(price)
dstrike = self.ctxt.fromarray(strike)
dt = self.ctxt.fromarray(t)
ret = black_scholes(self.ctxt, nopt, dprice, dstrike, dt, rate, vol)
return ret[0].toarray(), ret[1].toarray()


if __name__ == '__main__':
bsr = bs_runner(da.Context())
run(__file__, bsr, pass_args=False)
33 changes: 0 additions & 33 deletions bs_erf_naive.py

This file was deleted.

7 changes: 7 additions & 0 deletions bs_erf_naive_apply_ipyparallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from bs_apply_ipyparallel import bs_runner
from bs_naive import black_scholes
import base_bs_erf

if __name__ == '__main__':
bsr = bs_runner(black_scholes)
base_bs_erf.run(__file__, bsr, pass_args=False)
10 changes: 10 additions & 0 deletions bs_erf_naive_apply_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from multiprocessing import Pool
from multiprocessing import cpu_count
from bs_apply import bs_runner
from bs_naive import black_scholes
import base_bs_erf

if __name__ == '__main__':
n = int(cpu_count()/2)
bsr = bs_runner(black_scholes, Pool(n), n)
base_bs_erf.run(__file__, bsr, pass_args=False)
9 changes: 9 additions & 0 deletions bs_erf_naive_apply_threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
from bs_apply import bs_runner
from bs_naive import black_scholes
import base_bs_erf

if __name__ == '__main__':
bsr = bs_runner(black_scholes, ThreadPool(cpu_count()), cpu_count())
base_bs_erf.run(__file__, bsr, pass_args=False)
10 changes: 10 additions & 0 deletions bs_erf_naive_map_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from multiprocessing import Pool
from multiprocessing import cpu_count
from bs_map import bs_runner
from bs_naive import black_scholes_map
import base_bs_erf

if __name__ == '__main__':
n = int(cpu_count()/2)
bsr = bs_runner(black_scholes_map, Pool(n))
base_bs_erf.run(__file__, bsr, pass_args=False)
11 changes: 11 additions & 0 deletions bs_erf_naive_map_processpoolexecutor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
from bs_executor import bs_runner
from bs_naive import black_scholes_map
import base_bs_erf

if __name__ == '__main__':
n = int(cpu_count()/2)
with ProcessPoolExecutor(n) as executor:
bsr = bs_runner(black_scholes_map, executor, n)
base_bs_erf.run(__file__, bsr, pass_args=False)
9 changes: 9 additions & 0 deletions bs_erf_naive_map_threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
from bs_map import bs_runner
from bs_naive import black_scholes_map
import base_bs_erf

if __name__ == '__main__':
bsr = bs_runner(black_scholes_map, ThreadPool(cpu_count()))
base_bs_erf.run(__file__, bsr, pass_args=False)
11 changes: 11 additions & 0 deletions bs_erf_naive_map_threadpoolexecutor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
from bs_executor import bs_runner
from bs_naive import black_scholes_map
import base_bs_erf

if __name__ == '__main__':
n = cpu_count()
with ThreadPoolExecutor(n) as executor:
bsr = bs_runner(black_scholes_map, executor, n)
base_bs_erf.run(__file__, bsr, pass_args=False)
8 changes: 8 additions & 0 deletions bs_erf_naive_mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from mpi4py import MPI
from bs_mpi import bs_runner
from bs_naive import black_scholes
import base_bs_erf

if __name__ == '__main__':
bsr = bs_runner(black_scholes)
base_bs_erf.run(__file__, bsr, pass_args=True, verbose=MPI.COMM_WORLD.Get_rank()==0)
5 changes: 5 additions & 0 deletions bs_erf_naive_sequential.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import base_bs_erf
from bs_naive import black_scholes_args

if __name__ == '__main__':
base_bs_erf.run(__file__, black_scholes_args, nparr=False, pass_args=True)
8 changes: 8 additions & 0 deletions bs_erf_naive_threading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from multiprocessing import cpu_count
from bs_threading import bs_runner
from bs_naive import black_scholes_args
import base_bs_erf

if __name__ == '__main__':
bsr = bs_runner(black_scholes_args, cpu_count())
base_bs_erf.run(__file__, bsr, pass_args=True)
Loading