Skip to content

Commit

Permalink
Add dummysource / xgpu averaging & downselect
Browse files Browse the repository at this point in the history
Dummysource replaces the ethernet input for throughput
testing, and is enabled with the commandline switch --fakesource

Add xGPU averaging and subselection. The former has been "tested" in
that it outputs appropriate data when the pipeline is fed with the
all ones.

With all threads active, the pipeline runs at ~40Gb/s on my
old Xeon machine, seemingly processing limited by my RTX 2060 GPU.

NB: Probably some syncronization barriers are needed, certainly
on the block which copies data to the GPU. See
ledatelescope/bifrost#138
  • Loading branch information
JackH committed Jun 22, 2020
1 parent d1f52b5 commit db2d0ef
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 51 deletions.
2 changes: 1 addition & 1 deletion bifrost
194 changes: 154 additions & 40 deletions test-scripts/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,62 @@ def main(self):
#print status
del capture

class DummySource(object):
"""
A dummy source block for throughput testing. Does nothing
but mark input buffers ready for consumption.
"""
def __init__(self, log, oring, ntime_gulp=2500,
core=-1, nchans=192, npols=704):
self.log = log
self.oring = oring
self.ntime_gulp = ntime_gulp
self.core = core
self.nchans = nchans
self.npols = npols

self.bind_proclog = ProcLog(type(self).__name__+"/bind")
self.in_proclog = ProcLog(type(self).__name__+"/in")
self.out_proclog = ProcLog(type(self).__name__+"/out")
self.size_proclog = ProcLog(type(self).__name__+"/size")
self.sequence_proclog = ProcLog(type(self).__name__+"/sequence0")
self.perf_proclog = ProcLog(type(self).__name__+"/perf")

self.out_proclog.update( {'nring':1, 'ring0':self.oring.name})
self.size_proclog.update({'nseq_per_gulp': self.ntime_gulp})
self.gulp_size = self.ntime_gulp*nchans*npols*1 # complex8

self.test_data = 1*np.ones(self.gulp_size, dtype=np.uint8)

def main(self):
cpu_affinity.set_core(self.core)
self.bind_proclog.update({'ncore': 1,
'core0': cpu_affinity.get_core(),})

time.sleep(0.1)
self.oring.resize(self.gulp_size)
hdr = {}
hdr['nchans'] = self.nchans
hdr['npols'] = self.npols
hdr['seq0'] = 0
time_tag = 0
REPORT_PERIOD = 100
bytes_per_report = REPORT_PERIOD * self.gulp_size
with self.oring.begin_writing() as oring:
tick = time.time()
while(True):
ohdr_str = json.dumps(hdr)
with oring.begin_sequence(time_tag=time_tag, header=ohdr_str) as oseq:
with oseq.reserve(self.gulp_size) as ospan:
ospan.data[...] = self.test_data
time_tag += 1
hdr['seq0'] += self.ntime_gulp
if time_tag % REPORT_PERIOD == 0:
tock = time.time()
dt = tock - tick
print('Send %d bytes in %.2f seconds (%.2f Gb/s)' % (bytes_per_report, dt, (8*bytes_per_report / dt / 1e9)))
tick = tock

class CopyOp(object):
"""
Copy data from one buffer to another.
Expand Down Expand Up @@ -137,7 +193,8 @@ def main(self):
with oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str, nringlet=iseq.nringlet) as oseq:
for ispan in iseq.read(self.igulp_size):
with oseq.reserve(self.igulp_size) as ospan:
self.log.info("Copying output")
#self.log.debug("Copying output")
odata = ospan.data_view('ci4')
copy_array(ospan.data, ispan.data)

class Corr(object):
Expand Down Expand Up @@ -171,11 +228,13 @@ def __init__(self, log, iring, oring, ntime_gulp=2500,
self.ogulp_size = 47849472 * 8 # complex64

# initialize xGPU. Arrays passed as inputs don't really matter here
# but passing something prevents xGPU from trying to allocate
# host memory
ibuf = BFArray(self.igulp_size, dtype='ci4')
obuf = BFArray(self.ogulp_size // 8, dtype='ci32')
_bf.xgpuInitialize(ibuf.as_BFarray(), obuf.as_BFarray(), 0)
# but we need to pass something
ibuf = BFArray(0, dtype='i8', space='cuda')
obuf = BFArray(0, dtype='i64', space='cuda')
rv = _bf.xgpuInitialize(ibuf.as_BFarray(), obuf.as_BFarray(), 0)
if (rv != _bf.BF_STATUS_SUCCESS):
self.log.error("xgpuIntialize returned %d" % rv)
raise RuntimeError

def main(self):
cpu_affinity.set_core(self.core)
Expand All @@ -185,7 +244,7 @@ def main(self):
self.oring.resize(self.ogulp_size)
with self.oring.begin_writing() as oring:
for iseq in self.iring.read(guarantee=self.guarantee):
self.log.info("Correlating output")
self.log.debug("Correlating output")
ihdr = json.loads(iseq.header.tostring())
subacc_id = ihdr['seq0'] % self.acc_len
first = subacc_id == 0
Expand All @@ -197,11 +256,63 @@ def main(self):
if first:
oseq = oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str, nringlet=iseq.nringlet)
ospan = WriteSpan(oseq.ring, self.ogulp_size, nonblocking=False)
_bf.xgpuCorrelate(ispan.data.as_BFarray(), ospan.data.as_BFarray(), int(last))
_bf.xgpuKernel(ispan.data.as_BFarray(), ospan.data.as_BFarray(), int(last))
if last:
ospan.close()
oseq.end()

class CorrSubSel(object):
"""
Grab arbitrary entries from a GPU buffer and copy them to the CPU
"""
def __init__(self, log, iring, oring,
guarantee=True, core=-1, nchans=192):
self.log = log
self.iring = iring
self.oring = oring
self.guarantee = guarantee
self.core = core

self.bind_proclog = ProcLog(type(self).__name__+"/bind")
self.in_proclog = ProcLog(type(self).__name__+"/in")
self.out_proclog = ProcLog(type(self).__name__+"/out")
self.size_proclog = ProcLog(type(self).__name__+"/size")
self.sequence_proclog = ProcLog(type(self).__name__+"/sequence0")
self.perf_proclog = ProcLog(type(self).__name__+"/perf")

self.in_proclog.update( {'nring':1, 'ring0':self.iring.name})
self.out_proclog.update( {'nring':1, 'ring0':self.oring.name})
self.igulp_size = 47849472 * 8 # complex64

self.subsel = BFArray(np.array(list(range(4656)), dtype=np.int32), dtype='i32', space='cuda')
self.nvis_out = len(self.subsel)
self.obuf_gpu = BFArray(shape=[nchans, self.nvis_out], dtype='i64', space='cuda')
self.ogulp_size = len(self.subsel) * 8

def main(self):
cpu_affinity.set_core(self.core)
self.bind_proclog.update({'ncore': 1,
'core0': cpu_affinity.get_core(),})

self.oring.resize(self.ogulp_size)
oseq = None
with self.oring.begin_writing() as oring:
for iseq in self.iring.read(guarantee=self.guarantee):
ihdr = json.loads(iseq.header.tostring())
ohdr = ihdr.copy()
# Mash header in here if you want
ohdr_str = json.dumps(ohdr)
for ispan in iseq.read(self.igulp_size):
self.log.debug("GRabbing subselection")
idata = ispan.data_view('i64').reshape(47849472)
with oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str, nringlet=iseq.nringlet) as oseq:
with oseq.reserve(self.ogulp_size) as ospan:
rv = _bf.xgpuSubSelect(idata.as_BFarray(), self.obuf_gpu.as_BFarray(), self.subsel.as_BFarray())
if (rv != _bf.BF_STATUS_SUCCESS):
self.log.error("xgpuIntialize returned %d" % rv)
raise RuntimeError
#copy_array(ospan.data, obuf_gpu.data)

class CorrAcc(object):
"""
Perform GPU side accumulation and then transfer to CPU
Expand Down Expand Up @@ -232,11 +343,8 @@ def __init__(self, log, iring, oring, ntime_gulp=2400,
self.ogulp_size = self.igulp_size
# integration buffer
self.accdata = BFArray(shape=(self.igulp_size // 4), dtype='i32', space='cuda')
self.zeros = BFArray(np.zeros(self.igulp_size // 4), dtype='i32', space='cuda')

self.bfbf = LinAlg()


def main(self):
cpu_affinity.set_core(self.core)
self.bind_proclog.update({'ncore': 1,
Expand All @@ -254,22 +362,23 @@ def main(self):
# Mash header in here if you want
ohdr_str = json.dumps(ohdr)
for ispan in iseq.read(self.igulp_size):
self.log.info("Accumulating correlation")
self.log.debug("Accumulating correlation")
idata = ispan.data_view('i32')
if first:
oseq = oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str, nringlet=iseq.nringlet)
ospan = WriteSpan(oseq.ring, self.ogulp_size, nonblocking=False)
self.accdata = self.zeros
# do accumulation
idata = ispan.data_view('i32')
BFMap("a += b", data={'a': self.accdata, 'b': idata})
# TODO: surely there are more sensible ways to implement a vacc
BFMap("a = b", data={'a': self.accdata, 'b': idata})
else:
BFMap("a += b", data={'a': self.accdata, 'b': idata})
if last:
if oseq is None:
print("Skipping output because oseq isn't open")
else:
# copy to CPU
odata = ospan.data_view('i32')
print(self.accdata.shape, odata.shape)
odata = self.accdata
print(odata[0:10])
ospan.close()
oseq.end()
oseq = None
Expand Down Expand Up @@ -356,7 +465,7 @@ def main(self):

igulp_size = self.ntime_gulp*64*704*1 # complex8
for iseq in self.iring.read(guarantee=True):
self.log.info("Dumping output")
self.log.debug("Dumping output")
#for ispan in iseq.read(igulp_size):
# if ispan.size < igulp_size:
# print('ignoring final gulp')
Expand All @@ -371,6 +480,7 @@ def main(argv):
parser.add_argument('-d', '--dryrun', action='store_true', help='Test without acting')
parser.add_argument('-l', '--logfile', default=None, help='Specify log file')
parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity')
parser.add_argument('--fakesource', action='store_true', help='Use a dummy source for testing')
parser.add_argument('-q', '--quiet', action='count', default=0, help='Decrease verbosity')
args = parser.parse_args()

Expand Down Expand Up @@ -431,17 +541,11 @@ def handle_signal_terminate(signum, frame):
log.info("Hostname: %s", hostname)
log.info("Server index: %i", server_idx)

iaddr = Address('100.100.100.101', 10000)
isock = UDPSocket()
isock.bind(iaddr)
isock.timeout = 0.5

print("binding input to", iaddr)

capture_ring = Ring(name="capture", space='cuda_host')
gpu_input_ring = Ring(name="gpu-input", space='cuda')
corr_output_ring = Ring(name="corr-output", space='cuda')
corr_slow_output_ring = Ring(name="corr-output", space='cuda_host')
corr_slow_output_ring = Ring(name="corr-slow-output", space='cuda_host')
corr_fast_output_ring = Ring(name="corr-fast-output", space='cuda_host')

# TODO: Figure out what to do with this resize
GSIZE = 480#1200
Expand All @@ -452,32 +556,42 @@ def handle_signal_terminate(signum, frame):
nroach = 11
nfreqblocks = 2
roach0 = 0
ops.append(CaptureOp(log, fmt="snap2", sock=isock, ring=capture_ring,
nsrc=nroach*nfreqblocks, src0=0, max_payload_size=9000,
buffer_ntime=GSIZE, slot_ntime=SLOT_NTIME, core=cores.pop(0),
utc_start=datetime.datetime.now()))
if not args.fakesource:
print("binding input to", iaddr)
iaddr = Address('100.100.100.101', 10000)
isock = UDPSocket()
isock.bind(iaddr)
isock.timeout = 0.5
ops.append(CaptureOp(log, fmt="snap2", sock=isock, ring=capture_ring,
nsrc=nroach*nfreqblocks, src0=0, max_payload_size=9000,
buffer_ntime=GSIZE, slot_ntime=SLOT_NTIME, core=cores.pop(0),
utc_start=datetime.datetime.now()))
else:
print('Using dummy source...')
ops.append(DummySource(log, oring=capture_ring, ntime_gulp=GSIZE, core=cores.pop(0)))

# capture_ring -> triggered buffer
## capture_ring -> triggered buffer

ops.append(CopyOp(log, iring=capture_ring, oring=gpu_input_ring, ntime_gulp=GSIZE,
core=cores.pop(0), guarantee=True))

# gpu_input_ring -> beamformer
# beamformer -> UDP
## gpu_input_ring -> beamformer
## beamformer -> UDP

ops.append(Corr(log, iring=gpu_input_ring, oring=corr_output_ring, ntime_gulp=GSIZE,
core=cores.pop(0), guarantee=True))
core=cores.pop(0), guarantee=True, acc_len=2400))

#ops.append(CorrSubOutput(log, iring=corr_output_ring, oring=corr_fast_output_ring,
# core=cores.pop(0), guarantee=True))
ops.append(CorrSubSel(log, iring=corr_output_ring, oring=corr_fast_output_ring,
core=cores.pop(0), guarantee=True))

ops.append(CorrAcc(log, iring=corr_output_ring, oring=corr_slow_output_ring,
core=cores.pop(0), guarantee=True, acc_len=24000))

# corr_slow_output -> UDP
# corr_fast_output -> UDP
#
## corr_slow_output -> UDP
## corr_fast_output -> UDP

final_ring = corr_slow_output_ring
final_ring = corr_fast_output_ring
#final_ring = corr_output_ring

ops.append(DummyOp(log=log, iring=final_ring,
core=cores.pop(0), ntime_gulp=GSIZE))
Expand Down
28 changes: 19 additions & 9 deletions test-scripts/xgpu_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import numpy as np

NSTATION=352; NFREQUENCY=192; NTIME=512;
NSTATION=352; NFREQUENCY=192; NTIME=480;
DOSIM=False
#NSTATION=32; NFREQUENCY=32; NTIME=32;
MATLEN = 47849472
Expand Down Expand Up @@ -64,41 +64,51 @@ def regtile_index(in0, in1):
print("Not populating test vectors")

print('allocating input')
ibuf = bf.ndarray(invec, dtype='ci4', space=SPACE)
ibuf = bf.ndarray(invec, dtype='i8', space=SPACE)
print('allocating output')
obuf = bf.ndarray(np.zeros([MATLEN], dtype=np.int32), dtype='ci32', space=SPACE)
#obuf = bf.ndarray(np.zeros([NFREQUENCY, MATLEN//NFREQUENCY], dtype=np.int32), dtype='ci32', space=SPACE)
#time.sleep(20)
print(obuf[0:10])

if SPACE == 'cuda':
print('running kernel as_GPUarray')
_bf.xgpuInitialize(ibuf.as_BFarray(), obuf.as_BFarray(), 0)
print('initialized')
#time.sleep(20)
for i in range(4):
_bf.xgpuKernel(ibuf.as_BFarray(), obuf.as_BFarray(), 0)
print(i)
print(_bf.xgpuKernel(ibuf.as_BFarray(), obuf.as_BFarray(), 0))
#time.sleep(20)
_bf.xgpuKernel(ibuf.as_BFarray(), obuf.as_BFarray(), 1)
print(obuf[0:10])
else:
print('running kernel as_BFarray')
_bf.xgpuInitialize(ibuf.as_BFarray(), obuf.as_BFarray(), 0)
for i in range(4):
_bf.xgpuCorrelate(ibuf.as_BFarray(), obuf.as_BFarray(), 0)
_bf.xgpuCorrelate(ibuf.as_BFarray(), obuf.as_BFarray(), 1)
print(obuf[0:10])

obuf_cpu = obuf.copy(space='system')
print('copied')
#view as real/imag x chan x station
o = obuf_cpu.view(dtype=np.int32).reshape(2, NFREQUENCY, MATLEN//NFREQUENCY)
oc = o[0,0,:] + 1j*o[1,0,:]
print('is all zero?', np.all(o==0))
print(o[o!=0])

acc_len = 5 * NTIME
ibuf_c = ibuf.view(dtype='ci4')
for s0 in range(5):
for s1 in range(s0, 5):
ar = ibuf[0,0,s0,0].real[0]
ai = ibuf[0,0,s0,0].imag[0]
#ar = ibuf[0,0,s0,0].real[0]
#ai = ibuf[0,0,s0,0].imag[0]
ar = ibuf[0,0,s0,0] >> 4
ai = ibuf[0,0,s0,0] & 0b1111
a = ar + 1j*ai
br = ibuf[0,0,s1,0].real[0]
bi = ibuf[0,0,s1,0].imag[0]
#br = ibuf[0,0,s1,0].real[0]
#bi = ibuf[0,0,s1,0].imag[0]
br = ibuf[0,0,s1,0] >> 4
bi = ibuf[0,0,s1,0] & 0b1111
b = br + 1j*bi
v = a * np.conj(b)
v *= acc_len
Expand Down
2 changes: 1 addition & 1 deletion xgpu
Submodule xgpu updated 2 files
+68 −17 src/cuda_xengine.cu
+12 −1 src/xgpu.h

0 comments on commit db2d0ef

Please sign in to comment.